Bump version and fix races

Cherry picked from Iadadefb8b4abebbb3b8efc0e536e5de30ed23dab

1) fix bug with filtering
filters were applied before unpaired items is added to result

2) fix races in default security group creation

3) add waiter for associate/disassociate address

4) fix security group classic test
it should choose default group group only from classic groups

5) fix describe_vpc_with_filters test
it can run in parallel with other test with same CIDR

6) fix networks list for instance run at subnet creation/deletion

7) fix selective decsribe by names
it should not delete valid items from db

Change-Id: I552c321760295ac6061ccf70e5ed4737113162d8
This commit is contained in:
Andrey Pavlov 2015-12-16 18:45:32 +03:00
parent 10c9562437
commit 85986cfc8c
19 changed files with 173 additions and 76 deletions

View File

@ -26,7 +26,7 @@ from ec2api.api import ec2utils
from ec2api.api import validator
from ec2api.db import api as db_api
from ec2api import exception
from ec2api.i18n import _, _LW
from ec2api.i18n import _, _LI, _LW
ec2_opts = [
@ -307,6 +307,9 @@ class UniversalDescriber(object):
if item is None and self.KIND not in VPC_KINDS:
item = ec2utils.auto_create_db_item(self.context, self.KIND,
self.get_id(os_item))
LOG.info(
_LI('Item %(item)s was updated to %(os_item)s.') %
{'item': str(item), 'os_item': str(os_item)})
return item
def get_id(self, os_item):
@ -316,6 +319,7 @@ class UniversalDescriber(object):
return os_item['name']
def delete_obsolete_item(self, item):
LOG.info(_LI('Deleting obsolete item %(item)s') % {'item': str(item)})
db_api.delete_item(self.context, item['id'])
def is_filtering_value_found(self, filter_value, value):
@ -380,7 +384,7 @@ class UniversalDescriber(object):
return formatted_items
def handle_unpaired_item(self, item, formatted_items):
def handle_unpaired_item(self, item):
self.delete_obsolete_item(item)
def describe(self, context, ids=None, names=None, filter=None,
@ -404,6 +408,8 @@ class UniversalDescriber(object):
os_item_name = self.get_name(os_item)
os_item_id = self.get_id(os_item)
item = self.items_dict.get(os_item_id, None)
if item:
paired_items_ids.add(item['id'])
# NOTE(Alex): Filter out items not requested in names or ids
if (self.selective_describe and
not (os_item_name in self.names or
@ -411,6 +417,8 @@ class UniversalDescriber(object):
continue
# NOTE(Alex): Autoupdate DB for autoupdatable items
item = self.auto_update_db(item, os_item)
# NOTE(andrey-mp): save item id again
# (if item has created by auto update)
if item:
paired_items_ids.add(item['id'])
formatted_item = self.format(item, os_item)
@ -424,8 +432,14 @@ class UniversalDescriber(object):
formatted_items.append(formatted_item)
# NOTE(Alex): delete obsolete items
for item in self.items:
if item['id'] not in paired_items_ids:
self.handle_unpaired_item(item, formatted_items)
if item['id'] in paired_items_ids:
continue
formatted_item = self.handle_unpaired_item(item)
if formatted_item:
if not self.filtered_out(formatted_item, filter):
formatted_items.append(formatted_item)
if item['id'] in self.ids:
self.ids.remove(item['id'])
# NOTE(Alex): some requested items are not found
if self.ids or self.names:
params = {'id': next(iter(self.ids or self.names))}

View File

@ -417,37 +417,36 @@ class ImageDescriber(common.TaggableItemsDescriber):
def get_tags(self):
return db_api.get_tags(self.context, ('ami', 'ari', 'aki'), self.ids)
def handle_unpaired_item(self, item, formatted_items):
def handle_unpaired_item(self, item):
if item['os_id']:
super(ImageDescriber, self).handle_unpaired_item(item,
formatted_items)
return super(ImageDescriber, self).handle_unpaired_item(item)
if 'is_public' not in item:
return None
# NOTE(ft): process creating images, ignoring ids mapping
# NOTE(ft): the image is being creating, Glance had created
# image, but creating thread doesn't yet update db item
os_image = self.ec2_created_os_images.get(item['id'])
if os_image:
item['os_id'] = os_image.id
item['is_public'] = os_image.is_public
db_api.update_item(self.context, item)
image = self.format(item, os_image)
else:
# NOTE(ft): process creating images, ignoring ids mapping
if 'is_public' in item:
# NOTE(ft): the image is being creating, Glance had created
# image, but creating thread doesn't yet update db item
os_image = self.ec2_created_os_images.get(item['id'])
if os_image:
item['os_id'] = os_image.id
item['is_public'] = os_image.is_public
db_api.update_item(self.context, item)
image = self.format(item, os_image)
else:
# NOTE(ft): Glance image is yet not created, but DB item
# exists. So that we adds EC2 image to output results
# with all data we have.
# TODO(ft): check if euca2ools can process such result
image = {'imageId': item['id'],
'imageOwnerId': self.context.project_id,
'imageType': IMAGE_TYPES[
ec2utils.get_ec2_id_kind(item['id'])],
'isPublic': item['is_public']}
if 'description' in item:
image['description'] = item['description']
image['imageState'] = item.get('state', 'pending')
formatted_items.append(image)
if item['id'] in self.ids:
self.ids.remove(item['id'])
# NOTE(ft): Glance image is not yet created, but DB item
# exists. So that we adds EC2 image to output results
# with all data we have.
# TODO(ft): check if euca2ools can process such result
image = {'imageId': item['id'],
'imageOwnerId': self.context.project_id,
'imageType': IMAGE_TYPES[
ec2utils.get_ec2_id_kind(item['id'])],
'isPublic': item['is_public']}
if 'description' in item:
image['description'] = item['description']
image['imageState'] = item.get('state', 'pending')
return image
def describe_images(context, executable_by=None, image_id=None,

View File

@ -1417,14 +1417,16 @@ class InstanceEngineNeutron(object):
os_subnets = neutron.list_subnets(
id=os_subnet_ids, fields=['network_id'],
tenant_id=context.project_id)['subnets']
vpc_os_network_ids = set(sn['network_id'] for sn in os_subnets)
vpc_os_network_ids = set(
sn['network_id'] for sn in os_subnets)
else:
vpc_os_network_ids = []
os_networks = neutron.list_networks(
**{'router:external': False, 'fields': ['id'],
**{'router:external': False, 'fields': ['id', 'name'],
'tenant_id': context.project_id})['networks']
ec2_classic_os_networks = [n for n in os_networks
if n['id'] not in vpc_os_network_ids]
if n['id'] not in vpc_os_network_ids and
not n.get('name').startswith('subnet-')]
if len(ec2_classic_os_networks) == 0:
raise exception.Unsupported(
reason=_('There are no available networks '

View File

@ -86,7 +86,7 @@ def create_security_group(context, group_name, group_description,
def _create_security_group(context, group_name, group_description,
vpc_id=None):
vpc_id=None, default=False):
nova = clients.nova(context)
with common.OnCrashCleaner() as cleaner:
try:
@ -99,9 +99,14 @@ def _create_security_group(context, group_name, group_description,
if vpc_id:
# NOTE(Alex) Check if such vpc exists
ec2utils.get_db_item(context, vpc_id)
security_group = db_api.add_item(context, 'sg',
{'vpc_id': vpc_id,
'os_id': os_security_group.id})
item = {'vpc_id': vpc_id, 'os_id': os_security_group.id}
if not default:
security_group = db_api.add_item(context, 'sg', item)
else:
item['id'] = ec2utils.change_ec2_id_kind(vpc_id, 'sg')
# NOTE(andrey-mp): try to add item with specific id
# and catch exception if it exists
security_group = db_api.restore_item(context, 'sg', item)
return {'return': 'true',
'groupId': security_group['id']}
@ -109,8 +114,16 @@ def _create_security_group(context, group_name, group_description,
def _create_default_security_group(context, vpc):
# NOTE(Alex): OpenStack doesn't allow creation of another group
# named 'default' hence vpc-id is used.
return _create_security_group(context, vpc['id'],
'Default VPC security group', vpc['id'])
try:
_create_security_group(context, vpc['id'],
'Default VPC security group', vpc['id'],
default=True)
except (exception.EC2DBDuplicateEntry, exception.InvalidVpcIDNotFound):
# NOTE(andrey-mp): when this thread tries to recreate default group
# but another thread tries to delete vpc we should pass vpc not found
LOG.exception('Failed to create default security group.')
return False
return True
def delete_security_group(context, group_name=None, group_id=None,
@ -176,8 +189,9 @@ class SecurityGroupDescriber(common.TaggableItemsDescriber):
db_group = db_groups_dict.get(os_group)
if db_group and db_group == vpc['id']:
continue
had_to_repair = True
_create_default_security_group(self.context, vpc)
result = _create_default_security_group(self.context, vpc)
if result:
had_to_repair = True
return had_to_repair

View File

@ -53,7 +53,8 @@ def create_subnet(context, vpc_id, cidr_block,
context, main_route_table, cidr_block)
neutron = clients.neutron(context)
with common.OnCrashCleaner() as cleaner:
os_network_body = {'network': {}}
# NOTE(andrey-mp): set fake name to filter networks in instance api
os_network_body = {'network': {'name': 'subnet-0'}}
try:
os_network = neutron.create_network(os_network_body)['network']
cleaner.addCleanup(neutron.delete_network, os_network['id'])

View File

@ -176,14 +176,17 @@ def delete_item(context, item_id):
@require_context
def restore_item(context, kind, data):
item_ref = models.Item()
item_ref.update({
"project_id": context.project_id,
})
item_ref.id = data['id']
item_ref.update(_pack_item_data(data))
item_ref.save()
return _unpack_item_data(item_ref)
try:
item_ref = models.Item()
item_ref.update({
"project_id": context.project_id,
})
item_ref.id = data['id']
item_ref.update(_pack_item_data(data))
item_ref.save()
return _unpack_item_data(item_ref)
except db_exception.DBDuplicateEntry:
raise exception.EC2DBDuplicateEntry(id=data['id'])
@require_context

View File

@ -105,6 +105,10 @@ class EC2DBInvalidOsIdUpdate(EC2APIException):
'from %(old_os_id)s to %(new_os_id)s')
class EC2DBDuplicateEntry(EC2APIException):
msg_fmt = _('Entry %(id)s already exists in DB.')
# Internal ec2api metadata exceptions
class EC2MetadataException(EC2APIException):

View File

@ -266,6 +266,8 @@ class AddressTest(base.EC2TestCase):
assoc_id = data['AssociationId']
clean_aa = self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=assoc_id)
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id})
kwargs = {
'AllocationIds': [alloc_id],
@ -275,9 +277,7 @@ class AddressTest(base.EC2TestCase):
data = self.client.disassociate_address(AssociationId=assoc_id)
self.cancelResourceCleanUp(clean_aa)
data = self.client.describe_addresses(*[], **kwargs)
self.assertIsNone(data['Addresses'][0].get('InstanceId'))
self.get_address_assoc_waiter().wait_delete({'AllocationId': alloc_id})
# NOTE(andrey-mp): cleanup
time.sleep(3)
@ -316,6 +316,7 @@ class AddressTest(base.EC2TestCase):
PublicIp=ip)
clean_aa = self.addResourceCleanUp(self.client.disassociate_address,
PublicIp=ip)
self.get_address_assoc_waiter().wait_available({'PublicIp': ip})
kwargs = {
'PublicIps': [ip],
@ -325,11 +326,7 @@ class AddressTest(base.EC2TestCase):
data = self.client.disassociate_address(PublicIp=ip)
self.cancelResourceCleanUp(clean_aa)
# NOTE(andrey-mp): Amazon needs some time to diassociate
time.sleep(2)
data = self.client.describe_addresses(*[], **kwargs)
self.assertFalse(data['Addresses'][0].get('InstanceId'))
self.get_address_assoc_waiter().wait_delete({'PublicIp': ip})
time.sleep(3)
@ -418,3 +415,5 @@ class AddressTest(base.EC2TestCase):
assoc_id = data['AssociationId']
self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=assoc_id)
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id})

View File

@ -32,10 +32,14 @@ class SecurityGroupBaseTest(base.EC2TestCase):
def _test_rules(self, add_func, del_func, field, vpc_id=None):
kwargs = dict()
if vpc_id:
kwargs['Filters'] = [{'Name': 'vpc-id',
'Values': [vpc_id]}]
kwargs['Filters'] = [{'Name': 'vpc-id', 'Values': [vpc_id]}]
data = self.client.describe_security_groups(*[], **kwargs)
default_group = data['SecurityGroups'][0]
security_groups = data['SecurityGroups']
if not vpc_id:
# TODO(andrey-mp): remove it when fitering by None will be
security_groups = [sg for sg in security_groups
if sg.get('VpcId') is None]
default_group = security_groups[0]
name = data_utils.rand_name('sgName')
desc = data_utils.rand_name('sgDesc')

View File

@ -100,7 +100,7 @@ class VPCTest(base.EC2TestCase):
self.get_vpc_waiter().wait_delete(vpc_id)
def test_describe_vpcs_filters(self):
cidr = '10.1.0.0/16'
cidr = '10.163.0.0/16'
data = self.client.create_vpc(CidrBlock=cidr)
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.delete_vpc,

View File

@ -336,6 +336,9 @@ class EC2TestCase(base.BaseTestCase):
'delete_vpn_gateway': (
'get_vpn_gateway_waiter',
lambda kwargs: kwargs['VpnGatewayId']),
'disassociate_address': (
'get_address_assoc_waiter',
lambda kwargs: kwargs),
}
@classmethod
@ -460,6 +463,31 @@ class EC2TestCase(base.BaseTestCase):
def get_subnet_waiter(cls):
return EC2Waiter(cls._subnet_get_state)
@classmethod
def _address_assoc_get_state(cls, kwargs):
try:
ip = kwargs.get('PublicIp')
alloc_id = kwargs.get('AllocationId')
assoc_id = kwargs.get('AssociationId')
if ip:
data = cls.client.describe_addresses(PublicIps=[ip])
elif alloc_id:
data = cls.client.describe_addresses(AllocationIds=[alloc_id])
elif assoc_id:
data = cls.client.describe_addresses(
Filters=[{'Name': 'association-id', 'Values': [assoc_id]}])
if ('Addresses' in data and len(data['Addresses']) == 1 and
data['Addresses'][0].get('InstanceId')):
return 'available'
raise exceptions.NotFound()
except botocore.exceptions.ClientError:
raise exceptions.NotFound()
@classmethod
def get_address_assoc_waiter(cls):
return EC2Waiter(cls._address_assoc_get_state)
@classmethod
def _instance_get_state(cls, instance_id):
try:

View File

@ -45,9 +45,13 @@ class BaseScenarioTest(base.EC2TestCase):
if is_vpc:
self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=data['AssociationId'])
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id})
else:
self.addResourceCleanUp(self.client.disassociate_address,
PublicIp=public_ip)
self.get_address_assoc_waiter().wait_available(
{'PublicIp': public_ip})
return public_ip

