support transactional insert of multiple rules
The goal is to make rule inserts all succeed or all fail with no in-between states or side-effects caused by temporary in-between states. The function is needed to support policy library. The patch takes present persistent_insert_rule method and extends it into the persistent_insert_rules method to work for multiple rules. By packaging multiple rules into a single call to agnostic.update, trigger activation is evaluated only after all rule insertions succeed. No rule is persisted in DB until all have succeeded in insertion to policy engine. Limitation: if unexpected DB error occurs after inserts into policy engine have succeeded, the inserts are undone, but not before the rules may have already have an effect on triggers and queries. This behavior affects single rule insert as well as batch rule insert. Then the single rule insert method is defined in terms of the multi rule insert method. Change-Id: Id22d52ed32904174564b7c8f6427822e553db27a
This commit is contained in:
parent
53f0133991
commit
7973b95d1f
|
@ -95,6 +95,19 @@ def commit_unlock_tables(session):
|
|||
# postgres automatically releases lock at transaction end
|
||||
|
||||
|
||||
def rollback_unlock_tables(session):
|
||||
"""Rollback and unlock tables
|
||||
|
||||
supported backends: MySQL and PostgreSQL
|
||||
"""
|
||||
session.rollback()
|
||||
|
||||
# unlock
|
||||
if is_mysql():
|
||||
session.execute('UNLOCK TABLES')
|
||||
# postgres automatically releases lock at transaction end
|
||||
|
||||
|
||||
def is_mysql():
|
||||
"""Return true if and only if database backend is mysql"""
|
||||
return (cfg.CONF.database.connection is not None and
|
||||
|
|
|
@ -331,13 +331,28 @@ class Runtime (object):
|
|||
rules = db_policy_rules.get_policy_rules(policy_name)
|
||||
return [rule.to_dict() for rule in rules]
|
||||
|
||||
def persistent_insert_rule(self, policy_name, str_rule, rule_name,
|
||||
comment):
|
||||
rule_data = {'str_rule': str_rule, 'rule_name': rule_name,
|
||||
'comment': comment}
|
||||
return_data = self.persistent_insert_rules(policy_name, [rule_data])
|
||||
return (return_data[0]['id'], return_data[0])
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
# acquire lock to avoid periodic sync from undoing insert before persisted
|
||||
@lockutils.synchronized('congress_synchronize_rules')
|
||||
def persistent_insert_rule(self, policy_name, str_rule, rule_name,
|
||||
comment):
|
||||
def persistent_insert_rules(self, policy_name, rules):
|
||||
"""Insert and persists rule into policy_name."""
|
||||
|
||||
def uninsert_rules(rules_inserted):
|
||||
for d in rules_inserted:
|
||||
self._safe_process_policy_update(
|
||||
[d['input_rule_str']], policy_name, insert=False)
|
||||
|
||||
success = False # used to rollback DB if not set to success
|
||||
try:
|
||||
rules_to_persist = []
|
||||
return_data = []
|
||||
if cfg.CONF.replicated_policy_engine:
|
||||
# get session
|
||||
db_session = db_api.get_locking_session()
|
||||
|
@ -373,65 +388,99 @@ class Runtime (object):
|
|||
raise exception.PolicyRuntimeException(
|
||||
name='rule_not_permitted')
|
||||
|
||||
id_ = uuidutils.generate_uuid()
|
||||
try:
|
||||
rule = self.parse(str_rule)
|
||||
except exception.PolicyException as e:
|
||||
# TODO(thinrichs): change compiler to provide these error_code
|
||||
# names directly.
|
||||
raise exception.PolicyException(str(e), name='rule_syntax')
|
||||
rules_to_insert = []
|
||||
for rule_data in rules:
|
||||
str_rule = rule_data['str_rule']
|
||||
rule_name = rule_data['rule_name']
|
||||
comment = rule_data['comment']
|
||||
|
||||
if len(rule) == 1:
|
||||
rule = rule[0]
|
||||
else:
|
||||
msg = ("Received multiple rules: " +
|
||||
"; ".join(str(x) for x in rule))
|
||||
raise exception.PolicyRuntimeException(
|
||||
msg, name='multiple_rules')
|
||||
id_ = uuidutils.generate_uuid()
|
||||
try:
|
||||
rule = self.parse(str_rule)
|
||||
except exception.PolicyException as e:
|
||||
# TODO(thinrichs): change compiler to provide these
|
||||
# error_code names directly.
|
||||
raise exception.PolicyException(
|
||||
str(e), name='rule_syntax')
|
||||
|
||||
rule.set_id(id_)
|
||||
rule.set_name(rule_name)
|
||||
rule.set_comment(comment or "")
|
||||
rule.set_original_str(str_rule)
|
||||
if len(rule) == 1:
|
||||
rule = rule[0]
|
||||
else:
|
||||
msg = ("Received multiple rules: " +
|
||||
"; ".join(str(x) for x in rule))
|
||||
raise exception.PolicyRuntimeException(
|
||||
msg, name='multiple_rules')
|
||||
|
||||
rule.set_id(id_)
|
||||
rule.set_name(rule_name)
|
||||
rule.set_comment(comment or "")
|
||||
rule.set_original_str(str_rule)
|
||||
rules_to_insert.append(rule)
|
||||
changes = self._safe_process_policy_update(
|
||||
rule, policy_name, persistent=True)
|
||||
rules_to_insert, policy_name, persistent=True)
|
||||
|
||||
if len(changes) > 0:
|
||||
# remember the rule for possible undo
|
||||
rules_inserted = [
|
||||
change_event.formula for change_event in changes]
|
||||
|
||||
# remember the rule for insert into DB
|
||||
rules_to_persist = [{
|
||||
'original_str': change_event.formula.original_str,
|
||||
'id': str(change_event.formula.id),
|
||||
'comment': change_event.formula.comment,
|
||||
'name': change_event.formula.name}
|
||||
for change_event in changes]
|
||||
|
||||
# prepare return data based on rules inserted
|
||||
return_data = [{
|
||||
'rule': change_event.formula.pretty_str(),
|
||||
'id': str(change_event.formula.id),
|
||||
'comment': change_event.formula.comment,
|
||||
'name': change_event.formula.name}
|
||||
for change_event in changes]
|
||||
|
||||
# save rule to database if change actually happened.
|
||||
# Note: change produced may not be equivalent to original rule
|
||||
# because of column-reference elimination.
|
||||
if len(changes) > 0:
|
||||
d = {'rule': rule.pretty_str(),
|
||||
'id': str(rule.id),
|
||||
'comment': rule.comment,
|
||||
'name': rule.name}
|
||||
try:
|
||||
if len(rules_to_persist) == 0 and len(rules) > 0:
|
||||
# change not accepted means it was already there
|
||||
raise exception.PolicyRuntimeException(
|
||||
name='rule_already_exists')
|
||||
try:
|
||||
for d in rules_to_persist:
|
||||
# Note(thread-safety): blocking call
|
||||
db_policy_rules.add_policy_rule(
|
||||
d['id'], policy_name, str_rule, d['comment'],
|
||||
rule_name=d['name'], session=db_session)
|
||||
d['id'], policy_name, d['original_str'],
|
||||
d['comment'], rule_name=d['name'], session=db_session)
|
||||
# do not begin to avoid implicitly releasing table
|
||||
# lock due to starting new transaction
|
||||
return (d['id'], d)
|
||||
except Exception as db_exception:
|
||||
try:
|
||||
self._safe_process_policy_update(
|
||||
rule, policy_name, insert=False)
|
||||
raise exception.PolicyRuntimeException(
|
||||
"Error while writing to DB: %s."
|
||||
% str(db_exception))
|
||||
except Exception as change_exception:
|
||||
raise exception.PolicyRuntimeException(
|
||||
"Error thrown during recovery from DB error. "
|
||||
"Inconsistent state. DB error: %s. "
|
||||
"New error: %s." % (str(db_exception),
|
||||
str(change_exception)))
|
||||
|
||||
# change not accepted means it was already there
|
||||
raise exception.PolicyRuntimeException(
|
||||
name='rule_already_exists')
|
||||
success = True
|
||||
return return_data
|
||||
except Exception as db_exception:
|
||||
try:
|
||||
# un-insert all rules from engine unless all db inserts
|
||||
# succeeded
|
||||
# Note limitation: if an unexpected DB error is encountered
|
||||
# the rule insertions into policy engine are undone, but
|
||||
# may have already had effects on actions and query results
|
||||
uninsert_rules(rules_inserted)
|
||||
raise exception.PolicyRuntimeException(
|
||||
"Error while writing to DB: %s."
|
||||
% str(db_exception))
|
||||
except Exception as change_exception:
|
||||
raise exception.PolicyRuntimeException(
|
||||
"Error thrown during recovery from DB error. "
|
||||
"Inconsistent state. DB error: %s. "
|
||||
"New error: %s." % (str(db_exception),
|
||||
str(change_exception)))
|
||||
finally:
|
||||
# commit, unlock, and close db_session
|
||||
# commit/rollback, unlock, and close db_session
|
||||
if db_session:
|
||||
db_api.commit_unlock_tables(session=db_session)
|
||||
if success:
|
||||
db_api.commit_unlock_tables(session=db_session)
|
||||
else:
|
||||
db_api.rollback_unlock_tables(session=db_session)
|
||||
db_session.close()
|
||||
|
||||
# Note(thread-safety): blocking function
|
||||
|
@ -445,7 +494,7 @@ class Runtime (object):
|
|||
name='rule_not_exists',
|
||||
data='ID: %s, policy_name: %s' % (id_, policy_name))
|
||||
rule = self.parse1(item['rule'])
|
||||
self._safe_process_policy_update(rule, policy_name, insert=False)
|
||||
self._safe_process_policy_update([rule], policy_name, insert=False)
|
||||
# Note(thread-safety): blocking call
|
||||
db_policy_rules.delete_policy_rule(id_)
|
||||
return item
|
||||
|
@ -466,21 +515,20 @@ class Runtime (object):
|
|||
parsed_rule.set_comment(rule.comment)
|
||||
parsed_rule.set_original_str(rule.rule)
|
||||
self._safe_process_policy_update(
|
||||
parsed_rule,
|
||||
[parsed_rule],
|
||||
rule.policy_name)
|
||||
|
||||
def _safe_process_policy_update(self, parsed_rule, policy_name,
|
||||
def _safe_process_policy_update(self, parsed_rules, policy_name,
|
||||
insert=True, persistent=False):
|
||||
if policy_name not in self.theory:
|
||||
raise exception.PolicyRuntimeException(
|
||||
'Policy ID %s does not exist' % policy_name,
|
||||
name='policy_not_exist')
|
||||
event = compile.Event(
|
||||
formula=parsed_rule,
|
||||
insert=insert,
|
||||
target=policy_name)
|
||||
events = [compile.Event(
|
||||
formula=parsed_rule, insert=insert, target=policy_name)
|
||||
for parsed_rule in parsed_rules]
|
||||
(permitted, changes) = self.process_policy_update(
|
||||
[event], persistent=persistent)
|
||||
events, persistent=persistent)
|
||||
if not permitted:
|
||||
raise exception.PolicyException(
|
||||
";".join([str(x) for x in changes]),
|
||||
|
|
|
@ -265,6 +265,35 @@ class TestRuntime(base.TestCase):
|
|||
self.assertFalse(run.synchronizer.sync_one_policy.called)
|
||||
self.assertNotIn('test_policy', run.policy_names())
|
||||
|
||||
mock_db_policy_obj = lambda: None
|
||||
setattr(mock_db_policy_obj, 'name', 'test_policy')
|
||||
|
||||
@mock.patch.object(db_policy_rules, 'add_policy_rule')
|
||||
@mock.patch.object(db_policy_rules, 'policy_name', side_effect=lambda x: x)
|
||||
@mock.patch.object(
|
||||
db_policy_rules, 'get_policies', return_value=[mock_db_policy_obj])
|
||||
def test_persistent_insert_rules(
|
||||
self, mock_add, mock_policy_name, mock_get_policies):
|
||||
run = agnostic.Runtime()
|
||||
run.synchronizer = mock.MagicMock()
|
||||
run.create_policy('test_policy')
|
||||
|
||||
# test empty insert
|
||||
result = run.persistent_insert_rules('test_policy', [])
|
||||
self.assertEqual(len(result), 0)
|
||||
self.assertTrue(helper.datalog_equal(
|
||||
run.select('p(x)'), ''))
|
||||
|
||||
# test duplicated insert, 3 rules, 2 unique
|
||||
result = run.persistent_insert_rules(
|
||||
'test_policy',
|
||||
[{'str_rule': 'p(1)', 'rule_name': '', 'comment': ''},
|
||||
{'str_rule': 'p(2)', 'rule_name': '', 'comment': ''},
|
||||
{'str_rule': 'p(1)', 'rule_name': '', 'comment': ''}])
|
||||
self.assertEqual(len(result), 2)
|
||||
self.assertTrue(helper.datalog_equal(
|
||||
run.select('p(x)'), 'p(1) p(2)'))
|
||||
|
||||
def test_tablenames_theory_name(self):
|
||||
run = agnostic.Runtime()
|
||||
run.create_policy('test')
|
||||
|
@ -544,17 +573,39 @@ class TestTriggers(base.TestCase):
|
|||
run.insert('p(1)')
|
||||
self.assertEqual(0, obj.value)
|
||||
|
||||
def test_batch_change(self):
|
||||
def test_batch_change_succeed(self):
|
||||
obj = self.MyObject()
|
||||
run = agnostic.Runtime()
|
||||
run.create_policy('test')
|
||||
run.register_trigger('p', lambda tbl, old, new: obj.increment())
|
||||
p1 = compile.parse1('p(1)')
|
||||
result = run.update([compile.Event(p1, target='test')])
|
||||
p2 = compile.parse1('p(2)')
|
||||
p3 = compile.parse1('p(3)')
|
||||
result = run.update([compile.Event(p1, target='test'),
|
||||
compile.Event(p2, target='test'),
|
||||
compile.Event(p3, target='test')])
|
||||
self.assertTrue(result[0], ("Update failed with errors: " +
|
||||
";".join(str(x) for x in result[1])))
|
||||
# IMPORTANT: 3 tuples inserted into p in a single batch triggers once
|
||||
self.assertEqual(1, obj.value)
|
||||
|
||||
def test_batch_change_fail(self):
|
||||
obj = self.MyObject()
|
||||
run = agnostic.Runtime()
|
||||
run.create_policy('test')
|
||||
run.register_trigger('p', lambda tbl, old, new: obj.increment())
|
||||
p1 = compile.parse1('p(1)')
|
||||
p2 = compile.parse1('p(x) :- q(x)')
|
||||
p3 = compile.parse1('q(x) :- p(x)')
|
||||
result = run.update([compile.Event(p1, target='test'),
|
||||
compile.Event(p2, target='test'),
|
||||
compile.Event(p3, target='test')])
|
||||
self.assertFalse(result[0],
|
||||
("Update should have failed with recursion: " +
|
||||
";".join(str(x) for x in result[1])))
|
||||
# IMPORTANT: trigger not activated even though initial events succeed
|
||||
self.assertEqual(0, obj.value)
|
||||
|
||||
def test_dependency(self):
|
||||
obj = self.MyObject()
|
||||
run = agnostic.Runtime()
|
||||
|
@ -1513,6 +1564,19 @@ class TestActionExecution(base.TestCase):
|
|||
self.assertEqual(expected_args, args)
|
||||
self.assertEqual(expected_kwargs, kwargs)
|
||||
|
||||
def test_insert_multiple_rules(self):
|
||||
# basic test
|
||||
|
||||
# test recursion caused at nth rule
|
||||
|
||||
# test transactional trigger activation:
|
||||
# e.g.
|
||||
# a(1)
|
||||
# trigger(x) :- a(x), not b(x)
|
||||
# b(x) :- a(x)
|
||||
# trigger should not be activated
|
||||
pass
|
||||
|
||||
def test_disjunction(self):
|
||||
run = self.run
|
||||
run.create_policy('test')
|
||||
|
|
Loading…
Reference in New Issue