fix ec2-api after deprecation of nova-network functions in novaclient

removed NovaEngine in addresses, availability_zones, instances, security_groups
removed unit tests for NovaEngine
disabled some unit tests using NovaEngine for further reworking

Closes-bug: #1691484
Change-Id: I662d5b57b9e46be80c1d2093038ada83897565eb
This commit is contained in:
tikitavi 2017-05-19 00:59:26 +03:00
parent 478a1897c3
commit 20f65fac2d
17 changed files with 176 additions and 759 deletions

View File

@ -149,13 +149,6 @@ function configure_ec2api_networking {
if [[ -n "$ext_net" ]]; then
iniset $EC2API_CONF_FILE DEFAULT external_network $ext_net
fi
if [[ ,${ENABLED_SERVICES} =~ ,"q-" ]]; then
iniset $EC2API_CONF_FILE DEFAULT full_vpc_support True
iniset $EC2API_CONF_FILE DEFAULT disable_ec2_api True
else
iniset $EC2API_CONF_FILE DEFAULT full_vpc_support False
fi
}
# Entry points

View File

@ -16,7 +16,6 @@ try:
from neutronclient.common import exceptions as neutron_exception
except ImportError:
pass # clients will log absense of neutronclient in this case
from novaclient import exceptions as nova_exception
from oslo_config import cfg
from oslo_log import log as logging
@ -39,10 +38,7 @@ Validator = common.Validator
def get_address_engine():
if CONF.full_vpc_support:
return AddressEngineNeutron()
else:
return AddressEngineNova()
return AddressEngineNeutron()
def allocate_address(context, domain=None):
@ -152,18 +148,10 @@ def _format_address(context, address, os_floating_ip, os_ports=[],
fixed_ip_address = os_floating_ip.get('fixed_ip_address')
if fixed_ip_address:
ec2_address['privateIpAddress'] = fixed_ip_address
port_id = os_floating_ip.get('port_id')
os_fip = os_floating_ip.get('instance_id')
if port_id:
port = next((port for port in os_ports
if port['id'] == port_id), None)
if port and port.get('device_id'):
ec2_address['instanceId'] = (
_get_instance_ec2_id_by_os_id(context, port['device_id'],
db_instances_dict))
elif os_fip:
os_instance_id = _get_os_instance_id(context, os_floating_ip, os_ports)
if os_instance_id:
ec2_address['instanceId'] = (
_get_instance_ec2_id_by_os_id(context, os_fip,
_get_instance_ec2_id_by_os_id(context, os_instance_id,
db_instances_dict))
if not address:
ec2_address['domain'] = 'standard'
@ -207,12 +195,21 @@ def _disassociate_address_item(context, address):
db_api.update_item(context, address)
def _get_os_instance_id(context, os_floating_ip,
os_ports=[]):
port_id = os_floating_ip.get('port_id')
os_instance_id = None
if port_id:
port = next((port for port in os_ports
if port['id'] == port_id), None)
if port and port.get('device_id'):
os_instance_id = port['device_id']
return os_instance_id
class AddressEngineNeutron(object):
def allocate_address(self, context, domain=None):
if ((not domain or domain == 'standard') and
not CONF.disable_ec2_classic):
return AddressEngineNova().allocate_address(context)
os_public_network = ec2utils.get_os_public_network(context)
neutron = clients.neutron(context)
@ -224,6 +221,9 @@ class AddressEngineNeutron(object):
except neutron_exception.OverQuotaClient:
raise exception.AddressLimitExceeded()
os_floating_ip = os_floating_ip['floatingip']
if ((not domain or domain == 'standard') and
not CONF.disable_ec2_classic):
return None, os_floating_ip
cleaner.addCleanup(neutron.delete_floatingip, os_floating_ip['id'])
address = {'os_id': os_floating_ip['id'],
@ -242,8 +242,13 @@ class AddressEngineNeutron(object):
msg = _('You must specify an allocation id when releasing a '
'VPC elastic IP address')
raise exception.InvalidParameterValue(msg)
return AddressEngineNova().release_address(context,
public_ip, None)
os_floating_ip = self.get_os_floating_ip_by_public_ip(context,
public_ip)
try:
neutron.delete_floatingip(os_floating_ip['id'])
except neutron_exception.NotFound:
pass
return
address = ec2utils.get_db_item(context, allocation_id)
if not _is_address_valid(context, neutron, address):
@ -308,15 +313,14 @@ class AddressEngineNeutron(object):
"The address '%(public_ip)s' does not belong to you.")
raise exception.AuthFailure(msg % {'public_ip': public_ip})
# NOTE(ft): in fact only the first two parameters are used to
# associate an address in EC2 Classic mode. Other parameters
# are sent to validate their emptiness in one place
return AddressEngineNova().associate_address(
context, public_ip=public_ip, instance_id=instance_id,
allocation_id=allocation_id,
network_interface_id=network_interface_id,
private_ip_address=private_ip_address,
allow_reassociation=allow_reassociation)
os_instance_id = ec2utils.get_db_item(context,
instance_id)['os_id']
# NOTE(ft): check the public IP exists to raise AWS exception
# otherwise
self.get_os_floating_ip_by_public_ip(context, public_ip)
nova = clients.nova(context)
nova.servers.add_floating_ip(os_instance_id, public_ip)
return None
if not address:
msg = _("The address '%(public_ip)s' does not belong to you.")
@ -395,11 +399,18 @@ class AddressEngineNeutron(object):
msg = _('You must specify an association id when '
'unmapping an address from a VPC instance')
raise exception.InvalidParameterValue(msg)
# NOTE(ft): association_id is unused in EC2 Classic mode,
# but it's passed there to validate its emptiness in one place
return AddressEngineNova().disassociate_address(
context, public_ip=public_ip,
association_id=association_id)
# NOTE(tikitavi): check the public IP exists to raise AWS
# exception otherwise
os_floating_ip = self.get_os_floating_ip_by_public_ip(
context,
public_ip)
os_ports = self.get_os_ports(context)
os_instance_id = _get_os_instance_id(context, os_floating_ip,
os_ports)
if os_instance_id:
nova = clients.nova(context)
nova.servers.remove_floating_ip(os_instance_id, public_ip)
return None
if not address:
msg = _("The address '%(public_ip)s' does not belong to you.")
@ -436,72 +447,15 @@ class AddressEngineNeutron(object):
neutron = clients.neutron(context)
return neutron.list_ports(tenant_id=context.project_id)['ports']
class AddressEngineNova(object):
# TODO(ft): check that parameters unused in EC2 Classic mode are not
# specified
def allocate_address(self, context, domain=None):
nova = clients.nova(context)
try:
nova_floating_ip = nova.floating_ips.create()
except nova_exception.Forbidden:
raise exception.AddressLimitExceeded()
return None, self.convert_ips_to_neutron_format(context,
[nova_floating_ip])[0]
def release_address(self, context, public_ip, allocation_id):
nova = clients.nova(context)
nova.floating_ips.delete(self.get_nova_ip_by_public_ip(context,
public_ip).id)
def associate_address(self, context, public_ip=None, instance_id=None,
allocation_id=None, network_interface_id=None,
private_ip_address=None, allow_reassociation=False):
os_instance_id = ec2utils.get_db_item(context, instance_id)['os_id']
# NOTE(ft): check the public IP exists to raise AWS exception otherwise
self.get_nova_ip_by_public_ip(context, public_ip)
nova = clients.nova(context)
nova.servers.add_floating_ip(os_instance_id, public_ip)
return None
def disassociate_address(self, context, public_ip=None,
association_id=None):
os_instance_id = self.get_nova_ip_by_public_ip(context,
public_ip).instance_id
if os_instance_id:
nova = clients.nova(context)
nova.servers.remove_floating_ip(os_instance_id, public_ip)
return None
def get_os_floating_ips(self, context):
nova = clients.nova(context)
return self.convert_ips_to_neutron_format(context,
nova.floating_ips.list())
def convert_ips_to_neutron_format(self, context, nova_ips):
neutron_ips = []
for nova_ip in nova_ips:
neutron_ips.append({'id': nova_ip.id,
'floating_ip_address': nova_ip.ip,
'fixed_ip_address': nova_ip.fixed_ip,
'instance_id': nova_ip.instance_id})
return neutron_ips
def get_os_ports(self, context):
return []
def get_nova_ip_by_public_ip(self, context, public_ip,
nova_floating_ips=None):
if nova_floating_ips is None:
nova = clients.nova(context)
nova_floating_ips = nova.floating_ips.list()
nova_ip = next((ip for ip in nova_floating_ips
if ip.ip == public_ip), None)
if nova_ip is None:
def get_os_floating_ip_by_public_ip(self, context, public_ip):
os_floating_ip = next((addr for addr in
self.get_os_floating_ips(context)
if addr['floating_ip_address'] == public_ip),
None)
if not os_floating_ip:
msg = _("The address '%(public_ip)s' does not belong to you.")
raise exception.AuthFailure(msg % {'public_ip': public_ip})
return nova_ip
return os_floating_ip
address_engine = get_address_engine()

View File

@ -45,10 +45,7 @@ class APIRequest(object):
self.action = action
self.version = version
self.args = args
if CONF.full_vpc_support:
self.controller = cloud.VpcCloudController()
else:
self.controller = cloud.CloudController()
self.controller = cloud.VpcCloudController()
def invoke(self, context):
try:

View File

@ -61,10 +61,7 @@ Validator = common.Validator
def get_account_attribute_engine():
if CONF.full_vpc_support:
return AccountAttributeEngineNeutron()
else:
return AccountAttributeEngineNova()
return AccountAttributeEngineNeutron()
class AvailabilityZoneDescriber(common.UniversalDescriber):
@ -214,13 +211,4 @@ class AccountAttributeEngineNeutron(object):
return 'none'
class AccountAttributeEngineNova(object):
def get_supported_platforms(self):
return ['EC2']
def get_default_vpc(self, context):
return 'none'
account_attribute_engine = get_account_attribute_engine()

View File

@ -30,9 +30,6 @@ from ec2api.i18n import _, _LI, _LW
ec2_opts = [
cfg.BoolOpt('full_vpc_support',
default=True,
help='True if server supports Neutron for full VPC access'),
cfg.BoolOpt('disable_ec2_classic',
help='True if server does not support EC2 Classic mode '
'in favor of default VPC'),

View File

@ -72,10 +72,7 @@ class Validator(common.Validator):
def get_instance_engine():
if CONF.full_vpc_support:
return InstanceEngineNeutron()
else:
return InstanceEngineNova()
return InstanceEngineNeutron()
def run_instances(context, image_id, min_count, max_count,
@ -1469,27 +1466,6 @@ class InstanceEngineNeutron(object):
return ec2_classic_os_networks[0]
class InstanceEngineNova(object):
def get_vpc_and_build_launch_context(
self, context, security_group,
subnet_id, private_ip_address, security_group_id,
network_interface, multiple_instances):
# TODO(ft): check emptiness of vpc related parameters
return None, {'security_groups': security_group}
def get_launch_extra_parameters(self, context, cleaner, launch_context):
return {'security_groups': launch_context['security_groups']}
def post_launch_action(self, context, cleaner, launch_context,
instance_id):
pass
def get_ec2_network_interfaces(self, context, instance_ids=None):
return {}
instance_engine = get_instance_engine()

View File

@ -19,7 +19,6 @@ try:
from neutronclient.common import exceptions as neutron_exception
except ImportError:
pass # clients will log absense of neutronclient in this case
from novaclient import exceptions as nova_exception
from oslo_config import cfg
from oslo_log import log as logging
@ -52,10 +51,7 @@ DEFAULT_GROUP_NAME = 'default'
def get_security_group_engine():
if CONF.full_vpc_support:
return SecurityGroupEngineNeutron()
else:
return SecurityGroupEngineNova()
return SecurityGroupEngineNeutron()
def create_security_group(context, group_name, group_description,
@ -88,19 +84,22 @@ def create_security_group(context, group_name, group_description,
def _create_security_group(context, group_name, group_description,
vpc_id=None, default=False):
nova = clients.nova(context)
neutron = clients.neutron(context)
with common.OnCrashCleaner() as cleaner:
try:
os_security_group = nova.security_groups.create(group_name,
group_description)
except nova_exception.OverLimit:
secgroup_body = (
{'security_group': {'name': group_name,
'description': group_description}})
os_security_group = neutron.create_security_group(
secgroup_body)['security_group']
except neutron_exception.OverQuotaClient:
raise exception.ResourceLimitExceeded(resource='security groups')
cleaner.addCleanup(nova.security_groups.delete,
os_security_group.id)
cleaner.addCleanup(neutron.delete_security_group,
os_security_group['id'])
if vpc_id:
# NOTE(Alex) Check if such vpc exists
ec2utils.get_db_item(context, vpc_id)
item = {'vpc_id': vpc_id, 'os_id': os_security_group.id}
item = {'vpc_id': vpc_id, 'os_id': os_security_group['id']}
if not default:
security_group = db_api.add_item(context, 'sg', item)
else:
@ -494,16 +493,13 @@ class SecurityGroupEngineNeutron(object):
def delete_group(self, context, group_name=None, group_id=None,
delete_default=False):
neutron = clients.neutron(context)
if CONF.disable_ec2_classic and group_name:
if group_name:
sg = describe_security_groups(
context,
group_name=[group_name])['securityGroupInfo'][0]
group_id = sg['groupId']
group_name = None
if group_id is None or not group_id.startswith('sg-'):
return SecurityGroupEngineNova().delete_group(context,
group_name,
group_id)
security_group = ec2utils.get_db_item(context, group_id)
try:
if not delete_default:
@ -553,123 +549,22 @@ class SecurityGroupEngineNeutron(object):
neutron.delete_security_group_rule(os_id)
def get_group_os_id(self, context, group_id, group_name):
if group_name:
return SecurityGroupEngineNova().get_group_os_id(context,
group_id,
group_name)
if group_name and not group_id:
os_group = self.get_os_group_by_name(context, group_name)
return str(os_group['id'])
return ec2utils.get_db_item(context, group_id, 'sg')['os_id']
class SecurityGroupEngineNova(object):
def delete_group(self, context, group_name=None, group_id=None,
delete_default=False):
nova = clients.nova(context)
os_id = self.get_group_os_id(context, group_id, group_name)
try:
nova.security_groups.delete(os_id)
except Exception as ex:
# TODO(Alex): do log error
# nova doesn't differentiate Conflict exception like neutron does
pass
def get_os_groups(self, context):
# Note(tikitavi): Using neutron engine in describing security groups.
# Security-group-list in nova cannot be filtered by tenant,
# listing all secgroups in case of big amount of groups can be slow
# and may have limitations in number.
try:
groups = SecurityGroupEngineNeutron().get_os_groups(context)
except Exception as ex:
groups = []
LOG.warning(_("Failed to get os groups."))
return groups
def authorize_security_group(self, context, rule_body):
nova = clients.nova(context)
try:
os_security_group_rule = nova.security_group_rules.create(
rule_body['security_group_id'],
rule_body.get('protocol'),
rule_body.get('port_range_min', -1),
rule_body.get('port_range_max', -1),
rule_body.get('remote_ip_prefix'),
rule_body.get('remote_group_id'))
except nova_exception.Conflict:
raise exception.InvalidPermissionDuplicate()
except nova_exception.OverLimit:
raise exception.RulesPerSecurityGroupLimitExceeded()
def get_os_group_rules(self, context, os_id):
nova = clients.nova(context)
os_security_group = nova.security_groups.get(os_id)
os_rules = os_security_group.rules
neutron_rules = []
for os_rule in os_rules:
neutron_rules.append(
self.convert_rule_to_neutron(context,
os_rule,
nova.security_groups.list()))
return neutron_rules
def delete_os_group_rule(self, context, os_id):
nova = clients.nova(context)
nova.security_group_rules.delete(os_id)
def convert_groups_to_neutron_format(self, context, nova_security_groups):
neutron_security_groups = []
for nova_group in nova_security_groups:
neutron_group = {'id': str(nova_group.id),
'name': nova_group.name,
'description': nova_group.description,
'tenant_id': nova_group.tenant_id}
neutron_rules = []
for rule in nova_group.rules:
neutron_rules.append(
self.convert_rule_to_neutron(context,
rule, nova_security_groups))
if neutron_rules:
neutron_group['security_group_rules'] = neutron_rules
neutron_security_groups.append(neutron_group)
return neutron_security_groups
def convert_rule_to_neutron(self, context, nova_rule,
nova_security_groups=None):
neutron_rule = {'id': str(nova_rule['id']),
'protocol': nova_rule['ip_protocol'],
'port_range_min': nova_rule['from_port'],
'port_range_max': nova_rule['to_port'],
'remote_ip_prefix': (
nova_rule.get('ip_range') or {}).get('cidr'),
'remote_group_id': None,
'direction': 'ingress',
'ethertype': 'IPv4',
'security_group_id': str(nova_rule['parent_group_id'])}
if (nova_rule.get('group') or {}).get('name'):
neutron_rule['remote_group_id'] = (
self.get_group_os_id(context, None,
nova_rule['group']['name'],
nova_security_groups))
return neutron_rule
def get_group_os_id(self, context, group_id, group_name,
nova_security_groups=None):
if group_id:
return ec2utils.get_db_item(context, group_id, 'sg')['os_id']
nova_group = self.get_nova_group_by_name(context, group_name,
nova_security_groups)
return str(nova_group.id)
def get_nova_group_by_name(self, context, group_name,
nova_security_groups=None):
if nova_security_groups is None:
nova = clients.nova(context)
nova_security_groups = nova.security_groups.list()
nova_group = next((g for g in nova_security_groups
if g.name == group_name), None)
if nova_group is None:
def get_os_group_by_name(self, context, group_name,
os_security_groups=None):
if os_security_groups is None:
neutron = clients.neutron(context)
os_security_groups = (
neutron.list_security_groups()['security_groups'])
os_group = next((g for g in os_security_groups
if g['name'] == group_name), None)
if os_group is None:
raise exception.InvalidGroupNotFound(id=group_name)
return nova_group
return os_group
security_group_engine = get_security_group_engine()

View File

@ -176,26 +176,13 @@ class MetadataRequestHandler(wsgi.Application):
os_instance_id, project_id = (
api.get_os_instance_and_project_id_by_provider_id(
context, provider_id, remote_ip))
elif req.headers.get('X-Instance-ID'):
else:
os_instance_id, project_id, remote_ip = (
self._unpack_neutron_request(req))
else:
remote_ip = self._unpack_nova_network_request(req)
context = ec2_context.get_os_admin_context()
os_instance_id, project_id = (
api.get_os_instance_and_project_id(context, remote_ip))
return {'os_instance_id': os_instance_id,
'project_id': project_id,
'private_ip': remote_ip}
def _unpack_nova_network_request(self, req):
remote_ip = req.remote_addr
if CONF.use_forwarded_for:
remote_ip = req.headers.get('X-Forwarded-For', remote_ip)
if not remote_ip:
raise exception.EC2MetadataInvalidAddress()
return remote_ip
def _unpack_neutron_request(self, req):
os_instance_id = req.headers.get('X-Instance-ID')
project_id = req.headers.get('X-Tenant-ID')

View File

@ -68,23 +68,6 @@ def get_version_list():
return _format_metadata_item(VERSIONS + ["latest"])
def get_os_instance_and_project_id(context, fixed_ip):
try:
nova = clients.nova(context)
os_address = nova.fixed_ips.get(fixed_ip)
os_instances = nova.servers.list(
search_opts={'hostname': os_address.hostname,
'all_tenants': True})
return next((os_instance.id, os_instance.tenant_id)
for os_instance in os_instances
if any((addr['addr'] == fixed_ip and
addr['OS-EXT-IPS:type'] == 'fixed')
for addr in itertools.chain(
*six.itervalues(os_instance.addresses))))
except (nova_exception.NotFound, StopIteration):
raise exception.EC2MetadataNotFound()
def get_os_instance_and_project_id_by_provider_id(context, provider_id,
fixed_ip):
neutron = clients.neutron(context)

View File

@ -1038,14 +1038,6 @@ OS_DHCP_OPTIONS_1 = {'extra_dhcp_opts': [{'opt_name': 'domain-name',
# address objects
class NovaFloatingIp(object):
def __init__(self, nova_ip_dict):
self.id = nova_ip_dict['id']
self.ip = nova_ip_dict['ip']
self.fixed_ip = nova_ip_dict['fixed_ip']
self.instance_id = nova_ip_dict['instance_id']
DB_ADDRESS_DEFAULT = {
'id': ID_EC2_ADDRESS_DEFAULT,
'os_id': ID_OS_FLOATING_IP_2,
@ -1118,31 +1110,9 @@ OS_FLOATING_IP_2 = {
'fixed_ip_address': IP_NETWORK_INTERFACE_2,
}
NOVA_FLOATING_IP_1 = {
'id': ID_OS_FLOATING_IP_1,
'ip': IP_ADDRESS_1,
'instance_id': None,
'fixed_ip': None,
}
NOVA_FLOATING_IP_2 = {
'id': ID_OS_FLOATING_IP_2,
'ip': IP_ADDRESS_2,
'instance_id': ID_OS_INSTANCE_1,
'fixed_ip': IP_NETWORK_INTERFACE_2,
}
# security group objects
class NovaSecurityGroup(object):
def __init__(self, nova_group_dict):
self.id = nova_group_dict['id']
self.name = nova_group_dict['name']
self.description = nova_group_dict['description']
self.tenant_id = ID_OS_PROJECT
self.rules = nova_group_dict['security_group_rules']
DB_SECURITY_GROUP_DEFAULT = {
'id': ID_EC2_SECURITY_GROUP_DEFAULT,
'os_id': ID_OS_SECURITY_GROUP_DEFAULT,

View File

@ -14,9 +14,7 @@
import mock
from neutronclient.common import exceptions as neutron_exception
from novaclient import exceptions as nova_exception
from ec2api.api import address
from ec2api.tests.unit import base
from ec2api.tests.unit import fakes
from ec2api.tests.unit import matchers
@ -27,27 +25,25 @@ class AddressTestCase(base.ApiTestCase):
def setUp(self):
super(AddressTestCase, self).setUp()
self.addCleanup(self._reset_engine)
def _reset_engine(self):
address.address_engine = address.AddressEngineNeutron()
def test_allocate_ec2_classic_address(self):
address.address_engine = (
address.AddressEngineNeutron())
self.nova.floating_ips.create.return_value = (
fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_1))
self.configure(external_network=fakes.NAME_OS_PUBLIC_NETWORK)
self.neutron.list_networks.return_value = (
{'networks': [{'id': fakes.ID_OS_PUBLIC_NETWORK}]})
self.neutron.create_floatingip.return_value = (
{'floatingip': fakes.OS_FLOATING_IP_1})
resp = self.execute('AllocateAddress', {})
self.assertEqual(fakes.IP_ADDRESS_1, resp['publicIp'])
self.assertEqual('standard', resp['domain'])
self.assertNotIn('allocationId', resp)
self.assertEqual(0, self.db_api.add_item.call_count)
self.nova.floating_ips.create.assert_called_once_with()
self.neutron.create_floatingip.assert_called_once_with(
{'floatingip': {
'floating_network_id':
fakes.ID_OS_PUBLIC_NETWORK}})
def test_allocate_vpc_address(self):
address.address_engine = (
address.AddressEngineNeutron())
self.configure(external_network=fakes.NAME_OS_PUBLIC_NETWORK)
self.neutron.list_networks.return_value = (
{'networks': [{'id': fakes.ID_OS_PUBLIC_NETWORK}]})
@ -96,16 +92,12 @@ class AddressTestCase(base.ApiTestCase):
'name': fakes.NAME_OS_PUBLIC_NETWORK})
def test_allocate_address_invalid_parameters(self):
address.address_engine = (
address.AddressEngineNeutron())
self.assert_execution_error('InvalidParameterValue', 'AllocateAddress',
{'Domain': 'fake_domain'})
self.assertEqual(0, self.db_api.add_item.call_count)
self.assertEqual(0, self.neutron.create_floatingip.call_count)
def test_allocate_address_overlimit(self):
address.address_engine = (
address.AddressEngineNeutron())
self.configure(external_network=fakes.NAME_OS_PUBLIC_NETWORK)
self.neutron.list_networks.return_value = (
{'networks': [{'id': fakes.ID_OS_PUBLIC_NETWORK}]})
@ -113,18 +105,11 @@ class AddressTestCase(base.ApiTestCase):
neutron_exception.OverQuotaClient())
self.assert_execution_error('AddressLimitExceeded', 'AllocateAddress',
{'Domain': 'vpc'})
address.address_engine = (
address.AddressEngineNeutron())
self.nova.floating_ips.create.side_effect = (
nova_exception.Forbidden(403))
self.assert_execution_error('AddressLimitExceeded', 'AllocateAddress',
{})
@tools.screen_unexpected_exception_logs
def test_allocate_address_vpc_rollback(self):
address.address_engine = (
address.AddressEngineNeutron())
self.configure(external_network=fakes.NAME_OS_PUBLIC_NETWORK)
self.neutron.list_networks.return_value = (
{'networks': [{'id': fakes.ID_OS_PUBLIC_NETWORK}]})
@ -139,12 +124,10 @@ class AddressTestCase(base.ApiTestCase):
fakes.ID_OS_FLOATING_IP_1)
def test_associate_address_ec2_classic(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items(fakes.DB_INSTANCE_1)
self.nova.floating_ips.list.return_value = (
[fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_1),
fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_2)])
self.neutron.list_floatingips.return_value = (
{'floatingips': [fakes.OS_FLOATING_IP_1,
fakes.OS_FLOATING_IP_2]})
self.nova.servers.add_floating_ip.return_value = True
resp = self.execute('AssociateAddress',
@ -157,8 +140,6 @@ class AddressTestCase(base.ApiTestCase):
fakes.IP_ADDRESS_1)
def test_associate_address_vpc(self):
address.address_engine = (
address.AddressEngineNeutron())
def do_check(params, fixed_ip):
resp = self.execute('AssociateAddress', params)
@ -223,8 +204,6 @@ class AddressTestCase(base.ApiTestCase):
fakes.IP_NETWORK_INTERFACE_2)
def test_associate_address_vpc_idempotent(self):
address.address_engine = (
address.AddressEngineNeutron())
def do_check(params):
resp = self.execute('AssociateAddress', params)
@ -248,8 +227,6 @@ class AddressTestCase(base.ApiTestCase):
'PrivateIpAddress': fakes.IP_NETWORK_INTERFACE_2})
def test_associate_address_invalid_main_parameters(self):
address.address_engine = (
address.AddressEngineNeutron())
def do_check(params, error):
self.assert_execution_error(error, 'AssociateAddress', params)
@ -268,8 +245,6 @@ class AddressTestCase(base.ApiTestCase):
'MissingParameter')
def test_associate_address_invalid_ec2_classic_parameters(self):
address.address_engine = (
address.AddressEngineNeutron())
# NOTE(ft): ec2 classic instance vs allocation_id parameter
self.set_mock_db_items(fakes.DB_INSTANCE_2)
self.assert_execution_error('InvalidParameterCombination',
@ -278,7 +253,7 @@ class AddressTestCase(base.ApiTestCase):
'InstanceId': fakes.ID_EC2_INSTANCE_2})
# NOTE(ft): ec2 classic instance vs not existing public IP
self.nova.floating_ips.list.return_value = []
self.neutron.list_floatingips.return_value = {'floatingips': []}
self.assert_execution_error('AuthFailure', 'AssociateAddress',
{'PublicIp': fakes.IP_ADDRESS_1,
'InstanceId': fakes.ID_EC2_INSTANCE_2})
@ -292,8 +267,6 @@ class AddressTestCase(base.ApiTestCase):
'InstanceId': fakes.ID_EC2_INSTANCE_2})
def test_associate_address_invalid_vpc_parameters(self):
address.address_engine = (
address.AddressEngineNeutron())
def do_check(params, error):
self.assert_execution_error(error, 'AssociateAddress', params)
@ -381,8 +354,6 @@ class AddressTestCase(base.ApiTestCase):
@tools.screen_unexpected_exception_logs
def test_associate_address_vpc_rollback(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items(fakes.DB_ADDRESS_1, fakes.DB_IGW_1,
fakes.DB_NETWORK_INTERFACE_1,
fakes.DB_NETWORK_INTERFACE_2)
@ -398,13 +369,15 @@ class AddressTestCase(base.ApiTestCase):
mock.ANY, fakes.DB_ADDRESS_1)
def test_dissassociate_address_ec2_classic(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items()
self.set_mock_db_items(fakes.DB_INSTANCE_1)
self.nova.servers.remove_floating_ip.return_value = True
self.nova.floating_ips.list.return_value = (
[fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_1),
fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_2)])
self.neutron.list_floatingips.return_value = (
{'floatingips': [fakes.OS_FLOATING_IP_1,
fakes.OS_FLOATING_IP_2]})
self.neutron.list_ports.return_value = (
{'ports': [fakes.OS_PORT_1,
fakes.OS_PORT_2]})
resp = self.execute('DisassociateAddress',
{'PublicIp': fakes.IP_ADDRESS_2})
self.assertEqual(True, resp['return'])
@ -419,8 +392,6 @@ class AddressTestCase(base.ApiTestCase):
self.assertEqual(1, self.nova.servers.remove_floating_ip.call_count)
def test_dissassociate_address_vpc(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items(fakes.DB_ADDRESS_2)
self.neutron.show_floatingip.return_value = (
{'floatingip': fakes.OS_FLOATING_IP_2})
@ -454,8 +425,6 @@ class AddressTestCase(base.ApiTestCase):
'private_ip_address']))
def test_dissassociate_address_vpc_idempotent(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items(fakes.DB_ADDRESS_1)
self.neutron.show_floatingip.return_value = (
{'floatingip': fakes.OS_FLOATING_IP_1})
@ -468,8 +437,6 @@ class AddressTestCase(base.ApiTestCase):
self.assertEqual(0, self.db_api.update_item.call_count)
def test_disassociate_address_invalid_parameters(self):
address.address_engine = (
address.AddressEngineNeutron())
def do_check(params, error):
self.assert_execution_error(error, 'DisassociateAddress', params)
@ -483,7 +450,8 @@ class AddressTestCase(base.ApiTestCase):
# NOTE(ft): EC2 Classic public IP does not exists
self.set_mock_db_items()
self.nova.floating_ips.list.return_value = []
self.neutron.list_floatingips.return_value = {'floatingips': []}
self.assert_execution_error('AuthFailure', 'DisassociateAddress',
{'PublicIp': fakes.IP_ADDRESS_2})
@ -519,8 +487,6 @@ class AddressTestCase(base.ApiTestCase):
@tools.screen_unexpected_exception_logs
def test_dissassociate_address_vpc_rollback(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items(fakes.DB_ADDRESS_2)
self.neutron.show_floatingip.return_value = (
{'floatingip': fakes.OS_FLOATING_IP_2})
@ -534,24 +500,20 @@ class AddressTestCase(base.ApiTestCase):
mock.ANY, fakes.DB_ADDRESS_2)
def test_release_address_ec2_classic(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items()
self.nova.floating_ips.delete.return_value = True
self.nova.floating_ips.list.return_value = (
[fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_1),
fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_2)])
self.neutron.delete_floatingip.return_value = True
self.neutron.list_floatingips.return_value = (
{'floatingips': [fakes.OS_FLOATING_IP_1,
fakes.OS_FLOATING_IP_2]})
resp = self.execute('ReleaseAddress',
{'PublicIp': fakes.IP_ADDRESS_1})
self.assertEqual(True, resp['return'])
self.nova.floating_ips.delete.assert_called_once_with(
fakes.NOVA_FLOATING_IP_1['id'])
self.neutron.delete_floatingip.assert_called_once_with(
fakes.OS_FLOATING_IP_1['id'])
def test_release_address_vpc(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items(fakes.DB_ADDRESS_1)
self.neutron.show_floatingip.return_value = (
{'floatingip': fakes.OS_FLOATING_IP_1})
@ -567,8 +529,6 @@ class AddressTestCase(base.ApiTestCase):
@mock.patch('ec2api.api.address.AddressEngineNeutron.disassociate_address')
def test_release_address_default_vpc(self, disassociate_address):
address.address_engine = (
address.AddressEngineNeutron())
self.configure(disable_ec2_classic=True)
self.set_mock_db_items(fakes.DB_VPC_DEFAULT,
fakes.DB_ADDRESS_DEFAULT,
@ -588,8 +548,6 @@ class AddressTestCase(base.ApiTestCase):
mock.ANY, fakes.ID_EC2_ADDRESS_DEFAULT)
def test_release_address_invalid_parameters(self):
address.address_engine = (
address.AddressEngineNeutron())
def do_check(params, error):
self.assert_execution_error(error, 'ReleaseAddress', params)
@ -602,7 +560,7 @@ class AddressTestCase(base.ApiTestCase):
'InvalidParameterCombination')
# NOTE(ft): EC2 Classic public IP is not found
self.nova.floating_ips.list.return_value = []
self.neutron.list_floatingips.return_value = {'floatingips': []}
do_check({'PublicIp': fakes.IP_ADDRESS_1},
'AuthFailure')
@ -646,8 +604,6 @@ class AddressTestCase(base.ApiTestCase):
@tools.screen_unexpected_exception_logs
def test_release_address_vpc_rollback(self):
address.address_engine = (
address.AddressEngineNeutron())
self.set_mock_db_items(fakes.DB_ADDRESS_1)
self.neutron.show_floatingip.return_value = (
{'floatingip': fakes.OS_FLOATING_IP_1})
@ -660,8 +616,6 @@ class AddressTestCase(base.ApiTestCase):
mock.ANY, 'eipalloc', fakes.DB_ADDRESS_1)
def test_describe_addresses_vpc(self):
address.address_engine = (
address.AddressEngineNeutron())
self.neutron.list_floatingips.return_value = (
{'floatingips': [fakes.OS_FLOATING_IP_1,
fakes.OS_FLOATING_IP_2]})
@ -698,12 +652,13 @@ class AddressTestCase(base.ApiTestCase):
('public-ip', fakes.IP_ADDRESS_2)])
def test_describe_addresses_ec2_classic(self):
address.address_engine = (
address.AddressEngineNova())
self.set_mock_db_items(fakes.DB_INSTANCE_1)
self.nova.floating_ips.list.return_value = [
fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_1),
fakes.NovaFloatingIp(fakes.NOVA_FLOATING_IP_2)]
self.neutron.list_floatingips.return_value = (
{'floatingips': [fakes.OS_FLOATING_IP_1,
fakes.OS_FLOATING_IP_2]})
self.neutron.list_ports.return_value = (
{'ports': [fakes.OS_PORT_1,
fakes.OS_PORT_2]})
resp = self.execute('DescribeAddresses', {})
self.assertThat(resp['addressesSet'],
matchers.ListMatches([fakes.EC2_ADDRESS_CLASSIC_1,

View File

@ -14,7 +14,6 @@
import mock
from ec2api.api import availability_zone
from ec2api.tests.unit import base
from ec2api.tests.unit import fakes
from ec2api.tests.unit import matchers
@ -24,11 +23,6 @@ class AvailabilityZoneCase(base.ApiTestCase):
def setUp(self):
super(AvailabilityZoneCase, self).setUp()
self.addCleanup(self._reset_engine)
def _reset_engine(self):
availability_zone.account_attribute_engine = (
availability_zone.AccountAttributeEngineNeutron())
def test_describe_availability_zones(self):
self.nova.availability_zones.list.return_value = [
@ -60,12 +54,11 @@ class AvailabilityZoneCase(base.ApiTestCase):
self.assertTrue(resp['regionInfo'][0].get('regionEndpoint')
is not None)
# TODO(tikitavi): Rework test to exclude nova-network
@mock.patch('ec2api.api.ec2utils.check_and_create_default_vpc')
def test_describe_account_attributes(self, check_and_create):
self.nova.quotas.get.return_value = mock.Mock(instances=77)
availability_zone.account_attribute_engine = (
availability_zone.AccountAttributeEngineNeutron())
resp = self.execute('DescribeAccountAttributes', {})
self.assertThat(resp['accountAttributeSet'],
matchers.ListMatches(
@ -83,43 +76,7 @@ class AvailabilityZoneCase(base.ApiTestCase):
self.nova.quotas.get.assert_called_once_with(
fakes.ID_OS_PROJECT, fakes.ID_OS_USER)
availability_zone.account_attribute_engine = (
availability_zone.AccountAttributeEngineNova())
resp = self.execute('DescribeAccountAttributes', {})
self.assertThat(resp['accountAttributeSet'],
matchers.ListMatches(
[{'attributeName': 'supported-platforms',
'attributeValueSet': [
{'attributeValue': 'EC2'}]},
{'attributeName': 'default-vpc',
'attributeValueSet': [
{'attributeValue': 'none'}]},
{'attributeName': 'max-instances',
'attributeValueSet': [
{'attributeValue': 77}]}],
orderless_lists=True))
resp = self.execute('DescribeAccountAttributes',
{'AttributeName.1': 'default-vpc',
'AttributeName.2': 'max-instances'})
self.assertThat(resp['accountAttributeSet'],
matchers.ListMatches(
[{'attributeName': 'default-vpc',
'attributeValueSet': [
{'attributeValue': 'none'}]},
{'attributeName': 'max-instances',
'attributeValueSet': [
{'attributeValue': 77}]}],
orderless_lists=True))
self.assert_execution_error('InvalidParameter',
'DescribeAccountAttributes',
{'AttributeName.1': 'fake'})
self.configure(disable_ec2_classic=True)
availability_zone.account_attribute_engine = (
availability_zone.AccountAttributeEngineNeutron())
check_and_create.return_value = fakes.DB_VPC_DEFAULT
resp = self.execute('DescribeAccountAttributes', {})

View File

@ -35,7 +35,6 @@ class InstanceTestCase(base.ApiTestCase):
def setUp(self):
super(InstanceTestCase, self).setUp()
self.addCleanup(self._reset_engine)
self.network_interface_api = self.mock(
'ec2api.api.instance.network_interface_api')
self.address_api = self.mock('ec2api.api.address')
@ -50,17 +49,12 @@ class InstanceTestCase(base.ApiTestCase):
self.nova.flavors.get.return_value = self.fake_flavor
self.nova.flavors.list.return_value = [self.fake_flavor]
def _reset_engine(self):
instance_api.instance_engine = instance_api.InstanceEngineNeutron()
@mock.patch('ec2api.api.instance.describe_instances')
@mock.patch('ec2api.api.instance.InstanceEngineNeutron.'
'get_vpc_default_security_group_id')
def test_run_instances(self, get_vpc_default_security_group_id,
describe_instances):
"""Run instance with various network interface settings."""
instance_api.instance_engine = (
instance_api.InstanceEngineNeutron())
self.set_mock_db_items(
fakes.DB_SUBNET_1, fakes.DB_NETWORK_INTERFACE_1, fakes.DB_IMAGE_1,
fakes.DB_IMAGE_ARI_1, fakes.DB_IMAGE_AKI_1)
@ -190,8 +184,6 @@ class InstanceTestCase(base.ApiTestCase):
get_vpc_default_security_group_id,
describe_instances):
"""Run 2 instances at once on 2 subnets in all combinations."""
instance_api.instance_engine = (
instance_api.InstanceEngineNeutron())
self._build_multiple_data_model()
self.glance.images.get.return_value = fakes.OSImage(fakes.OS_IMAGE_1)
@ -290,9 +282,7 @@ class InstanceTestCase(base.ApiTestCase):
user_data = base64.b64decode(fakes.USER_DATA_INSTANCE_2)
parse_block_device_mapping.return_value = []
def do_check(engine, extra_kwargs={}, extra_db_instance={}):
instance_api.instance_engine = engine
def do_check(extra_kwargs={}, extra_db_instance={}):
describe_instances.side_effect = [
{'reservationSet': []},
{'reservationSet': [{'foo': 'bar'}]}]
@ -338,13 +328,11 @@ class InstanceTestCase(base.ApiTestCase):
parse_block_device_mapping.reset_mock()
do_check(
instance_api.InstanceEngineNeutron(),
extra_kwargs={
'nics': [
{'net-id': get_ec2_classic_os_network.return_value['id']}],
},
extra_db_instance={'vpc_id': None})
do_check(instance_api.InstanceEngineNova())
@mock.patch('ec2api.api.instance.describe_instances')
def test_idempotent_run(self, describe_instances):
@ -376,8 +364,6 @@ class InstanceTestCase(base.ApiTestCase):
'ClientToken': 'client-token-2'})
def test_run_instances_rollback(self):
instance_api.instance_engine = (
instance_api.InstanceEngineNeutron())
self.set_mock_db_items(fakes.DB_IMAGE_1, fakes.DB_SUBNET_1,
fakes.DB_NETWORK_INTERFACE_1)
self.glance.images.get.return_value = fakes.OSImage(fakes.OS_IMAGE_1)
@ -454,9 +440,7 @@ class InstanceTestCase(base.ApiTestCase):
self.utils_generate_uid.return_value = fakes.ID_EC2_RESERVATION_1
def do_check(engine):
instance_api.instance_engine = engine
def do_check():
self.network_interface_api.create_network_interface.side_effect = [
{'networkInterface': {'networkInterfaceId': eni['id']}}
for eni in network_interfaces]
@ -488,15 +472,11 @@ class InstanceTestCase(base.ApiTestCase):
_attach_network_interface_item.side_effect) = [
None, None, Exception()]
with tools.ScreeningLogger(log_name='ec2api.api'):
do_check(instance_api.InstanceEngineNeutron())
do_check()
(self.network_interface_api.delete_network_interface.
assert_called_once_with(
mock.ANY, network_interface_id=network_interfaces[2]['id']))
self.nova.servers.update.side_effect = [None, None, Exception()]
with tools.ScreeningLogger(log_name='ec2api.api'):
do_check(instance_api.InstanceEngineNova())
def test_run_instances_invalid_parameters(self):
self.assert_execution_error('InvalidParameterValue', 'RunInstances',
{'ImageId': fakes.ID_EC2_IMAGE_1,
@ -523,8 +503,6 @@ class InstanceTestCase(base.ApiTestCase):
check_and_create):
"""Run instance without network interface settings."""
self.configure(disable_ec2_classic=True)
instance_api.instance_engine = (
instance_api.InstanceEngineNeutron())
self.set_mock_db_items(fakes.DB_IMAGE_2,
fakes.DB_SUBNET_DEFAULT,
fakes.DB_NETWORK_INTERFACE_DEFAULT)
@ -581,8 +559,6 @@ class InstanceTestCase(base.ApiTestCase):
"""Run instance without network interface settings. """
"""No default vpc"""
self.configure(disable_ec2_classic=True)
instance_api.instance_engine = (
instance_api.InstanceEngineNeutron())
self.set_mock_db_items(fakes.DB_IMAGE_2)
self.glance.images.get.return_value = fakes.OSImage(fakes.OS_IMAGE_2)
@ -603,8 +579,6 @@ class InstanceTestCase(base.ApiTestCase):
@mock.patch.object(fakes.OSInstance, 'get', autospec=True)
def test_terminate_instances(self, os_instance_get, os_instance_delete):
"""Terminate 2 instances in one request."""
instance_api.instance_engine = (
instance_api.InstanceEngineNeutron())
self.set_mock_db_items(fakes.DB_INSTANCE_1, fakes.DB_INSTANCE_2)
os_instances = [fakes.OSInstance(fakes.OS_INSTANCE_1),
fakes.OSInstance(fakes.OS_INSTANCE_2)]
@ -752,8 +726,6 @@ class InstanceTestCase(base.ApiTestCase):
def test_describe_instances(self):
"""Describe 2 instances, one of which is vpc instance."""
instance_api.instance_engine = (
instance_api.InstanceEngineNeutron())
self.set_mock_db_items(
fakes.DB_INSTANCE_1, fakes.DB_INSTANCE_2,
fakes.DB_NETWORK_INTERFACE_1, fakes.DB_NETWORK_INTERFACE_2,
@ -876,7 +848,8 @@ class InstanceTestCase(base.ApiTestCase):
'DescribeInstances', ['reservationSet', 'instancesSet'],
fakes.ID_EC2_INSTANCE_1, 'instanceId')
def test_describe_instances_ec2_classic(self):
# TODO(tikitavi): Rework test to exclude nova-network
def _test_describe_instances_ec2_classic(self):
instance_api.instance_engine = (
instance_api.InstanceEngineNova())
self.set_mock_db_items(
@ -900,8 +873,6 @@ class InstanceTestCase(base.ApiTestCase):
def test_describe_instances_mutliple_networks(self):
"""Describe 2 instances with various combinations of network."""
instance_api.instance_engine = (
instance_api.InstanceEngineNeutron())
self._build_multiple_data_model()
self.set_mock_db_items(*self.DB_INSTANCES)

View File

@ -92,36 +92,17 @@ class ProxyTestCase(test_base.BaseTestCase):
self.assertEqual(500, response.status_int)
self.assertEqual(len(log.mock_calls), 2)
def test_get_requester(self):
def _test_get_requester(self):
expected = {'os_instance_id': mock.sentinel.os_instance_id,
'project_id': mock.sentinel.project_id,
'private_ip': mock.sentinel.private_ip}
req = mock.Mock(headers={})
@mock.patch('ec2api.metadata.api.get_os_instance_and_project_id')
@mock.patch('ec2api.context.get_os_admin_context')
@mock.patch.object(metadata.MetadataRequestHandler,
'_unpack_nova_network_request')
def do_test1(unpack_request, get_context, get_ids):
get_context.return_value = base.create_context(is_os_admin=True)
unpack_request.return_value = mock.sentinel.private_ip
get_ids.return_value = (mock.sentinel.os_instance_id,
mock.sentinel.project_id)
retval = self.handler._get_requester(req)
self.assertEqual(expected, retval)
get_context.assert_called_with()
unpack_request.assert_called_with(req)
get_ids.assert_called_with(get_context.return_value,
mock.sentinel.private_ip)
do_test1()
req.headers['X-Instance-ID'] = mock.sentinel.os_instance_id
@mock.patch.object(metadata.MetadataRequestHandler,
'_unpack_neutron_request')
def do_test2(unpack_request):
def do_test1(unpack_request):
unpack_request.return_value = (mock.sentinel.os_instance_id,
mock.sentinel.project_id,
mock.sentinel.private_ip)
@ -130,7 +111,7 @@ class ProxyTestCase(test_base.BaseTestCase):
self.assertEqual(expected, retval)
unpack_request.assert_called_with(req)
do_test2()
do_test1()
req.headers['X-Metadata-Provider'] = mock.sentinel.provider_id
@ -139,7 +120,7 @@ class ProxyTestCase(test_base.BaseTestCase):
@mock.patch('ec2api.context.get_os_admin_context')
@mock.patch.object(metadata.MetadataRequestHandler,
'_unpack_nsx_request')
def do_test3(unpack_request, get_context, get_ids):
def do_test2(unpack_request, get_context, get_ids):
unpack_request.return_value = (mock.sentinel.provider_id,
mock.sentinel.private_ip)
get_context.return_value = base.create_context(is_os_admin=True)
@ -154,7 +135,7 @@ class ProxyTestCase(test_base.BaseTestCase):
mock.sentinel.provider_id,
mock.sentinel.private_ip)
do_test3()
do_test2()
@mock.patch('ec2api.metadata.api.get_metadata_item')
@mock.patch('ec2api.context.get_os_admin_context')
@ -304,24 +285,6 @@ class ProxyTestCase(test_base.BaseTestCase):
self.handler._sign_instance_id('foo')
)
def test_unpack_nova_network_request(self):
req = mock.Mock(remote_addr='fake_addr', headers={})
self.assertEqual('fake_addr',
self.handler._unpack_nova_network_request(req))
cfg.CONF.set_override('use_forwarded_for', True)
self.assertEqual('fake_addr',
self.handler._unpack_nova_network_request(req))
req.headers['X-Forwarded-For'] = 'fake_forwarded_for'
self.assertEqual('fake_forwarded_for',
self.handler._unpack_nova_network_request(req))
cfg.CONF.set_override('use_forwarded_for', False)
self.assertEqual('fake_addr',
self.handler._unpack_nova_network_request(req))
def test_unpack_neutron_request(self):
sign = (
'97e7709481495f1a3a589e5ee03f8b5d51a3e0196768e300c441b58fe0382f4d')
@ -399,10 +362,9 @@ class ProxyTestCase(test_base.BaseTestCase):
@mock.patch('novaclient.client.Client')
@mock.patch('ec2api.db.api.IMPL')
@mock.patch('ec2api.metadata.api.instance_api')
def test_get_metadata_items(self, instance_api, db_api, nova):
@mock.patch('ec2api.metadata.MetadataRequestHandler._validate_signature')
def test_get_metadata_items(self, validate, instance_api, db_api, nova):
FAKE_USER_DATA = u'fake_user_data-' + six.unichr(1071)
nova.return_value.fixed_ips.get.return_value = (
mock.Mock(hostname='fake_name'))
nova.return_value.servers.list.return_value = [
fakes.OSInstance(fakes.OS_INSTANCE_1)]
keypair = mock.Mock(public_key=fakes.PUBLIC_KEY_KEY_PAIR)
@ -420,8 +382,12 @@ class ProxyTestCase(test_base.BaseTestCase):
def _test_metadata_path(relpath):
# recursively confirm a http 200 from all meta-data elements
# available at relpath.
headers = {'X-Instance-ID': fakes.ID_EC2_INSTANCE_1,
'X-Tenant-ID': fakes.ID_OS_PROJECT,
'X-Forwarded-For': fakes.IP_NETWORK_INTERFACE_2}
request = webob.Request.blank(
relpath, remote_addr=fakes.IP_NETWORK_INTERFACE_2)
relpath, headers=headers)
response = request.get_response(self.handler)
self.assertEqual(200, response.status_int)
for item in response.body.decode("utf-8").split('\n'):
@ -436,7 +402,7 @@ class ProxyTestCase(test_base.BaseTestCase):
path = relpath + '/' + item
request = webob.Request.blank(
path, remote_addr=fakes.IP_NETWORK_INTERFACE_2)
path, headers=headers)
response = request.get_response(self.handler)
self.assertEqual(200, response.status_int, message=path)

