Added absent security groups unit tests and functionality

Change-Id: I4004c9c1b68bee707122f45016d3a7dbb4a7beec
This commit is contained in:
alevine 2014-12-20 20:53:40 +04:00
parent daf47b16ef
commit 8053ca77bb
6 changed files with 396 additions and 33 deletions

View File

@ -14,6 +14,8 @@
import re
import netaddr
from ec2api import context
from ec2api.db import api as db_api
from ec2api import exception
@ -456,6 +458,42 @@ def os_id_to_ec2_id(context, kind, os_id, items_by_os_id=None,
return item_id
def _is_valid_cidr(address):
"""Check if address is valid
The provided address can be a IPv6 or a IPv4
CIDR address.
"""
try:
# Validate the correct CIDR Address
netaddr.IPNetwork(address)
except netaddr.core.AddrFormatError:
return False
except UnboundLocalError:
# NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in
# https://github.com/drkjam/netaddr/issues/2)
return False
# Prior validation partially verify /xx part
# Verify it here
ip_segment = address.split('/')
if (len(ip_segment) <= 1 or
ip_segment[1] == ''):
return False
return True
def validate_cidr_with_ipv6(cidr, parameter_name):
invalid_format_exception = exception.InvalidParameterValue(
value=cidr,
parameter=parameter_name,
reason='This is not a valid CIDR block.')
if not _is_valid_cidr(cidr):
raise invalid_format_exception
_cidr_re = re.compile("^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$")

View File

@ -14,11 +14,13 @@
import copy
import re
try:
from neutronclient.common import exceptions as neutron_exception
except ImportError:
pass # clients will log absense of neutronclient in this case
from novaclient import exceptions as nova_exception
from oslo.config import cfg
from ec2api.api import clients
@ -27,6 +29,7 @@ from ec2api.api import ec2utils
from ec2api.api import utils
from ec2api.db import api as db_api
from ec2api import exception
from ec2api.openstack.common.gettextutils import _
from ec2api.openstack.common import log as logging
@ -53,10 +56,14 @@ def get_security_group_engine():
def create_security_group(context, group_name, group_description,
vpc_id=None):
_validate_security_group_naming(group_name, group_description, vpc_id)
nova = clients.nova(context)
with utils.OnCrashCleaner() as cleaner:
os_security_group = nova.security_groups.create(group_name,
group_description)
try:
os_security_group = nova.security_groups.create(group_name,
group_description)
except nova_exception.OverLimit:
raise exception.ResourceLimitExceeded(resource='security groups')
cleaner.addCleanup(nova.security_groups.delete,
os_security_group.id)
if vpc_id:
@ -70,18 +77,53 @@ def create_security_group(context, group_name, group_description,
return {'return': 'true'}
def _validate_security_group_naming(group_name, group_description, vpc_id):
if group_name is None:
raise exception.MissingParameter(param='group name')
if group_description is None:
raise exception.MissingParameter(param='group description')
# NOTE(Alex) Amazon accepts any ASCII for EC2 classic;
# for EC2-VPC: a-z, A-Z, 0-9, spaces, and ._-:/()#,@[]+=&;{}!$*
if vpc_id:
allowed = '^[a-zA-Z0-9\._\-:/\(\)#,@\[\]\+=&;\{\}!\$\*\ ]+$'
else:
allowed = r'^[\x20-\x7E]+$'
_validate_property(group_name, 'name', allowed)
_validate_property(group_description, 'description', allowed)
def _validate_property(value, property, allowed):
msg = ''
try:
val = value.strip()
except AttributeError:
msg = _("Security group %s is not a string or unicode") % property
if not val:
msg = _("Security group %s cannot be empty.") % property
elif allowed and not re.match(allowed, val):
# Some validation to ensure that values match API spec.
# - Alphanumeric characters, spaces, dashes, and underscores.
# TODO(Daviey): LP: #813685 extend beyond group_name checking, and
# probably create a param validator that can be used elsewhere.
msg = (_("Specified value for parameter Group%(property)s is "
"invalid. Content limited to '%(allowed)s'.") %
{'allowed': 'allowed',
'property': property})
elif len(val) > 255:
msg = _("Security group %s should not be greater "
"than 255 characters.") % property
if msg:
raise exception.ValidationError(reason=msg)
def _create_default_security_group(context, vpc):
neutron = clients.neutron(context)
os_security_group = neutron.create_security_group(
{'security_group':
{'name': 'Default',
'description': 'Default VPC security group'}})['security_group']
security_group = db_api.add_item(context, 'sg',
{'vpc_id': vpc['id'],
'os_id': os_security_group['id']})
return create_security_group(context, 'Default',
'Default VPC security group', vpc['id'])
def delete_security_group(context, group_name=None, group_id=None):
if group_name is None and group_id is None:
raise exception.MissingParameter(param='group id or name')
security_group_engine.delete_group(context, group_name, group_id)
return True
@ -93,9 +135,14 @@ class SecurityGroupDescriber(common.UniversalDescriber):
'group-name': 'groupName',
'group-id': 'groupId'}
def __init__(self):
self.all_db_items = None
def format(self, item=None, os_item=None):
if self.all_db_items is None:
self.all_db_items = ec2utils.get_db_items(self.context, 'sg', None)
return _format_security_group(item, os_item,
self.items, self.os_items)
self.all_db_items, self.os_items)
def get_os_items(self):
return security_group_engine.get_os_groups(self.context)
@ -128,7 +175,42 @@ def _authorize_security_group(context, group_id, group_name,
return True
def _validate_parameters(protocol, from_port, to_port):
if (not isinstance(protocol, int) and
protocol not in ['tcp', 'udp', 'icmp']):
raise exception.InvalidParameterValue(
_('Invalid value for IP protocol. Unknown protocol.'))
if (not isinstance(from_port, int) or
not isinstance(to_port, int)):
raise exception.InvalidParameterValue(
_('Integer values should be specified for ports'))
if protocol in ['tcp', 'udp', 6, 17]:
if from_port == -1 or to_port == -1:
raise exception.InvalidParameterValue(
_('Must specify both from and to ports with TCP/UDP.'))
if from_port > to_port:
raise exception.InvalidParameterValue(
_('Invalid TCP/UDP port range.'))
if from_port < 0 or from_port > 65535:
raise exception.InvalidParameterValue(
_('TCP/UDP from port is out of range.'))
if to_port < 0 or to_port > 65535:
raise exception.InvalidParameterValue(
_('TCP/UDP to port is out of range.'))
elif protocol in ['icmp', 1]:
if from_port < -1 or from_port > 255:
raise exception.InvalidParameterValue(
_('ICMP type is out of range.'))
if to_port < -1 or to_port > 255:
raise exception.InvalidParameterValue(
_('ICMP code is out of range.'))
def _build_rules(context, group_id, group_name, ip_permissions, direction):
if group_name is None and group_id is None:
raise exception.MissingParameter(param='group id or name')
if ip_permissions is None:
raise exception.MissingParameter(param='source group or cidr')
os_security_group_id = security_group_engine.get_group_os_id(context,
group_id,
group_name)
@ -140,14 +222,19 @@ def _build_rules(context, group_id, group_name, ip_permissions, direction):
{'security_group_id': os_security_group_id,
'direction': direction,
'ethertype': 'IPv4'})
if rule.get('ip_protocol', -1) != -1:
protocol = rule.get('ip_protocol', -1)
from_port = rule.get('from_port', -1)
to_port = rule.get('to_port', -1)
_validate_parameters(protocol, from_port, to_port)
if protocol != -1:
os_security_group_rule_body['protocol'] = rule['ip_protocol']
if rule.get('from_port', -1) != -1:
if from_port != -1:
os_security_group_rule_body['port_range_min'] = rule['from_port']
if rule.get('to_port', -1) != -1:
if to_port != -1:
os_security_group_rule_body['port_range_max'] = rule['to_port']
# TODO(Alex) AWS protocol claims support of multiple groups and cidrs,
# however, neither aws cli, nor neutron support it at the moment.
# however, neutron doesn't support it at the moment.
# It's possible in the future to convert list values incoming from
# REST API into several neutron rules and squeeze them back into one
# for describing.
@ -161,6 +248,10 @@ def _build_rules(context, group_id, group_name, ip_permissions, direction):
elif rule.get('ip_ranges'):
os_security_group_rule_body['remote_ip_prefix'] = (
rule['ip_ranges'][0]['cidr_ip'])
ec2utils.validate_cidr_with_ipv6(
os_security_group_rule_body['remote_ip_prefix'], 'cidr_ip')
else:
raise exception.MissingParameter(param='source group or cidr')
os_security_group_rule_bodies.append(os_security_group_rule_body)
return os_security_group_rule_bodies
@ -326,8 +417,10 @@ class SecurityGroupEngineNeutron(object):
try:
os_security_group_rule = neutron.create_security_group_rule(
{'security_group_rule': rule_body})['security_group_rule']
except neutron_exception.OverQuotaClient:
raise exception.RulesPerSecurityGroupLimitExceeded()
except neutron_exception.Conflict as ex:
raise exception.RuleAlreadyExists()
raise exception.InvalidPermissionDuplicate()
def get_os_group_rules(self, context, os_id):
neutron = clients.neutron(context)
@ -351,10 +444,9 @@ class SecurityGroupEngineNova(object):
def delete_group(self, context, group_name=None, group_id=None):
nova = clients.nova(context)
os_id = self.get_group_os_id(context, group_id, group_name)
try:
nova.security_groups.delete(self.get_group_os_id(context,
group_id,
group_name))
nova.security_groups.delete(os_id)
except Exception as ex:
# TODO(Alex): do log error
# nova doesn't differentiate Conflict exception like neutron does
@ -376,9 +468,10 @@ class SecurityGroupEngineNova(object):
rule_body.get('port_range_max', -1),
rule_body.get('remote_ip_prefix'),
rule_body.get('remote_group_id'))
except Exception as ex:
# TODO(Alex) resolve Conflict exceptions
raise ex
except nova_exception.Conflict:
raise exception.InvalidPermissionDuplicate()
except nova_exception.OverLimit:
raise exception.RulesPerSecurityGroupLimitExceeded()
def get_os_group_rules(self, context, os_id):
nova = clients.nova(context)
@ -434,6 +527,8 @@ class SecurityGroupEngineNova(object):
def get_group_os_id(self, context, group_id, group_name,
nova_security_groups=None):
if group_id:
return group_id
nova_group = self.get_nova_group_by_name(context, group_name,
nova_security_groups)
return nova_group.id