View File

@ -328,7 +328,9 @@ class InstancePagingTest(scenario_base.BaseScenarioTest):
rcount += 1
self.assertEqual(self.RESERVATIONS_COUNT, rcount)
count = self.RESERVATIONS_COUNT * self.INSTANCES_IN_RESERVATIONS_COUNT
self.assertEqual(count, self._count_own_instances(data))
instances = set()
self._collect_own_instances(data, instances)
self.assertEqual(count, len(instances))
def test_simple_instances_paging_with_min_results(self):
max_results = 5
@ -340,16 +342,17 @@ class InstancePagingTest(scenario_base.BaseScenarioTest):
real_count = 0
max_results = 5
kwargs = {'MaxResults': max_results}
instances = set()
while True:
data = self.client.describe_instances(*[], **kwargs)
self.assertGreaterEqual(max_results, self._count_instances(data))
real_count += self._count_own_instances(data)
self._collect_own_instances(data, instances)
if 'NextToken' not in data:
break
kwargs['NextToken'] = data['NextToken']
count = self.RESERVATIONS_COUNT * self.INSTANCES_IN_RESERVATIONS_COUNT
self.assertEqual(count, real_count)
self.assertEqual(count, len(instances))
def test_invalid_paging(self):
self.assertRaises('InvalidParameterValue',
@ -359,13 +362,11 @@ class InstancePagingTest(scenario_base.BaseScenarioTest):
self.client.describe_instances,
MaxResults=5, InstanceIds=[self.ids[0]])
def _count_own_instances(self, data):
count = 0
def _collect_own_instances(self, data, instances):
for reservation in data['Reservations']:
for instance in reservation['Instances']:
if instance['InstanceId'] in self.ids:
count += 1
return count
instances.add(instance['InstanceId'])
def _count_instances(self, data):
count = 0

