Merge "Delete SG rules when deleting their remote group"
This commit is contained in:
commit
43113c0300
|
@ -446,9 +446,26 @@ def _validate_network_has_subnet(resource, event, trigger, **kwargs):
|
|||
raise n_exc.InvalidInput(error_message=msg)
|
||||
|
||||
|
||||
def _delete_sg_group_related_rules(resource, event, trigger, **kwargs):
|
||||
"""Upon SG deletion, call the explicit delete method for rules with that
|
||||
SG as the remote one.
|
||||
Otherwise those will be deleted with on_delete cascade, leaving the NSX
|
||||
backend unaware.
|
||||
"""
|
||||
sg_id = kwargs["security_group"]["id"]
|
||||
context = kwargs["context"]
|
||||
core_plugin = directory.get_plugin()
|
||||
filters = {'remote_group_id': [sg_id]}
|
||||
rules = core_plugin.get_security_group_rules(context, filters=filters)
|
||||
for rule in rules:
|
||||
core_plugin.delete_security_group_rule(context, rule["id"])
|
||||
|
||||
|
||||
def subscribe():
|
||||
registry.subscribe(_validate_network_has_subnet,
|
||||
resources.ROUTER_GATEWAY, events.BEFORE_CREATE)
|
||||
registry.subscribe(_delete_sg_group_related_rules,
|
||||
resources.SECURITY_GROUP, events.PRECOMMIT_DELETE)
|
||||
|
||||
|
||||
subscribe()
|
||||
|
|
|
@ -46,6 +46,7 @@ from neutron_lib.plugins import directory
|
|||
|
||||
from vmware_nsx.common import utils
|
||||
from vmware_nsx.extensions import providersecuritygroup as provider_sg
|
||||
from vmware_nsx.plugins.common import plugin as com_plugin
|
||||
from vmware_nsx.plugins.nsx_p import plugin as nsx_plugin
|
||||
from vmware_nsx.tests import unit as vmware
|
||||
from vmware_nsx.tests.unit.common_plugin import common_v3
|
||||
|
@ -1296,6 +1297,41 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
|
|||
scope=scope,
|
||||
logged=False)
|
||||
|
||||
def test_create_security_group_rule_with_remote_group(self):
|
||||
with self.security_group() as sg1, self.security_group() as sg2:
|
||||
security_group_id = sg1['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_group_id = sg2['security_group']['id']
|
||||
protocol = "tcp"
|
||||
keys = [('remote_group_id', remote_group_id),
|
||||
('security_group_id', security_group_id),
|
||||
('direction', direction),
|
||||
('protocol', protocol)]
|
||||
with self.security_group_rule(
|
||||
security_group_id, direction=direction, protocol=protocol,
|
||||
remote_group_id=remote_group_id) as rule:
|
||||
for k, v, in keys:
|
||||
self.assertEqual(rule['security_group_rule'][k], v)
|
||||
|
||||
def test_delete_security_group_rule_with_remote_group(self):
|
||||
com_plugin.subscribe()
|
||||
with self.security_group() as sg1, self.security_group() as sg2:
|
||||
security_group_id = sg1['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_group_id = sg2['security_group']['id']
|
||||
protocol = "tcp"
|
||||
with self.security_group_rule(
|
||||
security_group_id, direction=direction, protocol=protocol,
|
||||
remote_group_id=remote_group_id) as rule,\
|
||||
mock.patch.object(
|
||||
self.plugin, "delete_security_group_rule") as del_rule:
|
||||
# delete sg2
|
||||
self._delete('security-groups', remote_group_id,
|
||||
exc.HTTPNoContent.code)
|
||||
# verify the rule was deleted
|
||||
del_rule.assert_called_once_with(
|
||||
mock.ANY, rule["security_group_rule"]["id"])
|
||||
|
||||
|
||||
class NsxPTestL3ExtensionManager(object):
|
||||
|
||||
|
|
|
@ -79,6 +79,7 @@ from vmware_nsx.extensions import projectpluginmap
|
|||
from vmware_nsx.extensions import routersize as router_size
|
||||
from vmware_nsx.extensions import routertype as router_type
|
||||
from vmware_nsx.extensions import vnicindex as ext_vnic_idx
|
||||
from vmware_nsx.plugins.common import plugin as com_plugin
|
||||
from vmware_nsx.plugins.nsx_v import availability_zones as nsx_az
|
||||
from vmware_nsx.plugins.nsx_v.drivers import (
|
||||
distributed_router_driver as dist_router_driver)
|
||||
|
@ -4078,6 +4079,41 @@ class NsxVTestSecurityGroup(ext_sg.TestSecurityGroups,
|
|||
def test_create_security_group_rule_protocol_as_number_with_port(self):
|
||||
self.skipTest('not supported')
|
||||
|
||||
def test_create_security_group_rule_with_remote_group(self):
|
||||
with self.security_group() as sg1, self.security_group() as sg2:
|
||||
security_group_id = sg1['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_group_id = sg2['security_group']['id']
|
||||
protocol = "tcp"
|
||||
keys = [('remote_group_id', remote_group_id),
|
||||
('security_group_id', security_group_id),
|
||||
('direction', direction),
|
||||
('protocol', protocol)]
|
||||
with self.security_group_rule(
|
||||
security_group_id, direction=direction, protocol=protocol,
|
||||
remote_group_id=remote_group_id) as rule:
|
||||
for k, v, in keys:
|
||||
self.assertEqual(rule['security_group_rule'][k], v)
|
||||
|
||||
def test_delete_security_group_rule_with_remote_group(self):
|
||||
com_plugin.subscribe()
|
||||
with self.security_group() as sg1, self.security_group() as sg2:
|
||||
security_group_id = sg1['security_group']['id']
|
||||
direction = "ingress"
|
||||
remote_group_id = sg2['security_group']['id']
|
||||
protocol = "tcp"
|
||||
with self.security_group_rule(
|
||||
security_group_id, direction=direction, protocol=protocol,
|
||||
remote_group_id=remote_group_id) as rule,\
|
||||
mock.patch.object(
|
||||
self.plugin, "delete_security_group_rule") as del_rule:
|
||||
# delete sg2
|
||||
self._delete('security-groups', remote_group_id,
|
||||
webob.exc.HTTPNoContent.code)
|
||||
# verify the rule was deleted
|
||||
del_rule.assert_called_once_with(
|
||||
mock.ANY, rule["security_group_rule"]["id"])
|
||||
|
||||
|
||||
class TestVdrTestCase(L3NatTest, L3NatTestCaseBase,
|
||||
test_l3_plugin.L3NatDBIntTestCase,
|
||||
|
|
Loading…
Reference in New Issue