From 8c2d47c0767b934eea1728a7bcb7d4410b8c5b5f Mon Sep 17 00:00:00 2001 From: Koteswara Rao Kelam Date: Thu, 21 Aug 2014 02:33:57 -0700 Subject: [PATCH] shared policy shouldn't have unshared rules A shared firewall policy should always have shared rules. So the following cases should not be allowed: 1.Create shared policy with unshared rules 2.Update policy shared=True when it has unshared rules 3.Update policy with shared=True and unshared rules 4.Update shared policy with unshared rules Change-Id: I3d71899c328d3fefa96c1f99d6ba706160e445cc Closes-bug: 1334981 --- neutron/db/firewall/firewall_db.py | 32 +++++++++++--- .../unit/db/firewall/test_db_firewall.py | 44 +++++++++++++++++++ 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/neutron/db/firewall/firewall_db.py b/neutron/db/firewall/firewall_db.py index 1a1bafee6..8704c76a5 100644 --- a/neutron/db/firewall/firewall_db.py +++ b/neutron/db/firewall/firewall_db.py @@ -164,7 +164,8 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin): 'enabled': firewall_rule['enabled']} return self._fields(res, fields) - def _set_rules_for_policy(self, context, firewall_policy_db, rule_id_list): + def _set_rules_for_policy(self, context, firewall_policy_db, fwp): + rule_id_list = fwp['firewall_rules'] fwp_db = firewall_policy_db with context.session.begin(subtransactions=True): if not rule_id_list: @@ -188,6 +189,15 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin): fwp_db['id']): raise firewall.FirewallRuleInUse( firewall_rule_id=fwrule_id) + if 'shared' in fwp: + if fwp['shared'] and not rules_dict[fwrule_id]['shared']: + raise firewall.FirewallRuleSharingConflict( + firewall_rule_id=fwrule_id, + firewall_policy_id=fwp_db['id']) + elif fwp_db['shared'] and not rules_dict[fwrule_id]['shared']: + raise firewall.FirewallRuleSharingConflict( + firewall_rule_id=fwrule_id, + firewall_policy_id=fwp_db['id']) # New list of rules is valid so we will first reset the existing # list and then add each rule in order. # Note that the list could be empty in which case we interpret @@ -198,6 +208,15 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin): fwp_db.firewall_rules.reorder() fwp_db.audited = False + def _check_unshared_rules_for_policy(self, fwp_db, fwp): + if fwp['shared']: + rules_in_db = fwp_db['firewall_rules'] + for fwr_db in rules_in_db: + if not fwr_db['shared']: + raise firewall.FirewallPolicySharingConflict( + firewall_rule_id=fwr_db['id'], + firewall_policy_id=fwp_db['id']) + def _process_rule_for_policy(self, context, firewall_policy_id, firewall_rule_db, position): with context.session.begin(subtransactions=True): @@ -304,8 +323,7 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin): description=fwp['description'], shared=fwp['shared']) context.session.add(fwp_db) - self._set_rules_for_policy(context, fwp_db, - fwp['firewall_rules']) + self._set_rules_for_policy(context, fwp_db, fwp) fwp_db.audited = fwp['audited'] return self._make_firewall_policy_dict(fwp_db) @@ -314,9 +332,11 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin): fwp = firewall_policy['firewall_policy'] with context.session.begin(subtransactions=True): fwp_db = self._get_firewall_policy(context, id) - if 'firewall_rules' in fwp: - self._set_rules_for_policy(context, fwp_db, - fwp['firewall_rules']) + # check any existing rules are not shared + if 'shared' in fwp and 'firewall_rules' not in fwp: + self._check_unshared_rules_for_policy(fwp_db, fwp) + elif 'firewall_rules' in fwp: + self._set_rules_for_policy(context, fwp_db, fwp) del fwp['firewall_rules'] if 'audited' not in fwp or fwp['audited']: fwp['audited'] = False diff --git a/neutron/tests/unit/db/firewall/test_db_firewall.py b/neutron/tests/unit/db/firewall/test_db_firewall.py index 5e27f5fc4..a4d29450d 100644 --- a/neutron/tests/unit/db/firewall/test_db_firewall.py +++ b/neutron/tests/unit/db/firewall/test_db_firewall.py @@ -347,6 +347,14 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): audited=AUDITED) self.assertEqual(res.status_int, 409) + def test_create_shared_firewall_policy_with_unshared_rule(self): + with self.firewall_rule(shared=False) as fwr: + fw_rule_ids = [fwr['firewall_rule']['id']] + res = self._create_firewall_policy( + None, 'firewall_policy1', description=DESCRIPTION, shared=True, + firewall_rules=fw_rule_ids, audited=AUDITED) + self.assertEqual(webob.exc.HTTPConflict.code, res.status_int) + def test_show_firewall_policy(self): name = "firewall_policy1" attrs = self._get_test_firewall_policy_attrs(name) @@ -520,6 +528,42 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): for k, v in attrs.iteritems(): self.assertEqual(res['firewall_policy'][k], v) + def test_update_shared_firewall_policy_with_unshared_rule(self): + with self.firewall_rule(name='fwr1', shared=False) as fr: + with self.firewall_policy() as fwp: + fw_rule_ids = [fr['firewall_rule']['id']] + # update shared policy with unshared rule + data = {'firewall_policy': + {'firewall_rules': fw_rule_ids}} + req = self.new_update_request('firewall_policies', data, + fwp['firewall_policy']['id']) + res = req.get_response(self.ext_api) + self.assertEqual(webob.exc.HTTPConflict.code, res.status_int) + + def test_update_firewall_policy_with_shared_attr_unshared_rule(self): + with self.firewall_rule(name='fwr1', shared=False) as fr: + with self.firewall_policy(shared=False) as fwp: + fw_rule_ids = [fr['firewall_rule']['id']] + # update shared policy with shared attr and unshared rule + data = {'firewall_policy': {'shared': True, + 'firewall_rules': fw_rule_ids}} + req = self.new_update_request('firewall_policies', data, + fwp['firewall_policy']['id']) + res = req.get_response(self.ext_api) + self.assertEqual(webob.exc.HTTPConflict.code, res.status_int) + + def test_update_firewall_policy_with_shared_attr_exist_unshare_rule(self): + with self.firewall_rule(name='fwr1', shared=False) as fr: + fw_rule_ids = [fr['firewall_rule']['id']] + with self.firewall_policy(shared=False, + firewall_rules=fw_rule_ids) as fwp: + # update policy with shared attr + data = {'firewall_policy': {'shared': True}} + req = self.new_update_request('firewall_policies', data, + fwp['firewall_policy']['id']) + res = req.get_response(self.ext_api) + self.assertEqual(webob.exc.HTTPConflict.code, res.status_int) + def test_delete_firewall_policy(self): ctx = context.get_admin_context() with self.firewall_policy(do_delete=False) as fwp: