Merge "Enable to associate ports with default fwg for non-admin users"
This commit is contained in:
commit
e5f5c3f445
|
@ -48,13 +48,13 @@ class FirewallDefaultParameterExists(exceptions.InUse):
|
|||
Occurs when user creates/updates any existing firewall resource with
|
||||
reserved parameter names.
|
||||
"""
|
||||
message = _("Operation cannot be performed since '%(name)s' "
|
||||
"is a reserved name for %(resource_type)s.")
|
||||
message = ("Operation cannot be performed since '%(name)s' "
|
||||
"is a reserved name for %(resource_type)s.")
|
||||
|
||||
|
||||
class FirewallDefaultObjectUpdateRestricted(FirewallDefaultParameterExists):
|
||||
message = _("Operation cannot be performed on default object "
|
||||
"'%(resource_id)s' of type %(resource_type)s.")
|
||||
message = ("Operation cannot be performed on default object "
|
||||
"'%(resource_id)s' of type %(resource_type)s.")
|
||||
|
||||
|
||||
class HasName(object):
|
||||
|
@ -978,14 +978,18 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
self._ensure_not_default_resource(firewall_group, 'firewall_group')
|
||||
with context.session.begin(subtransactions=True):
|
||||
fwg_db = self.get_firewall_group(context, id)
|
||||
# XXX NOTE(ivasilevskaya) because delete_firewall_group method in
|
||||
# plugin calls DB's update_firewall_group when fw has rules
|
||||
# the check above is necessary for admin to be able to delete
|
||||
# default group. It's clumsy and should be improved I believe
|
||||
if (not context.is_admin or
|
||||
fwg.get("status", "") != nl_constants.PENDING_DELETE):
|
||||
self._ensure_not_default_resource(fwg_db,
|
||||
'firewall_group', action="update")
|
||||
if _is_default(fwg_db):
|
||||
attrs = [
|
||||
'name', 'description', 'admin_state_up',
|
||||
'ingress_firewall_policy_id', 'egress_firewall_policy_id'
|
||||
]
|
||||
if context.is_admin:
|
||||
attrs = ['name']
|
||||
for attr in attrs:
|
||||
if attr in fwg:
|
||||
raise FirewallDefaultObjectUpdateRestricted(
|
||||
resource_type='Firewall Group',
|
||||
resource_id=fwg_db['id'])
|
||||
self._validate_tenant_for_fwg_policies(context,
|
||||
fwg, fwg_db['tenant_id'])
|
||||
if 'ports' in fwg:
|
||||
|
@ -993,10 +997,12 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
self._delete_ports_in_firewall_group(context, id)
|
||||
self._set_ports_for_firewall_group(context, fwg_db, fwg)
|
||||
del fwg['ports']
|
||||
count = context.session.query(
|
||||
FirewallGroup).filter_by(id=id).update(fwg)
|
||||
if not count:
|
||||
raise f_exc.FirewallGroupNotFound(firewall_id=id)
|
||||
# If fwg is empty, skip updating
|
||||
if fwg:
|
||||
count = context.session.query(
|
||||
FirewallGroup).filter_by(id=id).update(fwg)
|
||||
if not count:
|
||||
raise f_exc.FirewallGroupNotFound(firewall_id=id)
|
||||
return self.get_firewall_group(context, id)
|
||||
|
||||
def update_firewall_group_status(self, context, id, status, not_in=None):
|
||||
|
@ -1017,9 +1023,6 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
# firewall is active
|
||||
LOG.debug("delete_firewall_group() called")
|
||||
|
||||
def _is_default(fwg_db):
|
||||
return fwg_db['name'] == const.DEFAULT_FWG
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
# if no such group exists -> don't raise an exception according to
|
||||
# 80fe2ba1, return None
|
||||
|
@ -1115,3 +1118,7 @@ class Firewall_db_mixin_v2(fw_ext.Firewallv2PluginBase, base_db.CommonDbMixin):
|
|||
self._set_ports_for_firewall_group(context, def_fwg_db, fwg_ports)
|
||||
return self._make_firewall_group_dict_with_rules(
|
||||
context, def_fwg_db['id'])
|
||||
|
||||
|
||||
def _is_default(fwg_db):
|
||||
return fwg_db['name'] == const.DEFAULT_FWG
|
||||
|
|
|
@ -38,7 +38,7 @@ FW_V2_OPTS = [
|
|||
cfg.BoolOpt(
|
||||
'auto_associate_default_firewall_group',
|
||||
default=False,
|
||||
help=_("Apply default fwg to all new VM ports within a project")),
|
||||
help=("Apply default fwg to all new VM ports within a project")),
|
||||
]
|
||||
cfg.CONF.register_opts(FW_V2_OPTS, 'fwaas')
|
||||
|
||||
|
|
|
@ -1380,35 +1380,101 @@ class TestFirewallDBPluginV2(FirewallPluginV2DbTestCase):
|
|||
egress_firewall_policy_id=None,
|
||||
expected_res_status=409)
|
||||
|
||||
def test_update_default_firewall_group(self):
|
||||
def test_update_default_firewall_group_with_non_admin_success(self):
|
||||
ctx = self._get_nonadmin_context()
|
||||
# try to rename a default group
|
||||
def_fwg_id = self._build_default_fwg(ctx=ctx)['id']
|
||||
data = {'firewall_group': {'name': 'not-default-anymore'}}
|
||||
req = self.new_update_request('firewall_groups', data, def_fwg_id,
|
||||
context=ctx)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(409, res.status_int)
|
||||
# create non default group and try to make it default by renaming
|
||||
with self.firewall_group(name="not-default-at-all",
|
||||
tenant_id=ctx.tenant_id,
|
||||
context=ctx) as not_default:
|
||||
data = {'firewall_group': {'name': constants.DEFAULT_FWG}}
|
||||
req = self.new_update_request('firewall_groups',
|
||||
data,
|
||||
not_default['firewall_group']['id'],
|
||||
context=ctx)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(409, res.status_int)
|
||||
with self.port(
|
||||
device_owner=nl_constants.DEVICE_OWNER_ROUTER_INTF,
|
||||
ctx=ctx) as dummy_port:
|
||||
port_id = dummy_port['port']['id']
|
||||
success_cases = [
|
||||
{'ports': [port_id]},
|
||||
{'ports': []},
|
||||
{'ports': None},
|
||||
{},
|
||||
]
|
||||
for attr in success_cases:
|
||||
data = {'firewall_group': attr}
|
||||
req = self.new_update_request(
|
||||
'firewall_groups', data, def_fwg_id)
|
||||
req.environ['neutron.context'] = ctx
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(200, res.status_int)
|
||||
|
||||
def test_update_default_firewall_group_with_non_admin(self):
|
||||
def test_update_default_firewall_group_with_non_admin_failure(self):
|
||||
ctx = self._get_nonadmin_context()
|
||||
def_fwg_id = self._build_default_fwg(ctx=ctx)['id']
|
||||
data = {'firewall_group': {'name': 'default'}}
|
||||
req = self.new_update_request('firewall_groups', data, def_fwg_id)
|
||||
req.environ['neutron.context'] = ctx
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(409, res.status_int)
|
||||
with self.port(
|
||||
device_owner=nl_constants.DEVICE_OWNER_ROUTER_INTF,
|
||||
ctx=ctx) as dummy_port:
|
||||
port_id = dummy_port['port']['id']
|
||||
conflict_cases = [
|
||||
{'name': ''},
|
||||
{'name': 'default'},
|
||||
{'name': 'non-default'},
|
||||
{'ingress_firewall_policy_id': None},
|
||||
{'egress_firewall_policy_id': None},
|
||||
{'description': 'try to modify'},
|
||||
{'admin_state_up': True},
|
||||
{'ports': [port_id], 'name': ''},
|
||||
{'ports': [], 'name': 'default'},
|
||||
{'ports': None, 'name': 'non-default'},
|
||||
]
|
||||
for attr in conflict_cases:
|
||||
data = {'firewall_group': attr}
|
||||
req = self.new_update_request(
|
||||
'firewall_groups', data, def_fwg_id)
|
||||
req.environ['neutron.context'] = ctx
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(409, res.status_int)
|
||||
|
||||
def test_update_default_firewall_group_with_admin_success(self):
|
||||
ctx = self._get_admin_context()
|
||||
with self.port(
|
||||
device_owner=nl_constants.DEVICE_OWNER_ROUTER_INTF,
|
||||
ctx=ctx) as dummy_port:
|
||||
port_id = dummy_port['port']['id']
|
||||
def_fwg_id = self._build_default_fwg(ctx=ctx)['id']
|
||||
success_cases = [
|
||||
{'ports': [port_id]},
|
||||
{'ports': []},
|
||||
{'ports': None},
|
||||
{'ingress_firewall_policy_id': None},
|
||||
{'egress_firewall_policy_id': None},
|
||||
{'description': 'try to modify'},
|
||||
{'admin_state_up': True},
|
||||
{},
|
||||
]
|
||||
for attr in success_cases:
|
||||
data = {'firewall_group': attr}
|
||||
req = self.new_update_request(
|
||||
'firewall_groups', data, def_fwg_id)
|
||||
req.environ['neutron.context'] = ctx
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(200, res.status_int)
|
||||
|
||||
def test_update_default_firewall_group_with_admin_failure(self):
|
||||
ctx = self._get_admin_context()
|
||||
with self.port(
|
||||
device_owner=nl_constants.DEVICE_OWNER_ROUTER_INTF,
|
||||
ctx=ctx) as dummy_port:
|
||||
port_id = dummy_port['port']['id']
|
||||
def_fwg_id = self._build_default_fwg(ctx=ctx)['id']
|
||||
conflict_cases = [
|
||||
{'name': 'default'},
|
||||
{'name': 'non-default'},
|
||||
{'name': ''},
|
||||
{'ports': [port_id], 'name': ''},
|
||||
{'ports': [], 'name': 'default'},
|
||||
{'ports': None, 'name': 'non-default'},
|
||||
]
|
||||
for attr in conflict_cases:
|
||||
data = {'firewall_group': attr}
|
||||
req = self.new_update_request(
|
||||
'firewall_groups', data, def_fwg_id)
|
||||
req.environ['neutron.context'] = ctx
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(409, res.status_int)
|
||||
|
||||
def test_update_firewall_group_with_fwp(self):
|
||||
ctx = self._get_nonadmin_context()
|
||||
|
|
Loading…
Reference in New Issue