fix races

1) fix bug with filtering
filters were applied before unpaired items is added to result

2) fix races in default security group creation

3) add waiter for associate/disassociate address

4) fix security group classic test
it should choose default group group only from classic groups

5) fix describe_vpc_with_filters test
it can run in parallel with other test with same CIDR

6) fix networks list for instance run at subnet creation/deletion

7) fix selective decsribe by names
it should not delete valid items from db

Change-Id: Iadadefb8b4abebbb3b8efc0e536e5de30ed23dab
This commit is contained in:
Andrey Pavlov 2015-12-09 16:55:52 +03:00
parent 64f1544e67
commit 2cc7ec92dc
18 changed files with 172 additions and 75 deletions

View File

@ -26,7 +26,7 @@ from ec2api.api import ec2utils
from ec2api.api import validator
from ec2api.db import api as db_api
from ec2api import exception
from ec2api.i18n import _, _LW
from ec2api.i18n import _, _LI, _LW
ec2_opts = [
@ -307,6 +307,9 @@ class UniversalDescriber(object):
if item is None and self.KIND not in VPC_KINDS:
item = ec2utils.auto_create_db_item(self.context, self.KIND,
self.get_id(os_item))
LOG.info(
_LI('Item %(item)s was updated to %(os_item)s.') %
{'item': str(item), 'os_item': str(os_item)})
return item
def get_id(self, os_item):
@ -316,6 +319,7 @@ class UniversalDescriber(object):
return os_item['name']
def delete_obsolete_item(self, item):
LOG.info(_LI('Deleting obsolete item %(item)s') % {'item': str(item)})
db_api.delete_item(self.context, item['id'])
def is_filtering_value_found(self, filter_value, value):
@ -380,7 +384,7 @@ class UniversalDescriber(object):
return formatted_items
def handle_unpaired_item(self, item, formatted_items):
def handle_unpaired_item(self, item):
self.delete_obsolete_item(item)
def describe(self, context, ids=None, names=None, filter=None,
@ -404,6 +408,8 @@ class UniversalDescriber(object):
os_item_name = self.get_name(os_item)
os_item_id = self.get_id(os_item)
item = self.items_dict.get(os_item_id, None)
if item:
paired_items_ids.add(item['id'])
# NOTE(Alex): Filter out items not requested in names or ids
if (self.selective_describe and
not (os_item_name in self.names or
@ -411,6 +417,8 @@ class UniversalDescriber(object):
continue
# NOTE(Alex): Autoupdate DB for autoupdatable items
item = self.auto_update_db(item, os_item)
# NOTE(andrey-mp): save item id again
# (if item has created by auto update)
if item:
paired_items_ids.add(item['id'])
formatted_item = self.format(item, os_item)
@ -424,8 +432,14 @@ class UniversalDescriber(object):
formatted_items.append(formatted_item)
# NOTE(Alex): delete obsolete items
for item in self.items:
if item['id'] not in paired_items_ids:
self.handle_unpaired_item(item, formatted_items)
if item['id'] in paired_items_ids:
continue
formatted_item = self.handle_unpaired_item(item)
if formatted_item:
if not self.filtered_out(formatted_item, filter):
formatted_items.append(formatted_item)
if item['id'] in self.ids:
self.ids.remove(item['id'])
# NOTE(Alex): some requested items are not found
if self.ids or self.names:
params = {'id': next(iter(self.ids or self.names))}

View File

@ -417,37 +417,36 @@ class ImageDescriber(common.TaggableItemsDescriber):
def get_tags(self):
return db_api.get_tags(self.context, ('ami', 'ari', 'aki'), self.ids)
def handle_unpaired_item(self, item, formatted_items):
def handle_unpaired_item(self, item):
if item['os_id']:
super(ImageDescriber, self).handle_unpaired_item(item,
formatted_items)
return super(ImageDescriber, self).handle_unpaired_item(item)
if 'is_public' not in item:
return None
# NOTE(ft): process creating images, ignoring ids mapping
# NOTE(ft): the image is being creating, Glance had created
# image, but creating thread doesn't yet update db item
os_image = self.ec2_created_os_images.get(item['id'])
if os_image:
item['os_id'] = os_image.id
item['is_public'] = os_image.is_public
db_api.update_item(self.context, item)
image = self.format(item, os_image)
else:
# NOTE(ft): process creating images, ignoring ids mapping
if 'is_public' in item:
# NOTE(ft): the image is being creating, Glance had created
# image, but creating thread doesn't yet update db item
os_image = self.ec2_created_os_images.get(item['id'])
if os_image:
item['os_id'] = os_image.id
item['is_public'] = os_image.is_public
db_api.update_item(self.context, item)
image = self.format(item, os_image)
else:
# NOTE(ft): Glance image is yet not created, but DB item
# exists. So that we adds EC2 image to output results
# with all data we have.
# TODO(ft): check if euca2ools can process such result
image = {'imageId': item['id'],
'imageOwnerId': self.context.project_id,
'imageType': IMAGE_TYPES[
ec2utils.get_ec2_id_kind(item['id'])],
'isPublic': item['is_public']}
if 'description' in item:
image['description'] = item['description']
image['imageState'] = item.get('state', 'pending')
formatted_items.append(image)
if item['id'] in self.ids:
self.ids.remove(item['id'])
# NOTE(ft): Glance image is not yet created, but DB item
# exists. So that we adds EC2 image to output results
# with all data we have.
# TODO(ft): check if euca2ools can process such result
image = {'imageId': item['id'],
'imageOwnerId': self.context.project_id,
'imageType': IMAGE_TYPES[
ec2utils.get_ec2_id_kind(item['id'])],
'isPublic': item['is_public']}
if 'description' in item:
image['description'] = item['description']
image['imageState'] = item.get('state', 'pending')
return image
def describe_images(context, executable_by=None, image_id=None,

View File

@ -1417,14 +1417,16 @@ class InstanceEngineNeutron(object):
os_subnets = neutron.list_subnets(
id=os_subnet_ids, fields=['network_id'],
tenant_id=context.project_id)['subnets']
vpc_os_network_ids = set(sn['network_id'] for sn in os_subnets)
vpc_os_network_ids = set(
sn['network_id'] for sn in os_subnets)
else:
vpc_os_network_ids = []
os_networks = neutron.list_networks(
**{'router:external': False, 'fields': ['id'],
**{'router:external': False, 'fields': ['id', 'name'],
'tenant_id': context.project_id})['networks']
ec2_classic_os_networks = [n for n in os_networks
if n['id'] not in vpc_os_network_ids]
if n['id'] not in vpc_os_network_ids and
not n.get('name').startswith('subnet-')]
if len(ec2_classic_os_networks) == 0:
raise exception.Unsupported(
reason=_('There are no available networks '

View File

@ -86,7 +86,7 @@ def create_security_group(context, group_name, group_description,
def _create_security_group(context, group_name, group_description,
vpc_id=None):
vpc_id=None, default=False):
nova = clients.nova(context)
with common.OnCrashCleaner() as cleaner:
try:
@ -99,9 +99,14 @@ def _create_security_group(context, group_name, group_description,
if vpc_id:
# NOTE(Alex) Check if such vpc exists
ec2utils.get_db_item(context, vpc_id)
security_group = db_api.add_item(context, 'sg',
{'vpc_id': vpc_id,
'os_id': os_security_group.id})
item = {'vpc_id': vpc_id, 'os_id': os_security_group.id}
if not default:
security_group = db_api.add_item(context, 'sg', item)
else:
item['id'] = ec2utils.change_ec2_id_kind(vpc_id, 'sg')
# NOTE(andrey-mp): try to add item with specific id
# and catch exception if it exists
security_group = db_api.restore_item(context, 'sg', item)
return {'return': 'true',
'groupId': security_group['id']}
@ -109,8 +114,16 @@ def _create_security_group(context, group_name, group_description,
def _create_default_security_group(context, vpc):
# NOTE(Alex): OpenStack doesn't allow creation of another group
# named 'default' hence vpc-id is used.
return _create_security_group(context, vpc['id'],
'Default VPC security group', vpc['id'])
try:
_create_security_group(context, vpc['id'],
'Default VPC security group', vpc['id'],
default=True)
except (exception.EC2DBDuplicateEntry, exception.InvalidVpcIDNotFound):
# NOTE(andrey-mp): when this thread tries to recreate default group
# but another thread tries to delete vpc we should pass vpc not found
LOG.exception('Failed to create default security group.')
return False
return True
def delete_security_group(context, group_name=None, group_id=None,
@ -176,8 +189,9 @@ class SecurityGroupDescriber(common.TaggableItemsDescriber):
db_group = db_groups_dict.get(os_group)
if db_group and db_group == vpc['id']:
continue
had_to_repair = True
_create_default_security_group(self.context, vpc)
result = _create_default_security_group(self.context, vpc)
if result:
had_to_repair = True
return had_to_repair

View File

@ -53,7 +53,8 @@ def create_subnet(context, vpc_id, cidr_block,
context, main_route_table, cidr_block)
neutron = clients.neutron(context)
with common.OnCrashCleaner() as cleaner:
os_network_body = {'network': {}}
# NOTE(andrey-mp): set fake name to filter networks in instance api
os_network_body = {'network': {'name': 'subnet-0'}}
try:
os_network = neutron.create_network(os_network_body)['network']
cleaner.addCleanup(neutron.delete_network, os_network['id'])

View File

@ -176,14 +176,17 @@ def delete_item(context, item_id):
@require_context
def restore_item(context, kind, data):
item_ref = models.Item()
item_ref.update({
"project_id": context.project_id,
})
item_ref.id = data['id']
item_ref.update(_pack_item_data(data))
item_ref.save()
return _unpack_item_data(item_ref)
try:
item_ref = models.Item()
item_ref.update({
"project_id": context.project_id,
})
item_ref.id = data['id']
item_ref.update(_pack_item_data(data))
item_ref.save()
return _unpack_item_data(item_ref)
except db_exception.DBDuplicateEntry:
raise exception.EC2DBDuplicateEntry(id=data['id'])
@require_context

View File

@ -105,6 +105,10 @@ class EC2DBInvalidOsIdUpdate(EC2APIException):
'from %(old_os_id)s to %(new_os_id)s')
class EC2DBDuplicateEntry(EC2APIException):
msg_fmt = _('Entry %(id)s already exists in DB.')
# Internal ec2api metadata exceptions
class EC2MetadataException(EC2APIException):

View File

@ -266,6 +266,8 @@ class AddressTest(base.EC2TestCase):
assoc_id = data['AssociationId']
clean_aa = self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=assoc_id)
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id})
kwargs = {
'AllocationIds': [alloc_id],
@ -275,9 +277,7 @@ class AddressTest(base.EC2TestCase):
data = self.client.disassociate_address(AssociationId=assoc_id)
self.cancelResourceCleanUp(clean_aa)
data = self.client.describe_addresses(*[], **kwargs)
self.assertIsNone(data['Addresses'][0].get('InstanceId'))
self.get_address_assoc_waiter().wait_delete({'AllocationId': alloc_id})
# NOTE(andrey-mp): cleanup
time.sleep(3)
@ -316,6 +316,7 @@ class AddressTest(base.EC2TestCase):
PublicIp=ip)
clean_aa = self.addResourceCleanUp(self.client.disassociate_address,
PublicIp=ip)
self.get_address_assoc_waiter().wait_available({'PublicIp': ip})
kwargs = {
'PublicIps': [ip],
@ -325,11 +326,7 @@ class AddressTest(base.EC2TestCase):
data = self.client.disassociate_address(PublicIp=ip)
self.cancelResourceCleanUp(clean_aa)
# NOTE(andrey-mp): Amazon needs some time to diassociate
time.sleep(2)
data = self.client.describe_addresses(*[], **kwargs)
self.assertFalse(data['Addresses'][0].get('InstanceId'))
self.get_address_assoc_waiter().wait_delete({'PublicIp': ip})
time.sleep(3)
@ -418,3 +415,5 @@ class AddressTest(base.EC2TestCase):
assoc_id = data['AssociationId']
self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=assoc_id)
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id})

View File

@ -32,10 +32,14 @@ class SecurityGroupBaseTest(base.EC2TestCase):
def _test_rules(self, add_func, del_func, field, vpc_id=None):
kwargs = dict()
if vpc_id:
kwargs['Filters'] = [{'Name': 'vpc-id',
'Values': [vpc_id]}]
kwargs['Filters'] = [{'Name': 'vpc-id', 'Values': [vpc_id]}]
data = self.client.describe_security_groups(*[], **kwargs)
default_group = data['SecurityGroups'][0]
security_groups = data['SecurityGroups']
if not vpc_id:
# TODO(andrey-mp): remove it when fitering by None will be
security_groups = [sg for sg in security_groups
if sg.get('VpcId') is None]
default_group = security_groups[0]
name = data_utils.rand_name('sgName')
desc = data_utils.rand_name('sgDesc')

View File

@ -100,7 +100,7 @@ class VPCTest(base.EC2TestCase):
self.get_vpc_waiter().wait_delete(vpc_id)
def test_describe_vpcs_filters(self):
cidr = '10.1.0.0/16'
cidr = '10.163.0.0/16'
data = self.client.create_vpc(CidrBlock=cidr)
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.delete_vpc,

View File

@ -336,6 +336,9 @@ class EC2TestCase(base.BaseTestCase):
'delete_vpn_gateway': (
'get_vpn_gateway_waiter',
lambda kwargs: kwargs['VpnGatewayId']),
'disassociate_address': (
'get_address_assoc_waiter',
lambda kwargs: kwargs),
}
@classmethod
@ -460,6 +463,31 @@ class EC2TestCase(base.BaseTestCase):
def get_subnet_waiter(cls):
return EC2Waiter(cls._subnet_get_state)
@classmethod
def _address_assoc_get_state(cls, kwargs):
try:
ip = kwargs.get('PublicIp')
alloc_id = kwargs.get('AllocationId')
assoc_id = kwargs.get('AssociationId')
if ip:
data = cls.client.describe_addresses(PublicIps=[ip])
elif alloc_id:
data = cls.client.describe_addresses(AllocationIds=[alloc_id])
elif assoc_id:
data = cls.client.describe_addresses(
Filters=[{'Name': 'association-id', 'Values': [assoc_id]}])
if ('Addresses' in data and len(data['Addresses']) == 1 and
data['Addresses'][0].get('InstanceId')):
return 'available'
raise exceptions.NotFound()
except botocore.exceptions.ClientError:
raise exceptions.NotFound()
@classmethod
def get_address_assoc_waiter(cls):
return EC2Waiter(cls._address_assoc_get_state)
@classmethod
def _instance_get_state(cls, instance_id):
try:

View File

@ -45,9 +45,13 @@ class BaseScenarioTest(base.EC2TestCase):
if is_vpc:
self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=data['AssociationId'])
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id})
else:
self.addResourceCleanUp(self.client.disassociate_address,
PublicIp=public_ip)
self.get_address_assoc_waiter().wait_available(
{'PublicIp': public_ip})
return public_ip

