Add create_all and delete_all for AllocationList

This is the object side of creating and deleting allocations in the
placement service.

create_all() checks that the inventory records involved in the
allocations requested do not have their capacities exceeded. If no
inventory capacities are exceeded, allocation records are written to the
database. Afterwards, the resource provider generation for each resource
provider involved in the allocation transaction has its generation
incremented. If another threads has concurrently updated either the
inventory or the usage of resources, this generation increment will
raise ConcurrentUpdateDetected, allowing callers to retry. This patch
does not yet implement this retry, which should be fine for the time
being considering that nothing yet calls POST /allocations.

Partially-Implements: blueprint generic-resource-pools

Co-Authored-By: Jay Pipes <jaypipes@gmail.com>
Co-Authored-By: Ed Leafe <ed@leafe.com>

Change-Id: Ic0bb48dcb1ab33b278f09dee8778fcb7a374caad
This commit is contained in:
Chris Dent 2016-08-30 21:14:50 +00:00 committed by Dan Smith
parent 285a7f0bb6
commit 8ed5120829
5 changed files with 314 additions and 7 deletions

View File

@ -2135,6 +2135,12 @@ class InvalidInventoryNewCapacityExceeded(InvalidInventory):
"minus reserved amount is less than the existing used amount.")
class InvalidAllocationCapacityExceeded(InvalidInventory):
msg_fmt = _("Unable to create allocation for '%(resource_class)s' on "
"resource provider '%(resource_provider)s'. The requested "
"amount would exceed the capacity.")
class UnsupportedPointerModelRequested(Invalid):
msg_fmt = _("Pointer model '%(model)s' requested is not supported by "
"host.")

View File

