diff --git a/cyborg/agent/provider_tree.py b/cyborg/agent/provider_tree.py new file mode 100644 index 00000000..0ae558dc --- /dev/null +++ b/cyborg/agent/provider_tree.py @@ -0,0 +1,670 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""An object describing a tree of resource providers and their inventories. + +This object is not stored in the Nova API or cell databases; rather, this +object is constructed and used by the scheduler report client to track state +changes for resources on the hypervisor or baremetal node. As such, there are +no remoteable methods nor is there any interaction with the nova.db modules. +""" + +import collections +import copy + +import os_traits +from oslo_concurrency import lockutils +from oslo_log import log as logging +from oslo_utils import uuidutils + +from cyborg.common.i18n import _ + +LOG = logging.getLogger(__name__) +_LOCK_NAME = 'provider-tree-lock' + +# Point-in-time representation of a resource provider in the tree. +# Note that, whereas namedtuple enforces read-only-ness of instances as a +# whole, nothing prevents modification of the internals of attributes of +# complex types (children/inventory/traits/aggregates). However, any such +# modifications still have no effect on the ProviderTree the instance came +# from. Like, you can Sharpie a moustache on a Polaroid of my face, but that +# doesn't make a moustache appear on my actual face. +ProviderData = collections.namedtuple( + 'ProviderData', ['uuid', 'name', 'generation', 'parent_uuid', 'inventory', + 'traits', 'aggregates']) + + +class _Provider(object): + """Represents a resource provider in the tree. + + All operations against the tree should be done using the ProviderTree + interface, since it controls thread-safety. + """ + + def __init__(self, name, uuid=None, generation=None, parent_uuid=None): + if uuid is None: + uuid = uuidutils.generate_uuid() + self.uuid = uuid + self.name = name + self.generation = generation + self.parent_uuid = parent_uuid + # Contains a dict, keyed by uuid of child resource providers having + # this provider as a parent + self.children = {} + # dict of inventory records, keyed by resource class + self.inventory = {} + # Set of trait names + self.traits = set() + # Set of aggregate UUIDs + self.aggregates = set() + + @classmethod + def from_dict(cls, pdict): + """Factory method producing a _Provider based on a dict with + appropriate keys. + + :param pdict: Dictionary representing a provider, with keys 'name', + 'uuid', 'generation', 'parent_provider_uuid'. Of these, + only 'name' is mandatory. + """ + return cls(pdict['name'], uuid=pdict.get('uuid'), + generation=pdict.get('generation'), + parent_uuid=pdict.get('parent_provider_uuid')) + + def data(self): + """A collection of all informations of a provider. + + :Return: a collections.namedtuple + include inventory, traits, aggregates, uuid, name, generation, + and parent_uuid. + """ + inventory = copy.deepcopy(self.inventory) + traits = copy.copy(self.traits) + aggregates = copy.copy(self.aggregates) + return ProviderData( + self.uuid, self.name, self.generation, self.parent_uuid, + inventory, traits, aggregates) + + def get_provider_uuids(self): + """Returns a list, in top-down traversal order, of UUIDs of this + provider and all its descendants. + """ + ret = [self.uuid] + for child in self.children.values(): + ret.extend(child.get_provider_uuids()) + return ret + + def find(self, search): + """Find an expect one in the provider tree by match the serach. + + :param search: it can be the either name or uuid of an expect provider. + :return: the expect _Provider object or None. + """ + if self.name == search or self.uuid == search: + return self + if search in self.children: + return self.children[search] + if self.children: + for child in self.children.values(): + # We already searched for the child by UUID above, so here we + # just check for a child name match + if child.name == search: + return child + subchild = child.find(search) + if subchild: + return subchild + return None + + def add_child(self, provider): + self.children[provider.uuid] = provider + + def remove_child(self, provider): + if provider.uuid in self.children: + del self.children[provider.uuid] + + def has_inventory(self): + """Returns whether the provider has any inventory records at all.""" + return self.inventory != {} + + def has_inventory_changed(self, new): + """Returns whether the inventory has changed for the provider.""" + cur = self.inventory + if set(cur) != set(new): + return True + for key, cur_rec in cur.items(): + new_rec = new[key] + # If the new record contains new fields (e.g. we're adding on + # `reserved` or `allocation_ratio`) we want to make sure to pick + # them up + if set(new_rec) - set(cur_rec): + return True + for rec_key, cur_val in cur_rec.items(): + if rec_key not in new_rec: + # Deliberately don't want to compare missing keys in the + # *new* inventory record. For instance, we will be passing + # in fields like allocation_ratio in the current dict but + # the resource tracker may only pass in the total field. We + # want to return that inventory didn't change when the + # total field values are the same even if the + # allocation_ratio field is missing from the new record. + continue + if new_rec[rec_key] != cur_val: + return True + return False + + def _update_generation(self, generation): + if generation is not None and generation != self.generation: + msg_args = { + 'rp_uuid': self.uuid, + 'old': self.generation, + 'new': generation, + } + LOG.debug("Updating resource provider %(rp_uuid)s generation " + "from %(old)s to %(new)s", msg_args) + self.generation = generation + + def update_inventory(self, inventory, generation): + """Update the stored inventory for the provider along with a resource + provider generation to set the provider to. The method returns whether + the inventory has changed. + """ + self._update_generation(generation) + if self.has_inventory_changed(inventory): + self.inventory = copy.deepcopy(inventory) + return True + return False + + def have_traits_changed(self, new): + """Returns whether the provider's traits have changed.""" + return set(new) != self.traits + + def update_traits(self, new, generation=None): + """Update the stored traits for the provider along with a resource + provider generation to set the provider to. The method returns whether + the traits have changed. + """ + self._update_generation(generation) + if self.have_traits_changed(new): + self.traits = set(new) # create a copy of the new traits + return True + return False + + def has_traits(self, traits): + """Query whether the provider has certain traits. + + :param traits: Iterable of string trait names to look for. + :return: True if this provider has *all* of the specified traits; False + if any of the specified traits are absent. Returns True if + the traits parameter is empty. + """ + return not bool(set(traits) - self.traits) + + def have_aggregates_changed(self, new): + """Returns whether the provider's aggregates have changed.""" + return set(new) != self.aggregates + + def update_aggregates(self, new, generation=None): + """Update the stored aggregates for the provider along with a resource + provider generation to set the provider to. The method returns whether + the aggregates have changed. + """ + self._update_generation(generation) + if self.have_aggregates_changed(new): + self.aggregates = set(new) # create a copy of the new aggregates + return True + return False + + def in_aggregates(self, aggregates): + """Query whether the provider is a member of certain aggregates. + + :param aggregates: Iterable of string aggregate UUIDs to look for. + :return: True if this provider is a member of *all* of the specified + aggregates; False if any of the specified aggregates are + absent. Returns True if the aggregates parameter is empty. + """ + return not bool(set(aggregates) - self.aggregates) + + +class ProviderTree(object): + + def __init__(self): + """Create an empty provider tree.""" + self.lock = lockutils.internal_lock(_LOCK_NAME) + self.roots = [] + + def get_provider_uuids(self, name_or_uuid=None): + """Return a list, in top-down traversable order, of the UUIDs of all + providers (in a subtree). + + :param name_or_uuid: Provider name or UUID representing the root of a + subtree for which to return UUIDs. If not + specified, the method returns all UUIDs in the + ProviderTree. + """ + if name_or_uuid is not None: + with self.lock: + return self._find_with_lock(name_or_uuid).get_provider_uuids() + + # If no name_or_uuid, get UUIDs for all providers recursively. + ret = [] + with self.lock: + for root in self.roots: + ret.extend(root.get_provider_uuids()) + return ret + + def populate_from_iterable(self, provider_dicts): + """Populates this ProviderTree from an iterable of provider dicts. + + This method will ADD providers to the tree if provider_dicts contains + providers that do not exist in the tree already and will REPLACE + providers in the tree if provider_dicts contains providers that are + already in the tree. This method will NOT remove providers from the + tree that are not in provider_dicts. But if a parent provider is in + provider_dicts and the descendents are not, this method will remove the + descendents from the tree. + + :param provider_dicts: An iterable of dicts of resource provider + information. If a provider is present in + provider_dicts, all its descendants must also be + present. + :raises: ValueError if any provider in provider_dicts has a parent that + is not in this ProviderTree or elsewhere in provider_dicts. + """ + if not provider_dicts: + return + + # Map of provider UUID to provider dict for the providers we're + # *adding* via this method. + to_add_by_uuid = {pd['uuid']: pd for pd in provider_dicts} + + with self.lock: + # Sanity check for orphans. Every parent UUID must either be None + # (the provider is a root), or be in the tree already, or exist as + # a key in to_add_by_uuid (we're adding it). + all_parents = set([None]) | set(to_add_by_uuid) + # NOTE(efried): Can't use get_provider_uuids directly because we're + # already under lock. + for root in self.roots: + all_parents |= set(root.get_provider_uuids()) + missing_parents = set() + for pd in to_add_by_uuid.values(): + parent_uuid = pd.get('parent_provider_uuid') + if parent_uuid not in all_parents: + missing_parents.add(parent_uuid) + if missing_parents: + raise ValueError( + _("The following parents were not found: %s") % + ', '.join(missing_parents)) + + # Ready to do the work. + # Use to_add_by_uuid to keep track of which providers are left to + # be added. + while to_add_by_uuid: + # Find a provider that's suitable to inject. + for uuid, pd in to_add_by_uuid.items(): + # Roots are always okay to inject (None won't be a key in + # to_add_by_uuid). Otherwise, we have to make sure we + # already added the parent (and, by recursion, all + # ancestors) if present in the input. + parent_uuid = pd.get('parent_provider_uuid') + if parent_uuid not in to_add_by_uuid: + break + else: + # This should never happen - we already ensured all parents + # exist in the tree, which means we can't have any branches + # that don't wind up at the root, which means we can't have + # cycles. But to quell the paranoia... + raise ValueError( + _("Unexpectedly failed to find parents already in the" + "tree for any of the following: %s") % + ','.join(set(to_add_by_uuid))) + + # Add or replace the provider, either as a root or under its + # parent + try: + self._remove_with_lock(uuid) + except ValueError: + # Wasn't there in the first place - fine. + pass + + provider = _Provider.from_dict(pd) + if parent_uuid is None: + self.roots.append(provider) + else: + parent = self._find_with_lock(parent_uuid) + parent.add_child(provider) + + # Remove this entry to signify we're done with it. + to_add_by_uuid.pop(uuid) + + def _remove_with_lock(self, name_or_uuid): + found = self._find_with_lock(name_or_uuid) + if found.parent_uuid: + parent = self._find_with_lock(found.parent_uuid) + parent.remove_child(found) + else: + self.roots.remove(found) + + def remove(self, name_or_uuid): + """Safely removes the provider identified by the supplied name_or_uuid + parameter and all of its children from the tree. + + :raises ValueError if name_or_uuid points to a non-existing provider. + :param name_or_uuid: Either name or UUID of the resource provider to + remove from the tree. + """ + with self.lock: + self._remove_with_lock(name_or_uuid) + + def new_root(self, name, uuid, generation=None): + """Adds a new root provider to the tree, returning its UUID. + + :param name: The name of the new root provider + :param uuid: The UUID of the new root provider + :param generation: Generation to set for the new root provider + :returns: the UUID of the new provider + :raises: ValueError if a provider with the specified uuid already + exists in the tree. + """ + + with self.lock: + exists = True + try: + self._find_with_lock(uuid) + except ValueError: + exists = False + + if exists: + err = _("Provider %s already exists.") + raise ValueError(err % uuid) + + p = _Provider(name, uuid=uuid, generation=generation) + self.roots.append(p) + return p.uuid + + def _find_with_lock(self, name_or_uuid): + for root in self.roots: + found = root.find(name_or_uuid) + if found: + return found + raise ValueError(_("No such provider %s") % name_or_uuid) + + def data(self, name_or_uuid): + """Return a point-in-time copy of the specified provider's data. + + :param name_or_uuid: Either name or UUID of the resource provider whose + data is to be returned. + :return: ProviderData object representing the specified provider. + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + """ + with self.lock: + return self._find_with_lock(name_or_uuid).data() + + def exists(self, name_or_uuid): + """Given either a name or a UUID, return True if the tree contains the + provider, False otherwise. + """ + with self.lock: + try: + self._find_with_lock(name_or_uuid) + return True + except ValueError: + return False + + def new_child(self, name, parent, uuid=None, generation=None): + """Creates a new child provider with the given name and uuid under the + given parent. + + :param name: The name of the new child provider + :param parent: Either name or UUID of the parent provider + :param uuid: The UUID of the new child provider + :param generation: Generation to set for the new child provider + :returns: the UUID of the new provider + + :raises ValueError if a provider with the specified uuid or name + already exists; or if parent_uuid points to a nonexistent + provider. + """ + with self.lock: + try: + self._find_with_lock(uuid or name) + except ValueError: + pass + else: + err = _("Provider %s already exists.") + raise ValueError(err % (uuid or name)) + + parent_node = self._find_with_lock(parent) + p = _Provider(name, uuid, generation, parent_node.uuid) + parent_node.add_child(p) + return p.uuid + + def has_inventory(self, name_or_uuid): + """Returns True if the provider identified by name_or_uuid has any + inventory records at all. + + :raises: ValueError if a provider with uuid was not found in the tree. + :param name_or_uuid: Either name or UUID of the resource provider + """ + with self.lock: + p = self._find_with_lock(name_or_uuid) + return p.has_inventory() + + def has_inventory_changed(self, name_or_uuid, inventory): + """Returns True if the supplied inventory is different for the provider + with the supplied name or UUID. + + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + :param name_or_uuid: Either name or UUID of the resource provider to + query inventory for. + :param inventory: dict, keyed by resource class, of inventory + information. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + return provider.has_inventory_changed(inventory) + + def update_inventory(self, name_or_uuid, inventory, generation=None): + """Given a name or UUID of a provider and a dict of inventory resource + records, update the provider's inventory and set the provider's + generation. + + :returns: True if the inventory has changed. + + :note: The provider's generation is always set to the supplied + generation, even if there were no changes to the inventory. + + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + :param name_or_uuid: Either name or UUID of the resource provider to + update inventory for. + :param inventory: dict, keyed by resource class, of inventory + information. + :param generation: The resource provider generation to set. If not + specified, the provider's generation is not changed. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + return provider.update_inventory(inventory, generation) + + def has_sharing_provider(self, resource_class): + """Returns whether the specified provider_tree contains any sharing + providers of inventory of the specified resource_class. + """ + for rp_uuid in self.get_provider_uuids(): + pdata = self.data(rp_uuid) + has_rc = resource_class in pdata.inventory + is_sharing = os_traits.MISC_SHARES_VIA_AGGREGATE in pdata.traits + if has_rc and is_sharing: + return True + return False + + def has_traits(self, name_or_uuid, traits): + """Given a name or UUID of a provider, query whether that provider has + *all* of the specified traits. + + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + :param name_or_uuid: Either name or UUID of the resource provider to + query for traits. + :param traits: Iterable of string trait names to search for. + :return: True if this provider has *all* of the specified traits; False + if any of the specified traits are absent. Returns True if + the traits parameter is empty, even if the provider has no + traits. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + return provider.has_traits(traits) + + def have_traits_changed(self, name_or_uuid, traits): + """Returns True if the specified traits list is different for the + provider with the specified name or UUID. + + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + :param name_or_uuid: Either name or UUID of the resource provider to + query traits for. + :param traits: Iterable of string trait names to compare against the + provider's traits. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + return provider.have_traits_changed(traits) + + def update_traits(self, name_or_uuid, traits, generation=None): + """Given a name or UUID of a provider and an iterable of string trait + names, update the provider's traits and set the provider's generation. + + :returns: True if the traits list has changed. + + :note: The provider's generation is always set to the supplied + generation, even if there were no changes to the traits. + + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + :param name_or_uuid: Either name or UUID of the resource provider to + update traits for. + :param traits: Iterable of string trait names to set. + :param generation: The resource provider generation to set. If None, + the provider's generation is not changed. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + return provider.update_traits(traits, generation=generation) + + def add_traits(self, name_or_uuid, *traits): + """Set traits on a provider, without affecting existing traits. + + :param name_or_uuid: The name or UUID of the provider whose traits are + to be affected. + :param traits: String names of traits to be added. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + final_traits = provider.traits | set(traits) + provider.update_traits(final_traits) + + def remove_traits(self, name_or_uuid, *traits): + """Unset traits on a provider, without affecting other existing traits. + + :param name_or_uuid: The name or UUID of the provider whose traits are + to be affected. + :param traits: String names of traits to be removed. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + final_traits = provider.traits - set(traits) + provider.update_traits(final_traits) + + def in_aggregates(self, name_or_uuid, aggregates): + """Given a name or UUID of a provider, query whether that provider is a + member of *all* the specified aggregates. + + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + :param name_or_uuid: Either name or UUID of the resource provider to + query for aggregates. + :param aggregates: Iterable of string aggregate UUIDs to search for. + :return: True if this provider is associated with *all* of the + specified aggregates; False if any of the specified aggregates + are absent. Returns True if the aggregates parameter is + empty, even if the provider has no aggregate associations. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + return provider.in_aggregates(aggregates) + + def have_aggregates_changed(self, name_or_uuid, aggregates): + """Returns True if the specified aggregates list is different for the + provider with the specified name or UUID. + + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + :param name_or_uuid: Either name or UUID of the resource provider to + query aggregates for. + :param aggregates: Iterable of string aggregate UUIDs to compare + against the provider's aggregates. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + return provider.have_aggregates_changed(aggregates) + + def update_aggregates(self, name_or_uuid, aggregates, generation=None): + """Given a name or UUID of a provider and an iterable of string + aggregate UUIDs, update the provider's aggregates and set the + provider's generation. + + :returns: True if the aggregates list has changed. + + :note: The provider's generation is always set to the supplied + generation, even if there were no changes to the aggregates. + + :raises: ValueError if a provider with name_or_uuid was not found in + the tree. + :param name_or_uuid: Either name or UUID of the resource provider to + update aggregates for. + :param aggregates: Iterable of string aggregate UUIDs to set. + :param generation: The resource provider generation to set. If None, + the provider's generation is not changed. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + return provider.update_aggregates(aggregates, + generation=generation) + + def add_aggregates(self, name_or_uuid, *aggregates): + """Set aggregates on a provider, without affecting existing aggregates. + + :param name_or_uuid: The name or UUID of the provider whose aggregates + are to be affected. + :param aggregates: String UUIDs of aggregates to be added. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + final_aggs = provider.aggregates | set(aggregates) + provider.update_aggregates(final_aggs) + + def remove_aggregates(self, name_or_uuid, *aggregates): + """Unset aggregates on a provider, without affecting other existing + aggregates. + + :param name_or_uuid: The name or UUID of the provider whose aggregates + are to be affected. + :param aggregates: String UUIDs of aggregates to be removed. + """ + with self.lock: + provider = self._find_with_lock(name_or_uuid) + final_aggs = provider.aggregates - set(aggregates) + provider.update_aggregates(final_aggs) diff --git a/cyborg/agent/rc_fields.py b/cyborg/agent/rc_fields.py new file mode 100644 index 00000000..add6c76a --- /dev/null +++ b/cyborg/agent/rc_fields.py @@ -0,0 +1,70 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Standard Resource Class Fields.""" + +# NOTE(cdent): This is kept as its own independent file as it is used by +# both the placement and nova sides of the placement interaction. On the +# placement side we don't want to import all the nova fields, nor all the +# nova objects (which are automatically loaded and registered if the +# nova.objects package is imported). + +import re + +from oslo_versionedobjects import fields + + +class ResourceClass(fields.StringField): + """Classes of resources provided to consumers.""" + + CUSTOM_NAMESPACE = 'CUSTOM_' + """All non-standard resource classes must begin with this string.""" + + VCPU = 'VCPU' + MEMORY_MB = 'MEMORY_MB' + DISK_GB = 'DISK_GB' + PCI_DEVICE = 'PCI_DEVICE' + SRIOV_NET_VF = 'SRIOV_NET_VF' + NUMA_SOCKET = 'NUMA_SOCKET' + NUMA_CORE = 'NUMA_CORE' + NUMA_THREAD = 'NUMA_THREAD' + NUMA_MEMORY_MB = 'NUMA_MEMORY_MB' + IPV4_ADDRESS = 'IPV4_ADDRESS' + VGPU = 'VGPU' + VGPU_DISPLAY_HEAD = 'VGPU_DISPLAY_HEAD' + + # The ordering here is relevant. If you must add a value, only + # append. + STANDARD = (VCPU, MEMORY_MB, DISK_GB, PCI_DEVICE, SRIOV_NET_VF, + NUMA_SOCKET, NUMA_CORE, NUMA_THREAD, NUMA_MEMORY_MB, + IPV4_ADDRESS, VGPU, VGPU_DISPLAY_HEAD) + + # This is the set of standard resource classes that existed before + # we opened up for custom resource classes in version 1.1 of various + # objects in nova/objects/resource_provider.py + V1_0 = (VCPU, MEMORY_MB, DISK_GB, PCI_DEVICE, SRIOV_NET_VF, NUMA_SOCKET, + NUMA_CORE, NUMA_THREAD, NUMA_MEMORY_MB, IPV4_ADDRESS) + + @classmethod + def normalize_name(cls, rc_name): + if rc_name is None: + return None + # Replace non-alphanumeric characters with underscores + norm_name = re.sub('[^0-9A-Za-z]+', '_', rc_name) + # Bug #1762789: Do .upper after replacing non alphanumerics. + norm_name = norm_name.upper() + norm_name = cls.CUSTOM_NAMESPACE + norm_name + return norm_name + + +class ResourceClassField(fields.AutoTypedField): + AUTO_TYPE = ResourceClass() diff --git a/cyborg/common/exception.py b/cyborg/common/exception.py index 226cd532..a7492d9d 100644 --- a/cyborg/common/exception.py +++ b/cyborg/common/exception.py @@ -139,6 +139,15 @@ class NotFound(CyborgException): code = http_client.NOT_FOUND +class ServiceNotFound(NotFound): + msg_fmt = _("Service %(service_id)s could not be found.") + + +class ConfGroupForServiceTypeNotFound(ServiceNotFound): + msg_fmt = _("No conf group name could be found for service type " + "%(stype)s.") + + class AcceleratorNotFound(NotFound): _msg_fmt = _("Accelerator %(uuid)s could not be found.") @@ -196,3 +205,87 @@ class AttributeInvalid(CyborgException): class AttributeAlreadyExists(CyborgException): _msg_fmt = _("Attribute with uuid %(uuid)s already exists.") + + +# An exception with this name is used on both sides of the placement/ +# cyborg interaction. +class ResourceProviderInUse(CyborgException): + msg_fmt = _("Resource provider has allocations.") + + +class ResourceProviderRetrievalFailed(CyborgException): + msg_fmt = _("Failed to get resource provider with UUID %(uuid)s") + + +class ResourceProviderAggregateRetrievalFailed(CyborgException): + msg_fmt = _("Failed to get aggregates for resource provider with UUID" + " %(uuid)s") + + +class ResourceProviderTraitRetrievalFailed(CyborgException): + msg_fmt = _("Failed to get traits for resource provider with UUID" + " %(uuid)s") + + +class ResourceProviderCreationFailed(CyborgException): + msg_fmt = _("Failed to create resource provider %(name)s") + + +class ResourceProviderDeletionFailed(CyborgException): + msg_fmt = _("Failed to delete resource provider %(uuid)s") + + +class ResourceProviderUpdateFailed(CyborgException): + msg_fmt = _("Failed to update resource provider via URL %(url)s: " + "%(error)s") + + +class ResourceProviderNotFound(NotFound): + msg_fmt = _("No such resource provider %(name_or_uuid)s.") + + +class ResourceProviderSyncFailed(CyborgException): + msg_fmt = _("Failed to synchronize the placement service with resource " + "provider information supplied by the compute host.") + + +class PlacementAPIConnectFailure(CyborgException): + msg_fmt = _("Unable to communicate with the Placement API.") + + +class PlacementAPIConflict(CyborgException): + """Any 409 error from placement APIs should use (a subclass of) this + exception. + """ + msg_fmt = _("A conflict was encountered attempting to invoke the " + "placement API at URL %(url)s: %(error)s") + + +class ResourceProviderUpdateConflict(PlacementAPIConflict): + """A 409 caused by generation mismatch from attempting to update an + existing provider record or its associated data (aggregates, traits, etc.). + """ + msg_fmt = _("A conflict was encountered attempting to update resource " + "provider %(uuid)s (generation %(generation)d): %(error)s") + + +class InvalidResourceClass(Invalid): + msg_fmt = _("Resource class '%(resource_class)s' invalid.") + + +class InvalidResourceAmount(Invalid): + msg_fmt = _("Resource amounts must be integers. Received '%(amount)s'.") + + +class InvalidInventory(Invalid): + msg_fmt = _("Inventory for '%(resource_class)s' on " + "resource provider '%(resource_provider)s' invalid.") + + +# An exception with this name is used on both sides of the placement/ +# cyborg interaction. +class InventoryInUse(InvalidInventory): + # NOTE(mriedem): This message cannot change without impacting the + # cyborg.services.client.report._RE_INV_IN_USE regex. + msg_fmt = _("Inventory for '%(resource_classes)s' on " + "resource provider '%(resource_provider)s' in use.") diff --git a/cyborg/common/utils.py b/cyborg/common/utils.py index 79e9f8dd..705f35d3 100644 --- a/cyborg/common/utils.py +++ b/cyborg/common/utils.py @@ -15,15 +15,23 @@ """Utilities and helper functions.""" -from oslo_log import log -from oslo_concurrency import lockutils import six +from keystoneauth1 import loading as ks_loading +from os_service_types import service_types +from oslo_concurrency import lockutils +from oslo_log import log + +from cyborg.common import exception +import cyborg.conf + LOG = log.getLogger(__name__) synchronized = lockutils.synchronized_with_prefix('cyborg-') +_SERVICE_TYPES = service_types.ServiceTypes() +CONF = cyborg.conf.CONF def safe_rstrip(value, chars=None): @@ -41,3 +49,62 @@ def safe_rstrip(value, chars=None): return value return value.rstrip(chars) or value + + +def get_ksa_adapter(service_type, ksa_auth=None, ksa_session=None, + min_version=None, max_version=None): + """Construct a keystoneauth1 Adapter for a given service type. + + We expect to find a conf group whose name corresponds to the service_type's + project according to the service-types-authority. That conf group must + provide at least ksa adapter options. Depending how the result is to be + used, ksa auth and/or session options may also be required, or the relevant + parameter supplied. + + :param service_type: String name of the service type for which the Adapter + is to be constructed. + :param ksa_auth: A keystoneauth1 auth plugin. If not specified, we attempt + to find one in ksa_session. Failing that, we attempt to + load one from the conf. + :param ksa_session: A keystoneauth1 Session. If not specified, we attempt + to load one from the conf. + :param min_version: The minimum major version of the adapter's endpoint, + intended to be used as the lower bound of a range with + max_version. + If min_version is given with no max_version it is as + if max version is 'latest'. + :param max_version: The maximum major version of the adapter's endpoint, + intended to be used as the upper bound of a range with + min_version. + :return: A keystoneauth1 Adapter object for the specified service_type. + :raise: ConfGroupForServiceTypeNotFound If no conf group name could be + found for the specified service_type. + """ + # Get the conf group corresponding to the service type. + confgrp = _SERVICE_TYPES.get_project_name(service_type) + if not confgrp or not hasattr(CONF, confgrp): + # Try the service type as the conf group. This is necessary for e.g. + # placement, while it's still part of the nova project. + # Note that this might become the first thing we try if/as we move to + # using service types for conf group names in general. + confgrp = service_type + if not confgrp or not hasattr(CONF, confgrp): + raise exception.ConfGroupForServiceTypeNotFound(stype=service_type) + + # Ensure we have an auth. + # NOTE(efried): This could be None, and that could be okay - e.g. if the + # result is being used for get_endpoint() and the conf only contains + # endpoint_override. + if not ksa_auth: + if ksa_session and ksa_session.auth: + ksa_auth = ksa_session.auth + else: + ksa_auth = ks_loading.load_auth_from_conf_options(CONF, confgrp) + + if not ksa_session: + ksa_session = ks_loading.load_session_from_conf_options( + CONF, confgrp, auth=ksa_auth) + + return ks_loading.load_adapter_from_conf_options( + CONF, confgrp, session=ksa_session, auth=ksa_auth, + min_version=min_version, max_version=max_version) diff --git a/cyborg/conf/default.py b/cyborg/conf/default.py index 78aa884c..a2fac775 100644 --- a/cyborg/conf/default.py +++ b/cyborg/conf/default.py @@ -13,13 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. +"""Cyborg Default Config Setting""" + import os import socket -from oslo_config import cfg from keystoneauth1 import loading as k_loading +from oslo_config import cfg from cyborg.common.i18n import _ +from cyborg.conf import utils as confutils exc_log_opts = [ @@ -63,17 +66,30 @@ path_opts = [ ] PLACEMENT_CONF_SECTION = 'placement' +DEFAULT_SERVICE_TYPE = 'placement' + +placement_group = cfg.OptGroup( + PLACEMENT_CONF_SECTION, + title='Placement Service Options', + help="Configuration options for connecting to the placement API service") placement_opts = [ - cfg.StrOpt('region_name', - help=_('Name of placement region to use. Useful if keystone ' - 'manages more than one region.')), cfg.StrOpt('endpoint_type', default='public', choices=['public', 'admin', 'internal'], help=_('Type of the placement endpoint to use. This endpoint ' 'will be looked up in the keystone catalog and should ' 'be one of public, internal or admin.')), + cfg.BoolOpt( + 'randomize_allocation_candidates', + default=False, + help=_('If True, when limiting allocation candidate results, the ' + 'results will be a random sampling of the full result set. ' + 'If False, allocation candidates are returned in a ' + 'deterministic but undefined order. That is, all things ' + 'being equal, two requests for allocation candidates will ' + 'return the same results in the same order; but no guarantees ' + 'are made as to how that order is determined.')), ] @@ -84,9 +100,9 @@ def register_opts(conf): def register_placement_opts(cfg=cfg.CONF): - cfg.register_opts(k_loading.get_session_conf_options(), - group=PLACEMENT_CONF_SECTION) + cfg.register_group(placement_group) cfg.register_opts(placement_opts, group=PLACEMENT_CONF_SECTION) + confutils.register_ksa_opts(cfg, placement_group, DEFAULT_SERVICE_TYPE) DEFAULT_OPTS = (exc_log_opts + service_opts + path_opts) @@ -95,5 +111,13 @@ PLACEMENT_OPTS = (placement_opts) def list_opts(): return { - PLACEMENT_CONF_SECTION: PLACEMENT_OPTS, 'DEFAULT': DEFAULT_OPTS + PLACEMENT_CONF_SECTION: ( + placement_opts + + k_loading.get_session_conf_options() + + k_loading.get_auth_common_conf_options() + + k_loading.get_auth_plugin_conf_options('password') + + k_loading.get_auth_plugin_conf_options('v2password') + + k_loading.get_auth_plugin_conf_options('v3password') + + confutils.get_ksa_adapter_opts(DEFAULT_SERVICE_TYPE)), + 'DEFAULT': DEFAULT_OPTS } diff --git a/cyborg/conf/utils.py b/cyborg/conf/utils.py new file mode 100644 index 00000000..da964440 --- /dev/null +++ b/cyborg/conf/utils.py @@ -0,0 +1,91 @@ +# Copyright 2017 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Common utilities for conf providers. + +This module does not provide any actual conf options. +""" +from keystoneauth1 import loading as ks_loading +from oslo_config import cfg + + +_ADAPTER_VERSION_OPTS = ('version', 'min_version', 'max_version') + + +def get_ksa_adapter_opts(default_service_type, deprecated_opts=None): + """Get auth, Session, and Adapter conf options from keystonauth1.loading. + + :param default_service_type: Default for the service_type conf option on + the Adapter. + :param deprecated_opts: dict of deprecated opts to register with the ksa + Adapter opts. Works the same as the + deprecated_opts kwarg to: + keystoneauth1.loading.session.Session.register_conf_options + :return: List of cfg.Opts. + """ + opts = ks_loading.get_adapter_conf_options(include_deprecated=False, + deprecated_opts=deprecated_opts) + + for opt in opts[:]: + # Remove version-related opts. Required/supported versions are + # something the code knows about, not the operator. + if opt.dest in _ADAPTER_VERSION_OPTS: + opts.remove(opt) + + # Override defaults that make sense for nova + cfg.set_defaults(opts, + valid_interfaces=['internal', 'public'], + service_type=default_service_type) + return opts + + +def _dummy_opt(name): + # A config option that can't be set by the user, so it behaves as if it's + # ignored; but consuming code may expect it to be present in a conf group. + return cfg.Opt(name, type=lambda x: None) + + +def register_ksa_opts(conf, group, default_service_type, include_auth=True, + deprecated_opts=None): + """Register keystoneauth auth, Session, and Adapter opts. + + :param conf: oslo_config.cfg.CONF in which to register the options + :param group: Conf group, or string name thereof, in which to register the + options. + :param default_service_type: Default for the service_type conf option on + the Adapter. + :param include_auth: For service types where Nova is acting on behalf of + the user, auth should come from the user context. + In those cases, set this arg to False to avoid + registering ksa auth options. + :param deprecated_opts: dict of deprecated opts to register with the ksa + Session or Adapter opts. See docstring for + the deprecated_opts param of: + keystoneauth1.loading.session.Session.register_conf_options + """ + # ksa register methods need the group name as a string. oslo doesn't care. + group = getattr(group, 'name', group) + ks_loading.register_session_conf_options( + conf, group, deprecated_opts=deprecated_opts) + if include_auth: + ks_loading.register_auth_conf_options(conf, group) + conf.register_opts(get_ksa_adapter_opts( + default_service_type, deprecated_opts=deprecated_opts), group=group) + # Have to register dummies for the version-related opts we removed + for name in _ADAPTER_VERSION_OPTS: + conf.register_opt(_dummy_opt(name), group=group) + + +# NOTE(efried): Required for docs build. +def list_opts(): + return {} diff --git a/cyborg/services/client/__init__.py b/cyborg/services/client/__init__.py new file mode 100644 index 00000000..f8e3117e --- /dev/null +++ b/cyborg/services/client/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2018 Intel, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Client for call other project API.""" diff --git a/cyborg/services/client/report.py b/cyborg/services/client/report.py new file mode 100644 index 00000000..f8db64aa --- /dev/null +++ b/cyborg/services/client/report.py @@ -0,0 +1,1997 @@ +# Copyright (c) 2014 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Placement Client to Handle Resource Provider Operation.""" + +import contextlib +import copy +import functools +import random +import re +import time + +from keystoneauth1 import exceptions as ks_exc +import os_traits +from oslo_log import log as logging +from oslo_middleware import request_id +from oslo_utils import versionutils + +from cyborg.agent import provider_tree +from cyborg.agent import rc_fields as fields +from cyborg.common import exception +from cyborg.common.i18n import _ +from cyborg.common import utils +import cyborg.conf + +CONF = cyborg.conf.CONF +LOG = logging.getLogger(__name__) +VCPU = fields.ResourceClass.VCPU +MEMORY_MB = fields.ResourceClass.MEMORY_MB +DISK_GB = fields.ResourceClass.DISK_GB +_RE_INV_IN_USE = re.compile("Inventory for (.+) on resource provider " + "(.+) in use") +WARN_EVERY = 10 +PLACEMENT_CLIENT_SEMAPHORE = 'placement_client' +GRANULAR_AC_VERSION = '1.25' +POST_RPS_RETURNS_PAYLOAD_API_VERSION = '1.20' +NESTED_PROVIDER_API_VERSION = '1.14' +POST_ALLOCATIONS_API_VERSION = '1.13' + + +def warn_limit(self, msg): + if self._warn_count: + self._warn_count -= 1 + else: + self._warn_count = WARN_EVERY + LOG.warning(msg) + + +def safe_connect(f): + @functools.wraps(f) + def wrapper(self, *a, **k): + try: + return f(self, *a, **k) + except ks_exc.EndpointNotFound: + warn_limit( + self, 'The placement API endpoint was not found.') + # Reset client session so there is a new catalog, which + # gets cached when keystone is first successfully contacted. + self._client = self._create_client() + except ks_exc.MissingAuthPlugin: + warn_limit( + self, 'No authentication information found for placement API.') + except ks_exc.Unauthorized: + warn_limit( + self, 'Placement service credentials do not work.') + except ks_exc.DiscoveryFailure: + # TODO(_gryf): Looks like DiscoveryFailure is not the only missing + # exception here. In Pike we should take care about keystoneauth1 + # failures handling globally. + warn_limit(self, + 'Discovering suitable URL for placement API failed.') + except ks_exc.ConnectFailure: + LOG.warning('Placement API service is not responding.') + return wrapper + + +class Retry(Exception): + def __init__(self, operation, reason): + self.operation = operation + self.reason = reason + + +def retries(f): + """Decorator to retry a call three times if it raises Retry + + Note that this returns the actual value of the inner call on success + or returns False if all the retries fail. + """ + @functools.wraps(f) + def wrapper(self, *a, **k): + for retry in range(0, 4): + try: + sleep_time = random.uniform(0, retry * 2) + time.sleep(sleep_time) + return f(self, *a, **k) + except Retry as e: + LOG.debug( + 'Unable to %(op)s because %(reason)s; retrying...', + {'op': e.operation, 'reason': e.reason}) + LOG.error('Failed scheduler client operation %s: out of retries', + f.__name__) + return False + return wrapper + + +def merge_resources(original_resources, new_resources, sign=1): + """Merge a list of new resources with existing resources. + + Either add the resources (if sign is 1) or subtract (if sign is -1). + If the resulting value is 0 do not include the resource in the results. + """ + all_keys = set(original_resources.keys()) | set(new_resources.keys()) + for key in all_keys: + value = (original_resources.get(key, 0) + + (sign * new_resources.get(key, 0))) + if value: + original_resources[key] = value + else: + original_resources.pop(key, None) + + +def _compute_node_to_inventory_dict(compute_node): + """Given a supplied `objects.ComputeNode` object, return a dict, keyed + by resource class, of various inventory information. + + :param compute_node: `objects.ComputeNode` object to translate + """ + result = {} + + # NOTE(jaypipes): Ironic virt driver will return 0 values for vcpus, + # memory_mb and disk_gb if the Ironic node is not available/operable + if compute_node.vcpus > 0: + result[VCPU] = { + 'total': compute_node.vcpus, + 'reserved': CONF.reserved_host_cpus, + 'min_unit': 1, + 'max_unit': compute_node.vcpus, + 'step_size': 1, + 'allocation_ratio': compute_node.cpu_allocation_ratio, + } + if compute_node.memory_mb > 0: + result[MEMORY_MB] = { + 'total': compute_node.memory_mb, + 'reserved': CONF.reserved_host_memory_mb, + 'min_unit': 1, + 'max_unit': compute_node.memory_mb, + 'step_size': 1, + 'allocation_ratio': compute_node.ram_allocation_ratio, + } + # if compute_node.local_gb > 0: + # # TODO(johngarbutt) We should either move to reserved_host_disk_gb + # # or start tracking DISK_MB. + # reserved_disk_gb = compute_utils.convert_mb_to_ceil_gb( + # CONF.reserved_host_disk_mb) + # result[DISK_GB] = { + # 'total': compute_node.local_gb, + # 'reserved': reserved_disk_gb, + # 'min_unit': 1, + # 'max_unit': compute_node.local_gb, + # 'step_size': 1, + # 'allocation_ratio': compute_node.disk_allocation_ratio, + # } + return result + + +def _instance_to_allocations_dict(instance): + """Given an `objects.Instance` object, return a dict, keyed by resource + class of the amount used by the instance. + + :param instance: `objects.Instance` object to translate + """ + alloc_dict = {} + + # Remove any zero allocations. + return {key: val for key, val in alloc_dict.items() if val} + + +def _move_operation_alloc_request(source_allocs, dest_alloc_req): + """Given existing allocations for a source host and a new allocation + request for a destination host, return a new allocation_request that + contains resources claimed against both source and destination, accounting + for shared providers. + + Also accounts for a resize to the same host where the source and dest + compute node resource providers are going to be the same. In that case + we sum the resource allocations for the single provider. + + :param source_allocs: Dict, keyed by resource provider UUID, of resources + allocated on the source host + :param dest_alloc_req: The allocation_request for resources against the + destination host + """ + LOG.debug("Doubling-up allocation_request for move operation.") + # Remove any allocations against resource providers that are + # already allocated against on the source host (like shared storage + # providers) + cur_rp_uuids = set(source_allocs.keys()) + new_rp_uuids = set(dest_alloc_req['allocations']) - cur_rp_uuids + + current_allocs = { + cur_rp_uuid: {'resources': alloc['resources']} + for cur_rp_uuid, alloc in source_allocs.items() + } + new_alloc_req = {'allocations': current_allocs} + for rp_uuid in dest_alloc_req['allocations']: + if rp_uuid in new_rp_uuids: + new_alloc_req['allocations'][rp_uuid] = dest_alloc_req[ + 'allocations'][rp_uuid] + elif not new_rp_uuids: + # If there are no new_rp_uuids that means we're resizing to + # the same host so we need to sum the allocations for + # the compute node (and possibly shared providers) using both + # the current and new allocations. + # Note that we sum the allocations rather than take the max per + # resource class between the current and new allocations because + # the compute node/resource tracker is going to adjust for + # decrementing any old allocations as necessary, the scheduler + # shouldn't make assumptions about that. + merge_resources( + new_alloc_req['allocations'][rp_uuid]['resources'], + dest_alloc_req['allocations'][rp_uuid]['resources']) + + LOG.debug("New allocation_request containing both source and " + "destination hosts in move operation: %s", new_alloc_req) + return new_alloc_req + + +def _extract_inventory_in_use(body): + """Given an HTTP response body, extract the resource classes that were + still in use when we tried to delete inventory. + + :returns: String of resource classes or None if there was no InventoryInUse + error in the response body. + """ + match = _RE_INV_IN_USE.search(body) + if match: + return match.group(1) + return None + + +def get_placement_request_id(response): + if response is not None: + return response.headers.get(request_id.HTTP_RESP_HEADER_REQUEST_ID) + + +class SchedulerReportClient(object): + """Client class for updating the scheduler.""" + + def __init__(self, adapter=None): + """Initialize the report client. + + :param adapter: A prepared keystoneauth1 Adapter for API communication. + If unspecified, one is created based on config options in the + [placement] section. + """ + self._adapter = adapter + # An object that contains a nova-compute-side cache of resource + # provider and inventory information + self._provider_tree = provider_tree.ProviderTree() + # Track the last time we updated providers' aggregates and traits + self._association_refresh_time = {} + self._client = self._create_client() + # NOTE(danms): Keep track of how naggy we've been + self._warn_count = 0 + + @utils.synchronized(PLACEMENT_CLIENT_SEMAPHORE) + def _create_client(self): + """Create the HTTP session accessing the placement service.""" + # Flush provider tree and associations so we start from a clean slate. + self._provider_tree = provider_tree.ProviderTree() + self._association_refresh_time = {} + client = self._adapter or utils.get_ksa_adapter('placement') + # Set accept header on every request to ensure we notify placement + # service of our response body media type preferences. + client.additional_headers = {'accept': 'application/json'} + return client + + def get(self, url, version=None, global_request_id=None): + headers = ({request_id.INBOUND_HEADER: global_request_id} + if global_request_id else {}) + return self._client.get(url, microversion=version, headers=headers) + + def post(self, url, data, version=None, global_request_id=None): + headers = ({request_id.INBOUND_HEADER: global_request_id} + if global_request_id else {}) + # NOTE(sdague): using json= instead of data= sets the + # media type to application/json for us. Placement API is + # more sensitive to this than other APIs in the OpenStack + # ecosystem. + return self._client.post(url, json=data, microversion=version, + headers=headers) + + def put(self, url, data, version=None, global_request_id=None): + # NOTE(sdague): using json= instead of data= sets the + # media type to application/json for us. Placement API is + # more sensitive to this than other APIs in the OpenStack + # ecosystem. + kwargs = {'microversion': version, + 'headers': {request_id.INBOUND_HEADER: + global_request_id} if global_request_id else {}} + if data is not None: + kwargs['json'] = data + return self._client.put(url, **kwargs) + + def delete(self, url, version=None, global_request_id=None): + headers = ({request_id.INBOUND_HEADER: global_request_id} + if global_request_id else {}) + return self._client.delete(url, microversion=version, headers=headers) + + @safe_connect + def get_allocation_candidates(self, context, resources): + """Returns a tuple of (allocation_requests, provider_summaries, + allocation_request_version). + + The allocation_requests are a collection of potential JSON objects that + can be passed to the PUT /allocations/{consumer_uuid} Placement REST + API to claim resources against one or more resource providers that meet + the requested resource constraints. + + The provider summaries is a dict, keyed by resource provider UUID, of + inventory and capacity information and traits for any resource + provider involved in the allocation_requests. + + :returns: A tuple with a list of allocation_request dicts, a dict of + provider information, and the microversion used to request + this data from placement, or (None, None, None) if the + request failed + + :param context: The security context + :param nova.scheduler.utils.ResourceRequest resources: + A ResourceRequest object representing the requested resources, + traits, and aggregates from the request spec. + + Example member_of (aggregates) value in resources: + + [('foo', 'bar'), ('baz',)] + + translates to: + + "Candidates are in either 'foo' or 'bar', but definitely in 'baz'" + + """ + version = GRANULAR_AC_VERSION + qparams = resources.to_querystring() + url = "/allocation_candidates?%s" % qparams + resp = self.get(url, version=version, + global_request_id=context.global_id) + if resp.status_code == 200: + data = resp.json() + return (data['allocation_requests'], data['provider_summaries'], + version) + + args = { + 'resource_request': str(resources), + 'status_code': resp.status_code, + 'err_text': resp.text, + } + msg = ("Failed to retrieve allocation candidates from placement " + "API for filters: %(resource_request)s\n" + "Got %(status_code)d: %(err_text)s.") + LOG.error(msg, args) + return None, None, None + + @safe_connect + def _get_provider_aggregates(self, context, rp_uuid): + """Queries the placement API for a resource provider's aggregates. + + :param rp_uuid: UUID of the resource provider to grab aggregates for. + :return: A set() of aggregate UUIDs, which may be empty if the + specified provider has no aggregate associations. + :raise: ResourceProviderAggregateRetrievalFailed on errors. In + particular, we raise this exception (as opposed to returning + None or the empty set()) if the specified resource provider + does not exist. + """ + resp = self.get("/resource_providers/%s/aggregates" % rp_uuid, + version='1.1', global_request_id=context.global_id) + if resp.status_code == 200: + data = resp.json() + return set(data['aggregates']) + + placement_req_id = get_placement_request_id(resp) + msg = ("[%(placement_req_id)s] Failed to retrieve aggregates from " + "placement API for resource provider with UUID %(uuid)s. " + "Got %(status_code)d: %(err_text)s.") + args = { + 'placement_req_id': placement_req_id, + 'uuid': rp_uuid, + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + raise exception.ResourceProviderAggregateRetrievalFailed(uuid=rp_uuid) + + @safe_connect + def _get_provider_traits(self, context, rp_uuid): + """Queries the placement API for a resource provider's traits. + + :param context: The security context + :param rp_uuid: UUID of the resource provider to grab traits for. + :return: A set() of string trait names, which may be empty if the + specified provider has no traits. + :raise: ResourceProviderTraitRetrievalFailed on errors. In particular, + we raise this exception (as opposed to returning None or the + empty set()) if the specified resource provider does not exist. + """ + resp = self.get("/resource_providers/%s/traits" % rp_uuid, + version='1.6', global_request_id=context.global_id) + + if resp.status_code == 200: + return set(resp.json()['traits']) + + placement_req_id = get_placement_request_id(resp) + LOG.error( + "[%(placement_req_id)s] Failed to retrieve traits from " + "placement API for resource provider with UUID %(uuid)s. Got " + "%(status_code)d: %(err_text)s.", + {'placement_req_id': placement_req_id, 'uuid': rp_uuid, + 'status_code': resp.status_code, 'err_text': resp.text}) + raise exception.ResourceProviderTraitRetrievalFailed(uuid=rp_uuid) + + @safe_connect + def _get_resource_provider(self, context, uuid): + """Queries the placement API for a resource provider record with the + supplied UUID. + + :param context: The security context + :param uuid: UUID identifier for the resource provider to look up + :return: A dict of resource provider information if found or None if no + such resource provider could be found. + :raise: ResourceProviderRetrievalFailed on error. + """ + resp = self.get("/resource_providers/%s" % uuid, + version=NESTED_PROVIDER_API_VERSION, + global_request_id=context.global_id) + if resp.status_code == 200: + data = resp.json() + return data + elif resp.status_code == 404: + return None + else: + placement_req_id = get_placement_request_id(resp) + msg = ("[%(placement_req_id)s] Failed to retrieve resource " + "provider record from placement API for UUID %(uuid)s. Got " + "%(status_code)d: %(err_text)s.") + args = { + 'uuid': uuid, + 'status_code': resp.status_code, + 'err_text': resp.text, + 'placement_req_id': placement_req_id, + } + LOG.error(msg, args) + raise exception.ResourceProviderRetrievalFailed(uuid=uuid) + + @safe_connect + def _get_sharing_providers(self, context, agg_uuids): + """Queries the placement API for a list of the resource providers + associated with any of the specified aggregates and possessing the + MISC_SHARES_VIA_AGGREGATE trait. + + :param context: The security context + :param agg_uuids: Iterable of string UUIDs of aggregates to filter on. + :return: A list of dicts of resource provider information, which may be + empty if no provider exists with the specified UUID. + :raise: ResourceProviderRetrievalFailed on error. + """ + if not agg_uuids: + return [] + + aggs = ','.join(agg_uuids) + url = "/resource_providers?member_of=in:%s&required=%s" % ( + aggs, os_traits.MISC_SHARES_VIA_AGGREGATE) + resp = self.get(url, version='1.18', + global_request_id=context.global_id) + if resp.status_code == 200: + return resp.json()['resource_providers'] + + msg = _("[%(placement_req_id)s] Failed to retrieve sharing resource " + "providers associated with the following aggregates from " + "placement API: %(aggs)s. Got %(status_code)d: %(err_text)s.") + args = { + 'aggs': aggs, + 'status_code': resp.status_code, + 'err_text': resp.text, + 'placement_req_id': get_placement_request_id(resp), + } + LOG.error(msg, args) + raise exception.ResourceProviderRetrievalFailed(message=msg % args) + + @safe_connect + def _get_providers_in_tree(self, context, uuid): + """Queries the placement API for a list of the resource providers in + the tree associated with the specified UUID. + + :param context: The security context + :param uuid: UUID identifier for the resource provider to look up + :return: A list of dicts of resource provider information, which may be + empty if no provider exists with the specified UUID. + :raise: ResourceProviderRetrievalFailed on error. + """ + resp = self.get("/resource_providers?in_tree=%s" % uuid, + version=NESTED_PROVIDER_API_VERSION, + global_request_id=context.global_id) + + if resp.status_code == 200: + return resp.json()['resource_providers'] + + # Some unexpected error + placement_req_id = get_placement_request_id(resp) + msg = ("[%(placement_req_id)s] Failed to retrieve resource provider " + "tree from placement API for UUID %(uuid)s. Got " + "%(status_code)d: %(err_text)s.") + args = { + 'uuid': uuid, + 'status_code': resp.status_code, + 'err_text': resp.text, + 'placement_req_id': placement_req_id, + } + LOG.error(msg, args) + raise exception.ResourceProviderRetrievalFailed(uuid=uuid) + + @safe_connect + def _create_resource_provider(self, context, uuid, name, + parent_provider_uuid=None): + """Calls the placement API to create a new resource provider record. + + :param context: The security context + :param uuid: UUID of the new resource provider + :param name: Name of the resource provider + :param parent_provider_uuid: Optional UUID of the immediate parent + :return: A dict of resource provider information object representing + the newly-created resource provider. + :raise: ResourceProviderCreationFailed or + ResourceProviderRetrievalFailed on error. + """ + url = "/resource_providers" + payload = { + 'uuid': uuid, + 'name': name, + } + if parent_provider_uuid is not None: + payload['parent_provider_uuid'] = parent_provider_uuid + + # Bug #1746075: First try the microversion that returns the new + # provider's payload. + resp = self.post(url, payload, + version=POST_RPS_RETURNS_PAYLOAD_API_VERSION, + global_request_id=context.global_id) + + placement_req_id = get_placement_request_id(resp) + + if resp: + msg = ("[%(placement_req_id)s] Created resource provider record " + "via placement API for resource provider with UUID " + "%(uuid)s and name %(name)s.") + args = { + 'uuid': uuid, + 'name': name, + 'placement_req_id': placement_req_id, + } + LOG.info(msg, args) + return resp.json() + + # TODO(efried): Push error codes from placement, and use 'em. + name_conflict = 'Conflicting resource provider name:' + if resp.status_code == 409 and name_conflict not in resp.text: + # Another thread concurrently created a resource provider with the + # same UUID. Log a warning and then just return the resource + # provider object from _get_resource_provider() + msg = ("[%(placement_req_id)s] Another thread already created a " + "resource provider with the UUID %(uuid)s. Grabbing that " + "record from the placement API.") + args = { + 'uuid': uuid, + 'placement_req_id': placement_req_id, + } + LOG.info(msg, args) + return self._get_resource_provider(context, uuid) + + # A provider with the same *name* already exists, or some other error. + msg = ("[%(placement_req_id)s] Failed to create resource provider " + "record in placement API for UUID %(uuid)s. Got " + "%(status_code)d: %(err_text)s.") + args = { + 'uuid': uuid, + 'status_code': resp.status_code, + 'err_text': resp.text, + 'placement_req_id': placement_req_id, + } + LOG.error(msg, args) + raise exception.ResourceProviderCreationFailed(name=name) + + def _ensure_resource_provider(self, context, uuid, name=None, + parent_provider_uuid=None): + """Ensures that the placement API has a record of a resource provider + with the supplied UUID. If not, creates the resource provider record in + the placement API for the supplied UUID, passing in a name for the + resource provider. + + If found or created, the provider's UUID is returned from this method. + If the resource provider for the supplied uuid was not found and the + resource provider record could not be created in the placement API, an + exception is raised. + + If this method returns successfully, callers are assured that the + placement API contains a record of the provider; and that the local + cache of resource provider information contains a record of: + - The specified provider + - All providers in its tree + - All providers associated via aggregate with all providers in said + tree + and for each of those providers: + - The UUIDs of its aggregates + - The trait strings associated with the provider + + Note that if the provider did not exist prior to this call, the above + reduces to just the specified provider as a root, with no aggregates or + traits. + + :param context: The security context + :param uuid: UUID identifier for the resource provider to ensure exists + :param name: Optional name for the resource provider if the record + does not exist. If empty, the name is set to the UUID + value + :param parent_provider_uuid: Optional UUID of the immediate parent, + which must have been previously _ensured. + """ + # NOTE(efried): We currently have no code path where we need to set the + # parent_provider_uuid on a previously-parent-less provider - so we do + # NOT handle that scenario here. + # TODO(efried): Reinstate this optimization if possible. + # For now, this is removed due to the following: + # - update_provider_tree adds a child with some bogus inventory (bad + # resource class) or trait (invalid trait name). + # - update_from_provider_tree creates the child in placement and adds + # it to the cache, then attempts to add the bogus inventory/trait. + # The latter fails, so update_from_provider_tree invalidates the + # cache entry by removing the child from the cache. + # - Ordinarily, we would rely on the code below (_get_providers_in_tree + # and _provider_tree.populate_from_iterable) to restore the child to + # the cache on the next iteration. BUT since the root is still + # present in the cache, the commented-out block will cause that part + # of this method to be skipped. + # if self._provider_tree.exists(uuid): + # # If we had the requested provider locally, refresh it and its + # # descendants, but only if stale. + # for u in self._provider_tree.get_provider_uuids(uuid): + # self._refresh_associations(context, u, force=False) + # return uuid + + # We don't have it locally; check placement or create it. + created_rp = None + rps_to_refresh = self._get_providers_in_tree(context, uuid) + if not rps_to_refresh: + created_rp = self._create_resource_provider( + context, uuid, name or uuid, + parent_provider_uuid=parent_provider_uuid) + # If @safe_connect can't establish a connection to the placement + # service, like if placement isn't running or nova-compute is + # mis-configured for authentication, we'll get None back and need + # to treat it like we couldn't create the provider (because we + # couldn't). + if created_rp is None: + raise exception.ResourceProviderCreationFailed( + name=name or uuid) + # Don't add the created_rp to rps_to_refresh. Since we just + # created it, it has no aggregates or traits. + + self._provider_tree.populate_from_iterable( + rps_to_refresh or [created_rp]) + + # At this point, the whole tree exists in the local cache. + + for rp_to_refresh in rps_to_refresh: + # NOTE(efried): _refresh_associations doesn't refresh inventory + # (yet) - see that method's docstring for the why. + self._refresh_and_get_inventory(context, rp_to_refresh['uuid']) + self._refresh_associations( + context, rp_to_refresh['uuid'], + generation=rp_to_refresh.get('generation'), force=True) + + return uuid + + @safe_connect + def _delete_provider(self, rp_uuid, global_request_id=None): + resp = self.delete('/resource_providers/%s' % rp_uuid, + global_request_id=global_request_id) + # Check for 404 since we don't need to warn/raise if we tried to delete + # something which doesn"t actually exist. + if resp or resp.status_code == 404: + if resp: + LOG.info("Deleted resource provider %s", rp_uuid) + # clean the caches + try: + self._provider_tree.remove(rp_uuid) + except ValueError: + pass + self._association_refresh_time.pop(rp_uuid, None) + return + + msg = ("[%(placement_req_id)s] Failed to delete resource provider " + "with UUID %(uuid)s from the placement API. Got " + "%(status_code)d: %(err_text)s.") + args = { + 'placement_req_id': get_placement_request_id(resp), + 'uuid': rp_uuid, + 'status_code': resp.status_code, + 'err_text': resp.text + } + LOG.error(msg, args) + # On conflict, the caller may wish to delete allocations and + # redrive. (Note that this is not the same as a + # PlacementAPIConflict case.) + if resp.status_code == 409: + raise exception.ResourceProviderInUse() + raise exception.ResourceProviderDeletionFailed(uuid=rp_uuid) + + def _get_inventory(self, context, rp_uuid): + url = '/resource_providers/%s/inventories' % rp_uuid + result = self.get(url, global_request_id=context.global_id) + if not result: + return None + return result.json() + + def _refresh_and_get_inventory(self, context, rp_uuid): + """Helper method that retrieves the current inventory for the supplied + resource provider according to the placement API. + + If the cached generation of the resource provider is not the same as + the generation returned from the placement API, we update the cached + generation and attempt to update inventory if any exists, otherwise + return empty inventories. + """ + curr = self._get_inventory(context, rp_uuid) + if curr is None: + return None + + cur_gen = curr['resource_provider_generation'] + # TODO(efried): This condition banks on the generation for a new RP + # starting at zero, which isn't part of the API. It also is only + # useful as an optimization on a freshly-created RP to which nothing + # has ever been done. And it's not much of an optimization, because + # updating the cache is super cheap. We should remove the condition. + if cur_gen: + curr_inv = curr['inventories'] + self._provider_tree.update_inventory(rp_uuid, curr_inv, + generation=cur_gen) + return curr + + def _refresh_associations(self, context, rp_uuid, generation=None, + force=False, refresh_sharing=True): + """Refresh aggregates, traits, and (optionally) aggregate-associated + sharing providers for the specified resource provider uuid. + + Only refresh if there has been no refresh during the lifetime of + this process, CONF.compute.resource_provider_association_refresh + seconds have passed, or the force arg has been set to True. + + Note that we do *not* refresh inventories. The reason is largely + historical: all code paths that get us here are doing inventory refresh + themselves. + + :param context: The security context + :param rp_uuid: UUID of the resource provider to check for fresh + aggregates and traits + :param generation: The resource provider generation to set. If None, + the provider's generation is not updated. + :param force: If True, force the refresh + :param refresh_sharing: If True, fetch all the providers associated + by aggregate with the specified provider, + including their traits and aggregates (but not + *their* sharing providers). + :raise: On various placement API errors, one of: + - ResourceProviderAggregateRetrievalFailed + - ResourceProviderTraitRetrievalFailed + - ResourceProviderRetrievalFailed + """ + if force or self._associations_stale(rp_uuid): + # Refresh aggregates + aggs = self._get_provider_aggregates(context, rp_uuid) + msg = ("Refreshing aggregate associations for resource provider " + "%s, aggregates: %s") + LOG.debug(msg, rp_uuid, ','.join(aggs or ['None'])) + + # NOTE(efried): This will blow up if called for a RP that doesn't + # exist in our _provider_tree. + self._provider_tree.update_aggregates( + rp_uuid, aggs, generation=generation) + + # Refresh traits + traits = self._get_provider_traits(context, rp_uuid) + msg = ("Refreshing trait associations for resource provider %s, " + "traits: %s") + LOG.debug(msg, rp_uuid, ','.join(traits or ['None'])) + # NOTE(efried): This will blow up if called for a RP that doesn't + # exist in our _provider_tree. + self._provider_tree.update_traits( + rp_uuid, traits, generation=generation) + + if refresh_sharing: + # Refresh providers associated by aggregate + for rp in self._get_sharing_providers(context, aggs): + if not self._provider_tree.exists(rp['uuid']): + # NOTE(efried): Right now sharing providers are always + # treated as roots. This is deliberate. From the + # context of this compute's RP, it doesn't matter if a + # sharing RP is part of a tree. + self._provider_tree.new_root( + rp['name'], rp['uuid'], + generation=rp['generation']) + # Now we have to (populate or) refresh that guy's traits + # and aggregates (but not *his* aggregate-associated + # providers). No need to override force=True for newly- + # added providers - the missing timestamp will always + # trigger them to refresh. + self._refresh_associations(context, rp['uuid'], + force=force, + refresh_sharing=False) + self._association_refresh_time[rp_uuid] = time.time() + + def _associations_stale(self, uuid): + """Respond True if aggregates and traits have not been refreshed + "recently". + + Associations are stale if association_refresh_time for this uuid is not + set or is more than CONF.compute.resource_provider_association_refresh + seconds ago. + """ + refresh_time = self._association_refresh_time.get(uuid, 0) + return ((time.time() - refresh_time) > + CONF.compute.resource_provider_association_refresh) + + def _update_inventory_attempt(self, context, rp_uuid, inv_data): + """Update the inventory for this resource provider if needed. + + :param context: The security context + :param rp_uuid: The resource provider UUID for the operation + :param inv_data: The new inventory for the resource provider + :returns: True if the inventory was updated (or did not need to be), + False otherwise. + """ + # TODO(jaypipes): Should we really be calling the placement API to get + # the current inventory for every resource provider each and every time + # update_resource_stats() is called? :( + curr = self._refresh_and_get_inventory(context, rp_uuid) + if curr is None: + return False + + cur_gen = curr['resource_provider_generation'] + + # Check to see if we need to update placement's view + if not self._provider_tree.has_inventory_changed(rp_uuid, inv_data): + return True + + payload = { + 'resource_provider_generation': cur_gen, + 'inventories': inv_data, + } + url = '/resource_providers/%s/inventories' % rp_uuid + result = self.put(url, payload, global_request_id=context.global_id) + if result.status_code == 409: + LOG.info('[%(placement_req_id)s] Inventory update conflict for ' + '%(resource_provider_uuid)s with generation ID ' + '%(generation)s', + {'placement_req_id': get_placement_request_id(result), + 'resource_provider_uuid': rp_uuid, + 'generation': cur_gen}) + # NOTE(jaypipes): There may be cases when we try to set a + # provider's inventory that results in attempting to delete an + # inventory record for a resource class that has an active + # allocation. We need to catch this particular case and raise an + # exception here instead of returning False, since we should not + # re-try the operation in this case. + # + # A use case for where this can occur is the following: + # + # 1) Provider created for each Ironic baremetal node in Newton + # 2) Inventory records for baremetal node created for VCPU, + # MEMORY_MB and DISK_GB + # 3) A Nova instance consumes the baremetal node and allocation + # records are created for VCPU, MEMORY_MB and DISK_GB matching + # the total amount of those resource on the baremetal node. + # 3) Upgrade to Ocata and now resource tracker wants to set the + # provider's inventory to a single record of resource class + # CUSTOM_IRON_SILVER (or whatever the Ironic node's + # "resource_class" attribute is) + # 4) Scheduler report client sends the inventory list containing a + # single CUSTOM_IRON_SILVER record and placement service + # attempts to delete the inventory records for VCPU, MEMORY_MB + # and DISK_GB. An exception is raised from the placement service + # because allocation records exist for those resource classes, + # and a 409 Conflict is returned to the compute node. We need to + # trigger a delete of the old allocation records and then set + # the new inventory, and then set the allocation record to the + # new CUSTOM_IRON_SILVER record. + rc = _extract_inventory_in_use(result.text) + if rc is not None: + raise exception.InventoryInUse( + resource_classes=rc, + resource_provider=rp_uuid, + ) + + # Invalidate our cache and re-fetch the resource provider + # to be sure to get the latest generation. + self._provider_tree.remove(rp_uuid) + # NOTE(jaypipes): We don't need to pass a name parameter to + # _ensure_resource_provider() because we know the resource provider + # record already exists. We're just reloading the record here. + self._ensure_resource_provider(context, rp_uuid) + return False + elif not result: + placement_req_id = get_placement_request_id(result) + LOG.warning('[%(placement_req_id)s] Failed to update inventory ' + 'for resource provider %(uuid)s: %(status)i %(text)s', + {'placement_req_id': placement_req_id, + 'uuid': rp_uuid, + 'status': result.status_code, + 'text': result.text}) + # log the body at debug level + LOG.debug('[%(placement_req_id)s] Failed inventory update request ' + 'for resource provider %(uuid)s with body: %(payload)s', + {'placement_req_id': placement_req_id, + 'uuid': rp_uuid, + 'payload': payload}) + return False + + if result.status_code != 200: + placement_req_id = get_placement_request_id(result) + LOG.info('[%(placement_req_id)s] Received unexpected response ' + 'code %(code)i while trying to update inventory for ' + 'resource provider %(uuid)s: %(text)s', + {'placement_req_id': placement_req_id, + 'uuid': rp_uuid, + 'code': result.status_code, + 'text': result.text}) + return False + + # Update our view of the generation for next time + updated_inventories_result = result.json() + new_gen = updated_inventories_result['resource_provider_generation'] + + self._provider_tree.update_inventory(rp_uuid, inv_data, + generation=new_gen) + LOG.debug('Updated inventory for %s at generation %i', + rp_uuid, new_gen) + return True + + @safe_connect + def _update_inventory(self, context, rp_uuid, inv_data): + for attempt in (1, 2, 3): + if not self._provider_tree.exists(rp_uuid): + # NOTE(danms): Either we failed to fetch/create the RP + # on our first attempt, or a previous attempt had to + # invalidate the cache, and we were unable to refresh + # it. Bail and try again next time. + LOG.warning('Unable to refresh my resource provider record') + return False + if self._update_inventory_attempt(context, rp_uuid, inv_data): + return True + time.sleep(1) + return False + + def get_provider_tree_and_ensure_root(self, context, rp_uuid, name=None, + parent_provider_uuid=None): + """Returns a fresh ProviderTree representing all providers which are in + the same tree or in the same aggregate as the specified provider, + including their aggregates, traits, and inventories. + + If the specified provider does not exist, it is created with the + specified UUID, name, and parent provider (which *must* already exist). + + :param context: The security context + :param rp_uuid: UUID of the resource provider for which to populate the + tree. (This doesn't need to be the UUID of the root.) + :param name: Optional name for the resource provider if the record + does not exist. If empty, the name is set to the UUID + value + :param parent_provider_uuid: Optional UUID of the immediate parent, + which must have been previously _ensured. + :return: A new ProviderTree object. + """ + # TODO(efried): We would like to have the caller handle create-and/or- + # cache-if-not-already, but the resource tracker is currently + # structured to handle initialization and update in a single path. At + # some point this should be refactored, and this method can *just* + # return a deep copy of the local _provider_tree cache. + # (Re)populate the local ProviderTree + self._ensure_resource_provider( + context, rp_uuid, name=name, + parent_provider_uuid=parent_provider_uuid) + # Ensure inventories are up to date (for *all* cached RPs) + for uuid in self._provider_tree.get_provider_uuids(): + self._refresh_and_get_inventory(context, uuid) + # Return a *copy* of the tree. + return copy.deepcopy(self._provider_tree) + + def set_inventory_for_provider(self, context, rp_uuid, rp_name, inv_data, + parent_provider_uuid=None): + """Given the UUID of a provider, set the inventory records for the + provider to the supplied dict of resources. + + :param context: The security context + :param rp_uuid: UUID of the resource provider to set inventory for + :param rp_name: Name of the resource provider in case we need to create + a record for it in the placement API + :param inv_data: Dict, keyed by resource class name, of inventory data + to set against the provider + :param parent_provider_uuid: + If the provider is not a root, this is required, and represents + the UUID of the immediate parent, which is a provider for which + this method has already been invoked. + + :raises: exc.InvalidResourceClass if a supplied custom resource class + name does not meet the placement API's format requirements. + """ + self._ensure_resource_provider( + context, rp_uuid, rp_name, + parent_provider_uuid=parent_provider_uuid) + + # Auto-create custom resource classes coming from a virt driver + self._ensure_resource_classes(context, set(inv_data)) + + # NOTE(efried): Do not use the DELETE API introduced in microversion + # 1.5, even if the new inventory is empty. It provides no way of + # sending the generation down, so no way to trigger/detect a conflict + # if an out-of-band update occurs between when we GET the latest and + # when we invoke the DELETE. See bug #1746374. + self._update_inventory(context, rp_uuid, inv_data) + + def _set_inventory_for_provider(self, context, rp_uuid, inv_data): + """Given the UUID of a provider, set the inventory records for the + provider to the supplied dict of resources. + + Compare and contrast with set_inventory_for_provider above. This one + is specially formulated for use by update_from_provider_tree. Like the + other method, we DO need to _ensure_resource_class - i.e. automatically + create new resource classes specified in the inv_data. However, UNLIKE + the other method: + - We don't use the DELETE API when inventory is empty, because that guy + doesn't return content, and we need to update the cached provider + tree with the new generation. + - We raise exceptions (rather than returning a boolean) which are + handled in a consistent fashion by update_from_provider_tree. + - We don't invalidate the cache on failure. That's controlled at a + broader scope (based on errors from ANY of the set_*_for_provider + methods, etc.) by update_from_provider_tree. + - We don't retry. In this code path, retries happen at the level of + the resource tracker on the next iteration. + - We take advantage of the cache and no-op if inv_data isn't different + from what we have locally. This is an optimization, not essential. + - We don't _ensure_resource_provider or refresh_and_get_inventory, + because that's already been done in the code paths leading up to + update_from_provider_tree (by get_provider_tree). This is an + optimization, not essential. + + In short, this version is more in the spirit of set_traits_for_provider + and set_aggregates_for_provider. + + :param context: The security context + :param rp_uuid: The UUID of the provider whose inventory is to be + updated. + :param inv_data: Dict, keyed by resource class name, of inventory data + to set for the provider. Use None or the empty dict + to remove all inventory for the provider. + :raises: InventoryInUse if inv_data indicates removal of inventory in a + resource class which has active allocations for this provider. + :raises: InvalidResourceClass if inv_data contains a resource class + which cannot be created. + :raises: ResourceProviderUpdateConflict if the provider's generation + doesn't match the generation in the cache. Callers may choose + to retrieve the provider and its associations afresh and + redrive this operation. + :raises: ResourceProviderUpdateFailed on any other placement API + failure. + """ + # TODO(efried): Consolidate/refactor to one set_inventory_for_provider. + + # NOTE(efried): This is here because _ensure_resource_class already has + # @safe_connect, so we don't want to decorate this whole method with it + @safe_connect + def do_put(url, payload): + return self.put(url, payload, global_request_id=context.global_id) + + # If not different from what we've got, short out + if not self._provider_tree.has_inventory_changed(rp_uuid, inv_data): + return + + # Ensure non-standard resource classes exist, creating them if needed. + self._ensure_resource_classes(context, set(inv_data)) + + url = '/resource_providers/%s/inventories' % rp_uuid + inv_data = inv_data or {} + generation = self._provider_tree.data(rp_uuid).generation + payload = { + 'resource_provider_generation': generation, + 'inventories': inv_data, + } + resp = do_put(url, payload) + + if resp.status_code == 200: + json = resp.json() + self._provider_tree.update_inventory( + rp_uuid, json['inventories'], + generation=json['resource_provider_generation']) + return + + # Some error occurred; log it + msg = ("[%(placement_req_id)s] Failed to update inventory to " + "[%(inv_data)s] for resource provider with UUID %(uuid)s. Got " + "%(status_code)d: %(err_text)s") + args = { + 'placement_req_id': get_placement_request_id(resp), + 'uuid': rp_uuid, + 'inv_data': str(inv_data), + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + + if resp.status_code == 409: + # If a conflict attempting to remove inventory in a resource class + # with active allocations, raise InventoryInUse + rc = _extract_inventory_in_use(resp.text) + if rc is not None: + raise exception.InventoryInUse( + resource_classes=rc, + resource_provider=rp_uuid, + ) + # Other conflicts are generation mismatch: raise conflict exception + raise exception.ResourceProviderUpdateConflict( + uuid=rp_uuid, generation=generation, error=resp.text) + + # Otherwise, raise generic exception + raise exception.ResourceProviderUpdateFailed(url=url, error=resp.text) + + @safe_connect + def _ensure_traits(self, context, traits): + """Make sure all specified traits exist in the placement service. + + :param context: The security context + :param traits: Iterable of trait strings to ensure exist. + :raises: TraitCreationFailed if traits contains a trait that did not + exist in placement, and couldn't be created. When this + exception is raised, it is possible that *some* of the + requested traits were created. + :raises: TraitRetrievalFailed if the initial query of existing traits + was unsuccessful. In this scenario, it is guaranteed that + no traits were created. + """ + if not traits: + return + + # Query for all the requested traits. Whichever ones we *don't* get + # back, we need to create. + # NOTE(efried): We don't attempt to filter based on our local idea of + # standard traits, which may not be in sync with what the placement + # service knows. If the caller tries to ensure a nonexistent + # "standard" trait, they deserve the TraitCreationFailed exception + # they'll get. + resp = self.get('/traits?name=in:' + ','.join(traits), version='1.6', + global_request_id=context.global_id) + if resp.status_code == 200: + traits_to_create = set(traits) - set(resp.json()['traits']) + # Might be neat to have a batch create. But creating multiple + # traits will generally happen once, at initial startup, if at all. + for trait in traits_to_create: + resp = self.put('/traits/' + trait, None, version='1.6', + global_request_id=context.global_id) + if not resp: + raise exception.TraitCreationFailed(name=trait, + error=resp.text) + return + + # The initial GET failed + msg = ("[%(placement_req_id)s] Failed to retrieve the list of traits. " + "Got %(status_code)d: %(err_text)s") + args = { + 'placement_req_id': get_placement_request_id(resp), + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + raise exception.TraitRetrievalFailed(error=resp.text) + + @safe_connect + def set_traits_for_provider(self, context, rp_uuid, traits): + """Replace a provider's traits with those specified. + + The provider must exist - this method does not attempt to create it. + + :param context: The security context + :param rp_uuid: The UUID of the provider whose traits are to be updated + :param traits: Iterable of traits to set on the provider + :raises: ResourceProviderUpdateConflict if the provider's generation + doesn't match the generation in the cache. Callers may choose + to retrieve the provider and its associations afresh and + redrive this operation. + :raises: ResourceProviderUpdateFailed on any other placement API + failure. + :raises: TraitCreationFailed if traits contains a trait that did not + exist in placement, and couldn't be created. + :raises: TraitRetrievalFailed if the initial query of existing traits + was unsuccessful. + """ + # If not different from what we've got, short out + if not self._provider_tree.have_traits_changed(rp_uuid, traits): + return + + self._ensure_traits(context, traits) + + url = '/resource_providers/%s/traits' % rp_uuid + # NOTE(efried): Don't use the DELETE API when traits is empty, because + # that guy doesn't return content, and we need to update the cached + # provider tree with the new generation. + traits = list(traits) if traits else [] + generation = self._provider_tree.data(rp_uuid).generation + payload = { + 'resource_provider_generation': generation, + 'traits': traits, + } + resp = self.put(url, payload, version='1.6', + global_request_id=context.global_id) + + if resp.status_code == 200: + json = resp.json() + self._provider_tree.update_traits( + rp_uuid, json['traits'], + generation=json['resource_provider_generation']) + return + + # Some error occurred; log it + msg = ("[%(placement_req_id)s] Failed to update traits to " + "[%(traits)s] for resource provider with UUID %(uuid)s. Got " + "%(status_code)d: %(err_text)s") + args = { + 'placement_req_id': get_placement_request_id(resp), + 'uuid': rp_uuid, + 'traits': ','.join(traits), + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + + # If a conflict, raise special conflict exception + if resp.status_code == 409: + raise exception.ResourceProviderUpdateConflict( + uuid=rp_uuid, generation=generation, error=resp.text) + + # Otherwise, raise generic exception + raise exception.ResourceProviderUpdateFailed(url=url, error=resp.text) + + @safe_connect + def set_aggregates_for_provider(self, context, rp_uuid, aggregates, + use_cache=True): + """Replace a provider's aggregates with those specified. + + The provider must exist - this method does not attempt to create it. + + :param context: The security context + :param rp_uuid: The UUID of the provider whose aggregates are to be + updated. + :param aggregates: Iterable of aggregates to set on the provider. + :param use_cache: If False, indicates not to update the cache of + resource providers. + :raises: ResourceProviderUpdateFailed on any placement API failure. + """ + # TODO(efried): Handle generation conflicts when supported by placement + url = '/resource_providers/%s/aggregates' % rp_uuid + aggregates = list(aggregates) if aggregates else [] + resp = self.put(url, aggregates, version='1.1', + global_request_id=context.global_id) + + if resp.status_code == 200: + placement_aggs = resp.json()['aggregates'] + if use_cache: + self._provider_tree.update_aggregates(rp_uuid, placement_aggs) + return + + # Some error occurred; log it + msg = ("[%(placement_req_id)s] Failed to update aggregates to " + "[%(aggs)s] for resource provider with UUID %(uuid)s. Got " + "%(status_code)d: %(err_text)s") + args = { + 'placement_req_id': get_placement_request_id(resp), + 'uuid': rp_uuid, + 'aggs': ','.join(aggregates), + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + + raise exception.ResourceProviderUpdateFailed(url=url, error=resp.text) + + @safe_connect + def _ensure_resource_classes(self, context, names): + """Make sure resource classes exist. + + :param context: The security context + :param names: Iterable of string names of the resource classes to + check/create. Must not be None. + :raises: exception.InvalidResourceClass if an attempt is made to create + an invalid resource class. + """ + # Placement API version that supports PUT /resource_classes/CUSTOM_* + # to create (or validate the existence of) a consumer-specified + # resource class. + version = '1.7' + to_ensure = set(n for n in names + if n.startswith(fields.ResourceClass.CUSTOM_NAMESPACE)) + + for name in to_ensure: + # no payload on the put request + resp = self.put( + "/resource_classes/%s" % name, None, version=version, + global_request_id=context.global_id) + if not resp: + msg = ("Failed to ensure resource class record with placement " + "API for resource class %(rc_name)s. Got " + "%(status_code)d: %(err_text)s.") + args = { + 'rc_name': name, + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + raise exception.InvalidResourceClass(resource_class=name) + + def update_compute_node(self, context, compute_node): + """Creates or updates stats for the supplied compute node. + + :param context: The security context + :param compute_node: updated nova.objects.ComputeNode to report + :raises `exception.InventoryInUse` if the compute node has had changes + to its inventory but there are still active allocations for + resource classes that would be deleted by an update to the + placement API. + """ + self._ensure_resource_provider(context, compute_node.uuid, + compute_node.hypervisor_hostname) + inv_data = _compute_node_to_inventory_dict(compute_node) + # NOTE(efried): Do not use the DELETE API introduced in microversion + # 1.5, even if the new inventory is empty. It provides no way of + # sending the generation down, so no way to trigger/detect a conflict + # if an out-of-band update occurs between when we GET the latest and + # when we invoke the DELETE. See bug #1746374. + self._update_inventory(context, compute_node.uuid, inv_data) + + def update_from_provider_tree(self, context, new_tree): + """Flush changes from a specified ProviderTree back to placement. + + The specified ProviderTree is compared against the local cache. Any + changes are flushed back to the placement service. Upon successful + completion, the local cache should reflect the specified ProviderTree. + + This method is best-effort and not atomic. When exceptions are raised, + it is possible that some of the changes have been flushed back, leaving + the placement database in an inconsistent state. This should be + recoverable through subsequent calls. + + :param context: The security context + :param new_tree: A ProviderTree instance representing the desired state + of providers in placement. + :raises: ResourceProviderSyncFailed if any errors were encountered + attempting to perform the necessary API operations. + """ + # NOTE(efried): We currently do not handle the "rename" case. This is + # where new_tree contains a provider named Y whose UUID already exists + # but is named X. Today the only way the consumer could accomplish + # this is by deleting the provider and recreating it with the new name. + + @contextlib.contextmanager + def catch_all(rp_uuid): + """Convert all "expected" exceptions from placement API helpers to + True or False. Saves having to do try/except for every helper call + below. + """ + class Status(object): + success = True + s = Status() + # TODO(efried): Make a base exception class from which all these + # can inherit. + helper_exceptions = ( + exception.InvalidResourceClass, + exception.InventoryInUse, + exception.ResourceProviderAggregateRetrievalFailed, + exception.ResourceProviderDeletionFailed, + exception.ResourceProviderInUse, + exception.ResourceProviderRetrievalFailed, + exception.ResourceProviderTraitRetrievalFailed, + exception.ResourceProviderUpdateConflict, + exception.ResourceProviderUpdateFailed, + exception.TraitCreationFailed, + exception.TraitRetrievalFailed, + ) + try: + yield s + except helper_exceptions: + s.success = False + # Invalidate the caches + try: + self._provider_tree.remove(rp_uuid) + except ValueError: + pass + self._association_refresh_time.pop(rp_uuid, None) + + # Overall indicator of success. Will be set to False on any exception. + success = True + + # Helper methods herein will be updating the local cache (this is + # intentional) so we need to grab up front any data we need to operate + # on in its "original" form. + old_tree = self._provider_tree + old_uuids = old_tree.get_provider_uuids() + new_uuids = new_tree.get_provider_uuids() + + # Do provider deletion first, since it has the best chance of failing + # for non-generation-conflict reasons (i.e. allocations). + uuids_to_remove = set(old_uuids) - set(new_uuids) + # We have to do deletions in bottom-up order, so we don't error + # attempting to delete a parent who still has children. + for uuid in reversed(old_uuids): + if uuid not in uuids_to_remove: + continue + with catch_all(uuid) as status: + self._delete_provider(uuid) + success = success and status.success + + # Now create (or load) any "new" providers + uuids_to_add = set(new_uuids) - set(old_uuids) + # We have to do additions in top-down order, so we don't error + # attempting to create a child before its parent exists. + for uuid in new_uuids: + if uuid not in uuids_to_add: + continue + provider = new_tree.data(uuid) + with catch_all(uuid) as status: + self._ensure_resource_provider( + context, uuid, name=provider.name, + parent_provider_uuid=provider.parent_uuid) + success = success and status.success + + # At this point the local cache should have all the same providers as + # new_tree. Whether we added them or not, walk through and diff/flush + # inventories, traits, and aggregates as necessary (the helper methods + # are set up to check and short out when the relevant property does not + # differ from what's in the cache). + # If we encounter any error and remove a provider from the cache, all + # its descendants are also removed, and set_*_for_provider methods on + # it wouldn't be able to get started. Walking the tree in bottom-up + # order ensures we at least try to process all of the providers. + for uuid in reversed(new_uuids): + pd = new_tree.data(uuid) + with catch_all(pd.uuid) as status: + self._set_inventory_for_provider( + context, pd.uuid, pd.inventory) + self.set_aggregates_for_provider( + context, pd.uuid, pd.aggregates) + self.set_traits_for_provider(context, pd.uuid, pd.traits) + success = success and status.success + + if not success: + raise exception.ResourceProviderSyncFailed() + + @safe_connect + def get_allocations_for_consumer(self, context, consumer): + url = '/allocations/%s' % consumer + resp = self.get(url, global_request_id=context.global_id) + if not resp: + return {} + else: + return resp.json()['allocations'] + + def get_allocations_for_consumer_by_provider(self, context, rp_uuid, + consumer): + # NOTE(cdent): This trims to just the allocations being + # used on this resource provider. In the future when there + # are shared resources there might be other providers. + allocations = self.get_allocations_for_consumer(context, consumer) + if allocations is None: + # safe_connect can return None on 404 + allocations = {} + return allocations.get( + rp_uuid, {}).get('resources', {}) + + def _allocate_for_instance(self, context, rp_uuid, instance): + my_allocations = _instance_to_allocations_dict(instance) + current_allocations = self.get_allocations_for_consumer_by_provider( + context, rp_uuid, instance.uuid) + if current_allocations == my_allocations: + allocstr = ','.join(['%s=%s' % (k, v) + for k, v in my_allocations.items()]) + LOG.debug('Instance %(uuid)s allocations are unchanged: %(alloc)s', + {'uuid': instance.uuid, 'alloc': allocstr}) + return + + LOG.debug('Sending allocation for instance %s', + my_allocations, + instance=instance) + res = self.put_allocations(context, rp_uuid, instance.uuid, + my_allocations, instance.project_id, + instance.user_id) + if res: + LOG.info('Submitted allocation for instance', instance=instance) + + # NOTE(jaypipes): Currently, this method is ONLY used in two places: + # 1. By the scheduler to allocate resources on the selected destination + # hosts. + # 2. By the conductor LiveMigrationTask to allocate resources on a forced + # destination host. This is a short-term fix for Pike which should be + # replaced in Queens by conductor calling the scheduler in the force + # host case. + # This method should not be called by the resource tracker; instead, the + # _allocate_for_instance() method is used which does not perform any + # checking that a move operation is in place. + @safe_connect + @retries + def claim_resources(self, context, consumer_uuid, alloc_request, + project_id, user_id, allocation_request_version=None): + """Creates allocation records for the supplied instance UUID against + the supplied resource providers. + + We check to see if resources have already been claimed for this + consumer. If so, we assume that a move operation is underway and the + scheduler is attempting to claim resources against the new (destination + host). In order to prevent compute nodes currently performing move + operations from being scheduled to improperly, we create a "doubled-up" + allocation that consumes resources on *both* the source and the + destination host during the move operation. When the move operation + completes, the destination host (via _allocate_for_instance()) will + end up setting allocations for the instance only on the destination + host thereby freeing up resources on the source host appropriately. + + :param context: The security context + :param consumer_uuid: The instance's UUID. + :param alloc_request: The JSON body of the request to make to the + placement's PUT /allocations API + :param project_id: The project_id associated with the allocations. + :param user_id: The user_id associated with the allocations. + :param allocation_request_version: The microversion used to request the + allocations. + :returns: True if the allocations were created, False otherwise. + """ + # Older clients might not send the allocation_request_version, so + # default to 1.10. + # TODO(alex_xu): In the rocky, all the client should send the + # allocation_request_version. So remove this default value. + allocation_request_version = allocation_request_version or '1.10' + # Ensure we don't change the supplied alloc request since it's used in + # a loop within the scheduler against multiple instance claims + ar = copy.deepcopy(alloc_request) + + # If the allocation_request_version less than 1.12, then convert the + # allocation array format to the dict format. This conversion can be + # remove in Rocky release. + if versionutils.convert_version_to_tuple( + allocation_request_version) < (1, 12): + ar = { + 'allocations': { + alloc['resource_provider']['uuid']: { + 'resources': alloc['resources'] + } for alloc in ar['allocations'] + } + } + allocation_request_version = '1.12' + + url = '/allocations/%s' % consumer_uuid + + payload = ar + + # We first need to determine if this is a move operation and if so + # create the "doubled-up" allocation that exists for the duration of + # the move operation against both the source and destination hosts + r = self.get(url, global_request_id=context.global_id) + if r.status_code == 200: + current_allocs = r.json()['allocations'] + if current_allocs: + payload = _move_operation_alloc_request(current_allocs, ar) + + payload['project_id'] = project_id + payload['user_id'] = user_id + r = self.put(url, payload, version=allocation_request_version, + global_request_id=context.global_id) + if r.status_code != 204: + # NOTE(jaypipes): Yes, it sucks doing string comparison like this + # but we have no error codes, only error messages. + if 'concurrently updated' in r.text: + reason = ('another process changed the resource providers ' + 'involved in our attempt to put allocations for ' + 'consumer %s' % consumer_uuid) + raise Retry('claim_resources', reason) + else: + LOG.warning( + 'Unable to submit allocation for instance ' + '%(uuid)s (%(code)i %(text)s)', + {'uuid': consumer_uuid, + 'code': r.status_code, + 'text': r.text}) + return r.status_code == 204 + + @safe_connect + def remove_provider_from_instance_allocation(self, context, consumer_uuid, + rp_uuid, user_id, project_id, + resources): + """Grabs an allocation for a particular consumer UUID, strips parts of + the allocation that refer to a supplied resource provider UUID, and + then PUTs the resulting allocation back to the placement API for the + consumer. + + This is used to reconcile the "doubled-up" allocation that the + scheduler constructs when claiming resources against the destination + host during a move operation. + + If the move was between hosts, the entire allocation for rp_uuid will + be dropped. If the move is a resize on the same host, then we will + subtract resources from the single allocation to ensure we do not + exceed the reserved or max_unit amounts for the resource on the host. + + :param context: The security context + :param consumer_uuid: The instance/consumer UUID + :param rp_uuid: The UUID of the provider whose resources we wish to + remove from the consumer's allocation + :param user_id: The instance's user + :param project_id: The instance's project + :param resources: The resources to be dropped from the allocation + """ + url = '/allocations/%s' % consumer_uuid + + # Grab the "doubled-up" allocation that we will manipulate + r = self.get(url, global_request_id=context.global_id) + if r.status_code != 200: + LOG.warning("Failed to retrieve allocations for %s. Got HTTP %s", + consumer_uuid, r.status_code) + return False + + current_allocs = r.json()['allocations'] + if not current_allocs: + LOG.error("Expected to find current allocations for %s, but " + "found none.", consumer_uuid) + return False + + # If the host isn't in the current allocation for the instance, don't + # do anything + if rp_uuid not in current_allocs: + LOG.warning("Expected to find allocations referencing resource " + "provider %s for %s, but found none.", + rp_uuid, consumer_uuid) + return True + + compute_providers = [uuid for uuid, alloc in current_allocs.items() + if 'VCPU' in alloc['resources']] + LOG.debug('Current allocations for instance: %s', current_allocs, + instance_uuid=consumer_uuid) + LOG.debug('Instance %s has resources on %i compute nodes', + consumer_uuid, len(compute_providers)) + + new_allocs = [ + { + 'resource_provider': { + 'uuid': alloc_rp_uuid, + }, + 'resources': alloc['resources'], + } + for alloc_rp_uuid, alloc in current_allocs.items() + if alloc_rp_uuid != rp_uuid + ] + + if len(compute_providers) == 1: + # NOTE(danms): We are in a resize to same host scenario. Since we + # are the only provider then we need to merge back in the doubled + # allocation with our part subtracted + peer_alloc = { + 'resource_provider': { + 'uuid': rp_uuid, + }, + 'resources': current_allocs[rp_uuid]['resources'] + } + LOG.debug('Original resources from same-host ' + 'allocation: %s', peer_alloc['resources']) + merge_resources(peer_alloc['resources'], + resources, -1) + LOG.debug('Subtracting old resources from same-host ' + 'allocation: %s', peer_alloc['resources']) + new_allocs.append(peer_alloc) + + payload = {'allocations': new_allocs} + payload['project_id'] = project_id + payload['user_id'] = user_id + LOG.debug("Sending updated allocation %s for instance %s after " + "removing resources for %s.", + new_allocs, consumer_uuid, rp_uuid) + r = self.put(url, payload, version='1.10', + global_request_id=context.global_id) + if r.status_code != 204: + LOG.warning("Failed to save allocation for %s. Got HTTP %s: %s", + consumer_uuid, r.status_code, r.text) + return r.status_code == 204 + + @safe_connect + @retries + def set_and_clear_allocations(self, context, rp_uuid, consumer_uuid, + alloc_data, project_id, user_id, + consumer_to_clear=None): + """Create allocation records for the supplied consumer UUID while + simultaneously clearing any allocations identified by the uuid + in consumer_to_clear, for example a migration uuid when moving an + instance to another host. This is for atomically managing so-called + "doubled" migration records. + + :note Currently we only allocate against a single resource provider. + Once shared storage and things like NUMA allocations are a + reality, this will change to allocate against multiple providers. + + :param context: The security context + :param rp_uuid: The UUID of the resource provider to allocate against. + :param consumer_uuid: The consumer UUID for which allocations are + being set. + :param alloc_data: Dict, keyed by resource class, of amounts to + consume. + :param project_id: The project_id associated with the allocations. + :param user_id: The user_id associated with the allocations. + :param consumer_to_clear: A UUID identifying allocations for a + consumer that should be cleared. + :returns: True if the allocations were created, False otherwise. + :raises: Retry if the operation should be retried due to a concurrent + update. + """ + # FIXME(cdent): Fair amount of duplicate with put in here, but now + # just working things through. + payload = { + consumer_uuid: { + 'allocations': { + rp_uuid: { + 'resources': alloc_data + } + }, + 'project_id': project_id, + 'user_id': user_id, + } + } + if consumer_to_clear: + payload[consumer_to_clear] = { + 'allocations': {}, + 'project_id': project_id, + 'user_id': user_id, + } + r = self.post('/allocations', payload, + version=POST_ALLOCATIONS_API_VERSION, + global_request_id=context.global_id) + if r.status_code != 204: + # NOTE(jaypipes): Yes, it sucks doing string comparison like this + # but we have no error codes, only error messages. + if 'concurrently updated' in r.text: + reason = ('another process changed the resource providers ' + 'involved in our attempt to post allocations for ' + 'consumer %s' % consumer_uuid) + raise Retry('set_and_clear_allocations', reason) + else: + LOG.warning( + 'Unable to post allocations for instance ' + '%(uuid)s (%(code)i %(text)s)', + {'uuid': consumer_uuid, + 'code': r.status_code, + 'text': r.text}) + return r.status_code == 204 + + @safe_connect + @retries + def put_allocations(self, context, rp_uuid, consumer_uuid, alloc_data, + project_id, user_id): + """Creates allocation records for the supplied instance UUID against + the supplied resource provider. + + :note Currently we only allocate against a single resource provider. + Once shared storage and things like NUMA allocations are a + reality, this will change to allocate against multiple providers. + + :param context: The security context + :param rp_uuid: The UUID of the resource provider to allocate against. + :param consumer_uuid: The instance's UUID. + :param alloc_data: Dict, keyed by resource class, of amounts to + consume. + :param project_id: The project_id associated with the allocations. + :param user_id: The user_id associated with the allocations. + :returns: True if the allocations were created, False otherwise. + :raises: Retry if the operation should be retried due to a concurrent + update. + """ + payload = { + 'allocations': [ + { + 'resource_provider': { + 'uuid': rp_uuid, + }, + 'resources': alloc_data, + }, + ], + 'project_id': project_id, + 'user_id': user_id, + } + url = '/allocations/%s' % consumer_uuid + r = self.put(url, payload, version='1.8', + global_request_id=context.global_id) + if r.status_code != 204: + # NOTE(jaypipes): Yes, it sucks doing string comparison like this + # but we have no error codes, only error messages. + if 'concurrently updated' in r.text: + reason = ('another process changed the resource providers ' + 'involved in our attempt to put allocations for ' + 'consumer %s' % consumer_uuid) + raise Retry('put_allocations', reason) + else: + LOG.warning( + 'Unable to submit allocation for instance ' + '%(uuid)s (%(code)i %(text)s)', + {'uuid': consumer_uuid, + 'code': r.status_code, + 'text': r.text}) + return r.status_code == 204 + + @safe_connect + def delete_allocation_for_instance(self, context, uuid): + url = '/allocations/%s' % uuid + r = self.delete(url, global_request_id=context.global_id) + if r: + LOG.info('Deleted allocation for instance %s', uuid) + return True + else: + # Check for 404 since we don't need to log a warning if we tried to + # delete something which doesn't actually exist. + if r.status_code != 404: + LOG.warning('Unable to delete allocation for instance ' + '%(uuid)s: (%(code)i %(text)s)', + {'uuid': uuid, + 'code': r.status_code, + 'text': r.text}) + return False + + def update_instance_allocation(self, context, compute_node, instance, + sign): + if sign > 0: + self._allocate_for_instance(context, compute_node.uuid, instance) + else: + self.delete_allocation_for_instance(context, instance.uuid) + + @safe_connect + def get_allocations_for_resource_provider(self, context, rp_uuid): + url = '/resource_providers/%s/allocations' % rp_uuid + resp = self.get(url, global_request_id=context.global_id) + if not resp: + return {} + else: + return resp.json()['allocations'] + + def delete_resource_provider(self, context, compute_node, cascade=False): + """Deletes the ResourceProvider record for the compute_node. + + :param context: The security context + :param compute_node: The nova.objects.ComputeNode object that is the + resource provider being deleted. + :param cascade: Boolean value that, when True, will first delete any + associated Allocation and Inventory records for the + compute node + """ + raise NotImplementedError('Please implement this method to delete a ' + 'resource provider.') + + @safe_connect + def _get_provider_by_name(self, context, name): + """Queries the placement API for resource provider information matching + a supplied name. + + :param context: The security context + :param name: Name of the resource provider to look up + :return: A dict of resource provider information including the + provider's UUID and generation + :raises: `exception.ResourceProviderNotFound` when no such provider was + found + """ + resp = self.get("/resource_providers?name=%s" % name, + global_request_id=context.global_id) + if resp.status_code == 200: + data = resp.json() + records = data['resource_providers'] + num_recs = len(records) + if num_recs == 1: + return records[0] + elif num_recs > 1: + msg = ("Found multiple resource provider records for resource " + "provider name %(rp_name)s: %(rp_uuids)s. " + "This should not happen.") + LOG.warning(msg, { + 'rp_name': name, + 'rp_uuids': ','.join([r['uuid'] for r in records]) + }) + elif resp.status_code != 404: + msg = ("Failed to retrieve resource provider information by name " + "for resource provider %s. Got %d: %s") + LOG.warning(msg, name, resp.status_code, resp.text) + + raise exception.ResourceProviderNotFound(name_or_uuid=name) + + def aggregate_add_host(self, context, agg_uuid, host_name): + """Looks up a resource provider by the supplied host name, and adds the + aggregate with supplied UUID to that resource provider. + + :note: This method does NOT use the cached provider tree. It is only + called from the Compute API when a nova host aggregate is + modified + + :param context: The security context + :param agg_uuid: UUID of the aggregate being modified + :param host_name: Name of the nova-compute service worker to look up a + resource provider for + :raises: `exceptions.ResourceProviderNotFound` if no resource provider + matching the host name could be found from the placement API + :raises: `exception.ResourceProviderAggregateRetrievalFailed` when + failing to get a provider's existing aggregates + :raises: `exception.ResourceProviderUpdateFailed` if there was a + failure attempting to save the provider aggregates + """ + rp = self._get_provider_by_name(context, host_name) + # NOTE(jaypipes): Unfortunately, due to @safe_connect, + # _get_provider_by_name() can return None. If that happens, raise an + # error so we can trap for it in the Nova API code and ignore in Rocky, + # blow up in Stein. + if rp is None: + raise exception.PlacementAPIConnectFailure() + rp_uuid = rp['uuid'] + + # Now attempt to add the aggregate to the resource provider. We don't + # want to overwrite any other aggregates the provider may be associated + # with, however, so we first grab the list of aggregates for this + # provider and add the aggregate to the list of aggregates it already + # has + existing_aggs = self._get_provider_aggregates(context, rp_uuid) + if agg_uuid in existing_aggs: + return + + new_aggs = existing_aggs | set([agg_uuid]) + # TODO(jaypipes): Send provider generation (which is in the rp dict) + # along to set_aggregates_for_provider() + self.set_aggregates_for_provider( + context, rp_uuid, new_aggs, use_cache=False) + + def aggregate_remove_host(self, context, agg_uuid, host_name): + """Looks up a resource provider by the supplied host name, and removes + the aggregate with supplied UUID from that resource provider. + + :note: This method does NOT use the cached provider tree. It is only + called from the Compute API when a nova host aggregate is + modified + + :param context: The security context + :param agg_uuid: UUID of the aggregate being modified + :param host_name: Name of the nova-compute service worker to look up a + resource provider for + :raises: `exceptions.ResourceProviderNotFound` if no resource provider + matching the host name could be found from the placement API + :raises: `exception.ResourceProviderAggregateRetrievalFailed` when + failing to get a provider's existing aggregates + :raises: `exception.ResourceProviderUpdateFailed` if there was a + failure attempting to save the provider aggregates + """ + rp = self._get_provider_by_name(context, host_name) + # NOTE(jaypipes): Unfortunately, due to @safe_connect, + # _get_provider_by_name() can return None. If that happens, raise an + # error so we can trap for it in the Nova API code and ignore in Rocky, + # blow up in Stein. + if rp is None: + raise exception.PlacementAPIConnectFailure() + rp_uuid = rp['uuid'] + + # Now attempt to remove the aggregate from the resource provider. We + # don't want to overwrite any other aggregates the provider may be + # associated with, however, so we first grab the list of aggregates for + # this provider and remove the aggregate from the list of aggregates it + # already has + existing_aggs = self._get_provider_aggregates(context, rp_uuid) + if agg_uuid not in existing_aggs: + return + + new_aggs = existing_aggs - set([agg_uuid]) + # TODO(jaypipes): Send provider generation (which is in the rp dict) + # along to set_aggregates_for_provider() + self.set_aggregates_for_provider( + context, rp_uuid, new_aggs, use_cache=False) diff --git a/devstack/lib/cyborg b/devstack/lib/cyborg index f9f00baa..939073f2 100644 --- a/devstack/lib/cyborg +++ b/devstack/lib/cyborg @@ -162,6 +162,26 @@ function configure_auth_for { iniset $CYBORG_CONF_FILE $service_config_section cafile $SSL_BUNDLE_FILE } +function configure_cyborg_placement { + # Use the provided config file path or default to $CYBORG_CONF. + local section=${1:-placement} + local auth_section=${2:-keystone_authtoken} + iniset $CYBORG_CONF_FILE $section auth_section $auth_section + iniset $CYBORG_CONF_FILE $section auth_type "password" + iniset $CYBORG_CONF_FILE $section auth_url "$KEYSTONE_SERVICE_URI" + iniset $CYBORG_CONF_FILE $section username $section + iniset $CYBORG_CONF_FILE $section password "$SERVICE_PASSWORD" + iniset $CYBORG_CONF_FILE $section user_domain_name "$SERVICE_DOMAIN_NAME" + iniset $CYBORG_CONF_FILE $section project_name "$SERVICE_TENANT_NAME" + iniset $CYBORG_CONF_FILE $section project_domain_name "$SERVICE_DOMAIN_NAME" + # TODO(cdent): auth_strategy, which is common to see in these + # blocks is not currently used here. For the time being the + # placement api uses the auth_strategy configuration setting + # established by the nova api. This avoids, for the time, being, + # creating redundant configuration items that are just used for + # testing. +} + # configure_cyborg_conductor() - Is used by configure_cyborg(). # Sets conductor specific settings. function configure_cyborg_conductor { @@ -171,6 +191,7 @@ function configure_cyborg_conductor { # this one is needed for lookup of Cyborg API endpoint via Keystone configure_auth_for service_catalog + configure_cyborg_placement sudo cp $CYBORG_DIR/etc/cyborg/rootwrap.conf $CYBORG_ROOTWRAP_CONF sudo cp -r $CYBORG_DIR/etc/cyborg/rootwrap.d $CYBORG_CONF_DIR