View File

@ -328,7 +328,9 @@ class InstancePagingTest(scenario_base.BaseScenarioTest):
rcount += 1
self.assertEqual(self.RESERVATIONS_COUNT, rcount)
count = self.RESERVATIONS_COUNT * self.INSTANCES_IN_RESERVATIONS_COUNT
self.assertEqual(count, self._count_own_instances(data))
instances = set()
self._collect_own_instances(data, instances)
self.assertEqual(count, len(instances))
def test_simple_instances_paging_with_min_results(self):
max_results = 5
@ -340,16 +342,17 @@ class InstancePagingTest(scenario_base.BaseScenarioTest):
real_count = 0
max_results = 5
kwargs = {'MaxResults': max_results}
instances = set()
while True:
data = self.client.describe_instances(*[], **kwargs)
self.assertGreaterEqual(max_results, self._count_instances(data))
real_count += self._count_own_instances(data)
self._collect_own_instances(data, instances)
if 'NextToken' not in data:
break
kwargs['NextToken'] = data['NextToken']
count = self.RESERVATIONS_COUNT * self.INSTANCES_IN_RESERVATIONS_COUNT
self.assertEqual(count, real_count)
self.assertEqual(count, len(instances))
def test_invalid_paging(self):
self.assertRaises('InvalidParameterValue',
@ -359,13 +362,11 @@ class InstancePagingTest(scenario_base.BaseScenarioTest):
self.client.describe_instances,
MaxResults=5, InstanceIds=[self.ids[0]])
def _count_own_instances(self, data):
count = 0
def _collect_own_instances(self, data, instances):
for reservation in data['Reservations']:
for instance in reservation['Instances']:
if instance['InstanceId'] in self.ids:
count += 1
return count
instances.add(instance['InstanceId'])
def _count_instances(self, data):
count = 0