View File

@ -53,37 +53,6 @@ class MetadataApiTestCase(base.ApiTestCase):
retval = api.get_version_list()
self.assertEqual('\n'.join(api.VERSIONS + ['latest']), retval)
def test_get_instance_and_project_id(self):
self.nova.servers.list.return_value = [
fakes.OSInstance(fakes.OS_INSTANCE_1),
fakes.OSInstance(fakes.OS_INSTANCE_2)]
self.nova.fixed_ips.get.return_value = mock.Mock(hostname='fake_name')
self.assertEqual(
(fakes.ID_OS_INSTANCE_1, fakes.ID_OS_PROJECT),
api.get_os_instance_and_project_id(self.fake_context,
fakes.IP_NETWORK_INTERFACE_2))
self.nova.fixed_ips.get.assert_called_with(
fakes.IP_NETWORK_INTERFACE_2)
self.nova.servers.list.assert_called_with(
search_opts={'hostname': 'fake_name',
'all_tenants': True})
def check_raise():
self.assertRaises(exception.EC2MetadataNotFound,
api.get_os_instance_and_project_id,
self.fake_context,
fakes.IP_NETWORK_INTERFACE_2)
self.nova.servers.list.return_value = [
fakes.OSInstance(fakes.OS_INSTANCE_2)]
check_raise()
self.nova.fixed_ips.get.side_effect = nova_exception.NotFound('fake')
self.nova.servers.list.return_value = [
fakes.OSInstance(fakes.OS_INSTANCE_1),
fakes.OSInstance(fakes.OS_INSTANCE_2)]
check_raise()
def test_get_instance_and_project_id_by_provider_id(self):
self.neutron.list_subnets.return_value = {
'subnets': [fakes.OS_SUBNET_1, fakes.OS_SUBNET_2]}

