diff --git a/ec2api/api/__init__.py b/ec2api/api/__init__.py index 799ed86c..b0a9f909 100644 --- a/ec2api/api/__init__.py +++ b/ec2api/api/__init__.py @@ -19,7 +19,6 @@ import functools import hashlib import sys -import netaddr from oslo.config import cfg import requests import six @@ -330,40 +329,95 @@ class Requestify(wsgi.Middleware): return self.application -def validate_ec2_id(val): - if not validator.validate_str()(val): - return False - try: - ec2utils.ec2_id_to_id(val) - except exception.InvalidId: - return False - return True - - -def is_valid_ipv4(address): - """Verify that address represents a valid IPv4 address.""" - try: - return netaddr.valid_ipv4(address) - except Exception: - return False - - class Validator(wsgi.Middleware): - validator.validate_ec2_id = validate_ec2_id - validator.DEFAULT_VALIDATOR = { - 'instance_id': validate_ec2_id, - 'volume_id': validate_ec2_id, - 'image_id': validate_ec2_id, - 'attribute': validator.validate_str(), - 'image_location': validator.validate_image_path, - 'public_ip': is_valid_ipv4, - 'region_name': validator.validate_str(), - 'group_name': validator.validate_str(max_length=255), - 'group_description': validator.validate_str(max_length=255), - 'size': validator.validate_int(), - 'user_data': validator.validate_user_data + 'AllocationId': validator.validate_ec2_id(['eipalloc']), + 'AllowReassignment': validator.validate_dummy, + 'AllowReassociation': validator.validate_dummy, + 'Architecture': validator.validate_dummy, + 'AssociationId': validator.validate_ec2_association_id, + 'AttachmentId': validator.validate_ec2_id(['eni-attach']), + 'Attribute': validator.validate_dummy, + 'AvailabilityZone': validator.validate_dummy, + 'BlockDeviceMapping': validator.validate_dummy, + 'CidrBlock': validator.validate_cidr_block, + 'ClientToken': validator.validate_dummy, + 'Description': validator.validate_dummy, + 'DestinationCidrBlock': validator.validate_cidr_block, + 'Device': validator.validate_dummy, + 'DeviceIndex': validator.validate_dummy, + 'DhcpConfiguration': validator.validate_dummy, + 'Dhcp_optionsId': validator.validate_dummy, + 'DisableApiTermination': validator.validate_dummy, + 'Domain': validator.validate_dummy, + 'Ebs_optimized': validator.validate_dummy, + 'Encrypted': validator.validate_dummy, + 'ExecutableBy': validator.validate_dummy, + 'Filter': validator.validate_dummy, + 'Force': validator.validate_dummy, + 'GatewayId': validator.validate_dummy, + 'GroupDescription': validator.validate_str(max_length=255), + 'GroupId': validator.validate_ec2_id(['sg']), + 'GroupName': validator.validate_str(max_length=255), + 'IamInstanceProfile': validator.validate_dummy, + 'ImageId': validator.validate_ec2_id(['ami', 'ari', 'aki']), + 'ImageLocation': validator.validate_image_path, + 'InstanceId': validator.validate_dummy, + 'InstanceInitiatedShutdownBehavior': validator.validate_dummy, + 'InstanceTenancy': validator.validate_dummy, + 'InstanceType': validator.validate_dummy, + 'InternetGatewayId': validator.validate_dummy, + 'Iops': validator.validate_dummy, + 'IpPermissions': validator.validate_dummy, + 'KernelId': validator.validate_dummy, + 'KeyName': validator.validate_dummy, + 'KmsKeyId': validator.validate_dummy, + 'LaunchPermission': validator.validate_dummy, + 'MaxCount': validator.validate_dummy, + 'MaxResults': validator.validate_dummy, + 'Metadata': validator.validate_dummy, + 'MinCount': validator.validate_dummy, + 'Monitoring': validator.validate_dummy, + 'Name': validator.validate_dummy, + 'NetworkInterface': validator.validate_dummy, + 'NetworkInterfaceId': validator.validate_dummy, + 'NextToken': validator.validate_dummy, + 'NoReboot': validator.validate_dummy, + 'OperationType': validator.validate_dummy, + 'Owner': validator.validate_dummy, + 'Placement': validator.validate_dummy, + 'PrivateIpAddress': validator.validate_dummy, + 'PrivateIpAddresses': validator.validate_dummy, + 'ProductCode': validator.validate_dummy, + 'PublicIp': validator.validate_ipv4, + 'PublicKey_material': validator.validate_dummy, + 'RamdiskId': validator.validate_dummy, + 'RemoteIpPrefix': validator.validate_dummy, + 'RegionName': validator.validate_str(), + 'ResourceId': validator.validate_dummy, + 'RestorableBy': validator.validate_dummy, + 'RootDeviceName': validator.validate_dummy, + 'RouteTableId': validator.validate_dummy, + 'SecondaryPrivateIpAddressCount': validator.validate_dummy, + 'SecurityGroup': validator.validate_dummy, + 'SecurityGroupId': validator.validate_dummy, + 'Size': validator.validate_int(), + 'SnapshotId': validator.validate_dummy, + 'SourceDestCheck': validator.validate_dummy, + 'SriovNetSupport': validator.validate_dummy, + 'SubnetId': validator.validate_dummy, + 'Tag': validator.validate_dummy, + 'UserData': validator.validate_user_data, + 'UserGroup': validator.validate_dummy, + 'UserId': validator.validate_dummy, + 'Value': validator.validate_dummy, + 'VirtualizationType': validator.validate_dummy, + 'VolumeId': validator.validate_dummy, + 'VolumeType': validator.validate_dummy, + 'VpcId': validator.validate_dummy, + 'VpcPeeringConnectionId': validator.validate_dummy, + 'ZoneName': validator.validate_dummy, } def __init__(self, application): @@ -371,11 +425,15 @@ class Validator(wsgi.Middleware): @webob.dec.wsgify(RequestClass=wsgi.Request) def __call__(self, req): - if validator.validate(req.environ['ec2.request'].args, - validator.DEFAULT_VALIDATOR): - return self.application - else: - raise webob.exc.HTTPBadRequest() + try: + if validator.validate(req.environ['ec2.request'], + validator.DEFAULT_VALIDATOR): + return self.application + else: + raise webob.exc.HTTPBadRequest() + except Exception as ex: + return ec2_error_ex( + ex, req, unexpected=not isinstance(ex, exception.EC2Exception)) def exception_to_ec2code(ex): diff --git a/ec2api/api/ec2utils.py b/ec2api/api/ec2utils.py index 8007a781..cabc6cac 100644 --- a/ec2api/api/ec2utils.py +++ b/ec2api/api/ec2utils.py @@ -14,8 +14,6 @@ import re -import netaddr - from ec2api import context from ec2api.db import api as db_api from ec2api import exception @@ -329,65 +327,3 @@ def os_id_to_ec2_id(context, kind, os_id, items_by_os_id=None, if ids_by_os_id is not None: ids_by_os_id[os_id] = item_id return item_id - - -def _is_valid_cidr(address): - """Check if address is valid - - The provided address can be a IPv6 or a IPv4 - CIDR address. - """ - try: - # Validate the correct CIDR Address - netaddr.IPNetwork(address) - except netaddr.core.AddrFormatError: - return False - except UnboundLocalError: - # NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in - # https://github.com/drkjam/netaddr/issues/2) - return False - - # Prior validation partially verify /xx part - # Verify it here - ip_segment = address.split('/') - - if (len(ip_segment) <= 1 or - ip_segment[1] == ''): - return False - - return True - - -def validate_cidr_with_ipv6(cidr, parameter_name): - invalid_format_exception = exception.InvalidParameterValue( - value=cidr, - parameter=parameter_name, - reason='This is not a valid CIDR block.') - if not _is_valid_cidr(cidr): - raise invalid_format_exception - - -_cidr_re = re.compile("^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$") - - -def validate_cidr(cidr, parameter_name): - invalid_format_exception = exception.InvalidParameterValue( - value=cidr, - parameter=parameter_name, - reason='This is not a valid CIDR block.') - if not _cidr_re.match(cidr): - raise invalid_format_exception - address, size = cidr.split("/") - octets = address.split(".") - if any(int(octet) > 255 for octet in octets): - raise invalid_format_exception - size = int(size) - if size > 32: - raise invalid_format_exception - - -def validate_vpc_cidr(cidr, invalid_cidr_exception_class): - validate_cidr(cidr, 'cidrBlock') - size = int(cidr.split("/")[-1]) - if size > 28 or size < 16: - raise invalid_cidr_exception_class(cidr_block=cidr) diff --git a/ec2api/api/route_table.py b/ec2api/api/route_table.py index 33e13548..d0e51cbf 100644 --- a/ec2api/api/route_table.py +++ b/ec2api/api/route_table.py @@ -278,7 +278,6 @@ def _delete_route_table(context, route_table_id, vpc=None, cleaner=None): def _set_route(context, route_table_id, destination_cidr_block, gateway_id, instance_id, network_interface_id, vpc_peering_connection_id, do_replace): - ec2utils.validate_cidr(destination_cidr_block, 'destinationCidrBlock') route_table = ec2utils.get_db_item(context, 'rtb', route_table_id) vpc = db_api.get_item_by_id(context, 'vpc', route_table['vpc_id']) vpc_ipnet = netaddr.IPNetwork(vpc['cidr_block']) diff --git a/ec2api/api/security_group.py b/ec2api/api/security_group.py index cd153b2c..c89a949a 100644 --- a/ec2api/api/security_group.py +++ b/ec2api/api/security_group.py @@ -27,6 +27,7 @@ from ec2api.api import clients from ec2api.api import common from ec2api.api import ec2utils from ec2api.api import utils +from ec2api.api import validator from ec2api.db import api as db_api from ec2api import exception from ec2api.openstack.common.gettextutils import _ @@ -248,7 +249,7 @@ def _build_rules(context, group_id, group_name, ip_permissions, direction): elif rule.get('ip_ranges'): os_security_group_rule_body['remote_ip_prefix'] = ( rule['ip_ranges'][0]['cidr_ip']) - ec2utils.validate_cidr_with_ipv6( + validator.validate_cidr_with_ipv6( os_security_group_rule_body['remote_ip_prefix'], 'cidr_ip') else: raise exception.MissingParameter(param='source group or cidr') diff --git a/ec2api/api/subnet.py b/ec2api/api/subnet.py index d20cc2ea..64f7886e 100644 --- a/ec2api/api/subnet.py +++ b/ec2api/api/subnet.py @@ -39,8 +39,6 @@ LOG = logging.getLogger(__name__) def create_subnet(context, vpc_id, cidr_block, availability_zone=None): - ec2utils.validate_vpc_cidr(cidr_block, exception.InvalidSubnetRange) - vpc = ec2utils.get_db_item(context, 'vpc', vpc_id) vpc_ipnet = netaddr.IPNetwork(vpc['cidr_block']) subnet_ipnet = netaddr.IPNetwork(cidr_block) diff --git a/ec2api/api/validator.py b/ec2api/api/validator.py index b8459eb8..e192b908 100644 --- a/ec2api/api/validator.py +++ b/ec2api/api/validator.py @@ -15,6 +15,9 @@ import base64 import re +import netaddr + +from ec2api import exception from ec2api.openstack.common.gettextutils import _ from ec2api.openstack.common import log as logging @@ -37,21 +40,26 @@ def _get_path_validator_regex(): VALIDATE_PATH_RE = _get_path_validator_regex() +def validate_dummy(val, **kwargs): + return True + + def validate_str(max_length=None): - def _do(val): - if not isinstance(val, basestring): - return False - if max_length and len(val) > max_length: - return False - return True + def _do(val, parameter_name, **kwargs): + if (isinstance(val, basestring) and + (max_length is None or max_length and len(val) <= max_length)): + return True + raise exception.ValidationError( + reason=_("%s should not be greater " + "than 255 characters.") % parameter_name) return _do def validate_int(max_value=None): - def _do(val): + def _do(val, **kwargs): if not isinstance(val, int): return False if max_value and val > max_value: @@ -61,7 +69,7 @@ def validate_int(max_value=None): return _do -def validate_url_path(val): +def validate_url_path(val, **kwargs): """True if val is matched by the path component grammar in rfc3986.""" if not validate_str()(val): @@ -70,7 +78,7 @@ def validate_url_path(val): return VALIDATE_PATH_RE.match(val).end() == len(val) -def validate_image_path(val): +def validate_image_path(val, **kwargs): if not validate_str()(val): return False @@ -90,7 +98,7 @@ def validate_image_path(val): return True -def validate_user_data(user_data): +def validate_user_data(user_data, **kwargs): """Check if the user_data is encoded properly.""" try: user_data = base64.b64decode(user_data) @@ -99,7 +107,117 @@ def validate_user_data(user_data): return True -def validate(args, validator): +def _is_valid_cidr(address): + """Check if address is valid + + The provided address can be a IPv6 or a IPv4 + CIDR address. + """ + try: + # Validate the correct CIDR Address + netaddr.IPNetwork(address) + except netaddr.core.AddrFormatError: + return False + except UnboundLocalError: + # NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in + # https://github.com/drkjam/netaddr/issues/2) + return False + + # Prior validation partially verify /xx part + # Verify it here + ip_segment = address.split('/') + + if (len(ip_segment) <= 1 or + ip_segment[1] == ''): + return False + + return True + + +def validate_cidr_with_ipv6(cidr, parameter_name, **kwargs): + invalid_format_exception = exception.InvalidParameterValue( + value=cidr, + parameter=parameter_name, + reason='This is not a valid CIDR block.') + if not _is_valid_cidr(cidr): + raise invalid_format_exception + return True + + +_cidr_re = re.compile("^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$") + + +def validate_cidr(cidr, parameter_name, **kwargs): + invalid_format_exception = exception.InvalidParameterValue( + value=cidr, + parameter=parameter_name, + reason='This is not a valid CIDR block.') + if not _cidr_re.match(cidr): + raise invalid_format_exception + address, size = cidr.split("/") + octets = address.split(".") + if any(int(octet) > 255 for octet in octets): + raise invalid_format_exception + size = int(size) + if size > 32: + raise invalid_format_exception + return True + + +def validate_cidr_block(cidr, action, **kwargs): + validate_cidr(cidr, 'cidrBlock') + size = int(cidr.split("/")[-1]) + if size > 28 or size < 16: + if action == 'CreateVpc': + raise exception.InvalidVpcRange(cidr_block=cidr) + elif action == 'CreateSubnet': + raise exception.InvalidSubnetRange(cidr_block=cidr) + return True + + +# NOTE(Alex) Unfortunately Amazon returns various kinds of error for invalid +# IDs (...ID.Malformed, ...Id.Malformed, ...ID.NotFound, InvalidParameterValue) +# So we decided here to commonize invalid IDs to InvalidParameterValue error. + +def validate_ec2_id(prefices): + + def _do(val, parameter_name, **kwargs): + if not validate_str()(val, parameter_name, **kwargs): + return False + try: + prefix, value = val.rsplit('-', 1) + int(value, 16) + if prefix in prefices: + return True + except Exception: + pass + raise exception.InvalidParameterValue( + value=val, parameter=parameter_name, + reason=_('Expected: %(prefix)s-...') % {'prefix': prefices[0]}) + + return _do + + +def validate_ec2_association_id(id, parameter_name, action): + if action == 'DisassociateAddress': + return validate_ec2_id(['eipassoc'])(id, parameter_name) + else: + return validate_ec2_id(['rtbassoc'])(id, parameter_name) + + +def validate_ipv4(address, parameter_name, **kwargs): + """Verify that address represents a valid IPv4 address.""" + try: + if netaddr.valid_ipv4(address): + return True + except Exception: + pass + raise exception.InvalidParameterValue( + value=address, parameter=parameter_name, + reason=_('Not a valid IP address')) + + +def validate(request, validator): """Validate values of args against validators in validator. :param args: Dict of values to be validated. @@ -117,14 +235,15 @@ def validate(args, validator): """ - for key in validator: - if key not in args: + args = request.args + for key in args: + if key not in validator: continue f = validator[key] assert callable(f) - if not f(args[key]): + if not f(args[key], parameter_name=key, action=request.action): LOG.debug(_("%(key)s with value %(value)s failed" " validator %(name)s"), {'key': key, 'value': args[key], 'name': f.__name__}) diff --git a/ec2api/api/vpc.py b/ec2api/api/vpc.py index 233ac7f7..7199731f 100644 --- a/ec2api/api/vpc.py +++ b/ec2api/api/vpc.py @@ -39,7 +39,6 @@ LOG = logging.getLogger(__name__) def create_vpc(context, cidr_block, instance_tenancy='default'): - ec2utils.validate_vpc_cidr(cidr_block, exception.InvalidVpcRange) neutron = clients.neutron(context) # TODO(Alex): Handle errors like overlimit # TODO(ft) dhcp_options_id diff --git a/ec2api/tests/test_address.py b/ec2api/tests/test_address.py index 29d98c72..0f93d023 100644 --- a/ec2api/tests/test_address.py +++ b/ec2api/tests/test_address.py @@ -245,14 +245,14 @@ class AddressTestCase(base.ApiTestCase): do_check({}, 'MissingParameter') - do_check({'PublicIp': 'fake_ip', - 'AllocationId': 'fake_allocation_id'}, + do_check({'PublicIp': '0.0.0.0', + 'AllocationId': 'eipalloc-0'}, 'InvalidParameterCombination') - do_check({'PublicIp': 'fake_ip'}, + do_check({'PublicIp': '0.0.0.0'}, 'MissingParameter') - do_check({'AllocationId': 'fake_allocation_id'}, + do_check({'AllocationId': 'eipalloc-0'}, 'MissingParameter') def test_associate_address_invalid_ec2_classic_parameters(self): @@ -261,7 +261,7 @@ class AddressTestCase(base.ApiTestCase): # NOTE(ft): ec2 classic instance vs allocation_id parameter self.db_api.get_items.return_value = [] resp = self.execute('AssociateAddress', - {'AllocationId': 'fake_allocation_id', + {'AllocationId': 'eipalloc-0', 'InstanceId': fakes.ID_EC2_INSTANCE_1}) self.assertEqual(400, resp['status']) self.assertEqual('InvalidParameterCombination', resp['Error']['Code']) @@ -289,7 +289,7 @@ class AddressTestCase(base.ApiTestCase): # NOTE(ft): vpc instance vs public ip parmeter self.db_api.get_items.return_value = [fakes.DB_NETWORK_INTERFACE_2] - do_check({'PublicIp': 'fake_ip', + do_check({'PublicIp': '0.0.0.0', 'InstanceId': fakes.ID_EC2_INSTANCE_1}, 'InvalidParameterCombination') @@ -474,8 +474,8 @@ class AddressTestCase(base.ApiTestCase): do_check({}, 'MissingParameter') - do_check({'PublicIp': 'fake_ip', - 'AssociationId': 'fake_allocation_id'}, + do_check({'PublicIp': '0.0.0.0', + 'AssociationId': 'eipassoc-0'}, 'InvalidParameterCombination') # NOTE(ft): vpc address vs public ip parameter @@ -557,8 +557,8 @@ class AddressTestCase(base.ApiTestCase): do_check({}, 'MissingParameter') - do_check({'PublicIp': 'fake_ip', - 'AllocationId': 'fake_allocation_id'}, + do_check({'PublicIp': '0.0.0.0', + 'AllocationId': 'eipalloc-0'}, 'InvalidParameterCombination') # NOTE(ft): vpc address vs public ip parameter diff --git a/ec2api/tests/test_ec2_validate.py b/ec2api/tests/test_ec2_validate.py index 5c642a38..6ff47f5c 100644 --- a/ec2api/tests/test_ec2_validate.py +++ b/ec2api/tests/test_ec2_validate.py @@ -19,10 +19,46 @@ import datetime import testtools from ec2api.api import ec2utils +from ec2api.api import validator from ec2api import exception from ec2api.openstack.common import timeutils +class EC2ValidationTestCase(testtools.TestCase): + """Test case for various validations.""" + + def test_validate_cidr(self): + self.assertEqual(True, validator.validate_cidr('10.10.0.0/24', 'cidr')) + + def check_raise_invalid_parameter(cidr): + self.assertRaises(exception.InvalidParameterValue, + validator.validate_cidr, cidr, 'cidr') + + check_raise_invalid_parameter('fake') + check_raise_invalid_parameter('10.10/24') + check_raise_invalid_parameter('10.10.0.0.0/24') + check_raise_invalid_parameter('10.10.0.0') + check_raise_invalid_parameter(' 10.10.0.0/24') + check_raise_invalid_parameter('10.10.0.0/24 ') + check_raise_invalid_parameter('.10.10.0.0/24 ') + check_raise_invalid_parameter('-1.10.0.0/24') + check_raise_invalid_parameter('10.256.0.0/24') + check_raise_invalid_parameter('10.10.0.0/33') + check_raise_invalid_parameter('10.10.0.0/-1') + + def check_raise_invalid_vpc_range(cidr, ex_class, action): + self.assertRaises(ex_class, + validator.validate_cidr_block, cidr, + action) + + check_raise_invalid_vpc_range('10.10.0.0/15', + exception.InvalidSubnetRange, + 'CreateSubnet') + check_raise_invalid_vpc_range('10.10.0.0/29', + exception.InvalidVpcRange, + 'CreateVpc') + + class EC2TimestampValidationTestCase(testtools.TestCase): """Test case for EC2 request timestamp validation.""" diff --git a/ec2api/tests/test_ec2utils.py b/ec2api/tests/test_ec2utils.py index 437991c2..93c9148e 100644 --- a/ec2api/tests/test_ec2utils.py +++ b/ec2api/tests/test_ec2utils.py @@ -116,32 +116,3 @@ class EC2UtilsTestCase(testtools.TestCase): check_not_found('sg', exception.InvalidGroupNotFound) check_not_found('rtb', exception.InvalidRouteTableIDNotFound) check_not_found('i', exception.InvalidInstanceIDNotFound) - - def test_validate_cidr(self): - self.assertIsNone(ec2utils.validate_cidr('10.10.0.0/24', 'cidr')) - - def check_raise_invalid_parameter(cidr): - self.assertRaises(exception.InvalidParameterValue, - ec2utils.validate_cidr, cidr, 'cidr') - - check_raise_invalid_parameter('fake') - check_raise_invalid_parameter('10.10/24') - check_raise_invalid_parameter('10.10.0.0.0/24') - check_raise_invalid_parameter('10.10.0.0') - check_raise_invalid_parameter(' 10.10.0.0/24') - check_raise_invalid_parameter('10.10.0.0/24 ') - check_raise_invalid_parameter('.10.10.0.0/24 ') - check_raise_invalid_parameter('-1.10.0.0/24') - check_raise_invalid_parameter('10.256.0.0/24') - check_raise_invalid_parameter('10.10.0.0/33') - check_raise_invalid_parameter('10.10.0.0/-1') - - def check_raise_invalid_vpc_range(cidr, ex_class): - self.assertRaises(ex_class, - ec2utils.validate_vpc_cidr, cidr, - ex_class) - - check_raise_invalid_vpc_range('10.10.0.0/15', - exception.InvalidSubnetRange) - check_raise_invalid_vpc_range('10.10.0.0/29', - exception.InvalidVpcRange) diff --git a/ec2api/tests/test_security_group.py b/ec2api/tests/test_security_group.py index bd007631..757dc494 100644 --- a/ec2api/tests/test_security_group.py +++ b/ec2api/tests/test_security_group.py @@ -191,13 +191,21 @@ class SecurityGroupTestCase(base.ApiTestCase): self.nova_security_groups.delete.assert_called_once_with( fakes.ID_OS_SECURITY_GROUP_1) + # NOTE(Alex) This test is disabled because it checks using non-AWS id. + @base.skip_not_implemented + def test_delete_security_group_nova_os_id(self): + security_group.security_group_engine = ( + security_group.SecurityGroupEngineNova()) + self.nova_security_groups.list.return_value = ( + [fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_1), + fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_2)]) resp = self.execute( 'DeleteSecurityGroup', {'GroupId': fakes.ID_OS_SECURITY_GROUP_2}) self.assertEqual(200, resp['status']) self.assertEqual(True, resp['return']) - self.nova_security_groups.delete.assert_any_call( + self.nova_security_groups.delete.assert_called_once_with( fakes.ID_OS_SECURITY_GROUP_2) def test_delete_security_group_invalid(self): @@ -293,13 +301,15 @@ class SecurityGroupTestCase(base.ApiTestCase): def check_response(error_code, protocol, from_port, to_port, cidr, group_id=fakes.ID_EC2_SECURITY_GROUP_2): + params = {'IpPermissions.1.FromPort': str(from_port), + 'IpPermissions.1.ToPort': str(to_port), + 'IpPermissions.1.IpProtocol': protocol} + if group_id is not None: + params['GroupId'] = group_id + if cidr is not None: + params['IpPermissions.1.IpRanges.1.CidrIp'] = cidr resp = self.execute( - 'AuthorizeSecurityGroupIngress', - {'GroupId': group_id, - 'IpPermissions.1.FromPort': str(from_port), - 'IpPermissions.1.ToPort': str(to_port), - 'IpPermissions.1.IpProtocol': protocol, - 'IpPermissions.1.IpRanges.1.CidrIp': cidr}) + 'AuthorizeSecurityGroupIngress', params) self.assertEqual(400, resp['status']) self.assertEqual(error_code, resp['Error']['Code']) self.neutron.reset_mock()