Merge "shared policy shouldn't have unshared rules"

This commit is contained in:
Jenkins 2014-09-15 15:09:33 +00:00 committed by Gerrit Code Review
commit 8244413531
2 changed files with 70 additions and 6 deletions

View File

@ -164,7 +164,8 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
'enabled': firewall_rule['enabled']}
return self._fields(res, fields)
def _set_rules_for_policy(self, context, firewall_policy_db, rule_id_list):
def _set_rules_for_policy(self, context, firewall_policy_db, fwp):
rule_id_list = fwp['firewall_rules']
fwp_db = firewall_policy_db
with context.session.begin(subtransactions=True):
if not rule_id_list:
@ -188,6 +189,15 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
fwp_db['id']):
raise firewall.FirewallRuleInUse(
firewall_rule_id=fwrule_id)
if 'shared' in fwp:
if fwp['shared'] and not rules_dict[fwrule_id]['shared']:
raise firewall.FirewallRuleSharingConflict(
firewall_rule_id=fwrule_id,
firewall_policy_id=fwp_db['id'])
elif fwp_db['shared'] and not rules_dict[fwrule_id]['shared']:
raise firewall.FirewallRuleSharingConflict(
firewall_rule_id=fwrule_id,
firewall_policy_id=fwp_db['id'])
# New list of rules is valid so we will first reset the existing
# list and then add each rule in order.
# Note that the list could be empty in which case we interpret
@ -198,6 +208,15 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
fwp_db.firewall_rules.reorder()
fwp_db.audited = False
def _check_unshared_rules_for_policy(self, fwp_db, fwp):
if fwp['shared']:
rules_in_db = fwp_db['firewall_rules']
for fwr_db in rules_in_db:
if not fwr_db['shared']:
raise firewall.FirewallPolicySharingConflict(
firewall_rule_id=fwr_db['id'],
firewall_policy_id=fwp_db['id'])
def _process_rule_for_policy(self, context, firewall_policy_id,
firewall_rule_db, position):
with context.session.begin(subtransactions=True):
@ -303,8 +322,7 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
description=fwp['description'],
shared=fwp['shared'])
context.session.add(fwp_db)
self._set_rules_for_policy(context, fwp_db,
fwp['firewall_rules'])
self._set_rules_for_policy(context, fwp_db, fwp)
fwp_db.audited = fwp['audited']
return self._make_firewall_policy_dict(fwp_db)
@ -313,9 +331,11 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin):
fwp = firewall_policy['firewall_policy']
with context.session.begin(subtransactions=True):
fwp_db = self._get_firewall_policy(context, id)
if 'firewall_rules' in fwp:
self._set_rules_for_policy(context, fwp_db,
fwp['firewall_rules'])
# check any existing rules are not shared
if 'shared' in fwp and 'firewall_rules' not in fwp:
self._check_unshared_rules_for_policy(fwp_db, fwp)
elif 'firewall_rules' in fwp:
self._set_rules_for_policy(context, fwp_db, fwp)
del fwp['firewall_rules']
if 'audited' not in fwp or fwp['audited']:
fwp['audited'] = False

View File

@ -347,6 +347,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
audited=AUDITED)
self.assertEqual(res.status_int, 409)
def test_create_shared_firewall_policy_with_unshared_rule(self):
with self.firewall_rule(shared=False) as fwr:
fw_rule_ids = [fwr['firewall_rule']['id']]
res = self._create_firewall_policy(
None, 'firewall_policy1', description=DESCRIPTION, shared=True,
firewall_rules=fw_rule_ids, audited=AUDITED)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_show_firewall_policy(self):
name = "firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
@ -520,6 +528,42 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_policy'][k], v)
def test_update_shared_firewall_policy_with_unshared_rule(self):
with self.firewall_rule(name='fwr1', shared=False) as fr:
with self.firewall_policy() as fwp:
fw_rule_ids = [fr['firewall_rule']['id']]
# update shared policy with unshared rule
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_update_firewall_policy_with_shared_attr_unshared_rule(self):
with self.firewall_rule(name='fwr1', shared=False) as fr:
with self.firewall_policy(shared=False) as fwp:
fw_rule_ids = [fr['firewall_rule']['id']]
# update shared policy with shared attr and unshared rule
data = {'firewall_policy': {'shared': True,
'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_update_firewall_policy_with_shared_attr_exist_unshare_rule(self):
with self.firewall_rule(name='fwr1', shared=False) as fr:
fw_rule_ids = [fr['firewall_rule']['id']]
with self.firewall_policy(shared=False,
firewall_rules=fw_rule_ids) as fwp:
# update policy with shared attr
data = {'firewall_policy': {'shared': True}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_delete_firewall_policy(self):
ctx = context.get_admin_context()
with self.firewall_policy(do_delete=False) as fwp: