Default SG api rules template - DB and OVO models

This patch adds DB model, OVO class and DB migration script for
SG rules template used for every new SG created.
It also implements Create/Get/Delete actions for that new resource and
adds API policies for those APIs

Related-Bug: #1983053
Change-Id: Ib3cde1710edd400b972f493b13666d0679a7753c
This commit is contained in:
Slawek Kaplonski 2023-05-16 13:06:23 +02:00
parent eee897323c
commit e41fae522b
15 changed files with 1348 additions and 15 deletions

View File

@ -79,6 +79,7 @@ from neutron_lib.api.definitions import vpn_endpoint_groups
from neutron_lib import constants
from neutron.extensions import quotasv2_detail
from neutron.extensions import security_groups_default_rules
# NOTE(russellb) This remains in its own file (vs constants.py) because we want
# to be able to easily import it and export the info without any dependencies
@ -145,6 +146,7 @@ ML2_SUPPORTED_API_EXTENSIONS = [
rbac_security_groups.ALIAS,
'standard-attr-revisions',
'security-group',
security_groups_default_rules.ALIAS,
security_groups_normalized_cidr.ALIAS,
security_groups_remote_address_group.ALIAS,
security_groups_shared_filtering.ALIAS,

View File

@ -20,6 +20,7 @@ from neutron.conf.policies import agent
from neutron.conf.policies import auto_allocated_topology
from neutron.conf.policies import availability_zone
from neutron.conf.policies import base
from neutron.conf.policies import default_security_group_rules
from neutron.conf.policies import flavor
from neutron.conf.policies import floatingip
from neutron.conf.policies import floatingip_pools
@ -55,6 +56,7 @@ def list_rules():
agent.list_rules(),
auto_allocated_topology.list_rules(),
availability_zone.list_rules(),
default_security_group_rules.list_rules(),
flavor.list_rules(),
floatingip.list_rules(),
floatingip_pools.list_rules(),

View File

@ -0,0 +1,91 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import policy as neutron_policy
from oslo_policy import policy
from neutron.conf.policies import base
DEPRECATED_REASON = (
"The default security group rules API supports "
"system scope and default roles.")
COLLECTION_PATH = '/default-security-group-rules'
RESOURCE_PATH = '/default-security-group-rules/{id}'
rules = [
policy.DocumentedRuleDefault(
name='create_default_security_group_rule',
check_str=base.ADMIN,
scope_types=['project'],
description='Create a templated of the security group rule',
operations=[
{
'method': 'POST',
'path': COLLECTION_PATH,
},
],
deprecated_rule=policy.DeprecatedRule(
name='create_default_security_group_rule',
check_str=neutron_policy.RULE_ADMIN_ONLY,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2023.2')
),
policy.DocumentedRuleDefault(
name='get_default_security_group_rule',
# NOTE(slaweq): it can't be ADMIN_OR_PROJECT_READER constant from the
# base module because that is using "project_id" in the check string
# and this resource don't belongs to any project thus such
# check string would fail enforcement.
check_str='role:reader',
scope_types=['project'],
description='Get a templated of the security group rule',
operations=[
{
'method': 'GET',
'path': COLLECTION_PATH,
},
{
'method': 'GET',
'path': RESOURCE_PATH,
},
],
deprecated_rule=policy.DeprecatedRule(
name='get_default_security_group_rule',
check_str=neutron_policy.RULE_ANY,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2023.2')
),
policy.DocumentedRuleDefault(
name='delete_default_security_group_rule',
check_str=base.ADMIN,
scope_types=['project'],
description='Delete a templated of the security group rule',
operations=[
{
'method': 'DELETE',
'path': RESOURCE_PATH,
},
],
deprecated_rule=policy.DeprecatedRule(
name='delete_default_security_group_rule',
check_str=neutron_policy.RULE_ADMIN_ONLY,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2023.2')
),
]
def list_rules():
return rules

View File

@ -0,0 +1,130 @@
# Copyright 2023 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from alembic import op
from neutron_lib import constants
from neutron_lib.db import constants as db_const
from oslo_utils import uuidutils
import sqlalchemy as sa
"""security group default rules
Revision ID: c33da356b165
Revises: 6f1145bff34c
Create Date: 2023-05-15 12:32:01.915525
"""
# revision identifiers, used by Alembic.
revision = 'c33da356b165'
down_revision = 'b1199a3adbef'
INGRESS_RULE_DESCRIPTION = "Legacy default SG rule for ingress traffic"
EGRESS_RULE_DESCRIPTION = "Legacy default SG rule for egress traffic"
table_name = 'securitygroupdefaultrules'
rule_direction_enum = sa.Enum(constants.INGRESS_DIRECTION,
constants.EGRESS_DIRECTION,
name='defaultsecuritygrouprules_direction')
default_template_rules = [
{
'id': uuidutils.generate_uuid(),
'direction': constants.EGRESS_DIRECTION,
'ethertype': constants.IPv4,
'used_in_default_sg': True,
'used_in_non_default_sg': True,
'description': EGRESS_RULE_DESCRIPTION,
},
{
'id': uuidutils.generate_uuid(),
'direction': constants.EGRESS_DIRECTION,
'ethertype': constants.IPv6,
'used_in_default_sg': True,
'used_in_non_default_sg': True,
'description': EGRESS_RULE_DESCRIPTION,
},
{
'id': uuidutils.generate_uuid(),
'direction': constants.INGRESS_DIRECTION,
'ethertype': constants.IPv4,
'remote_group_id': 'PARENT',
'used_in_default_sg': True,
'used_in_non_default_sg': False,
'description': INGRESS_RULE_DESCRIPTION,
},
{
'id': uuidutils.generate_uuid(),
'direction': constants.INGRESS_DIRECTION,
'ethertype': constants.IPv6,
'remote_group_id': 'PARENT',
'used_in_default_sg': True,
'used_in_non_default_sg': False,
'description': INGRESS_RULE_DESCRIPTION,
},
]
standardattr = sa.Table(
'standardattributes', sa.MetaData(),
sa.Column('id', sa.BigInteger(), primary_key=True, autoincrement=True),
sa.Column('resource_type', sa.String(length=255), nullable=False),
sa.Column('description', sa.String(length=255)))
def upgrade():
connection = op.get_bind()
insp = sa.inspect(connection)
if table_name in insp.get_table_names():
# it means that this table was already there so we don't need to do
# anything else
return
sg_templates_table = op.create_table(
table_name,
sa.Column('id', sa.String(length=db_const.UUID_FIELD_SIZE),
primary_key=True),
sa.Column('standard_attr_id', sa.BigInteger(),
sa.ForeignKey('standardattributes.id', ondelete='CASCADE'),
nullable=False),
sa.Column('remote_group_id',
sa.String(length=db_const.UUID_FIELD_SIZE)),
sa.Column('remote_address_group_id',
sa.String(length=db_const.UUID_FIELD_SIZE)),
sa.Column('direction', rule_direction_enum, nullable=False),
sa.Column('ethertype', sa.String(length=40)),
sa.Column('protocol', sa.String(length=40)),
sa.Column('port_range_min', sa.Integer()),
sa.Column('port_range_max', sa.Integer()),
sa.Column('remote_ip_prefix', sa.String(length=255)),
sa.Column('used_in_default_sg', sa.Boolean(), nullable=False,
server_default=sa.sql.false()),
sa.Column('used_in_non_default_sg', sa.Boolean(), nullable=False,
server_default=sa.sql.true()),
sa.UniqueConstraint('standard_attr_id'))
# To keep backward compatibility with older releases, by default we need
# to have 4 default rules created for each default SG, and two of them are
# also used for every non-default SG as well:
session = sa.orm.Session(bind=connection)
for template_rule in default_template_rules:
res = session.execute(
standardattr.insert({
'description': template_rule.pop('description'),
'resource_type': table_name})
)
template_rule['standard_attr_id'] = res.inserted_primary_key[0]
session.execute(sg_templates_table.insert(template_rule))

View File

@ -1 +1 @@
b1199a3adbef
c33da356b165

View File

@ -0,0 +1,47 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from neutron_lib.db import constants as db_const
from neutron_lib.db import model_base
from neutron_lib.db import standard_attr
import sqlalchemy as sa
from neutron.extensions import security_groups_default_rules
class SecurityGroupDefaultRule(standard_attr.HasStandardAttributes,
model_base.BASEV2,
model_base.HasId):
"""Represents a template of the default neutron security group rules."""
direction = sa.Column(sa.Enum(constants.INGRESS_DIRECTION,
constants.EGRESS_DIRECTION,
name='defaultsecuritygrouprules_direction'),
nullable=False)
ethertype = sa.Column(sa.String(40))
remote_group_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE))
protocol = sa.Column(sa.String(40))
port_range_min = sa.Column(sa.Integer)
port_range_max = sa.Column(sa.Integer)
remote_ip_prefix = sa.Column(sa.String(255))
remote_address_group_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE))
used_in_default_sg = sa.Column(sa.Boolean(),
server_default=sa.sql.false(),
nullable=False,
default=False)
used_in_non_default_sg = sa.Column(sa.Boolean(),
server_default=sa.sql.true(),
nullable=False,
default=True)
api_collections = [security_groups_default_rules.COLLECTION_NAME]

