Default SG rules - use new rules templates to create rules for SGs

Default SG rules created as template in the Neutron DB are now used to
create security group rules for each new default and non-default SG
created in Neutron.

Closes-bug: #1983053
Change-Id: Iaf27deb955c3844409fcd36239511478e9607a82
This commit is contained in:
Slawek Kaplonski 2023-05-26 12:01:31 +02:00
parent 78bc33d300
commit a4c8392209
10 changed files with 218 additions and 60 deletions

View File

@ -120,24 +120,8 @@ class SecurityGroupDbMixin(
name=s['name'], is_default=default_sg, stateful=stateful)
sg.create()
for ethertype in ext_sg.sg_supported_ethertypes:
if default_sg:
# Allow intercommunication
ingress_rule = sg_obj.SecurityGroupRule(
context, id=uuidutils.generate_uuid(),
project_id=tenant_id, security_group_id=sg.id,
direction='ingress', ethertype=ethertype,
remote_group_id=sg.id)
ingress_rule.create()
sg.rules.append(ingress_rule)
egress_rule = sg_obj.SecurityGroupRule(
context, id=uuidutils.generate_uuid(),
project_id=tenant_id, security_group_id=sg.id,
direction='egress', ethertype=ethertype)
egress_rule.create()
sg.rules.append(egress_rule)
sg.obj_reset_changes(['rules'])
self._create_rules_from_template(
context, tenant_id, sg, default_sg)
# fetch sg from db to load the sg rules with sg model.
# NOTE(slaweq): With new system/project scopes it may happen that
@ -632,6 +616,32 @@ class SecurityGroupDbMixin(
default_sg_rule_obj.id = sg_rule_template_id
default_sg_rule_obj.delete()
def _create_rules_from_template(self, context, project_id, sg, default_sg):
if default_sg:
filters = {'used_in_default_sg': True}
else:
filters = {'used_in_non_default_sg': True}
template_sg_rules = self.get_default_security_group_rules(
context, filters=filters)
for rule_args in template_sg_rules:
# We need to filter out attributes which are relevant only to
# the template rule and not to the rule itself
rule_args.pop('standard_attr_id', None)
rule_args.pop('description', None)
rule_args.pop('used_in_default_sg', None)
rule_args.pop('used_in_non_default_sg', None)
rule_args.pop('id', None)
if rule_args.get(
'remote_group_id') == ext_sg_default_rules.PARENT_SG:
rule_args['remote_group_id'] = sg.id
new_rule = sg_obj.SecurityGroupRule(
context, id=uuidutils.generate_uuid(),
project_id=project_id, security_group_id=sg.id,
**rule_args)
new_rule.create()
sg.rules.append(new_rule)
sg.obj_reset_changes(['rules'])
def get_default_security_group_rules(self, context, filters=None,
fields=None, sorts=None, limit=None,
marker=None, page_reverse=False):

View File

@ -25,6 +25,9 @@ from neutron.extensions import securitygroup
from neutron.extensions import standardattrdescription as stdattr_ext
PARENT_SG = 'PARENT'
class DefaultSecurityGroupRuleNotFound(exceptions.NotFound):
message = _("Default Security Group rule %(id)s does not exist")

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from datetime import datetime
import errno
import os
@ -49,6 +50,7 @@ from neutron.tests import base
from neutron.tests.common import base as common_base
from neutron.tests.common import helpers
from neutron.tests.functional.resources import process
from neutron.tests.unit.extensions import test_securitygroup
from neutron.tests.unit.plugins.ml2 import test_plugin
import neutron.wsgi
@ -201,6 +203,11 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self._start_ovn_northd()
self.addCleanup(self._reset_agent_cache_singleton)
self.addCleanup(self._reset_ovn_client_placement_extension)
plugin = directory.get_plugin()
mock.patch.object(
plugin, 'get_default_security_group_rules',
return_value=copy.deepcopy(
test_securitygroup.RULES_TEMPLATE_FOR_DEFAULT_SG)).start()
def _reset_agent_cache_singleton(self):
neutron_agent.AgentCache._instance = None

View File

@ -36,6 +36,7 @@ from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
from neutron.tests import base as tests_base
from neutron.tests.functional import base
from neutron.tests.unit.extensions import test_securitygroup as test_sg
VHU_MODE = 'server'
OVS_VIF_DETAILS = {
@ -838,6 +839,10 @@ class TestSecurityGroup(base.TestOVNFunctionalBase):
'tenant_id': self._tenant_id,
'is_default': True,
}
mock.patch.object(
self.plugin, 'get_default_security_group_rules',
return_value=copy.deepcopy(
test_sg.RULES_TEMPLATE_FOR_CUSTOM_SG)).start()
def _find_acls_for_sg(self, sg_id):
rows = self.nb_api.db_find_rows('ACL').execute(check_error=True)

View File

@ -170,6 +170,32 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
super(SGServerRpcCallBackTestCase, self).setUp(plugin)
self.notifier = directory.get_plugin().notifier
self.rpc = securitygroups_rpc.SecurityGroupServerRpcCallback()
default_sg_rules = [
{
'direction': 'egress',
'ethertype': const.IPv4,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': True
}, {
'direction': 'egress',
'ethertype': const.IPv6,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': True
}
]
mock.patch.object(
SecurityGroupRpcTestPlugin, 'get_default_security_group_rules',
return_value=default_sg_rules).start()
def _test_security_group_port(self, device_owner, gw_ip,
cidr, ip_version, ip_address):

View File

@ -294,15 +294,18 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
'standard_attr_id': mock.ANY,
'shared': False,
'security_group_rules': [
# Four rules for egress/ingress and ipv4/ipv6
mock.ANY, mock.ANY, mock.ANY, mock.ANY,
# 2 Custom rules from template
mock.ANY, mock.ANY
],
}
if with_revisions:
DEFAULT_SECGROUP_DICT.update({
'revision_number': mock.ANY,
})
with mock.patch.object(registry, 'publish') as publish:
with mock.patch.object(registry, 'publish') as publish, \
mock.patch.object(
self.mixin, 'get_default_security_group_rules',
return_value=[mock.MagicMock(), mock.MagicMock()]):
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
publish.assert_has_calls([
@ -382,7 +385,10 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
self.assertEqual(sg_dict, payload.latest_state)
def test_security_group_precommit_and_after_delete_event(self):
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
with mock.patch.object(
self.mixin, 'get_default_security_group_rules',
return_value=[mock.MagicMock(), mock.MagicMock()]):
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
with mock.patch.object(registry, "publish") as mock_publish:
self.mixin.delete_security_group(self.ctx, sg_dict['id'])
sg_dict['security_group_rules'] = mock.ANY

View File

@ -38,6 +38,8 @@ from neutron.db import address_group_db
from neutron.db import db_base_plugin_v2
from neutron.db import securitygroups_db
from neutron.extensions import address_group as ext_ag
from neutron.extensions import security_groups_default_rules as \
ext_sg_default_rules
from neutron.extensions import securitygroup as ext_sg
from neutron.extensions import standardattrdescription
from neutron.tests import base
@ -49,6 +51,53 @@ DB_PLUGIN_KLASS = ('neutron.tests.unit.extensions.test_securitygroup.'
LONG_NAME_OK = 'x' * (db_const.NAME_FIELD_SIZE)
LONG_NAME_NG = 'x' * (db_const.NAME_FIELD_SIZE + 1)
RULES_TEMPLATE_FOR_CUSTOM_SG = [
{
'direction': 'egress',
'ethertype': const.IPv4,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': True
}, {
'direction': 'egress',
'ethertype': const.IPv6,
'remote_group_id': None,
'remote_ip_prefix': None,
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': True
}
]
RULES_TEMPLATE_FOR_DEFAULT_SG = RULES_TEMPLATE_FOR_CUSTOM_SG + [
{
'direction': 'ingress',
'ethertype': const.IPv4,
'remote_group_id': ext_sg_default_rules.PARENT_SG,
'remote_ip_prefix': None,
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': False
}, {
'direction': 'ingress',
'ethertype': const.IPv6,
'remote_group_id': ext_sg_default_rules.PARENT_SG,
'remote_ip_prefix': None,
'protocol': None,
'port_range_max': None,
'port_range_min': None,
'used_in_default_sg': True,
'used_in_non_default_sg': False
}
]
class SecurityGroupTestExtensionManager(object):
@ -293,11 +342,16 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
description = 'my webservers'
keys = [('name', name,), ('description', description),
('shared', False)]
with self.security_group(name, description) as security_group:
for k, v, in keys:
self.assertEqual(v, security_group['security_group'][k])
with mock.patch.object(
SecurityGroupTestPlugin,
'get_default_security_group_rules',
return_value=RULES_TEMPLATE_FOR_CUSTOM_SG):
with self.security_group(name, description) as security_group:
for k, v, in keys:
self.assertEqual(v, security_group['security_group'][k])
# Verify that default egress rules have been created
# Verify that egress rules have been created as defined in the template
# above
sg_rules = security_group['security_group']['security_group_rules']
self.assertEqual(2, len(sg_rules))
@ -717,37 +771,40 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
def test_get_security_group(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
remote_group_id = sg['security_group']['id']
res = self.new_show_request('security-groups', remote_group_id)
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.security_group_rule(security_group_id,
direction=direction,
protocol=protocol,
port_range_min=port_range_min,
port_range_max=port_range_max,
remote_ip_prefix=remote_ip_prefix):
with mock.patch.object(
SecurityGroupTestPlugin,
'get_default_security_group_rules', return_value=[]):
with self.security_group(name, description) as sg:
remote_group_id = sg['security_group']['id']
res = self.new_show_request('security-groups', remote_group_id)
security_group_id = sg['security_group']['id']
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix),
('security_group_id', security_group_id),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.security_group_rule(
security_group_id,
direction=direction,
protocol=protocol,
port_range_min=port_range_min,
port_range_max=port_range_max,
remote_ip_prefix=remote_ip_prefix):
group = self.deserialize(
self.fmt, res.get_response(self.ext_api))
sg_rule = group['security_group']['security_group_rules']
self.assertEqual(remote_group_id,
group['security_group']['id'])
self.assertEqual(3, len(sg_rule))
sg_rule = [r for r in sg_rule if r['direction'] == 'ingress']
for k, v, in keys:
self.assertEqual(v, sg_rule[0][k])
group = self.deserialize(
self.fmt, res.get_response(self.ext_api))
sg_rules = group['security_group']['security_group_rules']
self.assertEqual(remote_group_id,
group['security_group']['id'])
self.assertEqual(1, len(sg_rules))
for k, v, in keys:
self.assertEqual(v, sg_rules[0][k])
def test_get_security_group_empty_rules(self):
name = 'webservers'
@ -825,9 +882,9 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
for sg in sgs['security_groups']:
if sg['name'] == "webservers":
rules = sg['security_group_rules']
self.assertEqual(5, len(rules))
self.assertNotEqual('admin-tenant', rules[3]['tenant_id'])
self.assertEqual('admin-tenant', rules[4]['tenant_id'])
self.assertEqual(3, len(rules))
self.assertNotEqual('admin-tenant', rules[1]['tenant_id'])
self.assertEqual('admin-tenant', rules[2]['tenant_id'])
def test_get_security_group_on_port_from_wrong_tenant(self):
plugin = directory.get_plugin()
@ -900,7 +957,10 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.assertEqual(1, len(sg))
def test_default_security_group_rules(self):
with self.network():
with mock.patch.object(
SecurityGroupTestPlugin,
'get_default_security_group_rules',
return_value=copy.deepcopy(RULES_TEMPLATE_FOR_DEFAULT_SG)):
res = self.new_list_request('security-groups')
groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
self.assertEqual(1, len(groups['security_groups']))

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import math
from unittest import mock
@ -44,6 +45,11 @@ class Ml2SecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
notifier_cls.return_value = self.notifier
self.useFixture(fixture.APIDefinitionFixture())
super(Ml2SecurityGroupsTestCase, self).setUp('ml2')
plugin = directory.get_plugin()
mock.patch.object(
plugin, 'get_default_security_group_rules',
return_value=copy.deepcopy(
test_sg.RULES_TEMPLATE_FOR_CUSTOM_SG)).start()
class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
@ -91,6 +97,28 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase,
port_dict['fixed_ips'])
self._delete('ports', p['id'])
def test_default_security_group_rules(self):
plugin = directory.get_plugin()
with mock.patch.object(
plugin, 'get_default_security_group_rules',
return_value=copy.deepcopy(
test_sg.RULES_TEMPLATE_FOR_DEFAULT_SG)):
super().test_default_security_group_rules()
def test_get_security_group(self):
plugin = directory.get_plugin()
with mock.patch.object(
plugin, 'get_default_security_group_rules',
return_value=[]):
super().test_get_security_group()
def test_create_security_group_rules_admin_tenant(self):
plugin = directory.get_plugin()
with mock.patch.object(
plugin, 'get_default_security_group_rules',
return_value=[]):
super().test_create_security_group_rules_admin_tenant()
def test_security_group_get_ports_from_devices_with_bad_id(self):
plugin = directory.get_plugin()
ports = plugin.get_ports_from_devices(self.ctx, ['bad_device_id'])

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from unittest import mock
from neutron_lib import context
@ -66,6 +67,11 @@ class BaseTestEventHandler(object):
def_sec_group_patch = mock.patch(
'neutron.db.securitygroups_db.SecurityGroupDbMixin.'
'_ensure_default_security_group')
mock.patch(
'neutron.db.securitygroups_db.SecurityGroupDbMixin.'
'get_default_security_group_rules',
return_value=copy.deepcopy(
test_securitygroup.RULES_TEMPLATE_FOR_CUSTOM_SG)).start()
def_sec_group_patch.start()
get_sec_group_port_patch = mock.patch(
'neutron.db.securitygroups_db.SecurityGroupDbMixin.'

View File

@ -13,11 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from unittest import mock
from neutron_lib import constants as const
from neutron_lib import context
from neutron_lib.db import api as n_db_api
from neutron_lib.plugins import directory
from neutron_lib.services.logapi import constants as log_const
from neutron_lib.utils import net as net_utils
from oslo_utils import uuidutils
@ -188,6 +190,11 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase):
def setUp(self):
super(LoggingRpcCallbackTestCase, self).setUp()
plugin = directory.get_plugin()
mock.patch.object(
plugin, 'get_default_security_group_rules',
return_value=copy.deepcopy(
test_sg.RULES_TEMPLATE_FOR_CUSTOM_SG)).start()
self.context = context.get_admin_context()
self.rpc_callback = server_rpc.LoggingApiSkeleton()