View File

@ -111,7 +111,7 @@ def add_item(context, kind, data):
item_ref = models.Item()
item_ref.update({
"project_id": context.project_id,
"id": _new_id(kind, data["os_id"]),
"id": _new_id(kind, data.get("os_id")),
})
item_ref.update(_pack_item_data(data))
try:

View File

@ -99,6 +99,11 @@ class Unsupported(EC2Exception):
code = 400
class Overlimit(EC2Exception):
msg_fmt = _("Limit exceeded.")
code = 400
class Invalid(EC2Exception):
msg_fmt = _("Unacceptable parameters.")
code = 400
@ -147,6 +152,11 @@ class NotFound(EC2Exception):
code = 404
class ValidationError(Invalid):
msg_fmt = _("The input fails to satisfy the constraints "
"specified by an AWS service: '%(reason)s'")
class EC2NotFound(NotFound):
code = 400
@ -282,7 +292,7 @@ class InvalidSubnetConflict(Invalid):
class MissingParameter(Invalid):
pass
msg_fmt = _("The required parameter '%(param)s' is missing")
class InvalidParameterValue(Invalid):
@ -304,7 +314,6 @@ class GatewayNotAttached(Invalid):
class DependencyViolation(Invalid):
ec2_code = 'DependencyViolation'
msg_fmt = _('Object %(obj1_id)s has dependent resource %(obj2_id)s')
@ -335,15 +344,13 @@ class RouteAlreadyExists(Invalid):
'already exists.')
class NetworkInterfaceLimitExceeded(Invalid):
ec2_code = 'NetworkInterfaceLimitExceeded'
class NetworkInterfaceLimitExceeded(Overlimit):
msg_fmt = _('You have reached the limit of network interfaces for subnet'
'%(subnet_id)s.')
# TODO(Alex) Change next class with the real AWS exception
class RuleAlreadyExists(Invalid):
msg_fmt = _('The rule already exists.')
class ResourceLimitExceeded(Overlimit):
msg_fmt = _('You have reached the limit of %(resource)s')
class ImageNotActive(Invalid):
@ -371,3 +378,13 @@ class InvalidAvailabilityZoneNotFound(NotFound):
class KeyPairExists(Invalid):
ec2_code = 'InvalidKeyPair.Duplicate'
msg_fmt = _("Key pair '%(key_name)s' already exists.")
class InvalidPermissionDuplicate(Invalid):
ec2_code = 'InvalidPermission.Duplicate'
msg_fmt = _("The specified rule already exists for that security group.")
class RulesPerSecurityGroupLimitExceeded(Overlimit):
msg_fmt = _("You've reached the limit on the number of rules that "
"you can add to a security group.")