View File

@ -39,10 +39,13 @@ from neutron.common import _constants as const
from neutron.db import address_group_db as ag_db
from neutron.db.models import securitygroup as sg_models
from neutron.db import rbac_db_mixin as rbac_mixin
from neutron.extensions import security_groups_default_rules as \
ext_sg_default_rules
from neutron.extensions import securitygroup as ext_sg
from neutron.objects import base as base_obj
from neutron.objects import ports as port_obj
from neutron.objects import securitygroup as sg_obj
from neutron.objects import securitygroup_default_rules as sg_default_rules_obj
from neutron import quota
@ -53,8 +56,10 @@ DEFAULT_SG_DESCRIPTION = _('Default security group')
@resource_extend.has_resource_extenders
@registry.has_registry_receivers
class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
rbac_mixin.RbacPluginMixin):
class SecurityGroupDbMixin(
ext_sg.SecurityGroupPluginBase,
ext_sg_default_rules.SecurityGroupDefaultRulesPluginBase,
rbac_mixin.RbacPluginMixin):
"""Mixin class to add security group to db_base_plugin_v2."""
__native_bulk_support = True
@ -482,6 +487,190 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
return res_rule_dict
def _validate_multiple_remote_entites(self, rule):
remote = None
for key in ['remote_ip_prefix', 'remote_group_id',
'remote_address_group_id']:
if remote and rule.get(key):
raise ext_sg.SecurityGroupMultipleRemoteEntites()
remote = rule.get(key) or remote
def _validate_default_security_group_rule(self, rule):
self._validate_base_security_group_rule_attributes(rule)
def _make_default_security_group_rule_dict(self, rule_obj, fields=None):
res = {
'id': rule_obj['id'],
'ethertype': rule_obj['ethertype'],
'direction': rule_obj['direction'],
'protocol': rule_obj['protocol'],
'port_range_min': rule_obj['port_range_min'],
'port_range_max': rule_obj['port_range_max'],
'remote_ip_prefix': rule_obj['remote_ip_prefix'],
'remote_address_group_id': rule_obj[
'remote_address_group_id'],
'remote_group_id': rule_obj['remote_group_id'],
'standard_attr_id': rule_obj.db_obj.standard_attr.id,
'description': rule_obj['description'],
'used_in_default_sg': rule_obj['used_in_default_sg'],
'used_in_non_default_sg': rule_obj['used_in_non_default_sg']
}
return db_utils.resource_fields(res, fields)
def _get_default_security_group_rule(self, context, rule_id):
rule_obj = sg_default_rules_obj.SecurityGroupDefaultRule.get_object(
context, id=rule_id)
if rule_obj is None:
raise ext_sg_default_rules.DefaultSecurityGroupRuleNotFound(
id=rule_id)
return rule_obj
def _check_for_duplicate_default_rules(self, context, new_rules):
# We need to divide rules for those used in default security groups for
# projects and for those which are used only for custom security groups
self._check_for_duplicate_default_rules_in_template(
context,
rules=[rule['default_security_group_rule'] for rule in new_rules
if rule.get('used_in_default_sg', False)],
filters={'used_in_default_sg': True})
self._check_for_duplicate_default_rules_in_template(
context,
rules=[rule['default_security_group_rule'] for rule in new_rules
if rule.get('used_in_non_default_sg', True)],
filters={'used_in_non_default_sg': True})
def _check_for_duplicate_default_rules_in_template(self, context,
rules, filters):
new_rules_set = set()
for i in rules:
rule_key = self._rule_to_key(rule=i)
if rule_key in new_rules_set:
raise ext_sg_default_rules.DuplicateDefaultSgRuleInPost(rule=i)
new_rules_set.add(rule_key)
# Now, let's make sure none of the new rules conflict with
# existing rules; note that we do *not* store the db rules
# in the set, as we assume they were already checked,
# when added.
template_sg_rules = self.get_default_security_group_rules(
context, filters=filters) or []
for i in template_sg_rules:
rule_key = self._rule_to_key(i)
if rule_key in new_rules_set:
raise ext_sg_default_rules.DefaultSecurityGroupRuleExists(
rule_id=i.get('id'))
def create_default_security_group_rule(self, context,
default_security_group_rule):
"""Create a default security rule template.
:param context: neutron api request context
:type context: neutron.context.Context
:param default_security_group_rule: security group rule template data
to be applied
:type sg_rule_template: dict
:returns: a SecurityGroupDefaultRule object
"""
self._validate_default_security_group_rule(
default_security_group_rule['default_security_group_rule'])
self._check_for_duplicate_default_rules(context,
[default_security_group_rule])
rule_dict = default_security_group_rule['default_security_group_rule']
remote_ip_prefix = rule_dict.get('remote_ip_prefix')
if remote_ip_prefix:
remote_ip_prefix = net.AuthenticIPNetwork(remote_ip_prefix)
protocol = rule_dict.get('protocol')
if protocol:
# object expects strings only
protocol = str(protocol)
args = {
'id': (rule_dict.get('id') or
uuidutils.generate_uuid()),
'direction': rule_dict.get('direction'),
'remote_group_id': rule_dict.get('remote_group_id'),
'remote_address_group_id': rule_dict.get(
'remote_address_group_id'),
'ethertype': rule_dict.get('ethertype'),
'protocol': protocol,
'remote_ip_prefix': remote_ip_prefix,
'description': rule_dict.get('description'),
'used_in_default_sg': rule_dict.get('used_in_default_sg'),
'used_in_non_default_sg': rule_dict.get('used_in_non_default_sg')
}
port_range_min = self._safe_int(rule_dict.get('port_range_min'))
if port_range_min is not None:
args['port_range_min'] = port_range_min
port_range_max = self._safe_int(rule_dict.get('port_range_max'))
if port_range_max is not None:
args['port_range_max'] = port_range_max
with db_api.CONTEXT_WRITER.using(context):
default_sg_rule_obj = (
sg_default_rules_obj.SecurityGroupDefaultRule(context, **args))
default_sg_rule_obj.create()
return self._make_default_security_group_rule_dict(default_sg_rule_obj)
@db_api.CONTEXT_WRITER
def delete_default_security_group_rule(self, context, sg_rule_template_id):
"""Delete a default security rule template.
:param context: neutron api request context
:type context: neutron.context.Context
:param sg_rule_template_id: the id of the SecurityGroupDefaultRule to
delete
:type sg_rule_template_id: str uuid
:returns: None
"""
default_sg_rule_obj = (
sg_default_rules_obj.SecurityGroupDefaultRule(context))
default_sg_rule_obj.id = sg_rule_template_id
default_sg_rule_obj.delete()
def get_default_security_group_rules(self, context, filters=None,
fields=None, sorts=None, limit=None,
marker=None, page_reverse=False):
"""Get default security rule templates.
:param context: neutron api request context
:type context: neutron.context.Context
:param filters: search criteria
:type filters: dict
:returns: SecurityGroupDefaultRule objects meeting the search criteria
"""
filters = filters or {}
pager = base_obj.Pager(
sorts=sorts, marker=marker, limit=limit, page_reverse=page_reverse)
rule_objs = sg_default_rules_obj.SecurityGroupDefaultRule.get_objects(
context, _pager=pager, **filters)
return [
self._make_default_security_group_rule_dict(obj, fields)
for obj in rule_objs
]
def get_default_security_group_rule(self, context, sg_rule_template_id,
fields=None):
"""Get default security rule template.
:param context: neutron api request context
:type context: neutron.context.Context
:param sg_rule_template_id: the id of the SecurityGroupDefaultRule to
get
:type sg_rule_template_id: str uuid
:returns: a SecurityGroupDefaultRule object
"""
rule_obj = self._get_default_security_group_rule(context,
sg_rule_template_id)
return self._make_default_security_group_rule_dict(
rule_obj, fields=fields)
def _get_ip_proto_number(self, protocol):
if protocol is None:
return
@ -629,20 +818,22 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
'new_protocol': str(constants.PROTO_NUM_IPV6_ICMP)})
rule['protocol'] = str(constants.PROTO_NUM_IPV6_ICMP)
def _validate_security_group_rule(self, context, security_group_rule):
rule = security_group_rule['security_group_rule']
def _validate_base_security_group_rule_attributes(self, rule):
"""Validate values of the basic attributes of the SG rule.
This method validates attributes which are common for the actual SG
rule as well as SG rule template.
"""
self._make_canonical_ipv6_icmp_protocol(rule)
self._make_canonical_port_range(rule)
self._validate_port_range(rule)
self._validate_ip_prefix(rule)
self._validate_ethertype_and_protocol(rule)
self._validate_multiple_remote_entites(rule)
remote = None
for key in ['remote_ip_prefix', 'remote_group_id',
'remote_address_group_id']:
if remote and rule.get(key):
raise ext_sg.SecurityGroupMultipleRemoteEntites()
remote = rule.get(key) or remote
def _validate_security_group_rule(self, context, security_group_rule):
rule = security_group_rule['security_group_rule']
self._validate_base_security_group_rule_attributes(rule)
remote_group_id = rule['remote_group_id']
# Check that remote_group_id exists for tenant