View File

@ -80,6 +80,8 @@ class VpcAddressTest(scenario_base.BaseScenarioTest):
assoc_id1 = data['AssociationId']
clean_aa1 = self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=assoc_id1)
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id1})
instance = self.get_instance(instance_id)
nis = instance.get('NetworkInterfaces', [])
@ -106,6 +108,8 @@ class VpcAddressTest(scenario_base.BaseScenarioTest):
assoc_id2 = data['AssociationId']
clean_aa2 = self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=assoc_id2)
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id2})
kwargs = {
'NetworkInterfaceId': ni_id2,
@ -147,3 +151,5 @@ class VpcAddressTest(scenario_base.BaseScenarioTest):
self.client.disassociate_address(AssociationId=assoc_id1)
self.cancelResourceCleanUp(clean_aa1)
self.get_address_assoc_waiter().wait_delete(
{'AllocationId': alloc_id1})

View File

@ -121,6 +121,15 @@ class DbApiTestCase(base.DbTestCase):
item_id2 = db_api.add_item_id(self.context, 'fake', os_id)
self.assertEqual(item_id1, item_id2)
def test_restore_item(self):
os_id = fakes.random_os_id()
item = {'os_id': os_id, 'key': 'val1'}
new_item = db_api.add_item(self.context, 'fake', item)
item['id'] = new_item['id']
self.assertRaises(
exception.EC2DBDuplicateEntry,
db_api.restore_item, self.context, 'fake', item)
def test_update_item(self):
item = db_api.add_item(self.context, 'fake', {'key': 'val1',
'key1': 'val'})

View File

@ -724,6 +724,11 @@ class ImagePrivateTestCase(base.BaseTestCase):
result = image_api.describe_images(context, image_id=[image_id])
self.assertEqual(expected, result)
# describe with filter
result = image_api.describe_images(
context, filter=[{'name': 'name', 'value': 'noname'}])
self.assertEqual({'imagesSet': []}, result)
# describe failed image
image['state'] = 'failed'
expected['imagesSet'][0]['imageState'] = 'failed'

View File

@ -265,6 +265,7 @@ class SecurityGroupTestCase(base.ApiTestCase):
matchers.ListMatches(
[fakes.EC2_SECURITY_GROUP_2],
orderless_lists=True))
self.assertEqual(0, self.db_api.delete_item.call_count)
self.db_api.get_items_by_ids = tools.CopyingMock(
return_value=[fakes.DB_SECURITY_GROUP_2])
@ -276,6 +277,7 @@ class SecurityGroupTestCase(base.ApiTestCase):
orderless_lists=True))
self.db_api.get_items_by_ids.assert_called_once_with(
mock.ANY, set([fakes.ID_EC2_SECURITY_GROUP_2]))
self.assertEqual(0, self.db_api.delete_item.call_count)
self.check_filtering(
'DescribeSecurityGroups', 'securityGroupInfo',
@ -324,9 +326,11 @@ class SecurityGroupTestCase(base.ApiTestCase):
{'security_groups': [fakes.OS_SECURITY_GROUP_2]})
resp = self.execute('DescribeSecurityGroups', {})
self.db_api.add_item.assert_called_once_with(
self.db_api.restore_item.assert_called_once_with(
mock.ANY, 'sg',
tools.purge_dict(fakes.DB_SECURITY_GROUP_1, ('id',)))
{'id': fakes.ID_EC2_VPC_1.replace('vpc', 'sg'),
'os_id': fakes.ID_OS_SECURITY_GROUP_1,
'vpc_id': fakes.ID_EC2_VPC_1})
self.nova.security_groups.create.assert_called_once_with(
fakes.ID_EC2_VPC_1, 'Default VPC security group')

View File

@ -46,7 +46,7 @@ class SubnetTestCase(base.ApiTestCase):
mock.ANY, 'subnet',
tools.purge_dict(subnet_1, ('id',)))
self.neutron.create_network.assert_called_once_with(
{'network': {}})
{'network': {'name': 'subnet-0'}})
self.neutron.update_network.assert_called_once_with(
fakes.ID_OS_NETWORK_1,
{'network': {'name': fakes.ID_EC2_SUBNET_1}})

View File

@ -1,6 +1,6 @@
[metadata]
name = ec2-api
version = 1.0.0
version = 1.0.1
summary = OpenStack Ec2api Service
description-file =
README.rst