View File

@ -751,7 +751,7 @@ OS_SECURITY_GROUP_1 = {
}
OS_SECURITY_GROUP_2 = {
'id': ID_OS_SECURITY_GROUP_2,
'name': 'groupname',
'name': 'groupname2',
'security_group_rules': [
OS_SECURITY_GROUP_RULE_1,
OS_SECURITY_GROUP_RULE_2
@ -823,7 +823,7 @@ EC2_SECURITY_GROUP_2 = {
'ipRanges':
[{'cidrIp': '192.168.1.0/24'}]
}],
'groupName': 'groupname',
'groupName': 'groupname2',
'ipPermissionsEgress':
[{'toPort': -1,
'ipProtocol': 100,

View File

@ -17,6 +17,7 @@ import copy
import mock
from neutronclient.common import exceptions as neutron_exception
from novaclient import exceptions as nova_exception
from ec2api.api import security_group
from ec2api.tests import base
@ -57,6 +58,89 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.nova_security_groups.create.assert_called_once_with(
'groupname', 'Group description')
def test_create_security_group_invalid(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
def check_response(resp, error_code):
self.assertEqual(400, resp['status'])
self.assertEqual(error_code, resp['Error']['Code'])
self.neutron.reset_mock()
self.db_api.reset_mock()
self.db_api.get_item_by_id.return_value = None
resp = self.execute(
'CreateSecurityGroup',
{'VpcId': fakes.ID_EC2_VPC_1,
'GroupName': 'groupname',
'GroupDescription': 'Group description'})
self.db_api.get_item_by_id.assert_called_once_with(mock.ANY, 'vpc',
fakes.ID_EC2_VPC_1)
check_response(resp, 'InvalidVpcID.NotFound')
resp = self.execute(
'CreateSecurityGroup',
{'VpcId': fakes.ID_EC2_VPC_1,
'GroupName': 'aa #^% -=99',
'GroupDescription': 'Group description'})
check_response(resp, 'ValidationError')
resp = self.execute(
'CreateSecurityGroup',
{'VpcId': fakes.ID_EC2_VPC_1,
'GroupName': 'groupname',
'GroupDescription': 'aa #^% -=99'})
check_response(resp, 'ValidationError')
resp = self.execute(
'CreateSecurityGroup',
{'GroupName': 'aa \t\x01\x02\x7f',
'GroupDescription': 'Group description'})
check_response(resp, 'ValidationError')
resp = self.execute(
'CreateSecurityGroup',
{'GroupName': 'groupname',
'GroupDescription': 'aa \t\x01\x02\x7f'})
check_response(resp, 'ValidationError')
resp = self.execute(
'CreateSecurityGroup',
{'GroupName': 'x' * 256,
'GroupDescription': 'Group description'})
check_response(resp, 'ValidationError')
resp = self.execute(
'CreateSecurityGroup',
{'GroupName': 'groupname',
'GroupDescription': 'x' * 256})
check_response(resp, 'ValidationError')
resp = self.execute(
'CreateSecurityGroup',
{'GroupName': 'groupname'})
check_response(resp, 'MissingParameter')
resp = self.execute(
'CreateSecurityGroup',
{'GroupDescription': 'description'})
check_response(resp, 'MissingParameter')
def test_create_security_group_over_quota(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.nova_security_groups.create.side_effect = (
nova_exception.OverLimit(413))
resp = self.execute(
'CreateSecurityGroup',
{'VpcId': fakes.ID_EC2_VPC_1,
'GroupName': 'groupname',
'GroupDescription': 'Group description'})
self.assertEqual(400, resp['status'])
self.assertEqual('ResourceLimitExceeded', resp['Error']['Code'])
self.nova_security_groups.create.assert_called_once_with(
'groupname', 'Group description')
def test_create_security_group_rollback(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
@ -107,7 +191,16 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.nova_security_groups.delete.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_1)
def test_delete_security_group_no_security_group(self):
resp = self.execute(
'DeleteSecurityGroup',
{'GroupId':
fakes.ID_OS_SECURITY_GROUP_2})
self.assertEqual(200, resp['status'])
self.assertEqual(True, resp['return'])
self.nova_security_groups.delete.assert_any_call(
fakes.ID_OS_SECURITY_GROUP_2)
def test_delete_security_group_invalid(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.get_item_by_id.return_value = None
@ -119,6 +212,20 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.assertEqual('InvalidGroup.NotFound',
resp['Error']['Code'])
self.assertEqual(0, self.neutron.delete_port.call_count)
resp = self.execute(
'DeleteSecurityGroup',
{'GroupName':
'badname'})
self.assertEqual(400, resp['status'])
self.assertEqual('InvalidGroup.NotFound',
resp['Error']['Code'])
self.assertEqual(0, self.neutron.delete_port.call_count)
resp = self.execute(
'DeleteSecurityGroup', {})
self.assertEqual(400, resp['status'])
self.assertEqual('MissingParameter',
resp['Error']['Code'])
self.assertEqual(0, self.neutron.delete_port.call_count)
def test_delete_security_group_is_in_use(self):
security_group.security_group_engine = (
@ -150,6 +257,21 @@ class SecurityGroupTestCase(base.ApiTestCase):
[fakes.EC2_SECURITY_GROUP_1,
fakes.EC2_SECURITY_GROUP_2],
orderless_lists=True))
resp = self.execute('DescribeSecurityGroups',
{'GroupName.1': 'groupname2'})
self.assertEqual(200, resp['status'])
self.assertThat(resp['securityGroupInfo'],
matchers.ListMatches(
[fakes.EC2_SECURITY_GROUP_2],
orderless_lists=True))
self.db_api.get_items_by_ids.return_value = [fakes.DB_SECURITY_GROUP_2]
resp = self.execute('DescribeSecurityGroups',
{'GroupId.1': fakes.ID_EC2_SECURITY_GROUP_2})
self.assertEqual(200, resp['status'])
self.assertThat(resp['securityGroupInfo'],
matchers.ListMatches(
[fakes.EC2_SECURITY_GROUP_2],
orderless_lists=True))
def test_describe_security_groups_nova(self):
security_group.security_group_engine = (
@ -165,6 +287,82 @@ class SecurityGroupTestCase(base.ApiTestCase):
fakes.EC2_NOVA_SECURITY_GROUP_2],
orderless_lists=True))
def test_authorize_security_group_invalid(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
def check_response(error_code, protocol, from_port, to_port, cidr,
group_id=fakes.ID_EC2_SECURITY_GROUP_2):
resp = self.execute(
'AuthorizeSecurityGroupIngress',
{'GroupId': group_id,
'IpPermissions.1.FromPort': str(from_port),
'IpPermissions.1.ToPort': str(to_port),
'IpPermissions.1.IpProtocol': protocol,
'IpPermissions.1.IpRanges.1.CidrIp': cidr})
self.assertEqual(400, resp['status'])
self.assertEqual(error_code, resp['Error']['Code'])
self.neutron.reset_mock()
self.db_api.reset_mock()
resp = self.execute(
'AuthorizeSecurityGroupIngress',
{'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
'IpPermissions.1.FromPort': '-1',
'IpPermissions.1.ToPort': '-1',
'IpPermissions.1.IpProtocol': 'icmp',
'IpPermissions.1.IpRanges.1.CidrIp': '0.0.0.0/0'})
self.assertEqual(200, resp['status'])
# Duplicate rule
self.db_api.get_item_by_id.side_effect = copy.deepcopy(
fakes.get_db_api_get_item_by_id({
fakes.ID_EC2_SECURITY_GROUP_1: fakes.DB_SECURITY_GROUP_1,
fakes.ID_EC2_SECURITY_GROUP_2: fakes.DB_SECURITY_GROUP_2}))
self.neutron.create_security_group_rule.side_effect = (
neutron_exception.Conflict)
check_response('InvalidPermission.Duplicate', 'icmp',
-1, -1, '0.0.0.0/0')
# Over quota
self.neutron.create_security_group_rule.side_effect = (
neutron_exception.OverQuotaClient)
check_response('RulesPerSecurityGroupLimitExceeded', 'icmp', -1, -1,
'0.0.0.0/0')
# Invalid CIDR address
check_response('InvalidParameterValue', 'tcp', 80, 81, '0.0.0.0/0444')
# Missing ports
check_response('InvalidParameterValue', 'tcp', -1, -1, '0.0.0.0/0')
# from port cannot be greater than to port
check_response('InvalidParameterValue', 'tcp', 100, 1, '0.0.0.0/0')
# For tcp, negative values are not allowed
check_response('InvalidParameterValue', 'tcp', -1, 1, '0.0.0.0/0')
# For tcp, valid port range 1-65535
check_response('InvalidParameterValue', 'tcp', 1, 65599, '0.0.0.0/0')
# Invalid protocol
check_response('InvalidParameterValue', 'xyz', 1, 14, '0.0.0.0/0')
# Invalid port
check_response('InvalidParameterValue', 'tcp', " ", "gg", '0.0.0.0/0')
# Invalid icmp port
check_response('InvalidParameterValue', 'icmp', " ", "gg", '0.0.0.0/0')
# Invalid CIDR Address
check_response('InvalidParameterValue', 'icmp', -1, -1, '0.0.0.0')
# Invalid CIDR Address
check_response('InvalidParameterValue', 'icmp', 5, 10, '0.0.0.0/')
# Invalid Cidr ports
check_response('InvalidParameterValue', 'icmp', 1, 256, '0.0.0.0/0')
# Missing group
check_response('MissingParameter', 'tcp', 1, 255, '0.0.0.0/0', None)
# Missing cidr
check_response('MissingParameter', 'tcp', 1, 255, None)
# Invalid remote group
resp = self.execute(
'AuthorizeSecurityGroupIngress',
{'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
'IpPermissions.1.IpProtocol': 'icmp',
'IpPermissions.1.Groups.1.GroupName': 'somegroup',
'IpPermissions.1.Groups.1.UserId': 'i-99999999'})
self.assertEqual(400, resp['status'])
self.assertEqual('InvalidGroup.NotFound', resp['Error']['Code'])
def test_authorize_security_group_ingress_ip_ranges(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
@ -186,6 +384,21 @@ class SecurityGroupTestCase(base.ApiTestCase):
{'security_group_rule':
tools.purge_dict(fakes.OS_SECURITY_GROUP_RULE_1,
{'id', 'remote_group_id', 'tenant_id'})})
# NOTE(Alex): Openstack extension, AWS-incompability
# IPv6 is not supported by Amazon.
resp = self.execute(
'AuthorizeSecurityGroupIngress',
{'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
'IpPermissions.1.FromPort': '10',
'IpPermissions.1.ToPort': '10',
'IpPermissions.1.IpProtocol': 'tcp',
'IpPermissions.1.IpRanges.1.CidrIp': '::/0'})
self.assertEqual(200, resp['status'])
self.neutron.create_security_group_rule.assert_called_with(
{'security_group_rule':
tools.patch_dict(
fakes.OS_SECURITY_GROUP_RULE_1, {'remote_ip_prefix': '::/0'},
{'id', 'remote_group_id', 'tenant_id'})})
def test_authorize_security_group_ip_ranges_nova(self):
security_group.security_group_engine = (