Update allocations for failed over instances

This change ensures that we're updating the allocations stored in
the placement service when handling instance failovers.

For proper scheduling, the placement service needs to have an
accurate view of the resource providers.

We're using the 1.12 (Queens) Placement API version, making this patch
easy to backport.

Change-Id: Ie155ab280609ccff310c82c68dcdcbe3c4fc7f95
Closes-Bug: #1800437
This commit is contained in:
Lucian Petrut 2018-10-29 11:21:02 +02:00
parent 4608d199a4
commit 635963186a
6 changed files with 286 additions and 0 deletions

View File

@ -36,6 +36,7 @@ import compute_hyperv.nova.conf
from compute_hyperv.nova import coordination
from compute_hyperv.nova import hostops
from compute_hyperv.nova import serialconsoleops
from compute_hyperv.nova.utils import placement as placement_utils
from compute_hyperv.nova import vmops
LOG = logging.getLogger(__name__)
@ -56,6 +57,7 @@ class ClusterOps(object):
self._network_api = network.API()
self._vmops = vmops.VMOps()
self._serial_console_ops = serialconsoleops.SerialConsoleOps()
self._placement = placement_utils.PlacementUtils()
def get_instance_host(self, instance):
return self._clustutils.get_vm_host(instance.name)
@ -179,6 +181,13 @@ class ClusterOps(object):
self._nova_failover_server(instance, new_host)
if host_changed:
self._failover_migrate_networks(instance, old_host)
try:
self._placement.move_compute_node_allocations(
self._context, instance, old_host, new_host,
merge_existing=False)
except Exception:
LOG.exception("Could not update failed over instance '%s' "
"allocations.", instance)
self._vmops.plug_vifs(instance, nw_info)
self._serial_console_ops.start_console_handler(instance_name)

View File

View File

@ -0,0 +1,110 @@
# Copyright 2018 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import exception
from nova import objects
from nova.scheduler.client import report
from nova.scheduler import utils as scheduler_utils
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
SYMMETRIC_GET_PUT_ALLOCATIONS = "1.12" # Queens
class PlacementUtils(object):
def __init__(self):
self.reportclient = report.SchedulerReportClient()
def move_compute_node_allocations(self, context, instance, old_host,
new_host, merge_existing=True):
LOG.info("Moving instance allocations from compute node %s to %s.",
old_host, new_host, instance=instance)
cn_uuid = objects.ComputeNode.get_by_host_and_nodename(
context, old_host, old_host).uuid
new_cn_uuid = objects.ComputeNode.get_by_host_and_nodename(
context, new_host, new_host).uuid
self.move_allocations(context, instance.uuid, cn_uuid,
new_cn_uuid,
merge_existing=merge_existing)
@report.retries
def move_allocations(self, context, consumer_uuid, old_rp_uuid,
new_rp_uuid, merge_existing=True):
allocs = self._get_allocs_for_consumer(
context, consumer_uuid,
version=SYMMETRIC_GET_PUT_ALLOCATIONS)
allocations = allocs['allocations']
if old_rp_uuid == new_rp_uuid:
LOG.debug("Requested to move allocations to the "
"same provider: %s.", old_rp_uuid)
return
if old_rp_uuid not in allocations:
LOG.warning("Expected to find allocations referencing resource "
"provider %s for %s, but found none.",
old_rp_uuid, consumer_uuid)
return
if merge_existing and new_rp_uuid in allocations:
LOG.info("Merging existing allocations for consumer %s on "
"provider %s: %s.",
consumer_uuid, new_rp_uuid, allocations)
scheduler_utils.merge_resources(
allocations[new_rp_uuid]['resources'],
allocations[old_rp_uuid]['resources'])
else:
if new_rp_uuid in allocations:
LOG.info("Replacing existing allocations for consumer %s "
"on provider %s: %s",
consumer_uuid, new_rp_uuid, allocations)
allocations[new_rp_uuid] = allocations[old_rp_uuid]
del allocations[old_rp_uuid]
self._put_allocs(context, consumer_uuid, allocs,
version=SYMMETRIC_GET_PUT_ALLOCATIONS)
def _put_allocs(self, context, consumer_uuid, allocations, version=None):
url = '/allocations/%s' % consumer_uuid
r = self.reportclient.put(url, allocations,
version=version)
if r.status_code != 204:
errors = r.json().get('errors') or []
# NOTE(jaypipes): Yes, it sucks doing string comparison like this
# but we have no error codes, only error messages.
# TODO(gibi): Use more granular error codes when available
for err in errors:
if err.get('code') == 'placement.concurrent_update':
reason = (
'another process changed the resource providers '
'involved in our attempt to put allocations for '
'consumer %s' % consumer_uuid)
raise report.Retry('put_allocations', reason)
raise exception.AllocationUpdateFailed(
consumer_uuid=consumer_uuid, error=errors)
def _get_allocs_for_consumer(self, context, consumer, version=None):
resp = self.reportclient.get('/allocations/%s' % consumer,
version=version)
if not resp:
# TODO(efried): Use code/title/detail to make a better exception
raise exception.ConsumerAllocationRetrievalFailed(
consumer_uuid=consumer, error=resp.text)
return resp.json()

