perform reshaper operations in single transaction

Adds a nova.api.openstack.placement.objects.resource_provider.reshape()
function that accepts a dict, keyed by provider UUID, of inventory
information for a set of providers and an AllocationList object that
contains all of the rejiggered allocation records for all consumers on
the providers involved in the reshape operation.

The reshape() function is decorated with the placement API's DB
transaction context manager which will catch all exceptions and issue a
ROLLBACK of the single writer transaction that is involved in the myriad
sub-operations that happen inside reshape(). Likewise, a single COMMIT
will be executed in the writer transaction when reshape() completes
without an exception.

Change-Id: I527de486eda63b8272ffbfe42f6475907304556c
blueprint: reshape-provider-tree
This commit is contained in:
Jay Pipes 2018-07-12 16:51:37 -04:00 committed by Eric Fried
parent 24bd27fcfd
commit 19eed8592b
2 changed files with 445 additions and 0 deletions

View File

@ -4034,3 +4034,89 @@ class AllocationCandidates(base.VersionedObject):
kept_summary_objs = summary_objs
return alloc_request_objs, kept_summary_objs
@db_api.placement_context_manager.writer
def reshape(ctx, inventories, allocations):
"""The 'replace the world' strategy that is executed when we want to
completely replace a set of provider inventory, allocation and consumer
information in a single transaction.
:note: The reason this has to be done in a single monolithic function is so
we have a single top-level function on which to decorate with the
@db_api.placement_context_manager.writer transaction context
manager. Each time a top-level function that is decorated with this
exits, the transaction is either COMMIT'd or ROLLBACK'd. We need to
avoid calling two functions that are already decorated with a
transaction context manager from a function that *isn't* decorated
with the transaction context manager if we want all changes involved
in the sub-functions to operate within a single DB transaction.
:param ctx: `nova.api.openstack.placement.context.RequestContext` object
containing the DB transaction context.
:param inventories: dict, keyed by resource provider UUID, of
`InventoryList` objects representing the replaced
inventory information for the provider.
:param allocations: `AllocationList` object containing all allocations for
all consumers being modified by the reshape operation.
:raises: `exception.ConcurrentUpdateDetected` when any resource provider or
consumer generation increment fails due to concurrent changes to
the same objects.
"""
# The resource provider objects, keyed by provider UUID, that are involved
# in this transaction. We keep a cache of these because as we perform the
# various operations on the providers, their generations increment and we
# want to "inject" the changed resource provider objects into the
# AllocationList's objects before calling AllocationList.replace_all()
affected_providers = {}
# We have to do the inventory changes in two steps because:
# - we can't delete inventories with allocations; and
# - we can't create allocations on nonexistent inventories.
# So in the first step we create a kind of "union" inventory for each
# provider. It contains all the inventories that the request wishes to
# exist in the end, PLUS any inventories that the request wished to remove
# (in their original form).
# Note that this can cause us to end up with an interim situation where we
# have modified an inventory to have less capacity than is currently
# allocated, but that's allowed by the code. If the final picture is
# overcommitted, we'll get an appropriate exception when we replace the
# allocations at the end.
for rp_uuid, new_inv_list in inventories.items():
LOG.debug("reshaping: *interim* inventory replacement for provider %s",
rp_uuid)
rp = new_inv_list[0].resource_provider
# A dict, keyed by resource class, of the Inventory objects. We start
# with the original inventory list.
inv_by_rc = {inv.resource_class: inv for inv in
InventoryList.get_all_by_resource_provider(ctx, rp)}
# Now add each inventory in the new inventory list. If an inventory for
# that resource class existed in the original inventory list, it is
# overwritten.
for inv in new_inv_list:
inv_by_rc[inv.resource_class] = inv
# Set the interim inventory structure.
rp.set_inventory(InventoryList(objects=list(inv_by_rc.values())))
affected_providers[rp_uuid] = rp
# NOTE(jaypipes): The above inventory replacements will have
# incremented the resource provider generations, so we need to look in
# the AllocationList and swap the resource provider object with the one we
# saved above that has the updated provider generation in it.
for alloc in allocations:
rp_uuid = alloc.resource_provider.uuid
if rp_uuid in affected_providers:
alloc.resource_provider = affected_providers[rp_uuid]
# Now we can replace all the allocations
LOG.debug("reshaping: attempting allocation replacement")
allocations.replace_all()
# And finally, we can set the inventories to their actual desired state.
for rp_uuid, new_inv_list in inventories.items():
LOG.debug("reshaping: *final* inventory replacement for provider %s",
rp_uuid)
# TODO(efried): If we wanted this to be more efficient, we could keep
# track of providers for which all inventories are being deleted in the
# above loop and just do those and skip the rest, since they're already
# in their final form.
new_inv_list[0].resource_provider.set_inventory(new_inv_list)