@ -589,6 +589,9 @@ class Allocation(_HasAResourceProvider):
db_allocation = models.Allocation()
db_allocation.update(updates)
context.session.add(db_allocation)
# We may be in a nested context manager so must flush so the
# caller receives an id.
context.session.flush()
return db_allocation
@staticmethod
@ -613,31 +616,226 @@ class Allocation(_HasAResourceProvider):
self._destroy(self._context, self.id)
def _delete_current_allocs(conn, allocs):
"""Deletes any existing allocations that correspond to the allocations to
be written. This is wrapped in a transaction, so if the write subsequently
fails, the deletion will also be rolled back.
"""
for alloc in allocs:
rp_id = alloc.resource_provider.id
consumer_id = alloc.consumer_id
del_sql = _ALLOC_TBL.delete().where(
sa.and_(_ALLOC_TBL.c.resource_provider_id == rp_id,
_ALLOC_TBL.c.consumer_id == consumer_id))
conn.execute(del_sql)
def _check_capacity_exceeded(conn, allocs):
"""Checks to see if the supplied allocation records would result in any of
the inventories involved having their capacity exceeded.
Raises an InvalidAllocationCapacityExceeded exception if any inventory
would be exhausted by the allocation. If no inventories would be exceeded
by the allocation, the function returns a list of `ResourceProvider`
objects that contain the generation at the time of the check.
:param conn: SQLalchemy Connection object to use
:param allocs: List of `Allocation` objects to check
"""
# The SQL generated below looks like this:
# SELECT
# rp.id,
# rp.uuid,
# rp.generation,
# inv.resource_class_id,
# inv.total,
# inv.reserved,
# inv.allocation_ratio,
# allocs.used
# FROM resource_providers AS rp
# JOIN inventories AS i1
# ON rp.id = i1.resource_provider_id
# LEFT JOIN (
# SELECT resource_provider_id, resource_class_id, SUM(used) AS used
# FROM allocations
# WHERE resource_class_id IN ($RESOURCE_CLASSES)
# GROUP BY resource_provider_id, resource_class_id
# ) AS allocs
# ON inv.resource_provider_id = allocs.resource_provider_id
# AND inv.resource_class_id = allocs.resource_class_id
# WHERE rp.uuid IN ($RESOURCE_PROVIDERS)
# AND inv.resource_class_id IN ($RESOURCE_CLASSES)
#
# We then take the results of the above and determine if any of the
# inventory will have its capacity exceeded.
res_classes = set([fields.ResourceClass.index(a.resource_class)
for a in allocs])
provider_uuids = set([a.resource_provider.uuid for a in allocs])
usage = sa.select([_ALLOC_TBL.c.resource_provider_id,
_ALLOC_TBL.c.consumer_id,
_ALLOC_TBL.c.resource_class_id,
sql.func.sum(_ALLOC_TBL.c.used).label('used')])
usage = usage.where(_ALLOC_TBL.c.resource_class_id.in_(res_classes))
usage = usage.group_by(_ALLOC_TBL.c.resource_provider_id,
_ALLOC_TBL.c.resource_class_id)
usage = sa.alias(usage, name='usage')
inv_join = sql.join(_RP_TBL, _INV_TBL,
sql.and_(_RP_TBL.c.id == _INV_TBL.c.resource_provider_id,
_INV_TBL.c.resource_class_id.in_(res_classes)))
primary_join = sql.outerjoin(inv_join, usage,
_INV_TBL.c.resource_provider_id == usage.c.resource_provider_id)
cols_in_output = [
_RP_TBL.c.id.label('resource_provider_id'),
_RP_TBL.c.uuid,
_RP_TBL.c.generation,
_INV_TBL.c.resource_class_id,
_INV_TBL.c.total,
_INV_TBL.c.reserved,
_INV_TBL.c.allocation_ratio,
usage.c.used,
]
sel = sa.select(cols_in_output).select_from(primary_join)
sel = sel.where(
sa.and_(_RP_TBL.c.uuid.in_(provider_uuids),
_INV_TBL.c.resource_class_id.in_(res_classes)))
records = conn.execute(sel)
# Create a map keyed by (rp_uuid, res_class) for the records in the DB
usage_map = {}
provs_with_inv = set()
for record in records:
usage_map[(record['uuid'], record['resource_class_id'])] = record
provs_with_inv.add(record["uuid"])
# Ensure that all providers have existing inventory
missing_provs = provider_uuids - provs_with_inv
if missing_provs:
raise exception.InvalidInventory(resource_class=str(res_classes),
resource_provider=missing_provs)
res_providers = {}
for alloc in allocs:
res_class = fields.ResourceClass.index(alloc.resource_class)
rp_uuid = alloc.resource_provider.uuid
key = (rp_uuid, res_class)
usage = usage_map[key]
amount_needed = alloc.used
allocation_ratio = usage['allocation_ratio']
# usage["used"] can be returned as None
used = usage['used'] or 0
capacity = (usage['total'] - usage['reserved']) * allocation_ratio
if capacity < (used + amount_needed):
raise exception.InvalidAllocationCapacityExceeded(
resource_class=fields.ResourceClass.from_index(res_class),
resource_provider=rp_uuid)
if rp_uuid not in res_providers:
rp = ResourceProvider(id=usage['resource_provider_id'],
uuid=rp_uuid,
generation=usage['generation'])
res_providers[rp_uuid] = rp
return list(res_providers.values())
@base.NovaObjectRegistry.register
class AllocationList(base.ObjectListBase, base.NovaObject):
# Version 1.0: Initial Version
VERSION = '1.0'
# Version 1.1: Add create_all() and delete_all()
VERSION = '1.1'
fields = {
'objects': fields.ListOfObjectsField('Allocation'),
}
@staticmethod
@db_api.api_context_manager.writer
def _delete_allocations(context, allocations):
for allocation in allocations:
allocation._context = context
allocation.destroy()
@staticmethod
@db_api.api_context_manager.reader
def _get_allocations_from_db(context, rp_uuid):
def _get_allocations_from_db(context, resource_provider_uuid=None,
consumer_id=None):
query = (context.session.query(models.Allocation)
.join(models.Allocation.resource_provider)
.options(contains_eager('resource_provider'))
.filter(models.ResourceProvider.uuid == rp_uuid))
.options(contains_eager('resource_provider')))
if resource_provider_uuid:
query = query.filter(
models.ResourceProvider.uuid == resource_provider_uuid)
if consumer_id:
query = query.filter(
models.Allocation.consumer_id == consumer_id)
return query.all()
@staticmethod
@db_api.api_context_manager.writer
def _set_allocations(context, allocs):
"""Write a set of allocations.
We must check that there is capacity for each allocation.
If there is not we roll back the entire set.
"""
conn = context.session.connection()
# Before writing any allocation records, we check that the submitted
# allocations do not cause any inventory capacity to be exceeded for
# any resource provider and resource class involved in the allocation
# transaction. _check_capacity_exceeded() raises an exception if any
# inventory capacity is exceeded. If capacity is not exceeeded, the
# function returns a list of ResourceProvider objects containing the
# generation of the resource provider at the time of the check. These
# objects are used at the end of the allocation transaction as a guard
# against concurrent updates.
with conn.begin():
# First delete any existing allocations for that rp/consumer combo.
_delete_current_allocs(conn, allocs)
before_gens = _check_capacity_exceeded(conn, allocs)
# Now add the allocations that were passed in.
for alloc in allocs:
rp = alloc.resource_provider
res_class = fields.ResourceClass.index(alloc.resource_class)
ins_stmt = _ALLOC_TBL.insert().values(
resource_provider_id=rp.id,
resource_class_id=res_class,
consumer_id=alloc.consumer_id,
used=alloc.used)
conn.execute(ins_stmt)
# Generation checking happens here. If the inventory for
# this resource provider changed out from under us,
# this will raise a ConcurrentUpdateDetected which can be caught
# by the caller to choose to try again. It will also rollback the
# transaction so that these changes always happen atomically.
for rp in before_gens:
_increment_provider_generation(conn, rp)
@base.remotable_classmethod
def get_all_by_resource_provider_uuid(cls, context, rp_uuid):
db_allocation_list = cls._get_allocations_from_db(
context, rp_uuid)
context, resource_provider_uuid=rp_uuid)
return base.obj_make_list(
context, cls(context), objects.Allocation, db_allocation_list)
@base.remotable_classmethod
def get_all_by_consumer_id(cls, context, consumer_id):
db_allocation_list = cls._get_allocations_from_db(
context, consumer_id=consumer_id)
return base.obj_make_list(
context, cls(context), objects.Allocation, db_allocation_list)
@base.remotable
def create_all(self):
"""Create the supplied allocations."""
# TODO(jaypipes): Retry the allocation writes on
# ConcurrentUpdateDetected
self._set_allocations(self._context, self.objects)
@base.remotable
def delete_all(self):
self._delete_allocations(self._context, self.objects)
@base.NovaObjectRegistry.register
class Usage(base.NovaObject):

