support transactional insert of multiple rules

The goal is to make rule inserts all succeed or all fail with no
in-between states or side-effects caused by temporary in-between
states.

The function is needed to support policy library.

The patch takes present persistent_insert_rule method and extends it
into the persistent_insert_rules method to work for multiple rules.

By packaging multiple rules into a single call to agnostic.update,
trigger activation is evaluated only after all rule insertions
succeed.

No rule is persisted in DB until all have succeeded in insertion to
policy engine.

Limitation: if unexpected DB error occurs after inserts into policy
engine have succeeded, the inserts are undone, but not before the
rules may have already have an effect on triggers and queries. This
behavior affects single rule insert as well as batch rule insert.

Then the single rule insert method is defined in terms of the multi
rule insert method.

Change-Id: Id22d52ed32904174564b7c8f6427822e553db27a
This commit is contained in:
Eric Kao 2017-04-26 16:25:56 -07:00
parent 53f0133991
commit 7973b95d1f
3 changed files with 184 additions and 59 deletions

View File

@ -95,6 +95,19 @@ def commit_unlock_tables(session):
# postgres automatically releases lock at transaction end
def rollback_unlock_tables(session):
"""Rollback and unlock tables
supported backends: MySQL and PostgreSQL
"""
session.rollback()
# unlock
if is_mysql():
session.execute('UNLOCK TABLES')
# postgres automatically releases lock at transaction end
def is_mysql():
"""Return true if and only if database backend is mysql"""
return (cfg.CONF.database.connection is not None and

View File

@ -331,13 +331,28 @@ class Runtime (object):
rules = db_policy_rules.get_policy_rules(policy_name)
return [rule.to_dict() for rule in rules]
def persistent_insert_rule(self, policy_name, str_rule, rule_name,
comment):
rule_data = {'str_rule': str_rule, 'rule_name': rule_name,
'comment': comment}
return_data = self.persistent_insert_rules(policy_name, [rule_data])
return (return_data[0]['id'], return_data[0])
# Note(thread-safety): blocking function
# acquire lock to avoid periodic sync from undoing insert before persisted
@lockutils.synchronized('congress_synchronize_rules')
def persistent_insert_rule(self, policy_name, str_rule, rule_name,
comment):
def persistent_insert_rules(self, policy_name, rules):
"""Insert and persists rule into policy_name."""
def uninsert_rules(rules_inserted):
for d in rules_inserted:
self._safe_process_policy_update(
[d['input_rule_str']], policy_name, insert=False)
success = False # used to rollback DB if not set to success
try:
rules_to_persist = []
return_data = []
if cfg.CONF.replicated_policy_engine:
# get session
db_session = db_api.get_locking_session()
@ -373,65 +388,99 @@ class Runtime (object):
raise exception.PolicyRuntimeException(
name='rule_not_permitted')
id_ = uuidutils.generate_uuid()
try:
rule = self.parse(str_rule)
except exception.PolicyException as e:
# TODO(thinrichs): change compiler to provide these error_code
# names directly.
raise exception.PolicyException(str(e), name='rule_syntax')
rules_to_insert = []
for rule_data in rules:
str_rule = rule_data['str_rule']
rule_name = rule_data['rule_name']
comment = rule_data['comment']
if len(rule) == 1:
rule = rule[0]
else:
msg = ("Received multiple rules: " +
"; ".join(str(x) for x in rule))
raise exception.PolicyRuntimeException(
msg, name='multiple_rules')
id_ = uuidutils.generate_uuid()
try:
rule = self.parse(str_rule)
except exception.PolicyException as e:
# TODO(thinrichs): change compiler to provide these
# error_code names directly.
raise exception.PolicyException(
str(e), name='rule_syntax')
rule.set_id(id_)
rule.set_name(rule_name)
rule.set_comment(comment or "")
rule.set_original_str(str_rule)
if len(rule) == 1:
rule = rule[0]
else:
msg = ("Received multiple rules: " +
"; ".join(str(x) for x in rule))
raise exception.PolicyRuntimeException(
msg, name='multiple_rules')
rule.set_id(id_)
rule.set_name(rule_name)
rule.set_comment(comment or "")
rule.set_original_str(str_rule)
rules_to_insert.append(rule)
changes = self._safe_process_policy_update(
rule, policy_name, persistent=True)
rules_to_insert, policy_name, persistent=True)
if len(changes) > 0:
# remember the rule for possible undo
rules_inserted = [
change_event.formula for change_event in changes]
# remember the rule for insert into DB
rules_to_persist = [{
'original_str': change_event.formula.original_str,
'id': str(change_event.formula.id),
'comment': change_event.formula.comment,
'name': change_event.formula.name}
for change_event in changes]
# prepare return data based on rules inserted
return_data = [{
'rule': change_event.formula.pretty_str(),
'id': str(change_event.formula.id),
'comment': change_event.formula.comment,
'name': change_event.formula.name}
for change_event in changes]
# save rule to database if change actually happened.
# Note: change produced may not be equivalent to original rule
# because of column-reference elimination.
if len(changes) > 0:
d = {'rule': rule.pretty_str(),
'id': str(rule.id),
'comment': rule.comment,
'name': rule.name}
try:
if len(rules_to_persist) == 0 and len(rules) > 0:
# change not accepted means it was already there
raise exception.PolicyRuntimeException(
name='rule_already_exists')
try:
for d in rules_to_persist:
# Note(thread-safety): blocking call
db_policy_rules.add_policy_rule(
d['id'], policy_name, str_rule, d['comment'],
rule_name=d['name'], session=db_session)
d['id'], policy_name, d['original_str'],
d['comment'], rule_name=d['name'], session=db_session)
# do not begin to avoid implicitly releasing table
# lock due to starting new transaction
return (d['id'], d)
except Exception as db_exception:
try:
self._safe_process_policy_update(
rule, policy_name, insert=False)
raise exception.PolicyRuntimeException(
"Error while writing to DB: %s."
% str(db_exception))
except Exception as change_exception:
raise exception.PolicyRuntimeException(
"Error thrown during recovery from DB error. "
"Inconsistent state. DB error: %s. "
"New error: %s." % (str(db_exception),
str(change_exception)))
# change not accepted means it was already there
raise exception.PolicyRuntimeException(
name='rule_already_exists')
success = True
return return_data
except Exception as db_exception:
try:
# un-insert all rules from engine unless all db inserts
# succeeded
# Note limitation: if an unexpected DB error is encountered
# the rule insertions into policy engine are undone, but
# may have already had effects on actions and query results
uninsert_rules(rules_inserted)
raise exception.PolicyRuntimeException(
"Error while writing to DB: %s."
% str(db_exception))
except Exception as change_exception:
raise exception.PolicyRuntimeException(
"Error thrown during recovery from DB error. "
"Inconsistent state. DB error: %s. "
"New error: %s." % (str(db_exception),
str(change_exception)))
finally:
# commit, unlock, and close db_session
# commit/rollback, unlock, and close db_session
if db_session:
db_api.commit_unlock_tables(session=db_session)
if success:
db_api.commit_unlock_tables(session=db_session)
else:
db_api.rollback_unlock_tables(session=db_session)
db_session.close()
# Note(thread-safety): blocking function
@ -445,7 +494,7 @@ class Runtime (object):
name='rule_not_exists',
data='ID: %s, policy_name: %s' % (id_, policy_name))
rule = self.parse1(item['rule'])
self._safe_process_policy_update(rule, policy_name, insert=False)
self._safe_process_policy_update([rule], policy_name, insert=False)
# Note(thread-safety): blocking call
db_policy_rules.delete_policy_rule(id_)
return item
@ -466,21 +515,20 @@ class Runtime (object):
parsed_rule.set_comment(rule.comment)
parsed_rule.set_original_str(rule.rule)
self._safe_process_policy_update(
parsed_rule,
[parsed_rule],
rule.policy_name)
def _safe_process_policy_update(self, parsed_rule, policy_name,
def _safe_process_policy_update(self, parsed_rules, policy_name,
insert=True, persistent=False):
if policy_name not in self.theory:
raise exception.PolicyRuntimeException(
'Policy ID %s does not exist' % policy_name,
name='policy_not_exist')
event = compile.Event(
formula=parsed_rule,
insert=insert,
target=policy_name)
events = [compile.Event(
formula=parsed_rule, insert=insert, target=policy_name)
for parsed_rule in parsed_rules]
(permitted, changes) = self.process_policy_update(
[event], persistent=persistent)
events, persistent=persistent)
if not permitted:
raise exception.PolicyException(
";".join([str(x) for x in changes]),

View File

@ -265,6 +265,35 @@ class TestRuntime(base.TestCase):
self.assertFalse(run.synchronizer.sync_one_policy.called)
self.assertNotIn('test_policy', run.policy_names())
mock_db_policy_obj = lambda: None
setattr(mock_db_policy_obj, 'name', 'test_policy')
@mock.patch.object(db_policy_rules, 'add_policy_rule')
@mock.patch.object(db_policy_rules, 'policy_name', side_effect=lambda x: x)
@mock.patch.object(
db_policy_rules, 'get_policies', return_value=[mock_db_policy_obj])
def test_persistent_insert_rules(
self, mock_add, mock_policy_name, mock_get_policies):
run = agnostic.Runtime()
run.synchronizer = mock.MagicMock()
run.create_policy('test_policy')
# test empty insert
result = run.persistent_insert_rules('test_policy', [])
self.assertEqual(len(result), 0)
self.assertTrue(helper.datalog_equal(
run.select('p(x)'), ''))
# test duplicated insert, 3 rules, 2 unique
result = run.persistent_insert_rules(
'test_policy',
[{'str_rule': 'p(1)', 'rule_name': '', 'comment': ''},
{'str_rule': 'p(2)', 'rule_name': '', 'comment': ''},
{'str_rule': 'p(1)', 'rule_name': '', 'comment': ''}])
self.assertEqual(len(result), 2)
self.assertTrue(helper.datalog_equal(
run.select('p(x)'), 'p(1) p(2)'))
def test_tablenames_theory_name(self):
run = agnostic.Runtime()
run.create_policy('test')
@ -544,17 +573,39 @@ class TestTriggers(base.TestCase):
run.insert('p(1)')
self.assertEqual(0, obj.value)
def test_batch_change(self):
def test_batch_change_succeed(self):
obj = self.MyObject()
run = agnostic.Runtime()
run.create_policy('test')
run.register_trigger('p', lambda tbl, old, new: obj.increment())
p1 = compile.parse1('p(1)')
result = run.update([compile.Event(p1, target='test')])
p2 = compile.parse1('p(2)')
p3 = compile.parse1('p(3)')
result = run.update([compile.Event(p1, target='test'),
compile.Event(p2, target='test'),
compile.Event(p3, target='test')])
self.assertTrue(result[0], ("Update failed with errors: " +
";".join(str(x) for x in result[1])))
# IMPORTANT: 3 tuples inserted into p in a single batch triggers once
self.assertEqual(1, obj.value)
def test_batch_change_fail(self):
obj = self.MyObject()
run = agnostic.Runtime()
run.create_policy('test')
run.register_trigger('p', lambda tbl, old, new: obj.increment())
p1 = compile.parse1('p(1)')
p2 = compile.parse1('p(x) :- q(x)')
p3 = compile.parse1('q(x) :- p(x)')
result = run.update([compile.Event(p1, target='test'),
compile.Event(p2, target='test'),
compile.Event(p3, target='test')])
self.assertFalse(result[0],
("Update should have failed with recursion: " +
";".join(str(x) for x in result[1])))
# IMPORTANT: trigger not activated even though initial events succeed
self.assertEqual(0, obj.value)
def test_dependency(self):
obj = self.MyObject()
run = agnostic.Runtime()
@ -1513,6 +1564,19 @@ class TestActionExecution(base.TestCase):
self.assertEqual(expected_args, args)
self.assertEqual(expected_kwargs, kwargs)
def test_insert_multiple_rules(self):
# basic test
# test recursion caused at nth rule
# test transactional trigger activation:
# e.g.
# a(1)
# trigger(x) :- a(x), not b(x)
# b(x) :- a(x)
# trigger should not be activated
pass
def test_disjunction(self):
run = self.run
run.create_policy('test')