View File

@ -80,6 +80,8 @@ class VpcAddressTest(scenario_base.BaseScenarioTest):
assoc_id1 = data['AssociationId']
clean_aa1 = self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=assoc_id1)
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id1})
instance = self.get_instance(instance_id)
nis = instance.get('NetworkInterfaces', [])
@ -106,6 +108,8 @@ class VpcAddressTest(scenario_base.BaseScenarioTest):
assoc_id2 = data['AssociationId']
clean_aa2 = self.addResourceCleanUp(self.client.disassociate_address,
AssociationId=assoc_id2)
self.get_address_assoc_waiter().wait_available(
{'AllocationId': alloc_id2})
kwargs = {
'NetworkInterfaceId': ni_id2,
@ -147,3 +151,5 @@ class VpcAddressTest(scenario_base.BaseScenarioTest):
self.client.disassociate_address(AssociationId=assoc_id1)
self.cancelResourceCleanUp(clean_aa1)
self.get_address_assoc_waiter().wait_delete(
{'AllocationId': alloc_id1})

View File

@ -121,6 +121,15 @@ class DbApiTestCase(base.DbTestCase):
item_id2 = db_api.add_item_id(self.context, 'fake', os_id)
self.assertEqual(item_id1, item_id2)
def test_restore_item(self):
os_id = fakes.random_os_id()
item = {'os_id': os_id, 'key': 'val1'}
new_item = db_api.add_item(self.context, 'fake', item)
item['id'] = new_item['id']
self.assertRaises(
exception.EC2DBDuplicateEntry,
db_api.restore_item, self.context, 'fake', item)
def test_update_item(self):
item = db_api.add_item(self.context, 'fake', {'key': 'val1',
'key1': 'val'})

View File

@ -724,6 +724,11 @@ class ImagePrivateTestCase(base.BaseTestCase):
result = image_api.describe_images(context, image_id=[image_id])
self.assertEqual(expected, result)
# describe with filter
result = image_api.describe_images(
context, filter=[{'name': 'name', 'value': 'noname'}])
self.assertEqual({'imagesSet': []}, result)
# describe failed image
image['state'] = 'failed'
expected['imagesSet'][0]['imageState'] = 'failed'

View File

@ -265,6 +265,7 @@ class SecurityGroupTestCase(base.ApiTestCase):
matchers.ListMatches(
[fakes.EC2_SECURITY_GROUP_2],
orderless_lists=True))
self.assertEqual(0, self.db_api.delete_item.call_count)
self.db_api.get_items_by_ids = tools.CopyingMock(
return_value=[fakes.DB_SECURITY_GROUP_2])
@ -276,6 +277,7 @@ class SecurityGroupTestCase(base.ApiTestCase):
orderless_lists=True))
self.db_api.get_items_by_ids.assert_called_once_with(
mock.ANY, set([fakes.ID_EC2_SECURITY_GROUP_2]))
self.assertEqual(0, self.db_api.delete_item.call_count)
self.check_filtering(
'DescribeSecurityGroups', 'securityGroupInfo',
@ -324,9 +326,11 @@ class SecurityGroupTestCase(base.ApiTestCase):
{'security_groups': [fakes.OS_SECURITY_GROUP_2]})
resp = self.execute('DescribeSecurityGroups', {})
self.db_api.add_item.assert_called_once_with(
self.db_api.restore_item.assert_called_once_with(
mock.ANY, 'sg',
tools.purge_dict(fakes.DB_SECURITY_GROUP_1, ('id',)))
{'id': fakes.ID_EC2_VPC_1.replace('vpc', 'sg'),
'os_id': fakes.ID_OS_SECURITY_GROUP_1,
'vpc_id': fakes.ID_EC2_VPC_1})
self.nova.security_groups.create.assert_called_once_with(
fakes.ID_EC2_VPC_1, 'Default VPC security group')

View File

@ -46,7 +46,7 @@ class SubnetTestCase(base.ApiTestCase):
mock.ANY, 'subnet',
tools.purge_dict(subnet_1, ('id',)))
self.neutron.create_network.assert_called_once_with(
{'network': {}})
{'network': {'name': 'subnet-0'}})
self.neutron.update_network.assert_called_once_with(
fakes.ID_OS_NETWORK_1,
{'network': {'name': fakes.ID_EC2_SUBNET_1}})