View File

@ -10,16 +10,34 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
from neutron_lib.api import converters
from neutron_lib.api import extensions as api_extensions
from neutron_lib.db import constants as db_const
from neutron_lib import exceptions
from neutron_lib.plugins import directory
from neutron._i18n import _
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.extensions import securitygroup
from neutron.extensions import standardattrdescription as stdattr_ext
class DefaultSecurityGroupRuleNotFound(exceptions.NotFound):
message = _("Default Security Group rule %(id)s does not exist")
class DefaultSecurityGroupRuleExists(exceptions.InUse):
message = _("Default Security group rule already exists. "
"Rule id is %(rule_id)s.")
class DuplicateDefaultSgRuleInPost(exceptions.InUse):
message = _("Duplicate Default Security Group Rule in POST.")
# TODO(slaweq): rehome API definition to neutron-lib together with
# securitygroup API definition
@ -44,6 +62,12 @@ RESOURCE_ATTRIBUTE_MAP = {
'is_filter': True,
'is_sort_key': True,
'primary_key': True},
'tenant_id': {
'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'is_sort_key': False,
'validate': {'type:string': db_const.PROJECT_ID_FIELD_SIZE},
'is_visible': False, 'is_filter': False},
'description': {
'allow_post': True, 'allow_put': False, 'default': '',
'validate': {'type:string': db_const.LONG_DESCRIPTION_FIELD_SIZE},
@ -138,12 +162,37 @@ class Security_groups_default_rules(api_extensions.ExtensionDescriptor):
def get_resources(cls):
"""Returns Ext Resources."""
plugin = directory.get_plugin()
params = RESOURCE_ATTRIBUTE_MAP.get(COLLECTION_NAME)
collection_name = COLLECTION_NAME.replace('_', '-')
params = RESOURCE_ATTRIBUTE_MAP.get(COLLECTION_NAME, dict())
controller = base.create_resource(COLLECTION_NAME,
RESOURCE_NAME,
plugin, params)
plugin, params,
allow_pagination=True,
allow_sorting=True)
ex = extensions.ResourceExtension(COLLECTION_NAME,
controller)
ex = extensions.ResourceExtension(collection_name, controller,
attr_map=params)
return [ex]
class SecurityGroupDefaultRulesPluginBase(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def create_default_security_group_rule(self, context, sg_rule_template):
pass
@abc.abstractmethod
def delete_default_security_group_rule(self, context, sg_rule_template_id):
pass
@abc.abstractmethod
def get_default_security_group_rules(self, context, filters=None,
fields=None, sorts=None, limit=None,
marker=None, page_reverse=False):
pass
@abc.abstractmethod
def get_default_security_group_rule(self, context, sg_rule_template_id,
fields=None):
pass

View File

@ -0,0 +1,58 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.objects import common_types
from neutron_lib.utils import net as net_utils
from oslo_versionedobjects import fields as obj_fields
from neutron.db.models import securitygroup_default_rules
from neutron.objects import base
@base.NeutronObjectRegistry.register
class SecurityGroupDefaultRule(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = securitygroup_default_rules.SecurityGroupDefaultRule
fields = {
'id': common_types.UUIDField(),
'remote_group_id': obj_fields.StringField(nullable=True),
'direction': common_types.FlowDirectionEnumField(nullable=True),
'ethertype': common_types.EtherTypeEnumField(nullable=True),
'protocol': common_types.IpProtocolEnumField(nullable=True),
'port_range_min': common_types.PortRangeWith0Field(nullable=True),
'port_range_max': common_types.PortRangeWith0Field(nullable=True),
'remote_ip_prefix': common_types.IPNetworkField(nullable=True),
'remote_address_group_id': common_types.UUIDField(nullable=True),
'used_in_default_sg': obj_fields.BooleanField(),
'used_in_non_default_sg': obj_fields.BooleanField(),
}
@classmethod
def modify_fields_to_db(cls, fields):
result = super(SecurityGroupDefaultRule,
cls).modify_fields_to_db(fields)
remote_ip_prefix = result.get('remote_ip_prefix')
if remote_ip_prefix:
result['remote_ip_prefix'] = cls.filter_to_str(remote_ip_prefix)
return result
@classmethod
def modify_fields_from_db(cls, db_obj):
fields = super(SecurityGroupDefaultRule,
cls).modify_fields_from_db(db_obj)
if 'remote_ip_prefix' in fields:
fields['remote_ip_prefix'] = (
net_utils.AuthenticIPNetwork(fields['remote_ip_prefix']))
return fields

View File

@ -128,6 +128,8 @@ from neutron.db import subnet_service_type_mixin
from neutron.db import vlantransparent_db
from neutron.extensions import dhcpagentscheduler as dhcp_ext
from neutron.extensions import filter_validation
from neutron.extensions import security_groups_default_rules as \
sg_default_rules_ext
from neutron.extensions import vlantransparent
from neutron.ipam import exceptions as ipam_exc
from neutron.objects import base as base_obj
@ -239,6 +241,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
pdp_def.ALIAS,
quota_check_limit.ALIAS,
port_mac_address_override.ALIAS,
sg_default_rules_ext.ALIAS,
]
# List of agent types for which all binding_failed ports should try to be

View File

@ -0,0 +1,133 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_policy import policy as base_policy
from neutron import policy
from neutron.tests.unit.conf.policies import test_base as base
class DefaultSecurityGroupRuleAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
super(DefaultSecurityGroupRuleAPITestCase, self).setUp()
self.target = {}
class SystemAdminDefaultSecurityGroupRuleTests(
DefaultSecurityGroupRuleAPITestCase):
def setUp(self):
super(SystemAdminDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.system_admin_ctx
def test_create_default_security_group_rule(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'create_default_security_group_rule', self.target)
def test_get_default_security_group_rule(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'get_default_security_group_rule', self.target)
def test_delete_default_security_group_rule(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce,
self.context, 'delete_default_security_group_rule', self.target)
class SystemMemberDefaultSecurityGroupRuleTests(
SystemAdminDefaultSecurityGroupRuleTests):
def setUp(self):
super(SystemMemberDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.system_member_ctx
class SystemReaderDefaultSecurityGroupRuleTests(
SystemMemberDefaultSecurityGroupRuleTests):
def setUp(self):
super(SystemReaderDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.system_reader_ctx
class AdminDefaultSecurityGroupRuleTests(DefaultSecurityGroupRuleAPITestCase):
def setUp(self):
super(AdminDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.project_admin_ctx
def test_create_default_security_group_rule(self):
self.assertTrue(
policy.enforce(self.context,
'create_default_security_group_rule', self.target))
def test_get_default_security_group_rule(self):
self.assertTrue(
policy.enforce(self.context,
'get_default_security_group_rule', self.target))
def test_delete_default_security_group_rule(self):
self.assertTrue(
policy.enforce(self.context,
'delete_default_security_group_rule', self.target))
class ProjectMemberDefaultSecurityGroupRuleTests(
AdminDefaultSecurityGroupRuleTests):
def setUp(self):
super(ProjectMemberDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.project_member_ctx
def test_create_default_security_group_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_default_security_group_rule', self.target)
def test_get_default_security_group_rule(self):
self.assertTrue(
policy.enforce(self.context,
'get_default_security_group_rule', self.target))
def test_delete_default_security_group_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_default_security_group_rule', self.target)
class ProjectReaderDefaultSecurityGroupRuleTests(
ProjectMemberDefaultSecurityGroupRuleTests):
def setUp(self):
super(ProjectReaderDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.project_reader_ctx
def test_create_default_security_group_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_default_security_group_rule', self.target)
def test_delete_default_security_group_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_default_security_group_rule', self.target)

View File

@ -25,6 +25,8 @@ import sqlalchemy
import testtools
from neutron.db import securitygroups_db
from neutron.extensions import security_groups_default_rules as \
ext_sg_default_rules
from neutron.extensions import securitygroup
from neutron import quota
from neutron.services.revisions import revision_plugin
@ -672,3 +674,116 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
self.mixin._ensure_default_security_group(context, 'tenant_1')
create_sg.assert_not_called()
get_default_sg_id.assert_not_called()
def test__check_for_duplicate_default_rules_does_not_drop_protocol(self):
with mock.patch.object(self.mixin, 'get_default_security_group_rules',
return_value=None):
context = mock.Mock()
rule_dict = {
'default_security_group_rule': {'protocol': None,
'direction': 'fake'}
}
self.mixin._check_for_duplicate_default_rules(
context, [rule_dict])
self.assertIn('protocol', rule_dict['default_security_group_rule'])
def test__check_for_duplicate_default_rules_ignores_rule_id(self):
rules = [
{'default_security_group_rule': {
'protocol': 'tcp', 'id': 'fake1'}},
{'default_security_group_rule': {
'protocol': 'tcp', 'id': 'fake2'}}]
# NOTE(arosen): the name of this exception is a little misleading
# in this case as this test, tests that the id fields are dropped
# while being compared. This is in the case if a plugin specifies
# the rule ids themselves.
with mock.patch.object(self.mixin, 'get_default_security_group_rules',
return_value=None):
self.assertRaises(
ext_sg_default_rules.DuplicateDefaultSgRuleInPost,
self.mixin._check_for_duplicate_default_rules, context, rules)
def test__check_for_duplicate_default_rules_rule_used_in_non_default_sg(
self):
fake_rules = [
{'id': 'fake',
'used_in_default_sg': True,
'used_in_non_default_sg': True}
]
with mock.patch.object(self.mixin, 'get_default_security_group_rules',
return_value=fake_rules):
context = mock.Mock()
rule_dict = {
'default_security_group_rule': {
'id': 'fake2',
'used_in_default_sg': False,
'used_in_non_default_sg': True}
}
self.assertRaises(
ext_sg_default_rules.DefaultSecurityGroupRuleExists,
self.mixin._check_for_duplicate_default_rules,
context, [rule_dict])
def test__check_for_duplicate_default_rules_rule_used_in_default_sg(
self):
fake_rules = [
{'id': 'fake',
'used_in_default_sg': True,
'used_in_non_default_sg': True}
]
with mock.patch.object(self.mixin, 'get_default_security_group_rules',
return_value=fake_rules):
context = mock.Mock()
rule_dict = {
'default_security_group_rule': {
'id': 'fake2',
'used_in_default_sg': True,
'used_in_non_default_sg': False}
}
self.assertRaises(
ext_sg_default_rules.DefaultSecurityGroupRuleExists,
self.mixin._check_for_duplicate_default_rules,
context, [rule_dict])
def test__check_for_duplicate_diff_default_rules_remote_ip_prefix_ipv4(
self):
fake_rules = [
{'id': 'fake', 'ethertype': 'IPv4',
'direction': 'ingress', 'remote_ip_prefix': None}
]
with mock.patch.object(self.mixin, 'get_default_security_group_rules',
return_value=fake_rules):
context = mock.Mock()
rule_dict = {
'default_security_group_rule': {
'id': 'fake2',
'ethertype': 'IPv4',
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0'}
}
self.assertRaises(
ext_sg_default_rules.DefaultSecurityGroupRuleExists,
self.mixin._check_for_duplicate_default_rules,
context, [rule_dict])
def test__check_for_duplicate_diff_default_rules_remote_ip_prefix_ipv6(
self):
fake_rules = [
{'id': 'fake', 'ethertype': 'IPv6',
'direction': 'ingress', 'remote_ip_prefix': None}
]
with mock.patch.object(self.mixin, 'get_default_security_group_rules',
return_value=fake_rules):
context = mock.Mock()
rule_dict = {
'default_security_group_rule': {
'id': 'fake2',
'ethertype': 'IPv6',
'direction': 'ingress',
'remote_ip_prefix': '::/0'}
}
self.assertRaises(
ext_sg_default_rules.DefaultSecurityGroupRuleExists,
self.mixin._check_for_duplicate_default_rules,
context, [rule_dict])

View File

@ -0,0 +1,484 @@
# Copyright (c) 2023 OpenStack Foundation.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
from neutron_lib import constants as const
from oslo_utils import uuidutils
import webob.exc
from neutron.extensions import security_groups_default_rules as sgdf_ext
from neutron.tests.unit.extensions import test_securitygroup
DB_PLUGIN_KLASS = ('neutron.tests.unit.extensions.test_securitygroup.'
'SecurityGroupTestPlugin')
class DefaultSecurityGroupRulesTestExtensionManager(object):
def get_resources(self):
return sgdf_ext.Security_groups_default_rules.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class TestDefaultSecurityGroupRules(
test_securitygroup.SecurityGroupDBTestCase):
def setUp(self, plugin=None, ext_mgr=None):
plugin = DB_PLUGIN_KLASS
ext_mgr = DefaultSecurityGroupRulesTestExtensionManager()
super(TestDefaultSecurityGroupRules, self).setUp(
plugin=plugin, ext_mgr=ext_mgr)
def _build_default_security_group_rule(
self, direction, proto,
port_range_min=None, port_range_max=None,
remote_ip_prefix=None, remote_group_id=None,
remote_address_group_id=None,
used_in_default_sg=False,
used_in_non_default_sg=True,
description=None,
ethertype=const.IPv4,
as_admin=True):
sg_rule_template = {
'direction': direction,
'protocol': proto,
'ethertype': ethertype,
'used_in_default_sg': used_in_default_sg,
'used_in_non_default_sg': used_in_non_default_sg}
if port_range_min:
sg_rule_template['port_range_min'] = port_range_min
if port_range_max:
sg_rule_template['port_range_max'] = port_range_max
if remote_ip_prefix:
sg_rule_template['remote_ip_prefix'] = remote_ip_prefix
if remote_group_id:
sg_rule_template['remote_group_id'] = remote_group_id
if remote_address_group_id:
sg_rule_template['remote_address_group_id'] = (
remote_address_group_id)
if description:
sg_rule_template['description'] = description
return {'default_security_group_rule': sg_rule_template}
def _create_default_security_group_rule(self, fmt, rules, as_admin=True,
**kwargs):
default_security_group_rule_req = self.new_create_request(
'default-security-group-rules', rules, fmt, as_admin=as_admin)
return default_security_group_rule_req.get_response(self.ext_api)
def _make_default_security_group_rule(self, fmt, rules, as_admin=True,
**kwargs):
res = self._create_default_security_group_rule(
self.fmt, rules, as_admin=as_admin)
if res.status_int >= webob.exc.HTTPBadRequest.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
@contextlib.contextmanager
def default_security_group_rule(self, direction='ingress',
protocol=const.PROTO_NAME_TCP,
port_range_min='22', port_range_max='22',
remote_ip_prefix=None,
remote_group_id=None,
remote_address_group_id=None,
used_in_default_sg=False,
used_in_non_default_sg=True,
description=None,
fmt=None, ethertype=const.IPv4):
if not fmt:
fmt = self.fmt
rule = self._build_default_security_group_rule(
direction=direction,
proto=protocol,
port_range_min=port_range_min,
port_range_max=port_range_max,
remote_ip_prefix=remote_ip_prefix,
remote_group_id=remote_group_id,
remote_address_group_id=remote_address_group_id,
used_in_default_sg=used_in_default_sg,
used_in_non_default_sg=used_in_non_default_sg,
description=description,
ethertype=ethertype)
default_security_group_rule = self._make_default_security_group_rule(
self.fmt, rule)
yield default_security_group_rule
def test_create_default_security_group_rule(self):
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = const.PROTO_NAME_TCP
port_range_min = 22
port_range_max = 22
keys = [('remote_ip_prefix', remote_ip_prefix),
('direction', direction),
('protocol', protocol),
('port_range_min', port_range_min),
('port_range_max', port_range_max)]
with self.default_security_group_rule(
direction=direction,
protocol=protocol,
port_range_min=port_range_min,
port_range_max=port_range_max,
remote_ip_prefix=remote_ip_prefix) as rule:
for k, v, in keys:
self.assertEqual(v, rule['default_security_group_rule'][k])
def test_create_default_security_group_rule_ethertype_invalid_as_number(
self):
ethertype = 2
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_TCP, '22',
'22', None, None, ethertype=ethertype)
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_ethertype_invalid_for_protocol(
self):
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_IPV6_FRAG)
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_invalid_ip_prefix(self):
for bad_prefix in ['bad_ip', 256, "2001:db8:a::123/129", '172.30./24']:
rule = self._build_default_security_group_rule(
'ingress',
const.PROTO_NAME_TCP,
'22', '22',
bad_prefix)
res = self._create_default_security_group_rule(self.fmt, rule)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_invalid_ethertype_for_prefix(
self):
test_addr = {'192.168.1.1/24': 'IPv6',
'2001:db8:1234::/48': 'IPv4',
'192.168.2.1/24': 'BadEthertype'}
for remote_ip_prefix, ethertype in test_addr.items():
rule = self._build_default_security_group_rule(
'ingress',
const.PROTO_NAME_TCP,
'22', '22',
remote_ip_prefix,
None,
ethertype=ethertype)
res = self._create_default_security_group_rule(self.fmt, rule)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_with_unmasked_prefix(self):
addr = {'10.1.2.3': {'mask': '32', 'ethertype': 'IPv4'},
'fe80::2677:3ff:fe7d:4c': {'mask': '128', 'ethertype': 'IPv6'}}
for remote_ip_prefix in addr:
ethertype = addr[remote_ip_prefix]['ethertype']
rule = self._build_default_security_group_rule(
'ingress',
const.PROTO_NAME_TCP,
'22', '22',
remote_ip_prefix,
None,
ethertype=ethertype)
res = self._create_default_security_group_rule(self.fmt, rule)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
res_sg = self.deserialize(self.fmt, res)
prefix = res_sg['default_security_group_rule']['remote_ip_prefix']
self.assertEqual('%s/%s' % (
remote_ip_prefix, addr[remote_ip_prefix]['mask']), prefix)
def test_create_default_security_group_rule_tcp_protocol_as_number(self):
protocol = const.PROTO_NUM_TCP # TCP
rule = self._build_default_security_group_rule(
'ingress', protocol, '22', '22')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_default_security_group_rule_protocol_as_number(self):
protocol = 2
rule = self._build_default_security_group_rule(
'ingress', protocol)
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_default_security_group_rule_proto_as_number_with_port_bad(
self):
# When specifying ports, neither can be None
protocol = 6
rule = self._build_default_security_group_rule(
'ingress', protocol, '70', None)
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_protocol_as_number_range(self):
# This is a SG rule with a port range, but treated as a single
# port since min/max are the same.
protocol = 6
rule = self._build_default_security_group_rule(
'ingress', protocol, '70', '70')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_default_security_group_rule_protocol_as_number_port_bad(
self):
# Only certain protocols support a SG rule with a port
protocol = 111
rule = self._build_default_security_group_rule(
'ingress', protocol, '70', '70')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_case_insensitive(self):
direction = "ingress"
remote_ip_prefix = "10.0.0.0/24"
protocol = 'TCP'
port_range_min = 22
port_range_max = 22
ethertype = 'ipV4'
with self.default_security_group_rule(
direction=direction, protocol=protocol,
port_range_min=port_range_min, port_range_max=port_range_max,
remote_ip_prefix=remote_ip_prefix,
ethertype=ethertype) as rule:
# the lower case value will be return
self.assertEqual(protocol.lower(),
rule['default_security_group_rule']['protocol'])
self.assertEqual(const.IPv4,
rule['default_security_group_rule']['ethertype'])
def test_create_default_security_group_rule_multiple_remotes(self):
sg_id = uuidutils.generate_uuid()
ag_id = uuidutils.generate_uuid()
for remote in [
{'remote_ip_prefix': '10.0.0.0/8', 'remote_group_id': sg_id},
{'remote_group_id': sg_id, 'remote_address_group_id': ag_id},
{'remote_ip_prefix': '10.0.0.0/8',
'remote_address_group_id': ag_id},
{'remote_ip_prefix': '10.0.0.0/8', 'remote_group_id': sg_id,
'remote_address_group_id': ag_id}
]:
rule = self._build_default_security_group_rule(
"ingress", const.PROTO_NAME_TCP, **remote)
res = self._create_default_security_group_rule(self.fmt, rule)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_port_range_min_max_limits(
self):
direction = "ingress"
protocol = const.PROTO_NAME_TCP
port_range_min = const.PORT_RANGE_MIN
port_range_max = const.PORT_RANGE_MAX
# The returned rule should have port range min/max as None
keys = [('direction', direction),
('protocol', protocol),
('port_range_min', None),
('port_range_max', None)]
with self.default_security_group_rule(direction=direction,
protocol=protocol,
port_range_min=port_range_min,
port_range_max=port_range_max
) as rule:
for k, v, in keys:
self.assertEqual(v, rule['default_security_group_rule'][k])
def test_create_default_security_group_rule_duplicate_rules(self):
with self.default_security_group_rule() as sgr:
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_TCP, '22', '22')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
self.assertIn(sgr['default_security_group_rule']['id'],
res.json['NeutronError']['message'])
def test_default_create_security_group_rule_duplicate_rules_diff_desc(
self):
with self.default_security_group_rule() as sgr:
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_TCP, '22', '22')
rule['default_security_group_rule']['description'] = "description"
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
self.assertIn(sgr['default_security_group_rule']['id'],
res.json['NeutronError']['message'])
def test_create_default_security_group_rule_duplicate_rules_proto_name_num(
self):
with self.default_security_group_rule():
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_TCP, '22', '22')
self._create_default_security_group_rule(self.fmt, rule)
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NUM_TCP, '22', '22')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_default_security_group_rule_duplicate_rules_proto_num_name(
self):
with self.default_security_group_rule():
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NUM_UDP, '50', '100')
self._create_default_security_group_rule(self.fmt, rule)
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_UDP, '50', '100')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
def test_create_default_security_group_rule_min_port_greater_max(self):
with self.default_security_group_rule():
for protocol in [const.PROTO_NAME_TCP, const.PROTO_NAME_UDP,
const.PROTO_NUM_TCP, const.PROTO_NUM_UDP]:
rule = self._build_default_security_group_rule(
'ingress', protocol, '50', '22')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code,
res.status_int)
def test_create_default_security_group_rule_ports_but_no_protocol(self):
with self.default_security_group_rule():
rule = self._build_default_security_group_rule(
'ingress', None, '22', '22')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_port_range_min_only(self):
with self.default_security_group_rule():
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_TCP, '22', None)
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_port_range_max_only(self):
with self.default_security_group_rule():
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_TCP, None, '22')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_icmp_type_too_big(self):
with self.default_security_group_rule():
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_ICMP, '256', None)
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_icmp_code_too_big(self):
with self.default_security_group_rule():
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_ICMP, '8', '256')
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_default_security_group_rule_icmp_with_code_only(self):
with self.default_security_group_rule():
for code in ['2', '0']:
rule = self._build_default_security_group_rule(
'ingress', const.PROTO_NAME_ICMP, None, code)
res = self._create_default_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code,
res.status_int)
def test_list_defaut_security_group_rules(self):
with self.default_security_group_rule(
direction='egress', port_range_min=22,
port_range_max=22) as rule1,\
self.default_security_group_rule(
direction='egress', port_range_min=23,
port_range_max=23) as rule2,\
self.default_security_group_rule(
direction='egress', port_range_min=24,
port_range_max=24) as rule3:
self._test_list_resources('default-security-group-rule',
[rule1, rule2, rule3],
query_params='direction=egress')
def test_list_defaut_security_group_rules_with_sort(self):
with self.default_security_group_rule(
direction='egress', port_range_min=22,
port_range_max=22) as rule1,\
self.default_security_group_rule(
direction='egress', port_range_min=23,
port_range_max=23) as rule2,\
self.default_security_group_rule(
direction='egress', port_range_min=24,
port_range_max=24) as rule3:
self._test_list_with_sort('default-security-group-rule',
(rule3, rule2, rule1),
[('port_range_max', 'desc')],
query_params='direction=egress')
def test_list_defaut_security_group_rules_with_pagination(self):
with self.default_security_group_rule(
direction='egress', port_range_min=22,
port_range_max=22) as rule1,\
self.default_security_group_rule(
direction='egress', port_range_min=23,
port_range_max=23) as rule2,\
self.default_security_group_rule(
direction='egress', port_range_min=24,
port_range_max=24) as rule3:
self._test_list_with_pagination(
'default-security-group-rule', (rule3, rule2, rule1),
('port_range_max', 'desc'), 2, 2)
def test_list_defaut_security_group_rules_with_pagination_reverse(self):
with self.default_security_group_rule(
direction='egress', port_range_min=22,
port_range_max=22) as rule1,\
self.default_security_group_rule(
direction='egress', port_range_min=23,
port_range_max=23) as rule2,\
self.default_security_group_rule(
direction='egress', port_range_min=24,
port_range_max=24) as rule3:
self._test_list_with_pagination_reverse(
'default-security-group-rule', (rule3, rule2, rule1),
('port_range_max', 'desc'), 2, 2)

View File

@ -112,6 +112,7 @@ object_data = {
'RouterPort': '1.0-c8c8f499bcdd59186fcd83f323106908',
'RouterRoute': '1.0-07fc5337c801fb8c6ccfbcc5afb45907',
'SecurityGroup': '1.5-7eb8e44c327512e7bb1759ab41ede44b',
'SecurityGroupDefaultRule': '1.0-d498fd4993b6732f3f266c4b7e292e22',
'SecurityGroupPortBinding': '1.0-6879d5c0af80396ef5a72934b6a6ef20',
'SecurityGroupRBAC': '1.1-be82ed54376b85ee4f963d479ac48c91',
'SecurityGroupRule': '1.2-27793368d4ac35f2ed6e0bb653c6aaad',

View File

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects import securitygroup_default_rules
from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
class SecurityGroupDefaultRuleIfaceObjectTestCase(
test_base.BaseObjectIfaceTestCase):
_test_class = securitygroup_default_rules.SecurityGroupDefaultRule
class SecurityGroupDefaultRuleDbObjectTestCase(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = securitygroup_default_rules.SecurityGroupDefaultRule