Merge "Enable to associate ports with default fwg for non-admin users"

This commit is contained in:
Zuul 2018-01-24 18:20:44 +00:00 committed by Gerrit Code Review
commit e5f5c3f445
3 changed files with 117 additions and 44 deletions

View File

@ -48,13 +48,13 @@ class FirewallDefaultParameterExists(exceptions.InUse):
Occurs when user creates/updates any existing firewall resource with
reserved parameter names.
"""
message = _("Operation cannot be performed since '%(name)s' "
"is a reserved name for %(resource_type)s.")
message = ("Operation cannot be performed since '%(name)s' "
"is a reserved name for %(resource_type)s.")
class FirewallDefaultObjectUpdateRestricted(FirewallDefaultParameterExists):
message = _("Operation cannot be performed on default object "
"'%(resource_id)s' of type %(resource_type)s.")
message = ("Operation cannot be performed on default object "
"'%(resource_id)s' of type %(resource_type)s.")
class HasName(object):
@ -978,14 +978,18 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
self._ensure_not_default_resource(firewall_group, 'firewall_group')
with context.session.begin(subtransactions=True):
fwg_db = self.get_firewall_group(context, id)
# XXX NOTE(ivasilevskaya) because delete_firewall_group method in
# plugin calls DB's update_firewall_group when fw has rules
# the check above is necessary for admin to be able to delete
# default group. It's clumsy and should be improved I believe
if (not context.is_admin or
fwg.get("status", "") != nl_constants.PENDING_DELETE):
self._ensure_not_default_resource(fwg_db,
'firewall_group', action="update")
if _is_default(fwg_db):
attrs = [
'name', 'description', 'admin_state_up',
'ingress_firewall_policy_id', 'egress_firewall_policy_id'
]
if context.is_admin:
attrs = ['name']
for attr in attrs:
if attr in fwg:
raise FirewallDefaultObjectUpdateRestricted(
resource_type='Firewall Group',
resource_id=fwg_db['id'])
self._validate_tenant_for_fwg_policies(context,
fwg, fwg_db['tenant_id'])
if 'ports' in fwg:
@ -993,10 +997,12 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
self._delete_ports_in_firewall_group(context, id)
self._set_ports_for_firewall_group(context, fwg_db, fwg)
del fwg['ports']
count = context.session.query(
FirewallGroup).filter_by(id=id).update(fwg)
if not count:
raise f_exc.FirewallGroupNotFound(firewall_id=id)
# If fwg is empty, skip updating
if fwg:
count = context.session.query(
FirewallGroup).filter_by(id=id).update(fwg)
if not count:
raise f_exc.FirewallGroupNotFound(firewall_id=id)
return self.get_firewall_group(context, id)
def update_firewall_group_status(self, context, id, status, not_in=None):
@ -1017,9 +1023,6 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
# firewall is active
LOG.debug("delete_firewall_group() called")
def _is_default(fwg_db):
return fwg_db['name'] == const.DEFAULT_FWG
with context.session.begin(subtransactions=True):
# if no such group exists -> don't raise an exception according to
# 80fe2ba1, return None
@ -1115,3 +1118,7 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
self._set_ports_for_firewall_group(context, def_fwg_db, fwg_ports)
return self._make_firewall_group_dict_with_rules(
context, def_fwg_db['id'])
def _is_default(fwg_db):
return fwg_db['name'] == const.DEFAULT_FWG

View File

@ -38,7 +38,7 @@ FW_V2_OPTS = [
cfg.BoolOpt(
'auto_associate_default_firewall_group',
default=False,
help=_("Apply default fwg to all new VM ports within a project")),
help=("Apply default fwg to all new VM ports within a project")),
]
cfg.CONF.register_opts(FW_V2_OPTS, 'fwaas')

View File

@ -1380,35 +1380,101 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
egress_firewall_policy_id=None,
expected_res_status=409)
def test_update_default_firewall_group(self):
def test_update_default_firewall_group_with_non_admin_success(self):
ctx = self._get_nonadmin_context()
# try to rename a default group
def_fwg_id = self._build_default_fwg(ctx=ctx)['id']
data = {'firewall_group': {'name': 'not-default-anymore'}}
req = self.new_update_request('firewall_groups', data, def_fwg_id,
context=ctx)
res = req.get_response(self.ext_api)
self.assertEqual(409, res.status_int)
# create non default group and try to make it default by renaming
with self.firewall_group(name="not-default-at-all",
tenant_id=ctx.tenant_id,
context=ctx) as not_default:
data = {'firewall_group': {'name': constants.DEFAULT_FWG}}
req = self.new_update_request('firewall_groups',
data,
not_default['firewall_group']['id'],
context=ctx)
res = req.get_response(self.ext_api)
self.assertEqual(409, res.status_int)
with self.port(
device_owner=nl_constants.DEVICE_OWNER_ROUTER_INTF,
ctx=ctx) as dummy_port:
port_id = dummy_port['port']['id']
success_cases = [
{'ports': [port_id]},
{'ports': []},
{'ports': None},
{},
]
for attr in success_cases:
data = {'firewall_group': attr}
req = self.new_update_request(
'firewall_groups', data, def_fwg_id)
req.environ['neutron.context'] = ctx
res = req.get_response(self.ext_api)
self.assertEqual(200, res.status_int)
def test_update_default_firewall_group_with_non_admin(self):
def test_update_default_firewall_group_with_non_admin_failure(self):
ctx = self._get_nonadmin_context()
def_fwg_id = self._build_default_fwg(ctx=ctx)['id']
data = {'firewall_group': {'name': 'default'}}
req = self.new_update_request('firewall_groups', data, def_fwg_id)
req.environ['neutron.context'] = ctx
res = req.get_response(self.ext_api)
self.assertEqual(409, res.status_int)
with self.port(
device_owner=nl_constants.DEVICE_OWNER_ROUTER_INTF,
ctx=ctx) as dummy_port:
port_id = dummy_port['port']['id']
conflict_cases = [
{'name': ''},
{'name': 'default'},
{'name': 'non-default'},
{'ingress_firewall_policy_id': None},
{'egress_firewall_policy_id': None},
{'description': 'try to modify'},
{'admin_state_up': True},
{'ports': [port_id], 'name': ''},
{'ports': [], 'name': 'default'},
{'ports': None, 'name': 'non-default'},
]
for attr in conflict_cases:
data = {'firewall_group': attr}
req = self.new_update_request(
'firewall_groups', data, def_fwg_id)
req.environ['neutron.context'] = ctx
res = req.get_response(self.ext_api)
self.assertEqual(409, res.status_int)
def test_update_default_firewall_group_with_admin_success(self):
ctx = self._get_admin_context()
with self.port(
device_owner=nl_constants.DEVICE_OWNER_ROUTER_INTF,
ctx=ctx) as dummy_port:
port_id = dummy_port['port']['id']
def_fwg_id = self._build_default_fwg(ctx=ctx)['id']
success_cases = [
{'ports': [port_id]},
{'ports': []},
{'ports': None},
{'ingress_firewall_policy_id': None},
{'egress_firewall_policy_id': None},
{'description': 'try to modify'},
{'admin_state_up': True},
{},
]
for attr in success_cases:
data = {'firewall_group': attr}
req = self.new_update_request(
'firewall_groups', data, def_fwg_id)
req.environ['neutron.context'] = ctx
res = req.get_response(self.ext_api)
self.assertEqual(200, res.status_int)
def test_update_default_firewall_group_with_admin_failure(self):
ctx = self._get_admin_context()
with self.port(
device_owner=nl_constants.DEVICE_OWNER_ROUTER_INTF,
ctx=ctx) as dummy_port:
port_id = dummy_port['port']['id']
def_fwg_id = self._build_default_fwg(ctx=ctx)['id']
conflict_cases = [
{'name': 'default'},
{'name': 'non-default'},
{'name': ''},
{'ports': [port_id], 'name': ''},
{'ports': [], 'name': 'default'},
{'ports': None, 'name': 'non-default'},
]
for attr in conflict_cases:
data = {'firewall_group': attr}
req = self.new_update_request(
'firewall_groups', data, def_fwg_id)
req.environ['neutron.context'] = ctx
res = req.get_response(self.ext_api)
self.assertEqual(409, res.status_int)
def test_update_firewall_group_with_fwp(self):
ctx = self._get_nonadmin_context()