View File

@ -17,9 +17,7 @@ import copy
import mock
from neutronclient.common import exceptions as neutron_exception
from novaclient import exceptions as nova_exception
from ec2api.api import security_group
from ec2api.tests.unit import base
from ec2api.tests.unit import fakes
from ec2api.tests.unit import matchers
@ -30,33 +28,29 @@ class SecurityGroupTestCase(base.ApiTestCase):
def setUp(self):
super(SecurityGroupTestCase, self).setUp()
self.addCleanup(self._reset_engine)
def _reset_engine(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
def test_create_security_group(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_VPC_1,
fakes.DB_SECURITY_GROUP_1)
self.neutron.list_security_groups.return_value = (
{'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1)]})
self.db_api.add_item.return_value = fakes.DB_SECURITY_GROUP_2
self.nova.security_groups.create.return_value = (
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_2))
self.neutron.create_security_group.return_value = (
{'security_group': copy.deepcopy(fakes.OS_SECURITY_GROUP_2)})
resp = self.execute(
'CreateSecurityGroup',
{'GroupName': 'groupname',
'GroupDescription': 'Group description'})
self.nova.security_groups.create.assert_called_once_with(
'groupname', 'Group description')
secgroup_body = (
{'security_group': {'name': 'groupname',
'description': 'Group description'}})
self.neutron.create_security_group.assert_called_once_with(
secgroup_body)
db_group = tools.purge_dict(fakes.DB_SECURITY_GROUP_2, ('id',))
db_group['vpc_id'] = None
self.db_api.add_item.assert_called_once_with(mock.ANY, 'sg', db_group)
self.nova.security_groups.reset_mock()
self.neutron.create_security_group.reset_mock()
self.db_api.add_item.reset_mock()
self.neutron.list_security_groups.return_value = (
@ -70,16 +64,16 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'sg',
tools.purge_dict(fakes.DB_SECURITY_GROUP_2, ('id',)))
self.nova.security_groups.create.assert_called_once_with(
'groupname', 'Group description')
self.nova.security_groups.reset_mock()
self.neutron.create_security_group.assert_called_once_with(
secgroup_body)
self.neutron.create_security_group.reset_mock()
self.db_api.add_item.reset_mock()
self.configure(disable_ec2_classic=True)
self.add_mock_db_items(fakes.DB_VPC_DEFAULT,
fakes.DB_SECURITY_GROUP_DEFAULT)
self.nova.security_groups.create.return_value = (
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_5))
self.neutron.create_security_group.return_value = (
{'security_group': copy.deepcopy(fakes.OS_SECURITY_GROUP_5)})
self.neutron.list_security_groups.return_value = (
{'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1),
fakes.OS_SECURITY_GROUP_DEFAULT]})
@ -93,12 +87,13 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'sg',
tools.purge_dict(fakes.DB_SECURITY_GROUP_5, ('id',)))
self.nova.security_groups.create.assert_called_once_with(
'groupname2', 'Group description')
secgroup_body = (
{'security_group': {'name': 'groupname2',
'description': 'Group description'}})
self.neutron.create_security_group.assert_called_once_with(
secgroup_body)
def test_create_security_group_invalid(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
def do_check(args, error_code):
self.neutron.reset_mock()
@ -155,37 +150,35 @@ class SecurityGroupTestCase(base.ApiTestCase):
'InvalidGroup.Duplicate')
def test_create_security_group_over_quota(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.nova.security_groups.create.side_effect = (
nova_exception.OverLimit(413))
self.neutron.create_security_group.side_effect = (
neutron_exception.OverQuotaClient(413))
self.assert_execution_error(
'ResourceLimitExceeded', 'CreateSecurityGroup',
{'VpcId': fakes.ID_EC2_VPC_1,
'GroupName': 'groupname',
'GroupDescription': 'Group description'})
self.nova.security_groups.create.assert_called_once_with(
'groupname', 'Group description')
secgroup_body = (
{'security_group': {'name': 'groupname',
'description': 'Group description'}})
self.neutron.create_security_group.assert_called_once_with(
secgroup_body)
# TODO(tikitavi): Rework test to exclude nova-network
@tools.screen_unexpected_exception_logs
def test_create_security_group_rollback(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.set_mock_db_items(fakes.DB_VPC_1)
self.db_api.add_item.side_effect = Exception()
self.nova.security_groups.create.return_value = (
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_1))
self.neutron.create_security_group.return_value = (
{'security_group': copy.deepcopy(fakes.OS_SECURITY_GROUP_1)})
self.assert_execution_error(
self.ANY_EXECUTE_ERROR, 'CreateSecurityGroup',
{'VpcId': fakes.ID_EC2_VPC_1,
'GroupName': 'groupname',
'GroupDescription': 'Group description'})
self.nova.security_groups.delete.assert_called_once_with(
self.neutron.delete_security_group.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_1)
def test_delete_security_group(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1)
resp = self.execute(
'DeleteSecurityGroup',
@ -241,28 +234,12 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.neutron.delete_security_group.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_5)
def test_delete_security_group_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova.security_groups.list.return_value = (
[fakes.NovaSecurityGroup(copy.deepcopy(fakes.OS_SECURITY_GROUP_1)),
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_2)])
resp = self.execute(
'DeleteSecurityGroup',
{'GroupName':
fakes.EC2_SECURITY_GROUP_2['groupName']})
self.assertEqual(True, resp['return'])
self.nova.security_groups.delete.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_2)
# NOTE(Alex) This test is disabled because it checks using non-AWS id.
@base.skip_not_implemented
def test_delete_security_group_nova_os_id(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova.security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_2)])
[fakes.OS_SECURITY_GROUP_1,
fakes.OS_SECURITY_GROUP_2])
resp = self.execute(
'DeleteSecurityGroup',
{'GroupId':
@ -272,8 +249,6 @@ class SecurityGroupTestCase(base.ApiTestCase):
fakes.ID_OS_SECURITY_GROUP_2)
def test_delete_security_group_invalid(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1, fakes.DB_VPC_1)
self.neutron.show_security_group.return_value = (
{'security_group': fakes.OS_SECURITY_GROUP_1})
@ -293,8 +268,6 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.assertEqual(0, self.neutron.delete_port.call_count)
def test_delete_security_group_is_in_use(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1)
self.neutron.delete_security_group.side_effect = (
neutron_exception.Conflict())
@ -304,8 +277,6 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.assertEqual(0, self.db_api.delete_item.call_count)
def test_describe_security_groups(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
fakes.DB_SECURITY_GROUP_2,
fakes.DB_SECURITY_GROUP_3,
@ -378,37 +349,9 @@ class SecurityGroupTestCase(base.ApiTestCase):
[fakes.EC2_SECURITY_GROUP_5],
orderless_lists=True))
def test_describe_security_groups_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
fakes.DB_SECURITY_GROUP_2,
fakes.DB_SECURITY_GROUP_3,
fakes.DB_SECURITY_GROUP_4,
fakes.DB_SECURITY_GROUP_5,)
self.neutron.list_security_groups.return_value = (
{'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1),
fakes.OS_SECURITY_GROUP_2,
fakes.OS_SECURITY_GROUP_3,
fakes.OS_SECURITY_GROUP_4,
fakes.OS_SECURITY_GROUP_5]})
resp = self.execute('DescribeSecurityGroups', {})
self.assertThat(resp['securityGroupInfo'],
matchers.ListMatches(
[fakes.EC2_SECURITY_GROUP_1,
fakes.EC2_SECURITY_GROUP_2,
fakes.EC2_SECURITY_GROUP_3,
fakes.EC2_SECURITY_GROUP_4,
fakes.EC2_SECURITY_GROUP_5],
orderless_lists=True))
self.neutron.list_security_groups.assert_called_once_with(
tenant_id=fakes.ID_OS_PROJECT)
@mock.patch('ec2api.api.ec2utils.check_and_create_default_vpc')
def test_describe_security_groups_no_default_vpc(self, check_and_create):
self.configure(disable_ec2_classic=True)
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
def mock_check_and_create(context):
self.set_mock_db_items(fakes.DB_VPC_DEFAULT,
@ -426,11 +369,9 @@ class SecurityGroupTestCase(base.ApiTestCase):
check_and_create.assert_called_once_with(mock.ANY)
def test_repair_default_security_group(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.db_api.add_item.return_value = fakes.DB_SECURITY_GROUP_1
self.nova.security_groups.create.return_value = (
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_1))
self.neutron.create_security_group.return_value = (
{'security_group': copy.deepcopy(fakes.OS_SECURITY_GROUP_1)})
self.set_mock_db_items(fakes.DB_VPC_1,
fakes.DB_SECURITY_GROUP_1,
fakes.DB_SECURITY_GROUP_2)
@ -443,12 +384,13 @@ class SecurityGroupTestCase(base.ApiTestCase):
{'id': fakes.ID_EC2_VPC_1.replace('vpc', 'sg'),
'os_id': fakes.ID_OS_SECURITY_GROUP_1,
'vpc_id': fakes.ID_EC2_VPC_1})
self.nova.security_groups.create.assert_called_once_with(
fakes.ID_EC2_VPC_1, 'Default VPC security group')
secgroup_body = (
{'security_group': {'name': fakes.ID_EC2_VPC_1,
'description': 'Default VPC security group'}})
self.neutron.create_security_group.assert_called_once_with(
secgroup_body)
def test_authorize_security_group_invalid(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
def check_response(error_code, protocol, from_port, to_port, cidr,
group_id=fakes.ID_EC2_SECURITY_GROUP_2):
@ -518,8 +460,6 @@ class SecurityGroupTestCase(base.ApiTestCase):
'IpPermissions.1.Groups.1.UserId': 'i-99999999'})
def test_authorize_security_group_ingress_ip_ranges(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
fakes.DB_SECURITY_GROUP_2)
self.neutron.create_security_group_rule.return_value = (
@ -577,28 +517,7 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.neutron.create_security_group_rule.assert_called_with(
{'security_group_rule': security_group_rule})
def test_authorize_security_group_ip_ranges_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova.security_group_rules.create.return_value = (
{'security_group_rule': [fakes.NOVA_SECURITY_GROUP_RULE_1]})
self.nova.security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
self.execute(
'AuthorizeSecurityGroupIngress',
{'GroupName': fakes.EC2_NOVA_SECURITY_GROUP_2['groupName'],
'IpPermissions.1.FromPort': '10',
'IpPermissions.1.ToPort': '10',
'IpPermissions.1.IpProtocol': 'tcp',
'IpPermissions.1.IpRanges.1.CidrIp': '192.168.1.0/24'})
self.nova.security_group_rules.create.assert_called_once_with(
str(fakes.ID_NOVA_OS_SECURITY_GROUP_2), 'tcp', 10, 10,
'192.168.1.0/24', None)
def test_authorize_security_group_egress_groups(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
fakes.DB_SECURITY_GROUP_2)
self.neutron.create_security_group_rule.return_value = (
@ -616,27 +535,7 @@ class SecurityGroupTestCase(base.ApiTestCase):
{'id', 'remote_ip_prefix', 'tenant_id',
'port_range_max'})})
def test_authorize_security_group_groups_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova.security_group_rules.create.return_value = (
{'security_group_rule': [fakes.NOVA_SECURITY_GROUP_RULE_2]})
self.nova.security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
self.execute(
'AuthorizeSecurityGroupIngress',
{'GroupName': fakes.EC2_NOVA_SECURITY_GROUP_2['groupName'],
'IpPermissions.1.IpProtocol': 'icmp',
'IpPermissions.1.Groups.1.GroupName':
fakes.EC2_NOVA_SECURITY_GROUP_1['groupName']})
self.nova.security_group_rules.create.assert_called_once_with(
str(fakes.ID_NOVA_OS_SECURITY_GROUP_2), 'icmp', -1, -1,
None, str(fakes.ID_NOVA_OS_SECURITY_GROUP_1))
def test_revoke_security_group_ingress_ip_ranges(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
fakes.DB_SECURITY_GROUP_2)
self.neutron.show_security_group.return_value = {
@ -654,28 +553,7 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.neutron.delete_security_group_rule.assert_called_once_with(
fakes.OS_SECURITY_GROUP_RULE_1['id'])
def test_revoke_security_group_ingress_ip_ranges_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova.security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
self.nova.security_groups.get.return_value = (
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2))
self.nova.security_group_rules.delete.return_value = True
self.execute(
'RevokeSecurityGroupIngress',
{'GroupName': fakes.EC2_NOVA_SECURITY_GROUP_2['groupName'],
'IpPermissions.1.FromPort': '10',
'IpPermissions.1.ToPort': '10',
'IpPermissions.1.IpProtocol': 'tcp',
'IpPermissions.1.IpRanges.1.CidrIp': '192.168.1.0/24'})
self.nova.security_group_rules.delete.assert_called_once_with(
fakes.NOVA_SECURITY_GROUP_RULE_1['id'])
def test_revoke_security_group_egress_groups(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNeutron())
self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
fakes.DB_SECURITY_GROUP_2)
self.neutron.show_security_group.return_value = {
@ -692,21 +570,3 @@ class SecurityGroupTestCase(base.ApiTestCase):
fakes.ID_OS_SECURITY_GROUP_2)
self.neutron.delete_security_group_rule.assert_called_once_with(
fakes.OS_SECURITY_GROUP_RULE_2['id'])
def test_revoke_security_group_groups_nova(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova.security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2)])
self.nova.security_groups.get.return_value = (
fakes.NovaSecurityGroup(fakes.NOVA_SECURITY_GROUP_2))
self.nova.security_group_rules.delete.return_value = True
self.execute(
'RevokeSecurityGroupIngress',
{'GroupName': fakes.EC2_NOVA_SECURITY_GROUP_2['groupName'],
'IpPermissions.1.IpProtocol': 'icmp',
'IpPermissions.1.Groups.1.GroupName':
fakes.EC2_NOVA_SECURITY_GROUP_1['groupName']})
self.nova.security_group_rules.delete.assert_called_once_with(
fakes.NOVA_SECURITY_GROUP_RULE_2['id'])

View File

@ -274,7 +274,6 @@ iniset $CONF_FILE DEFAULT log_dir "$LOG_DIR"
iniset $CONF_FILE DEFAULT verbose True
iniset $CONF_FILE DEFAULT keystone_ec2_tokens_url "$OS_AUTH_URL_WO_PATH/v3/ec2tokens"
iniset $CONF_FILE database connection "$CONNECTION"
iniset $CONF_FILE DEFAULT full_vpc_support "$VPC_SUPPORT"
iniset $CONF_FILE DEFAULT disable_ec2_classic "$DISABLE_EC2_CLASSIC"
iniset $CONF_FILE DEFAULT external_network "$EXTERNAL_NETWORK"