View File

@ -18,6 +18,7 @@ import mock
from nova.compute import power_state
from nova.compute import task_states
from nova.compute import vm_states
from nova import exception
from nova.network.neutronv2 import api as network_api
from nova import objects
from nova.virt import event as virtevent
@ -41,6 +42,7 @@ class ClusterOpsTestCase(test_base.HyperVBaseTestCase):
network_api.API,
clusterops.vmops.VMOps,
clusterops.serialconsoleops.SerialConsoleOps,
clusterops.placement_utils.PlacementUtils,
]
_FAKE_INSTANCE_NAME = 'fake_instance_name'
@ -51,8 +53,10 @@ class ClusterOpsTestCase(test_base.HyperVBaseTestCase):
self.clusterops = clusterops.ClusterOps()
self.clusterops._context = self.context
self._clustutils = self.clusterops._clustutils
self._network_api = self.clusterops._network_api
self._placement = self.clusterops._placement
def test_get_instance_host(self):
mock_instance = fake_instance.fake_instance_obj(self.context)
@ -219,6 +223,9 @@ class ClusterOpsTestCase(test_base.HyperVBaseTestCase):
instance.host = old_host
self.clusterops._this_node = new_host
self._clustutils.get_vm_host.return_value = new_host
# Placement exceptions shouldn't break the rest of the failover logic.
self._placement.move_compute_node_allocations.side_effect = (
exception.NovaException)
self.clusterops._failover_migrate(mock.sentinel.instance_name,
new_host)
@ -235,6 +242,9 @@ class ClusterOpsTestCase(test_base.HyperVBaseTestCase):
mock_nova_failover_server.assert_called_once_with(instance, new_host)
mock_failover_migrate_networks.assert_called_once_with(
instance, old_host)
self._placement.move_compute_node_allocations.assert_called_once_with(
self.clusterops._context, instance, old_host, new_host,
merge_existing=False)
self.clusterops._vmops.plug_vifs.assert_called_once_with(
instance, get_inst_nw_info.return_value)
c_handler = self.clusterops._serial_console_ops.start_console_handler
@ -268,6 +278,7 @@ class ClusterOpsTestCase(test_base.HyperVBaseTestCase):
self.clusterops._vmops.unplug_vifs.assert_not_called()
self.clusterops._vmops.plug_vifs.assert_called_once_with(
instance, get_inst_nw_info.return_value)
self._placement.move_compute_node_allocations.assert_not_called()
mock_failover_migrate_networks.assert_not_called()
c_handler = self.clusterops._serial_console_ops.start_console_handler
c_handler.assert_called_once_with(mock.sentinel.instance_name)

View File