View File

@ -572,6 +572,109 @@ class TestAllocation(ResourceProviderBaseCase):
for allocation in allocations])
class TestAllocationListCreateDelete(ResourceProviderBaseCase):
def test_allocation_list_create(self):
consumer_uuid = uuidsentinel.consumer
# Create two resource providers
rp1_name = uuidsentinel.rp1_name
rp1_uuid = uuidsentinel.rp1_uuid
rp1_class = fields.ResourceClass.DISK_GB
rp1_used = 6
rp2_name = uuidsentinel.rp2_name
rp2_uuid = uuidsentinel.rp2_uuid
rp2_class = fields.ResourceClass.IPV4_ADDRESS
rp2_used = 2
rp1 = objects.ResourceProvider(
self.context, name=rp1_name, uuid=rp1_uuid)
rp1.create()
rp2 = objects.ResourceProvider(
self.context, name=rp2_name, uuid=rp2_uuid)
rp2.create()
# Two allocations, one for each resource provider.
allocation_1 = objects.Allocation(resource_provider=rp1,
consumer_id=consumer_uuid,
resource_class=rp1_class,
used=rp1_used)
allocation_2 = objects.Allocation(resource_provider=rp2,
consumer_id=consumer_uuid,
resource_class=rp2_class,
used=rp2_used)
allocation_list = objects.AllocationList(
self.context, objects=[allocation_1, allocation_2])
# There's no inventory, we have a failure.
self.assertRaises(exception.InvalidInventory,
allocation_list.create_all)
# Add inventory for one of the two resource providers. This should also
# fail, since rp2 has no inventory.
inv = objects.Inventory(resource_provider=rp1,
resource_class=rp1_class,
total=1024)
inv.obj_set_defaults()
inv_list = objects.InventoryList(objects=[inv])
rp1.set_inventory(inv_list)
self.assertRaises(exception.InvalidInventory,
allocation_list.create_all)
# Add inventory for the second resource provider
inv = objects.Inventory(resource_provider=rp2,
resource_class=rp2_class,
total=255, reserved=2)
inv.obj_set_defaults()
inv_list = objects.InventoryList(objects=[inv])
rp2.set_inventory(inv_list)
# Now the allocations will work.
allocation_list.create_all()
# Check that those allocations changed usage on each
# resource provider.
rp1_usage = objects.UsageList.get_all_by_resource_provider_uuid(
self.context, rp1_uuid)
rp2_usage = objects.UsageList.get_all_by_resource_provider_uuid(
self.context, rp2_uuid)
self.assertEqual(rp1_used, rp1_usage[0].usage)
self.assertEqual(rp2_used, rp2_usage[0].usage)
# redo one allocation
# TODO(cdent): This does not currently behave as expected
# because a new allocataion is created, adding to the total
# used, not replacing.
rp1_used += 1
allocation_1 = objects.Allocation(resource_provider=rp1,
consumer_id=consumer_uuid,
resource_class=rp1_class,
used=rp1_used)
allocation_list = objects.AllocationList(
self.context, objects=[allocation_1])
allocation_list.create_all()
rp1_usage = objects.UsageList.get_all_by_resource_provider_uuid(
self.context, rp1_uuid)
self.assertEqual(rp1_used, rp1_usage[0].usage)
# delete the allocations for the consumer
# NOTE(cdent): The database uses 'consumer_id' for the
# column, presumably because some ids might not be uuids, at
# some point in the future.
consumer_allocations = objects.AllocationList.get_all_by_consumer_id(
self.context, consumer_uuid)
consumer_allocations.delete_all()
rp1_usage = objects.UsageList.get_all_by_resource_provider_uuid(
self.context, rp1_uuid)
rp2_usage = objects.UsageList.get_all_by_resource_provider_uuid(
self.context, rp2_uuid)
self.assertEqual(0, rp1_usage[0].usage)
self.assertEqual(0, rp2_usage[0].usage)
class UsageListTestCase(ResourceProviderBaseCase):
def test_get_all_null(self):

View File

@ -1100,7 +1100,7 @@ object_data = {
'Aggregate': '1.3-f315cb68906307ca2d1cca84d4753585',
'AggregateList': '1.2-fb6e19f3c3a3186b04eceb98b5dadbfa',
'Allocation': '1.0-864506325f1822f4e4805b56faf51bbe',
'AllocationList': '1.0-de53f0fd078c27cc1d43400f4e8bcef8',
'AllocationList': '1.1-e43fe4a9c9cbbda7438b0e48332f099e',
'BandwidthUsage': '1.2-c6e4c779c7f40f2407e3d70022e3cd1c',
'BandwidthUsageList': '1.2-5fe7475ada6fe62413cbfcc06ec70746',
'BlockDeviceMapping': '1.17-5e094927f1251770dcada6ab05adfcdb',

View File

@ -493,7 +493,7 @@ class _TestAllocationListNoDB(object):
self.assertEqual(1, len(allocations))
mock_get_allocations_from_db.assert_called_once_with(
self.context, uuids.resource_provider)
self.context, resource_provider_uuid=uuids.resource_provider)
self.assertEqual(_ALLOCATION_DB['used'], allocations[0].used)