940 lines
37 KiB
Python
940 lines
37 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from oslo_log import log as logging
|
|
import six
|
|
import sqlalchemy as sa
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import contains_eager
|
|
from sqlalchemy import sql
|
|
|
|
from nova.db.sqlalchemy import api as db_api
|
|
from nova.db.sqlalchemy import api_models as models
|
|
from nova import exception
|
|
from nova.i18n import _LW
|
|
from nova import objects
|
|
from nova.objects import base
|
|
from nova.objects import fields
|
|
|
|
_ALLOC_TBL = models.Allocation.__table__
|
|
_INV_TBL = models.Inventory.__table__
|
|
_RP_TBL = models.ResourceProvider.__table__
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_current_inventory_resources(conn, rp):
|
|
"""Returns a set() containing the resource class IDs for all resources
|
|
currently having an inventory record for the supplied resource provider.
|
|
|
|
:param conn: DB connection to use.
|
|
:param rp: Resource provider to query inventory for.
|
|
"""
|
|
cur_res_sel = sa.select([_INV_TBL.c.resource_class_id]).where(
|
|
_INV_TBL.c.resource_provider_id == rp.id)
|
|
existing_resources = conn.execute(cur_res_sel).fetchall()
|
|
return set([r[0] for r in existing_resources])
|
|
|
|
|
|
def _delete_inventory_from_provider(conn, rp, to_delete):
|
|
"""Deletes any inventory records from the supplied provider and set() of
|
|
resource class identifiers.
|
|
|
|
If there are allocations for any of the inventories to be deleted raise
|
|
InventoryInUse exception.
|
|
|
|
:param conn: DB connection to use.
|
|
:param rp: Resource provider from which to delete inventory.
|
|
:param to_delete: set() containing resource class IDs for records to
|
|
delete.
|
|
"""
|
|
allocation_query = sa.select(
|
|
[_ALLOC_TBL.c.resource_class_id.label('resource_class')]).where(
|
|
sa.and_(_ALLOC_TBL.c.resource_provider_id == rp.id,
|
|
_ALLOC_TBL.c.resource_class_id.in_(to_delete))
|
|
).group_by(_ALLOC_TBL.c.resource_class_id)
|
|
allocations = conn.execute(allocation_query).fetchall()
|
|
if allocations:
|
|
resource_classes = ', '.join([fields.ResourceClass.from_index(
|
|
allocation.resource_class) for allocation in allocations])
|
|
raise exception.InventoryInUse(resource_classes=resource_classes,
|
|
resource_provider=rp.uuid)
|
|
|
|
del_stmt = _INV_TBL.delete().where(sa.and_(
|
|
_INV_TBL.c.resource_provider_id == rp.id,
|
|
_INV_TBL.c.resource_class_id.in_(to_delete)))
|
|
res = conn.execute(del_stmt)
|
|
return res.rowcount
|
|
|
|
|
|
def _add_inventory_to_provider(conn, rp, inv_list, to_add):
|
|
"""Inserts new inventory records for the supplied resource provider.
|
|
|
|
:param conn: DB connection to use.
|
|
:param rp: Resource provider to add inventory to.
|
|
:param inv_list: InventoryList object
|
|
:param to_add: set() containing resource class IDs to search inv_list for
|
|
adding to resource provider.
|
|
"""
|
|
for res_class in to_add:
|
|
inv_record = inv_list.find(res_class)
|
|
if inv_record.capacity <= 0:
|
|
raise exception.InvalidInventoryCapacity(
|
|
resource_class=fields.ResourceClass.from_index(res_class),
|
|
resource_provider=rp.uuid)
|
|
ins_stmt = _INV_TBL.insert().values(
|
|
resource_provider_id=rp.id,
|
|
resource_class_id=res_class,
|
|
total=inv_record.total,
|
|
reserved=inv_record.reserved,
|
|
min_unit=inv_record.min_unit,
|
|
max_unit=inv_record.max_unit,
|
|
step_size=inv_record.step_size,
|
|
allocation_ratio=inv_record.allocation_ratio)
|
|
conn.execute(ins_stmt)
|
|
|
|
|
|
def _update_inventory_for_provider(conn, rp, inv_list, to_update):
|
|
"""Updates existing inventory records for the supplied resource provider.
|
|
|
|
:param conn: DB connection to use.
|
|
:param rp: Resource provider on which to update inventory.
|
|
:param inv_list: InventoryList object
|
|
:param to_update: set() containing resource class IDs to search inv_list
|
|
for updating in resource provider.
|
|
:returns: A list of (uuid, class) tuples that have exceeded their
|
|
capacity after this inventory update.
|
|
"""
|
|
exceeded = []
|
|
for res_class in to_update:
|
|
inv_record = inv_list.find(res_class)
|
|
if inv_record.capacity <= 0:
|
|
raise exception.InvalidInventoryCapacity(
|
|
resource_class=fields.ResourceClass.from_index(res_class),
|
|
resource_provider=rp.uuid)
|
|
allocation_query = sa.select(
|
|
[func.sum(_ALLOC_TBL.c.used).label('usage')]).\
|
|
where(sa.and_(
|
|
_ALLOC_TBL.c.resource_provider_id == rp.id,
|
|
_ALLOC_TBL.c.resource_class_id == res_class))
|
|
allocations = conn.execute(allocation_query).first()
|
|
if allocations and allocations['usage'] > inv_record.capacity:
|
|
exceeded.append((rp.uuid,
|
|
fields.ResourceClass.from_index(res_class)))
|
|
upd_stmt = _INV_TBL.update().where(sa.and_(
|
|
_INV_TBL.c.resource_provider_id == rp.id,
|
|
_INV_TBL.c.resource_class_id == res_class)).values(
|
|
total=inv_record.total,
|
|
reserved=inv_record.reserved,
|
|
min_unit=inv_record.min_unit,
|
|
max_unit=inv_record.max_unit,
|
|
step_size=inv_record.step_size,
|
|
allocation_ratio=inv_record.allocation_ratio)
|
|
res = conn.execute(upd_stmt)
|
|
if not res.rowcount:
|
|
raise exception.NotFound(
|
|
'No inventory of class %s found for update'
|
|
% fields.ResourceClass.from_index(res_class))
|
|
return exceeded
|
|
|
|
|
|
def _increment_provider_generation(conn, rp):
|
|
"""Increments the supplied provider's generation value, supplying the
|
|
currently-known generation. Returns whether the increment succeeded.
|
|
|
|
:param conn: DB connection to use.
|
|
:param rp: `ResourceProvider` whose generation should be updated.
|
|
:returns: The new resource provider generation value if successful.
|
|
:raises nova.exception.ConcurrentUpdateDetected: if another thread updated
|
|
the same resource provider's view of its inventory or allocations
|
|
in between the time when this object was originally read
|
|
and the call to set the inventory.
|
|
"""
|
|
rp_gen = rp.generation
|
|
new_generation = rp_gen + 1
|
|
upd_stmt = _RP_TBL.update().where(sa.and_(
|
|
_RP_TBL.c.id == rp.id,
|
|
_RP_TBL.c.generation == rp_gen)).values(
|
|
generation=(new_generation))
|
|
|
|
res = conn.execute(upd_stmt)
|
|
if res.rowcount != 1:
|
|
raise exception.ConcurrentUpdateDetected
|
|
return new_generation
|
|
|
|
|
|
@db_api.api_context_manager.writer
|
|
def _add_inventory(context, rp, inventory):
|
|
"""Add one Inventory that wasn't already on the provider."""
|
|
resource_class_id = fields.ResourceClass.index(inventory.resource_class)
|
|
inv_list = InventoryList(objects=[inventory])
|
|
conn = context.session.connection()
|
|
with conn.begin():
|
|
_add_inventory_to_provider(
|
|
conn, rp, inv_list, set([resource_class_id]))
|
|
rp.generation = _increment_provider_generation(conn, rp)
|
|
|
|
|
|
@db_api.api_context_manager.writer
|
|
def _update_inventory(context, rp, inventory):
|
|
"""Update an inventory already on the provider."""
|
|
resource_class_id = fields.ResourceClass.index(inventory.resource_class)
|
|
inv_list = InventoryList(objects=[inventory])
|
|
conn = context.session.connection()
|
|
with conn.begin():
|
|
exceeded = _update_inventory_for_provider(
|
|
conn, rp, inv_list, set([resource_class_id]))
|
|
rp.generation = _increment_provider_generation(conn, rp)
|
|
return exceeded
|
|
|
|
|
|
@db_api.api_context_manager.writer
|
|
def _delete_inventory(context, rp, resource_class_id):
|
|
"""Delete up to one Inventory of the given resource_class id."""
|
|
|
|
conn = context.session.connection()
|
|
with conn.begin():
|
|
if not _delete_inventory_from_provider(conn, rp, [resource_class_id]):
|
|
raise exception.NotFound(
|
|
'No inventory of class %s found for delete'
|
|
% fields.ResourceClass.from_index(resource_class_id))
|
|
rp.generation = _increment_provider_generation(conn, rp)
|
|
|
|
|
|
@db_api.api_context_manager.writer
|
|
def _set_inventory(context, rp, inv_list):
|
|
"""Given an InventoryList object, replaces the inventory of the
|
|
resource provider in a safe, atomic fashion using the resource
|
|
provider's generation as a consistent view marker.
|
|
|
|
:param context: Nova RequestContext.
|
|
:param rp: `ResourceProvider` object upon which to set inventory.
|
|
:param inv_list: `InventoryList` object to save to backend storage.
|
|
:returns: A list of (uuid, class) tuples that have exceeded their
|
|
capacity after this inventory update.
|
|
:raises nova.exception.ConcurrentUpdateDetected: if another thread updated
|
|
the same resource provider's view of its inventory or allocations
|
|
in between the time when this object was originally read
|
|
and the call to set the inventory.
|
|
"""
|
|
|
|
conn = context.session.connection()
|
|
|
|
existing_resources = _get_current_inventory_resources(conn, rp)
|
|
these_resources = set([fields.ResourceClass.index(r.resource_class)
|
|
for r in inv_list.objects])
|
|
|
|
# Determine which resources we should be adding, deleting and/or
|
|
# updating in the resource provider's inventory by comparing sets
|
|
# of resource class identifiers.
|
|
to_add = these_resources - existing_resources
|
|
to_delete = existing_resources - these_resources
|
|
to_update = these_resources & existing_resources
|
|
exceeded = []
|
|
|
|
with conn.begin():
|
|
if to_delete:
|
|
_delete_inventory_from_provider(conn, rp, to_delete)
|
|
if to_add:
|
|
_add_inventory_to_provider(conn, rp, inv_list, to_add)
|
|
if to_update:
|
|
exceeded = _update_inventory_for_provider(conn, rp, inv_list,
|
|
to_update)
|
|
|
|
# Here is where we update the resource provider's generation value.
|
|
# If this update updates zero rows, that means that another
|
|
# thread has updated the inventory for this resource provider
|
|
# between the time the caller originally read the resource provider
|
|
# record and inventory information and this point. We raise an
|
|
# exception here which will rollback the above transaction and
|
|
# return an error to the caller to indicate that they can attempt
|
|
# to retry the inventory save after reverifying any capacity
|
|
# conditions and re-reading the existing inventory information.
|
|
rp.generation = _increment_provider_generation(conn, rp)
|
|
|
|
return exceeded
|
|
|
|
|
|
@base.NovaObjectRegistry.register
|
|
class ResourceProvider(base.NovaObject):
|
|
# Version 1.0: Initial version
|
|
# Version 1.1: Add destroy()
|
|
VERSION = '1.1'
|
|
|
|
fields = {
|
|
'id': fields.IntegerField(read_only=True),
|
|
'uuid': fields.UUIDField(nullable=False),
|
|
'name': fields.StringField(nullable=False),
|
|
'generation': fields.IntegerField(nullable=False),
|
|
}
|
|
|
|
@base.remotable
|
|
def create(self):
|
|
if 'id' in self:
|
|
raise exception.ObjectActionError(action='create',
|
|
reason='already created')
|
|
if 'uuid' not in self:
|
|
raise exception.ObjectActionError(action='create',
|
|
reason='uuid is required')
|
|
if 'name' not in self:
|
|
raise exception.ObjectActionError(action='create',
|
|
reason='name is required')
|
|
updates = self.obj_get_changes()
|
|
db_rp = self._create_in_db(self._context, updates)
|
|
self._from_db_object(self._context, self, db_rp)
|
|
|
|
@base.remotable
|
|
def destroy(self):
|
|
self._delete(self._context, self.id)
|
|
|
|
@base.remotable
|
|
def save(self):
|
|
updates = self.obj_get_changes()
|
|
if updates and updates.keys() != ['name']:
|
|
raise exception.ObjectActionError(
|
|
action='save',
|
|
reason='Immutable fields changed')
|
|
self._update_in_db(self._context, self.id, updates)
|
|
|
|
@base.remotable_classmethod
|
|
def get_by_uuid(cls, context, uuid):
|
|
db_resource_provider = cls._get_by_uuid_from_db(context, uuid)
|
|
return cls._from_db_object(context, cls(), db_resource_provider)
|
|
|
|
@base.remotable
|
|
def add_inventory(self, inventory):
|
|
"""Add one new Inventory to the resource provider.
|
|
|
|
Fails if Inventory of the provided resource class is
|
|
already present.
|
|
"""
|
|
_add_inventory(self._context, self, inventory)
|
|
self.obj_reset_changes()
|
|
|
|
@base.remotable
|
|
def delete_inventory(self, resource_class):
|
|
"""Delete Inventory of provided resource_class."""
|
|
resource_class_id = fields.ResourceClass.index(resource_class)
|
|
_delete_inventory(self._context, self, resource_class_id)
|
|
self.obj_reset_changes()
|
|
|
|
@base.remotable
|
|
def set_inventory(self, inv_list):
|
|
"""Set all resource provider Inventory to be the provided list."""
|
|
exceeded = _set_inventory(self._context, self, inv_list)
|
|
for uuid, rclass in exceeded:
|
|
LOG.warning(_LW('Resource provider %(uuid)s is now over-'
|
|
'capacity for %(resource)s'),
|
|
{'uuid': uuid, 'resource': rclass})
|
|
self.obj_reset_changes()
|
|
|
|
@base.remotable
|
|
def update_inventory(self, inventory):
|
|
"""Update one existing Inventory of the same resource class.
|
|
|
|
Fails if no Inventory of the same class is present.
|
|
"""
|
|
exceeded = _update_inventory(self._context, self, inventory)
|
|
for uuid, rclass in exceeded:
|
|
LOG.warning(_LW('Resource provider %(uuid)s is now over-'
|
|
'capacity for %(resource)s'),
|
|
{'uuid': uuid, 'resource': rclass})
|
|
self.obj_reset_changes()
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.writer
|
|
def _create_in_db(context, updates):
|
|
db_rp = models.ResourceProvider()
|
|
db_rp.update(updates)
|
|
context.session.add(db_rp)
|
|
return db_rp
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.writer
|
|
def _delete(context, _id):
|
|
# Don't delete the resource provider if it has allocations.
|
|
rp_allocations = context.session.query(models.Allocation).\
|
|
filter(models.Allocation.resource_provider_id == _id).\
|
|
count()
|
|
if rp_allocations:
|
|
raise exception.ResourceProviderInUse()
|
|
# Delete any inventory associated with the resource provider
|
|
context.session.query(models.Inventory).\
|
|
filter(models.Inventory.resource_provider_id == _id).delete()
|
|
result = context.session.query(models.ResourceProvider).\
|
|
filter(models.ResourceProvider.id == _id).delete()
|
|
if not result:
|
|
raise exception.NotFound()
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.writer
|
|
def _update_in_db(context, id, updates):
|
|
db_rp = context.session.query(models.ResourceProvider).filter_by(
|
|
id=id).first()
|
|
db_rp.update(updates)
|
|
db_rp.save(context.session)
|
|
|
|
@staticmethod
|
|
def _from_db_object(context, resource_provider, db_resource_provider):
|
|
for field in resource_provider.fields:
|
|
setattr(resource_provider, field, db_resource_provider[field])
|
|
resource_provider._context = context
|
|
resource_provider.obj_reset_changes()
|
|
return resource_provider
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.reader
|
|
def _get_by_uuid_from_db(context, uuid):
|
|
result = context.session.query(models.ResourceProvider).filter_by(
|
|
uuid=uuid).first()
|
|
if not result:
|
|
raise exception.NotFound()
|
|
return result
|
|
|
|
|
|
@base.NovaObjectRegistry.register
|
|
class ResourceProviderList(base.ObjectListBase, base.NovaObject):
|
|
# Version 1.0: Initial Version
|
|
VERSION = '1.0'
|
|
|
|
fields = {
|
|
'objects': fields.ListOfObjectsField('ResourceProvider'),
|
|
}
|
|
|
|
allowed_filters = (
|
|
'name', 'uuid'
|
|
)
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.reader
|
|
def _get_all_by_filters_from_db(context, filters):
|
|
if not filters:
|
|
filters = {}
|
|
query = context.session.query(models.ResourceProvider)
|
|
for attr in ResourceProviderList.allowed_filters:
|
|
if attr in filters:
|
|
query = query.filter(
|
|
getattr(models.ResourceProvider, attr) == filters[attr])
|
|
query = query.filter_by(can_host=filters.get('can_host', 0))
|
|
return query.all()
|
|
|
|
@base.remotable_classmethod
|
|
def get_all_by_filters(cls, context, filters=None):
|
|
resource_providers = cls._get_all_by_filters_from_db(context, filters)
|
|
return base.obj_make_list(context, cls(context),
|
|
objects.ResourceProvider, resource_providers)
|
|
|
|
|
|
class _HasAResourceProvider(base.NovaObject):
|
|
"""Code shared between Inventory and Allocation
|
|
|
|
Both contain a ResourceProvider.
|
|
"""
|
|
|
|
@staticmethod
|
|
def _make_db(updates):
|
|
try:
|
|
resource_provider = updates.pop('resource_provider')
|
|
updates['resource_provider_id'] = resource_provider.id
|
|
except (KeyError, NotImplementedError):
|
|
raise exception.ObjectActionError(
|
|
action='create',
|
|
reason='resource_provider required')
|
|
try:
|
|
resource_class = updates.pop('resource_class')
|
|
except KeyError:
|
|
raise exception.ObjectActionError(
|
|
action='create',
|
|
reason='resource_class required')
|
|
updates['resource_class_id'] = fields.ResourceClass.index(
|
|
resource_class)
|
|
return updates
|
|
|
|
@staticmethod
|
|
def _from_db_object(context, target, source):
|
|
for field in target.fields:
|
|
if field not in ('resource_provider', 'resource_class'):
|
|
setattr(target, field, source[field])
|
|
|
|
if 'resource_class' not in target:
|
|
target.resource_class = (
|
|
target.fields['resource_class'].from_index(
|
|
source['resource_class_id']))
|
|
if ('resource_provider' not in target and
|
|
'resource_provider' in source):
|
|
target.resource_provider = ResourceProvider()
|
|
ResourceProvider._from_db_object(
|
|
context,
|
|
target.resource_provider,
|
|
source['resource_provider'])
|
|
|
|
target._context = context
|
|
target.obj_reset_changes()
|
|
return target
|
|
|
|
|
|
@db_api.api_context_manager.writer
|
|
def _create_inventory_in_db(context, updates):
|
|
db_inventory = models.Inventory()
|
|
db_inventory.update(updates)
|
|
context.session.add(db_inventory)
|
|
return db_inventory
|
|
|
|
|
|
@db_api.api_context_manager.writer
|
|
def _update_inventory_in_db(context, id_, updates):
|
|
result = context.session.query(
|
|
models.Inventory).filter_by(id=id_).update(updates)
|
|
if not result:
|
|
raise exception.NotFound()
|
|
|
|
|
|
@base.NovaObjectRegistry.register
|
|
class Inventory(_HasAResourceProvider):
|
|
# Version 1.0: Initial version
|
|
VERSION = '1.0'
|
|
|
|
fields = {
|
|
'id': fields.IntegerField(read_only=True),
|
|
'resource_provider': fields.ObjectField('ResourceProvider'),
|
|
'resource_class': fields.ResourceClassField(read_only=True),
|
|
'total': fields.NonNegativeIntegerField(),
|
|
'reserved': fields.NonNegativeIntegerField(default=0),
|
|
'min_unit': fields.NonNegativeIntegerField(default=1),
|
|
'max_unit': fields.NonNegativeIntegerField(default=1),
|
|
'step_size': fields.NonNegativeIntegerField(default=1),
|
|
'allocation_ratio': fields.NonNegativeFloatField(default=1.0),
|
|
}
|
|
|
|
@property
|
|
def capacity(self):
|
|
"""Inventory capacity, adjusted by allocation_ratio."""
|
|
return int((self.total - self.reserved) * self.allocation_ratio)
|
|
|
|
@base.remotable
|
|
def create(self):
|
|
if 'id' in self:
|
|
raise exception.ObjectActionError(action='create',
|
|
reason='already created')
|
|
updates = self._make_db(self.obj_get_changes())
|
|
db_inventory = self._create_in_db(self._context, updates)
|
|
self._from_db_object(self._context, self, db_inventory)
|
|
|
|
@base.remotable
|
|
def save(self):
|
|
if 'id' not in self:
|
|
raise exception.ObjectActionError(action='save',
|
|
reason='not created')
|
|
updates = self.obj_get_changes()
|
|
updates.pop('id', None)
|
|
self._update_in_db(self._context, self.id, updates)
|
|
|
|
@staticmethod
|
|
def _create_in_db(context, updates):
|
|
return _create_inventory_in_db(context, updates)
|
|
|
|
@staticmethod
|
|
def _update_in_db(context, id_, updates):
|
|
return _update_inventory_in_db(context, id_, updates)
|
|
|
|
|
|
@base.NovaObjectRegistry.register
|
|
class InventoryList(base.ObjectListBase, base.NovaObject):
|
|
# Version 1.0: Initial Version
|
|
VERSION = '1.0'
|
|
|
|
fields = {
|
|
'objects': fields.ListOfObjectsField('Inventory'),
|
|
}
|
|
|
|
def find(self, res_class):
|
|
"""Return the inventory record from the list of Inventory records that
|
|
matches the supplied resource class, or None.
|
|
|
|
:param res_class: An integer or string representing a resource
|
|
class. If the value is a string, the method first
|
|
looks up the resource class identifier from the
|
|
string.
|
|
"""
|
|
if isinstance(res_class, six.string_types):
|
|
try:
|
|
res_class = fields.ResourceClass.index(res_class)
|
|
except ValueError:
|
|
raise exception.NotFound("No such resource class '%s'" %
|
|
res_class)
|
|
|
|
for inv_rec in self.objects:
|
|
if fields.ResourceClass.index(inv_rec.resource_class) == res_class:
|
|
return inv_rec
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.reader
|
|
def _get_all_by_resource_provider(context, rp_uuid):
|
|
return context.session.query(models.Inventory).\
|
|
join(models.Inventory.resource_provider).\
|
|
options(contains_eager('resource_provider')).\
|
|
filter(models.ResourceProvider.uuid == rp_uuid).all()
|
|
|
|
@base.remotable_classmethod
|
|
def get_all_by_resource_provider_uuid(cls, context, rp_uuid):
|
|
db_inventory_list = cls._get_all_by_resource_provider(context,
|
|
rp_uuid)
|
|
return base.obj_make_list(context, cls(context), objects.Inventory,
|
|
db_inventory_list)
|
|
|
|
|
|
@base.NovaObjectRegistry.register
|
|
class Allocation(_HasAResourceProvider):
|
|
# Version 1.0: Initial version
|
|
VERSION = '1.0'
|
|
|
|
fields = {
|
|
'id': fields.IntegerField(),
|
|
'resource_provider': fields.ObjectField('ResourceProvider'),
|
|
'consumer_id': fields.UUIDField(),
|
|
'resource_class': fields.ResourceClassField(),
|
|
'used': fields.IntegerField(),
|
|
}
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.writer
|
|
def _create_in_db(context, updates):
|
|
db_allocation = models.Allocation()
|
|
db_allocation.update(updates)
|
|
context.session.add(db_allocation)
|
|
# We may be in a nested context manager so must flush so the
|
|
# caller receives an id.
|
|
context.session.flush()
|
|
return db_allocation
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.writer
|
|
def _destroy(context, id):
|
|
result = context.session.query(models.Allocation).filter_by(
|
|
id=id).delete()
|
|
if not result:
|
|
raise exception.NotFound()
|
|
|
|
@base.remotable
|
|
def create(self):
|
|
if 'id' in self:
|
|
raise exception.ObjectActionError(action='create',
|
|
reason='already created')
|
|
updates = self._make_db(self.obj_get_changes())
|
|
db_allocation = self._create_in_db(self._context, updates)
|
|
self._from_db_object(self._context, self, db_allocation)
|
|
|
|
@base.remotable
|
|
def destroy(self):
|
|
self._destroy(self._context, self.id)
|
|
|
|
|
|
def _delete_current_allocs(conn, allocs):
|
|
"""Deletes any existing allocations that correspond to the allocations to
|
|
be written. This is wrapped in a transaction, so if the write subsequently
|
|
fails, the deletion will also be rolled back.
|
|
"""
|
|
for alloc in allocs:
|
|
rp_id = alloc.resource_provider.id
|
|
consumer_id = alloc.consumer_id
|
|
del_sql = _ALLOC_TBL.delete().where(
|
|
sa.and_(_ALLOC_TBL.c.resource_provider_id == rp_id,
|
|
_ALLOC_TBL.c.consumer_id == consumer_id))
|
|
conn.execute(del_sql)
|
|
|
|
|
|
def _check_capacity_exceeded(conn, allocs):
|
|
"""Checks to see if the supplied allocation records would result in any of
|
|
the inventories involved having their capacity exceeded.
|
|
|
|
Raises an InvalidAllocationCapacityExceeded exception if any inventory
|
|
would be exhausted by the allocation. If no inventories would be exceeded
|
|
by the allocation, the function returns a list of `ResourceProvider`
|
|
objects that contain the generation at the time of the check.
|
|
|
|
:param conn: SQLalchemy Connection object to use
|
|
:param allocs: List of `Allocation` objects to check
|
|
"""
|
|
# The SQL generated below looks like this:
|
|
# SELECT
|
|
# rp.id,
|
|
# rp.uuid,
|
|
# rp.generation,
|
|
# inv.resource_class_id,
|
|
# inv.total,
|
|
# inv.reserved,
|
|
# inv.allocation_ratio,
|
|
# allocs.used
|
|
# FROM resource_providers AS rp
|
|
# JOIN inventories AS i1
|
|
# ON rp.id = i1.resource_provider_id
|
|
# LEFT JOIN (
|
|
# SELECT resource_provider_id, resource_class_id, SUM(used) AS used
|
|
# FROM allocations
|
|
# WHERE resource_class_id IN ($RESOURCE_CLASSES)
|
|
# GROUP BY resource_provider_id, resource_class_id
|
|
# ) AS allocs
|
|
# ON inv.resource_provider_id = allocs.resource_provider_id
|
|
# AND inv.resource_class_id = allocs.resource_class_id
|
|
# WHERE rp.uuid IN ($RESOURCE_PROVIDERS)
|
|
# AND inv.resource_class_id IN ($RESOURCE_CLASSES)
|
|
#
|
|
# We then take the results of the above and determine if any of the
|
|
# inventory will have its capacity exceeded.
|
|
res_classes = set([fields.ResourceClass.index(a.resource_class)
|
|
for a in allocs])
|
|
provider_uuids = set([a.resource_provider.uuid for a in allocs])
|
|
|
|
usage = sa.select([_ALLOC_TBL.c.resource_provider_id,
|
|
_ALLOC_TBL.c.resource_class_id,
|
|
sql.func.sum(_ALLOC_TBL.c.used).label('used')])
|
|
usage = usage.where(_ALLOC_TBL.c.resource_class_id.in_(res_classes))
|
|
usage = usage.group_by(_ALLOC_TBL.c.resource_provider_id,
|
|
_ALLOC_TBL.c.resource_class_id)
|
|
usage = sa.alias(usage, name='usage')
|
|
|
|
inv_join = sql.join(_RP_TBL, _INV_TBL,
|
|
sql.and_(_RP_TBL.c.id == _INV_TBL.c.resource_provider_id,
|
|
_INV_TBL.c.resource_class_id.in_(res_classes)))
|
|
primary_join = sql.outerjoin(inv_join, usage,
|
|
sql.and_(
|
|
_INV_TBL.c.resource_provider_id == usage.c.resource_provider_id,
|
|
_INV_TBL.c.resource_class_id == usage.c.resource_class_id)
|
|
)
|
|
cols_in_output = [
|
|
_RP_TBL.c.id.label('resource_provider_id'),
|
|
_RP_TBL.c.uuid,
|
|
_RP_TBL.c.generation,
|
|
_INV_TBL.c.resource_class_id,
|
|
_INV_TBL.c.total,
|
|
_INV_TBL.c.reserved,
|
|
_INV_TBL.c.allocation_ratio,
|
|
usage.c.used,
|
|
]
|
|
|
|
sel = sa.select(cols_in_output).select_from(primary_join)
|
|
sel = sel.where(
|
|
sa.and_(_RP_TBL.c.uuid.in_(provider_uuids),
|
|
_INV_TBL.c.resource_class_id.in_(res_classes)))
|
|
records = conn.execute(sel)
|
|
# Create a map keyed by (rp_uuid, res_class) for the records in the DB
|
|
usage_map = {}
|
|
provs_with_inv = set()
|
|
for record in records:
|
|
map_key = (record['uuid'], record['resource_class_id'])
|
|
if map_key in usage_map:
|
|
raise KeyError("%s already in usage_map, bad query" % str(map_key))
|
|
usage_map[map_key] = record
|
|
provs_with_inv.add(record["uuid"])
|
|
# Ensure that all providers have existing inventory
|
|
missing_provs = provider_uuids - provs_with_inv
|
|
if missing_provs:
|
|
raise exception.InvalidInventory(resource_class=str(res_classes),
|
|
resource_provider=missing_provs)
|
|
|
|
res_providers = {}
|
|
for alloc in allocs:
|
|
res_class = fields.ResourceClass.index(alloc.resource_class)
|
|
rp_uuid = alloc.resource_provider.uuid
|
|
key = (rp_uuid, res_class)
|
|
usage = usage_map[key]
|
|
amount_needed = alloc.used
|
|
allocation_ratio = usage['allocation_ratio']
|
|
# usage["used"] can be returned as None
|
|
used = usage['used'] or 0
|
|
capacity = (usage['total'] - usage['reserved']) * allocation_ratio
|
|
if capacity < (used + amount_needed):
|
|
LOG.warning(
|
|
_LW("Over capacity for %(rc)s on resource provider %(rp)s. "
|
|
"Needed: %(needed)s, Used: %(used)s, Capacity: %(cap)s"),
|
|
{'rc': fields.ResourceClass.from_index(res_class),
|
|
'rp': rp_uuid,
|
|
'needed': amount_needed,
|
|
'used': used,
|
|
'cap': capacity})
|
|
raise exception.InvalidAllocationCapacityExceeded(
|
|
resource_class=fields.ResourceClass.from_index(res_class),
|
|
resource_provider=rp_uuid)
|
|
if rp_uuid not in res_providers:
|
|
rp = ResourceProvider(id=usage['resource_provider_id'],
|
|
uuid=rp_uuid,
|
|
generation=usage['generation'])
|
|
res_providers[rp_uuid] = rp
|
|
return list(res_providers.values())
|
|
|
|
|
|
@base.NovaObjectRegistry.register
|
|
class AllocationList(base.ObjectListBase, base.NovaObject):
|
|
# Version 1.0: Initial Version
|
|
# Version 1.1: Add create_all() and delete_all()
|
|
VERSION = '1.1'
|
|
|
|
fields = {
|
|
'objects': fields.ListOfObjectsField('Allocation'),
|
|
}
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.writer
|
|
def _delete_allocations(context, allocations):
|
|
for allocation in allocations:
|
|
allocation._context = context
|
|
allocation.destroy()
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.reader
|
|
def _get_allocations_from_db(context, resource_provider_uuid=None,
|
|
consumer_id=None):
|
|
query = (context.session.query(models.Allocation)
|
|
.join(models.Allocation.resource_provider)
|
|
.options(contains_eager('resource_provider')))
|
|
if resource_provider_uuid:
|
|
query = query.filter(
|
|
models.ResourceProvider.uuid == resource_provider_uuid)
|
|
if consumer_id:
|
|
query = query.filter(
|
|
models.Allocation.consumer_id == consumer_id)
|
|
return query.all()
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.writer
|
|
def _set_allocations(context, allocs):
|
|
"""Write a set of allocations.
|
|
|
|
We must check that there is capacity for each allocation.
|
|
If there is not we roll back the entire set.
|
|
"""
|
|
conn = context.session.connection()
|
|
# Before writing any allocation records, we check that the submitted
|
|
# allocations do not cause any inventory capacity to be exceeded for
|
|
# any resource provider and resource class involved in the allocation
|
|
# transaction. _check_capacity_exceeded() raises an exception if any
|
|
# inventory capacity is exceeded. If capacity is not exceeeded, the
|
|
# function returns a list of ResourceProvider objects containing the
|
|
# generation of the resource provider at the time of the check. These
|
|
# objects are used at the end of the allocation transaction as a guard
|
|
# against concurrent updates.
|
|
with conn.begin():
|
|
# First delete any existing allocations for that rp/consumer combo.
|
|
_delete_current_allocs(conn, allocs)
|
|
before_gens = _check_capacity_exceeded(conn, allocs)
|
|
# Now add the allocations that were passed in.
|
|
for alloc in allocs:
|
|
rp = alloc.resource_provider
|
|
res_class = fields.ResourceClass.index(alloc.resource_class)
|
|
ins_stmt = _ALLOC_TBL.insert().values(
|
|
resource_provider_id=rp.id,
|
|
resource_class_id=res_class,
|
|
consumer_id=alloc.consumer_id,
|
|
used=alloc.used)
|
|
conn.execute(ins_stmt)
|
|
|
|
# Generation checking happens here. If the inventory for
|
|
# this resource provider changed out from under us,
|
|
# this will raise a ConcurrentUpdateDetected which can be caught
|
|
# by the caller to choose to try again. It will also rollback the
|
|
# transaction so that these changes always happen atomically.
|
|
for rp in before_gens:
|
|
_increment_provider_generation(conn, rp)
|
|
|
|
@base.remotable_classmethod
|
|
def get_all_by_resource_provider_uuid(cls, context, rp_uuid):
|
|
db_allocation_list = cls._get_allocations_from_db(
|
|
context, resource_provider_uuid=rp_uuid)
|
|
return base.obj_make_list(
|
|
context, cls(context), objects.Allocation, db_allocation_list)
|
|
|
|
@base.remotable_classmethod
|
|
def get_all_by_consumer_id(cls, context, consumer_id):
|
|
db_allocation_list = cls._get_allocations_from_db(
|
|
context, consumer_id=consumer_id)
|
|
return base.obj_make_list(
|
|
context, cls(context), objects.Allocation, db_allocation_list)
|
|
|
|
@base.remotable
|
|
def create_all(self):
|
|
"""Create the supplied allocations."""
|
|
# TODO(jaypipes): Retry the allocation writes on
|
|
# ConcurrentUpdateDetected
|
|
self._set_allocations(self._context, self.objects)
|
|
|
|
@base.remotable
|
|
def delete_all(self):
|
|
self._delete_allocations(self._context, self.objects)
|
|
|
|
def __repr__(self):
|
|
strings = [repr(x) for x in self.objects]
|
|
return "AllocationList[" + ", ".join(strings) + "]"
|
|
|
|
|
|
@base.NovaObjectRegistry.register
|
|
class Usage(base.NovaObject):
|
|
# Version 1.0: Initial version
|
|
VERSION = '1.0'
|
|
|
|
fields = {
|
|
'resource_class': fields.ResourceClassField(read_only=True),
|
|
'usage': fields.NonNegativeIntegerField(),
|
|
}
|
|
|
|
@staticmethod
|
|
def _from_db_object(context, target, source):
|
|
for field in target.fields:
|
|
if field not in ('resource_class'):
|
|
setattr(target, field, source[field])
|
|
|
|
if 'resource_class' not in target:
|
|
target.resource_class = (
|
|
target.fields['resource_class'].from_index(
|
|
source['resource_class_id']))
|
|
|
|
target._context = context
|
|
target.obj_reset_changes()
|
|
return target
|
|
|
|
|
|
@base.NovaObjectRegistry.register
|
|
class UsageList(base.ObjectListBase, base.NovaObject):
|
|
# Version 1.0: Initial version
|
|
VERSION = '1.0'
|
|
|
|
fields = {
|
|
'objects': fields.ListOfObjectsField('Usage'),
|
|
}
|
|
|
|
@staticmethod
|
|
@db_api.api_context_manager.reader
|
|
def _get_all_by_resource_provider_uuid(context, rp_uuid):
|
|
query = (context.session.query(models.Inventory.resource_class_id,
|
|
func.coalesce(func.sum(models.Allocation.used), 0))
|
|
.join(models.ResourceProvider,
|
|
models.Inventory.resource_provider_id ==
|
|
models.ResourceProvider.id)
|
|
.outerjoin(models.Allocation,
|
|
sql.and_(models.Inventory.resource_provider_id ==
|
|
models.Allocation.resource_provider_id,
|
|
models.Inventory.resource_class_id ==
|
|
models.Allocation.resource_class_id))
|
|
.filter(models.ResourceProvider.uuid == rp_uuid)
|
|
.group_by(models.Inventory.resource_class_id))
|
|
result = [dict(resource_class_id=item[0], usage=item[1])
|
|
for item in query.all()]
|
|
return result
|
|
|
|
@base.remotable_classmethod
|
|
def get_all_by_resource_provider_uuid(cls, context, rp_uuid):
|
|
usage_list = cls._get_all_by_resource_provider_uuid(context, rp_uuid)
|
|
return base.obj_make_list(context, cls(context), Usage, usage_list)
|
|
|
|
def __repr__(self):
|
|
strings = [repr(x) for x in self.objects]
|
|
return "UsageList[" + ", ".join(strings) + "]"
|