@ -0,0 +1,156 @@
# Copyright 2018 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from nova import context
from nova import exception
from nova import objects
from nova.tests.unit import fake_requests
from oslo_serialization import jsonutils
from compute_hyperv.nova.utils import placement as placement
from compute_hyperv.tests import fake_instance
from compute_hyperv.tests.unit import test_base
@ddt.ddt
class PlacementUtilsTestCase(test_base.HyperVBaseTestCase):
_autospec_classes = [
placement.report.SchedulerReportClient
]
_FAKE_PROVIDER = 'fdb5c6d0-e0e9-4411-b952-fb05d6133718'
_FAKE_RESOURCES = {'VCPU': 1, 'MEMORY_MB': 512, 'DISK_GB': 1}
_FAKE_ALLOCATIONS = {
_FAKE_PROVIDER: {'resources': _FAKE_RESOURCES}
}
def setUp(self):
super(PlacementUtilsTestCase, self).setUp()
self.context = context.get_admin_context()
self.instance = fake_instance.fake_instance_obj(self.context)
self.placement = placement.PlacementUtils()
self.client = self.placement.reportclient
@mock.patch.object(objects.ComputeNode, 'get_by_host_and_nodename')
@mock.patch.object(placement.PlacementUtils, 'move_allocations')
def test_move_compute_node_allocations(self, mock_move_alloc,
mock_get_comp_node):
mock_get_comp_node.side_effect = [
mock.Mock(uuid=uuid) for uuid in [mock.sentinel.old_host_uuid,
mock.sentinel.new_host_uuid]]
self.placement.move_compute_node_allocations(
self.context, self.instance, mock.sentinel.old_host,
mock.sentinel.new_host,
merge_existing=mock.sentinel.merge_existing)
mock_move_alloc.assert_called_once_with(
self.context, self.instance.uuid,
mock.sentinel.old_host_uuid,
mock.sentinel.new_host_uuid,
merge_existing=mock.sentinel.merge_existing)
mock_get_comp_node.assert_has_calls(
mock.call(self.context, host, host) for host in
[mock.sentinel.old_host, mock.sentinel.new_host])
@ddt.data({}, # provider did not change
{'old_rp': 'fake_rp'}) # provider not included in allocations
@ddt.unpack
@mock.patch.object(placement.PlacementUtils, '_get_allocs_for_consumer')
@mock.patch.object(placement.PlacementUtils, '_put_allocs')
def test_move_allocations_noop(self, mock_put, mock_get_allocs,
old_rp=_FAKE_PROVIDER,
new_rp=_FAKE_PROVIDER):
mock_get_allocs.return_value = {'allocations': self._FAKE_ALLOCATIONS}
self.placement.move_allocations(
self.context, mock.sentinel.consumer, old_rp, new_rp)
mock_get_allocs.assert_called_once_with(
self.context, mock.sentinel.consumer,
version=placement.SYMMETRIC_GET_PUT_ALLOCATIONS)
mock_put.assert_not_called()
@ddt.data(True, False)
@mock.patch.object(placement.PlacementUtils, '_get_allocs_for_consumer')
@mock.patch.object(placement.PlacementUtils, '_put_allocs')
def test_merge_allocations(self, merge_existing,
mock_put, mock_get_allocs):
old_rp = self._FAKE_PROVIDER
new_rp = 'new_rp'
allocs = self._FAKE_ALLOCATIONS.copy()
allocs[new_rp] = {'resources': self._FAKE_RESOURCES.copy()}
mock_get_allocs.return_value = {'allocations': allocs}
if merge_existing:
exp_resources = {'VCPU': 2, 'MEMORY_MB': 1024, 'DISK_GB': 2}
else:
exp_resources = self._FAKE_RESOURCES
exp_allocs = {new_rp: {'resources': exp_resources}}
self.placement.move_allocations(
self.context, mock.sentinel.consumer, old_rp, new_rp,
merge_existing=merge_existing)
mock_put.assert_called_once_with(
self.context, mock.sentinel.consumer,
{'allocations': exp_allocs},
version=placement.SYMMETRIC_GET_PUT_ALLOCATIONS)
@ddt.data({}, # no errors
{'status_code': 409,
'errors': [{'code': 'placement.concurrent_update'}],
'expected_exc': placement.report.Retry},
{'status_code': 500,
'expected_exc': exception.AllocationUpdateFailed})
@ddt.unpack
def test_put_allocs(self, status_code=204, expected_exc=None, errors=None):
response = fake_requests.FakeResponse(
status_code,
content=jsonutils.dumps({'errors': errors}))
self.client.put.return_value = response
args = (self.context, mock.sentinel.consumer, mock.sentinel.allocs,
mock.sentinel.version)
if expected_exc:
self.assertRaises(expected_exc, self.placement._put_allocs, *args)
else:
self.placement._put_allocs(*args)
self.client.put.assert_called_once_with(
'/allocations/%s' % mock.sentinel.consumer,
mock.sentinel.allocs,
version=mock.sentinel.version)
def test_get_allocs(self):
ret_val = self.placement._get_allocs_for_consumer(
self.context, mock.sentinel.consumer, mock.sentinel.version)
exp_val = self.client.get.return_value.json.return_value
self.assertEqual(exp_val, ret_val)
self.client.get.assert_called_once_with(
'/allocations/%s' % mock.sentinel.consumer,
version=mock.sentinel.version)
def test_get_allocs_missing(self):
self.client.get.return_value = fake_requests.FakeResponse(500)
self.assertRaises(
exception.ConsumerAllocationRetrievalFailed,
self.placement._get_allocs_for_consumer,
self.context, mock.sentinel.consumer, mock.sentinel.version)