Merge "Delete SG rules when deleting their remote group"

This commit is contained in:
Zuul 2019-04-15 16:30:51 +00:00 committed by Gerrit Code Review
commit 43113c0300
3 changed files with 89 additions and 0 deletions

View File

@ -446,9 +446,26 @@ def _validate_network_has_subnet(resource, event, trigger, **kwargs):
raise n_exc.InvalidInput(error_message=msg)
def _delete_sg_group_related_rules(resource, event, trigger, **kwargs):
"""Upon SG deletion, call the explicit delete method for rules with that
SG as the remote one.
Otherwise those will be deleted with on_delete cascade, leaving the NSX
backend unaware.
"""
sg_id = kwargs["security_group"]["id"]
context = kwargs["context"]
core_plugin = directory.get_plugin()
filters = {'remote_group_id': [sg_id]}
rules = core_plugin.get_security_group_rules(context, filters=filters)
for rule in rules:
core_plugin.delete_security_group_rule(context, rule["id"])
def subscribe():
registry.subscribe(_validate_network_has_subnet,
resources.ROUTER_GATEWAY, events.BEFORE_CREATE)
registry.subscribe(_delete_sg_group_related_rules,
resources.SECURITY_GROUP, events.PRECOMMIT_DELETE)
subscribe()

View File

@ -46,6 +46,7 @@ from neutron_lib.plugins import directory
from vmware_nsx.common import utils
from vmware_nsx.extensions import providersecuritygroup as provider_sg
from vmware_nsx.plugins.common import plugin as com_plugin
from vmware_nsx.plugins.nsx_p import plugin as nsx_plugin
from vmware_nsx.tests import unit as vmware
from vmware_nsx.tests.unit.common_plugin import common_v3
@ -1296,6 +1297,41 @@ class NsxPTestSecurityGroup(common_v3.FixExternalNetBaseTest,
scope=scope,
logged=False)
def test_create_security_group_rule_with_remote_group(self):
with self.security_group() as sg1, self.security_group() as sg2:
security_group_id = sg1['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = "tcp"
keys = [('remote_group_id', remote_group_id),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol)]
with self.security_group_rule(
security_group_id, direction=direction, protocol=protocol,
remote_group_id=remote_group_id) as rule:
for k, v, in keys:
self.assertEqual(rule['security_group_rule'][k], v)
def test_delete_security_group_rule_with_remote_group(self):
com_plugin.subscribe()
with self.security_group() as sg1, self.security_group() as sg2:
security_group_id = sg1['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = "tcp"
with self.security_group_rule(
security_group_id, direction=direction, protocol=protocol,
remote_group_id=remote_group_id) as rule,\
mock.patch.object(
self.plugin, "delete_security_group_rule") as del_rule:
# delete sg2
self._delete('security-groups', remote_group_id,
exc.HTTPNoContent.code)
# verify the rule was deleted
del_rule.assert_called_once_with(
mock.ANY, rule["security_group_rule"]["id"])
class NsxPTestL3ExtensionManager(object):

View File

@ -79,6 +79,7 @@ from vmware_nsx.extensions import projectpluginmap
from vmware_nsx.extensions import routersize as router_size
from vmware_nsx.extensions import routertype as router_type
from vmware_nsx.extensions import vnicindex as ext_vnic_idx
from vmware_nsx.plugins.common import plugin as com_plugin
from vmware_nsx.plugins.nsx_v import availability_zones as nsx_az
from vmware_nsx.plugins.nsx_v.drivers import (
distributed_router_driver as dist_router_driver)
@ -4078,6 +4079,41 @@ class NsxVTestSecurityGroup(ext_sg.TestSecurityGroups,
def test_create_security_group_rule_protocol_as_number_with_port(self):
self.skipTest('not supported')
def test_create_security_group_rule_with_remote_group(self):
with self.security_group() as sg1, self.security_group() as sg2:
security_group_id = sg1['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = "tcp"
keys = [('remote_group_id', remote_group_id),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol)]
with self.security_group_rule(
security_group_id, direction=direction, protocol=protocol,
remote_group_id=remote_group_id) as rule:
for k, v, in keys:
self.assertEqual(rule['security_group_rule'][k], v)
def test_delete_security_group_rule_with_remote_group(self):
com_plugin.subscribe()
with self.security_group() as sg1, self.security_group() as sg2:
security_group_id = sg1['security_group']['id']
direction = "ingress"
remote_group_id = sg2['security_group']['id']
protocol = "tcp"
with self.security_group_rule(
security_group_id, direction=direction, protocol=protocol,
remote_group_id=remote_group_id) as rule,\
mock.patch.object(
self.plugin, "delete_security_group_rule") as del_rule:
# delete sg2
self._delete('security-groups', remote_group_id,
webob.exc.HTTPNoContent.code)
# verify the rule was deleted
del_rule.assert_called_once_with(
mock.ANY, rule["security_group_rule"]["id"])
class TestVdrTestCase(L3NatTest, L3NatTestCaseBase,
test_l3_plugin.L3NatDBIntTestCase,