View File

@ -0,0 +1,359 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.api.openstack.placement import exception
from nova.api.openstack.placement.objects import consumer as consumer_obj
from nova.api.openstack.placement.objects import resource_provider as rp_obj
from nova.tests.functional.api.openstack.placement.db import test_base as tb
from nova.tests import uuidsentinel as uuids
def alloc_for_rc(alloc_list, rc):
for alloc in alloc_list:
if alloc.resource_class == rc:
return alloc
class ReshapeTestCase(tb.PlacementDbBaseTestCase):
"""Test 'replace the world' reshape transaction."""
def test_reshape(self):
"""We set up the following scenario:
BEFORE: single compute node setup
A single compute node with:
- VCPU, MEMORY_MB, DISK_GB inventory
- Two instances consuming CPU, RAM and DISK from that compute node
AFTER: hierarchical + shared storage setup
A compute node parent provider with:
- MEMORY_MB
Two NUMA node child providers containing:
- VCPU
Shared storage provider with:
- DISK_GB
Both instances have their resources split among the providers and
shared storage accordingly
"""
# First create our consumers
i1_uuid = uuids.instance1
i1_consumer = consumer_obj.Consumer(
self.ctx, uuid=i1_uuid, user=self.user_obj,
project=self.project_obj)
i1_consumer.create()
i2_uuid = uuids.instance2
i2_consumer = consumer_obj.Consumer(
self.ctx, uuid=i2_uuid, user=self.user_obj,
project=self.project_obj)
i2_consumer.create()
cn1 = self._create_provider('cn1')
tb.add_inventory(cn1, 'VCPU', 16)
tb.add_inventory(cn1, 'MEMORY_MB', 32768)
tb.add_inventory(cn1, 'DISK_GB', 1000)
# Allocate both instances against the single compute node
for consumer in (i1_consumer, i2_consumer):
allocs = [
rp_obj.Allocation(
self.ctx, resource_provider=cn1,
resource_class='VCPU', consumer=consumer, used=2),
rp_obj.Allocation(
self.ctx, resource_provider=cn1,
resource_class='MEMORY_MB', consumer=consumer, used=1024),
rp_obj.Allocation(
self.ctx, resource_provider=cn1,
resource_class='DISK_GB', consumer=consumer, used=100),
]
alloc_list = rp_obj.AllocationList(self.ctx, objects=allocs)
alloc_list.replace_all()
# Verify we have the allocations we expect for the BEFORE scenario
before_allocs_i1 = rp_obj.AllocationList.get_all_by_consumer_id(
self.ctx, i1_uuid)
self.assertEqual(3, len(before_allocs_i1))
self.assertEqual(cn1.uuid, before_allocs_i1[0].resource_provider.uuid)
before_allocs_i2 = rp_obj.AllocationList.get_all_by_consumer_id(
self.ctx, i2_uuid)
self.assertEqual(3, len(before_allocs_i2))
self.assertEqual(cn1.uuid, before_allocs_i2[2].resource_provider.uuid)
# Before we issue the actual reshape() call, we need to first create
# the child providers and sharing storage provider. These are actions
# that the virt driver or external agent is responsible for performing
# *before* attempting any reshape activity.
cn1_numa0 = self._create_provider('cn1_numa0', parent=cn1.uuid)
cn1_numa1 = self._create_provider('cn1_numa1', parent=cn1.uuid)
ss = self._create_provider('ss')
# OK, now emulate the call to POST /reshaper that will be triggered by
# a virt driver wanting to replace the world and change its modeling
# from a single provider to a nested provider tree along with a sharing
# storage provider.
after_inventories = {
# cn1 keeps the RAM only
cn1.uuid: rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=cn1,
resource_class='MEMORY_MB', total=32768, reserved=0,
max_unit=32768, min_unit=1, step_size=1,
allocation_ratio=1.0),
]),
# each NUMA node gets half of the CPUs
cn1_numa0.uuid: rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=cn1_numa0,
resource_class='VCPU', total=8, reserved=0,
max_unit=8, min_unit=1, step_size=1,
allocation_ratio=1.0),
]),
cn1_numa1.uuid: rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=cn1_numa1,
resource_class='VCPU', total=8, reserved=0,
max_unit=8, min_unit=1, step_size=1,
allocation_ratio=1.0),
]),
# The sharing provider gets a bunch of disk
ss.uuid: rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=ss,
resource_class='DISK_GB', total=100000, reserved=0,
max_unit=1000, min_unit=1, step_size=1,
allocation_ratio=1.0),
]),
}
# We do a fetch from the DB for each instance to get its latest
# generation. This would be done by the resource tracker or scheduler
# report client before issuing the call to reshape() because the
# consumers representing the two instances above will have had their
# generations incremented in the original call to PUT
# /allocations/{consumer_uuid}
i1_consumer = consumer_obj.Consumer.get_by_uuid(self.ctx, i1_uuid)
i2_consumer = consumer_obj.Consumer.get_by_uuid(self.ctx, i2_uuid)
after_allocs = rp_obj.AllocationList(self.ctx, objects=[
# instance1 gets VCPU from NUMA0, MEMORY_MB from cn1 and DISK_GB
# from the sharing storage provider
rp_obj.Allocation(
self.ctx, resource_provider=cn1_numa0, resource_class='VCPU',
consumer=i1_consumer, used=2),
rp_obj.Allocation(
self.ctx, resource_provider=cn1, resource_class='MEMORY_MB',
consumer=i1_consumer, used=1024),
rp_obj.Allocation(
self.ctx, resource_provider=ss, resource_class='DISK_GB',
consumer=i1_consumer, used=100),
# instance2 gets VCPU from NUMA1, MEMORY_MB from cn1 and DISK_GB
# from the sharing storage provider
rp_obj.Allocation(
self.ctx, resource_provider=cn1_numa1, resource_class='VCPU',
consumer=i2_consumer, used=2),
rp_obj.Allocation(
self.ctx, resource_provider=cn1, resource_class='MEMORY_MB',
consumer=i2_consumer, used=1024),
rp_obj.Allocation(
self.ctx, resource_provider=ss, resource_class='DISK_GB',
consumer=i2_consumer, used=100),
])
rp_obj.reshape(self.ctx, after_inventories, after_allocs)
# Verify that the inventories have been moved to the appropriate
# providers in the AFTER scenario
# The root compute node should only have MEMORY_MB, nothing else
cn1_inv = rp_obj.InventoryList.get_all_by_resource_provider(
self.ctx, cn1)
self.assertEqual(1, len(cn1_inv))
self.assertEqual('MEMORY_MB', cn1_inv[0].resource_class)
self.assertEqual(32768, cn1_inv[0].total)
# Each NUMA node should only have half the original VCPU, nothing else
numa0_inv = rp_obj.InventoryList.get_all_by_resource_provider(
self.ctx, cn1_numa0)
self.assertEqual(1, len(numa0_inv))
self.assertEqual('VCPU', numa0_inv[0].resource_class)
self.assertEqual(8, numa0_inv[0].total)
numa1_inv = rp_obj.InventoryList.get_all_by_resource_provider(
self.ctx, cn1_numa1)
self.assertEqual(1, len(numa1_inv))
self.assertEqual('VCPU', numa1_inv[0].resource_class)
self.assertEqual(8, numa1_inv[0].total)
# The sharing storage provider should only have DISK_GB, nothing else
ss_inv = rp_obj.InventoryList.get_all_by_resource_provider(
self.ctx, ss)
self.assertEqual(1, len(ss_inv))
self.assertEqual('DISK_GB', ss_inv[0].resource_class)
self.assertEqual(100000, ss_inv[0].total)
# Verify we have the allocations we expect for the AFTER scenario
after_allocs_i1 = rp_obj.AllocationList.get_all_by_consumer_id(
self.ctx, i1_uuid)
self.assertEqual(3, len(after_allocs_i1))
# Our VCPU allocation should be in the NUMA0 node
vcpu_alloc = alloc_for_rc(after_allocs_i1, 'VCPU')
self.assertIsNotNone(vcpu_alloc)
self.assertEqual(cn1_numa0.uuid, vcpu_alloc.resource_provider.uuid)
# Our DISK_GB allocation should be in the sharing provider
disk_alloc = alloc_for_rc(after_allocs_i1, 'DISK_GB')
self.assertIsNotNone(disk_alloc)
self.assertEqual(ss.uuid, disk_alloc.resource_provider.uuid)
# And our MEMORY_MB should remain on the root compute node
ram_alloc = alloc_for_rc(after_allocs_i1, 'MEMORY_MB')
self.assertIsNotNone(ram_alloc)
self.assertEqual(cn1.uuid, ram_alloc.resource_provider.uuid)
after_allocs_i2 = rp_obj.AllocationList.get_all_by_consumer_id(
self.ctx, i2_uuid)
self.assertEqual(3, len(after_allocs_i2))
# Our VCPU allocation should be in the NUMA1 node
vcpu_alloc = alloc_for_rc(after_allocs_i2, 'VCPU')
self.assertIsNotNone(vcpu_alloc)
self.assertEqual(cn1_numa1.uuid, vcpu_alloc.resource_provider.uuid)
# Our DISK_GB allocation should be in the sharing provider
disk_alloc = alloc_for_rc(after_allocs_i2, 'DISK_GB')
self.assertIsNotNone(disk_alloc)
self.assertEqual(ss.uuid, disk_alloc.resource_provider.uuid)
# And our MEMORY_MB should remain on the root compute node
ram_alloc = alloc_for_rc(after_allocs_i2, 'MEMORY_MB')
self.assertIsNotNone(ram_alloc)
self.assertEqual(cn1.uuid, ram_alloc.resource_provider.uuid)
def test_reshape_concurrent_inventory_update(self):
"""Valid failure scenario for reshape(). We test a situation where the
virt driver has constructed it's "after inventories and allocations"
and sent those to the POST /reshape endpoint. The reshape POST handler
does a quick check of the resource provider generations sent in the
payload and they all check out.
However, right before the call to resource_provider.reshape(), another
thread legitimately changes the inventory of one of the providers
involved in the reshape transaction. We should get a
ConcurrentUpdateDetected in this case.
"""
# First create our consumers
i1_uuid = uuids.instance1
i1_consumer = consumer_obj.Consumer(
self.ctx, uuid=i1_uuid, user=self.user_obj,
project=self.project_obj)
i1_consumer.create()
# then all our original providers
cn1 = self._create_provider('cn1')
tb.add_inventory(cn1, 'VCPU', 16)
tb.add_inventory(cn1, 'MEMORY_MB', 32768)
tb.add_inventory(cn1, 'DISK_GB', 1000)
# Allocate an instance on our compute node
allocs = [
rp_obj.Allocation(
self.ctx, resource_provider=cn1,
resource_class='VCPU', consumer=i1_consumer, used=2),
rp_obj.Allocation(
self.ctx, resource_provider=cn1,
resource_class='MEMORY_MB', consumer=i1_consumer, used=1024),
rp_obj.Allocation(
self.ctx, resource_provider=cn1,
resource_class='DISK_GB', consumer=i1_consumer, used=100),
]
alloc_list = rp_obj.AllocationList(self.ctx, objects=allocs)
alloc_list.replace_all()
# Before we issue the actual reshape() call, we need to first create
# the child providers and sharing storage provider. These are actions
# that the virt driver or external agent is responsible for performing
# *before* attempting any reshape activity.
cn1_numa0 = self._create_provider('cn1_numa0', parent=cn1.uuid)
cn1_numa1 = self._create_provider('cn1_numa1', parent=cn1.uuid)
ss = self._create_provider('ss')
# OK, now emulate the call to POST /reshaper that will be triggered by
# a virt driver wanting to replace the world and change its modeling
# from a single provider to a nested provider tree along with a sharing
# storage provider.
after_inventories = {
# cn1 keeps the RAM only
cn1.uuid: rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=cn1,
resource_class='MEMORY_MB', total=32768, reserved=0,
max_unit=32768, min_unit=1, step_size=1,
allocation_ratio=1.0),
]),
# each NUMA node gets half of the CPUs
cn1_numa0.uuid: rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=cn1_numa0,
resource_class='VCPU', total=8, reserved=0,
max_unit=8, min_unit=1, step_size=1,
allocation_ratio=1.0),
]),
cn1_numa1.uuid: rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=cn1_numa1,
resource_class='VCPU', total=8, reserved=0,
max_unit=8, min_unit=1, step_size=1,
allocation_ratio=1.0),
]),
# The sharing provider gets a bunch of disk
ss.uuid: rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=ss,
resource_class='DISK_GB', total=100000, reserved=0,
max_unit=1000, min_unit=1, step_size=1,
allocation_ratio=1.0),
]),
}
# We do a fetch from the DB for each instance to get its latest
# generation. This would be done by the resource tracker or scheduler
# report client before issuing the call to reshape() because the
# consumers representing the two instances above will have had their
# generations incremented in the original call to PUT
# /allocations/{consumer_uuid}
i1_consumer = consumer_obj.Consumer.get_by_uuid(self.ctx, i1_uuid)
after_allocs = rp_obj.AllocationList(self.ctx, objects=[
# instance1 gets VCPU from NUMA0, MEMORY_MB from cn1 and DISK_GB
# from the sharing storage provider
rp_obj.Allocation(
self.ctx, resource_provider=cn1_numa0, resource_class='VCPU',
consumer=i1_consumer, used=2),
rp_obj.Allocation(
self.ctx, resource_provider=cn1, resource_class='MEMORY_MB',
consumer=i1_consumer, used=1024),
rp_obj.Allocation(
self.ctx, resource_provider=ss, resource_class='DISK_GB',
consumer=i1_consumer, used=100),
])
# OK, now before we call reshape(), here we emulate another thread
# changing the inventory for the sharing storage provider in between
# the time in the REST handler when the sharing storage provider's
# generation was validated and the actual call to reshape()
ss_threadB = rp_obj.ResourceProvider.get_by_uuid(self.ctx, ss.uuid)
# Reduce the amount of storage to 2000, from 100000.
new_ss_inv = rp_obj.InventoryList(self.ctx, objects=[
rp_obj.Inventory(
self.ctx, resource_provider=ss_threadB,
resource_class='DISK_GB', total=2000, reserved=0,
max_unit=1000, min_unit=1, step_size=1,
allocation_ratio=1.0)])
ss_threadB.set_inventory(new_ss_inv)
# Double check our storage provider's generation is now greater than
# the original storage provider record being sent to reshape()
self.assertGreater(ss_threadB.generation, ss.generation)
# And we should legitimately get a failure now to reshape() due to
# another thread updating one of the involved provider's generations
self.assertRaises(
exception.ConcurrentUpdateDetected,
rp_obj.reshape, self.ctx, after_inventories, after_allocs)