rebuild: RPC sends additional args and claims are done

If a migration record was created for an evacuation operation, we will
want to pass it over to the compute node. It will be required for
resource tracking, and we can avoid a round trip back to the conductor
by making it a part of the RPC call.

Also - if we called the scheduler, we also know the node of the chosen
host, and the limits set, and this information is needed for doing claims.

We also make sure that rebuild now uses claims and resource tracking.

There is a final piece of the puzzle missing, and that is the following
patch which actually makes resource tracker update it's resources based
on the evacuation records, and adds the relevant tests.

Related-bug: #1417667
Change-Id: I0233f964d8f294f0ffd9edcb16b1aaf93486177f
This commit is contained in:
Nikola Dipanov 2015-08-19 17:55:49 +01:00
parent ff412c3888
commit dc0221d724
10 changed files with 294 additions and 50 deletions

View File

@ -33,8 +33,8 @@ LOG = logging.getLogger(__name__)
class NopClaim(object):
"""For use with compute drivers that do not support resource tracking."""
def __init__(self, migration=None):
self.migration = migration
def __init__(self, *args, **kwargs):
self.migration = kwargs.pop('migration', None)
self.claimed_numa_topology = None
@property

View File

@ -57,6 +57,7 @@ from nova.cells import rpcapi as cells_rpcapi
from nova.cloudpipe import pipelib
from nova import compute
from nova.compute import build_results
from nova.compute import claims
from nova.compute import power_state
from nova.compute import resource_tracker
from nova.compute import rpcapi as compute_rpcapi
@ -665,7 +666,7 @@ class ComputeVirtAPI(virtapi.VirtAPI):
class ComputeManager(manager.Manager):
"""Manages the running instances from creation to destruction."""
target = messaging.Target(version='4.4')
target = messaging.Target(version='4.5')
# How long to wait in seconds before re-issuing a shutdown
# signal to an instance during power off. The overall
@ -2606,7 +2607,8 @@ class ComputeManager(manager.Manager):
def rebuild_instance(self, context, instance, orig_image_ref, image_ref,
injected_files, new_pass, orig_sys_metadata,
bdms, recreate, on_shared_storage=None,
preserve_ephemeral=False):
preserve_ephemeral=False, migration=None,
scheduled_node=None, limits=None):
"""Destroy and re-make this instance.
A 'rebuild' effectively purges all existing data from the system and
@ -2629,18 +2631,92 @@ class ComputeManager(manager.Manager):
files are available or not on the target host
:param preserve_ephemeral: True if the default ephemeral storage
partition must be preserved on rebuild
:param migration: a Migration object if one was created for this
rebuild operation (if it's a part of evacaute)
:param scheduled_node: A node of the host chosen by the scheduler. If a
host was specified by the user, this will be
None
:param limits: Overcommit limits set by the scheduler. If a host was
specified by the user, this will be None
"""
context = context.elevated()
LOG.info(_LI("Rebuilding instance"), context=context,
instance=instance)
if scheduled_node is not None:
rt = self._get_resource_tracker(scheduled_node)
rebuild_claim = rt.rebuild_claim
else:
rebuild_claim = claims.NopClaim
image_meta = {}
if image_ref:
image_meta = self.image_api.get(context, image_ref)
# NOTE(mriedem): On a recreate (evacuate), we need to update
# the instance's host and node properties to reflect it's
# destination node for the recreate.
if not scheduled_node:
try:
compute_node = self._get_compute_info(context, self.host)
scheduled_node = compute_node.hypervisor_hostname
except exception.ComputeHostNotFound:
LOG.exception(_LE('Failed to get compute_info for %s'),
self.host)
def _fail_migration(migration):
if migration:
migration.status = 'failed'
migration.save()
with self._error_out_instance_on_exception(context, instance):
self._do_rebuild_instance(context, instance, orig_image_ref,
image_ref, injected_files, new_pass,
orig_sys_metadata, bdms, recreate,
on_shared_storage,
preserve_ephemeral)
try:
claim_ctxt = rebuild_claim(
context, instance, limits=limits, image_meta=image_meta,
migration=migration)
self._do_rebuild_instance_with_claim(
claim_ctxt, context, instance, orig_image_ref,
image_ref, injected_files, new_pass, orig_sys_metadata,
bdms, recreate, on_shared_storage, preserve_ephemeral)
except exception.ComputeResourcesUnavailable as e:
LOG.debug("Could not rebuild instance on this host, not "
"enough resources available.", instance=instance)
# NOTE(ndipanov): We just abort the build for now and leave a
# migration record for potential cleanup later
_fail_migration(migration)
self._notify_about_instance_usage(context, instance,
'rebuild.error', fault=e)
raise exception.BuildAbortException(
instance_uuid=instance.uuid, reason=e.format_message())
except Exception as e:
_fail_migration(migration)
self._notify_about_instance_usage(context, instance,
'rebuild.error', fault=e)
raise
else:
instance.apply_migration_context()
# NOTE (ndipanov): This save will now update the host and node
# attributes making sure that next RT pass is consistent since
# it will be based on the instance and not the migration DB
# entry.
instance.host = self.host
instance.node = scheduled_node
instance.save()
instance.drop_migration_context()
# NOTE (ndipanov): Mark the migration as done only after we
# mark the instance as belonging to this host.
if migration:
migration.status = 'done'
migration.save()
def _do_rebuild_instance_with_claim(self, claim_context, *args, **kwargs):
"""Helper to avoid deep nesting in the top-level method."""
with claim_context:
self._do_rebuild_instance(*args, **kwargs)
def _do_rebuild_instance(self, context, instance, orig_image_ref,
image_ref, injected_files, new_pass,
@ -2676,21 +2752,6 @@ class ComputeManager(manager.Manager):
LOG.info(_LI("disk not on shared storage, rebuilding from:"
" '%s'"), str(image_ref))
# NOTE(mriedem): On a recreate (evacuate), we need to update
# the instance's host and node properties to reflect it's
# destination node for the recreate.
node_name = None
try:
compute_node = self._get_compute_info(context, self.host)
node_name = compute_node.hypervisor_hostname
except exception.ComputeHostNotFound:
LOG.exception(_LE('Failed to get compute_info for %s'),
self.host)
finally:
instance.host = self.host
instance.node = node_name
instance.save()
if image_ref:
image_meta = self.image_api.get(context, image_ref)
else:

View File

@ -188,6 +188,15 @@ class ResourceTracker(object):
return claim
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def rebuild_claim(self, context, instance, limits=None, image_meta=None,
migration=None):
"""Create a claim for a rebuild operation."""
instance_type = instance.flavor
return self._move_claim(context, instance, instance_type,
move_type='evacuation', limits=limits,
image_meta=image_meta, migration=migration)
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def resize_claim(self, context, instance, instance_type,
image_meta=None, limits=None):

View File

@ -1,5 +1,4 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -301,6 +300,8 @@ class ComputeAPI(object):
* 4.2 - Add migration argument to live_migration()
* 4.3 - Added get_mks_console method
* 4.4 - Make refresh_instance_security_rules send an instance object
* 4.5 - Add migration, scheduler_node and limits arguments to
rebuild_instance()
'''
VERSION_ALIASES = {
@ -606,12 +607,21 @@ class ComputeAPI(object):
def rebuild_instance(self, ctxt, instance, new_pass, injected_files,
image_ref, orig_image_ref, orig_sys_metadata, bdms,
recreate=False, on_shared_storage=False, host=None,
preserve_ephemeral=False, kwargs=None):
recreate=False, on_shared_storage=False, host=None, node=None,
preserve_ephemeral=False, migration=None, limits=None,
kwargs=None):
# NOTE(danms): kwargs is only here for cells compatibility, don't
# actually send it to compute
extra = {'preserve_ephemeral': preserve_ephemeral}
version = '4.0'
extra = {'preserve_ephemeral': preserve_ephemeral,
'migration': migration,
'scheduled_node': node,
'limits': limits}
version = '4.5'
if not self.client.can_send_version(version):
version = '4.0'
extra.pop('migration')
extra.pop('scheduled_node')
extra.pop('limits')
cctxt = self.client.prepare(server=_compute_host(host, instance),
version=version)
cctxt.cast(ctxt, 'rebuild_instance',

View File

@ -859,6 +859,7 @@ class ComputeTaskManager(base.Base):
with compute_utils.EventReporter(context, 'rebuild_server',
instance.uuid):
node = limits = None
if not host:
# NOTE(lcostantino): Retrieve scheduler filters for the
# instance when the feature is available
@ -872,7 +873,10 @@ class ComputeTaskManager(base.Base):
hosts = self.scheduler_client.select_destinations(context,
request_spec,
filter_properties)
host = hosts.pop(0)['host']
host_dict = hosts.pop(0)
host, node, limits = (host_dict['host'],
host_dict['nodename'],
host_dict['limits'])
except exception.NoValidHost as ex:
with excutils.save_and_reraise_exception():
self._set_vm_state_and_notify(context, instance.uuid,
@ -891,6 +895,14 @@ class ComputeTaskManager(base.Base):
"cannot be rebuilt"),
instance=instance)
try:
migration = objects.Migration.get_by_instance_and_status(
context, instance.uuid, 'accepted')
except exception.MigrationNotFoundByStatus:
LOG.debug("No migration record for the rebuild/evacuate "
"request.", instance=instance)
migration = None
compute_utils.notify_about_instance_usage(
self.notifier, context, instance, "rebuild.scheduled")
@ -905,7 +917,8 @@ class ComputeTaskManager(base.Base):
recreate=recreate,
on_shared_storage=on_shared_storage,
preserve_ephemeral=preserve_ephemeral,
host=host)
migration=migration,
host=host, node=node, limits=limits)
class _ConductorManagerV3Proxy(object):

View File

@ -27,7 +27,7 @@ LOG = logging.getLogger(__name__)
# NOTE(danms): This is the global service version counter
SERVICE_VERSION = 1
SERVICE_VERSION = 2
# NOTE(danms): This is our SERVICE_VERSION history. The idea is that any
@ -53,6 +53,8 @@ SERVICE_VERSION_HISTORY = (
# Version 1: Introduction of SERVICE_VERSION
{'compute_rpc': '4.4'},
# Version 2: Changes to rebuild_instance signature in the compute_rpc
{'compute_rpc': '4.5'},
)

View File

@ -11179,12 +11179,18 @@ class EvacuateHostTestCase(BaseTestCase):
db.instance_destroy(self.context, self.inst.uuid)
super(EvacuateHostTestCase, self).tearDown()
def _rebuild(self, on_shared_storage=True):
def _rebuild(self, on_shared_storage=True, migration=None,
send_node=False):
network_api = self.compute.network_api
ctxt = context.get_admin_context()
mock_context = mock.Mock()
mock_context.elevated.return_value = ctxt
node = limits = None
if send_node:
node = NODENAME
limits = {}
@mock.patch.object(network_api, 'setup_networks_on_host')
@mock.patch.object(network_api, 'setup_instance_network_on_host')
def _test_rebuild(mock_setup_instance_network_on_host,
@ -11197,7 +11203,8 @@ class EvacuateHostTestCase(BaseTestCase):
self.compute.rebuild_instance(
mock_context, self.inst, orig_image_ref,
image_ref, injected_files, 'newpass', {}, bdms, recreate=True,
on_shared_storage=on_shared_storage)
on_shared_storage=on_shared_storage, migration=migration,
scheduled_node=node, limits=limits)
mock_setup_networks_on_host.assert_called_once_with(
ctxt, self.inst, self.inst.host)
mock_setup_instance_network_on_host.assert_called_once_with(
@ -11244,6 +11251,19 @@ class EvacuateHostTestCase(BaseTestCase):
self.assertEqual(instance['host'], self.compute.host)
self.assertIsNone(instance['node'])
def test_rebuild_on_host_node_passed(self):
patch_get_info = mock.patch.object(self.compute, '_get_compute_info')
patch_on_disk = mock.patch.object(
self.compute.driver, 'instance_on_disk', return_value=True)
with patch_get_info as get_compute_info, patch_on_disk:
self._rebuild(send_node=True)
self.assertEqual(0, get_compute_info.call_count)
# Should be on destination host and node set to what was passed in
instance = db.instance_get(self.context, self.inst.id)
self.assertEqual(instance['host'], self.compute.host)
self.assertEqual(instance['node'], NODENAME)
def test_rebuild_with_instance_in_stopped_state(self):
"""Confirm evacuate scenario updates vm_state to stopped
if instance is in stopped state
@ -11413,6 +11433,79 @@ class EvacuateHostTestCase(BaseTestCase):
self._rebuild(on_shared_storage=None)
def test_rebuild_migration_passed_in(self):
migration = mock.Mock(spec=objects.Migration)
patch_spawn = mock.patch.object(self.compute.driver, 'spawn')
patch_on_disk = mock.patch.object(
self.compute.driver, 'instance_on_disk', return_value=True)
with patch_spawn, patch_on_disk:
self._rebuild(migration=migration)
self.assertEqual('done', migration.status)
migration.save.assert_called_once_with()
def test_rebuild_migration_node_passed_in(self):
patch_spawn = mock.patch.object(self.compute.driver, 'spawn')
patch_on_disk = mock.patch.object(
self.compute.driver, 'instance_on_disk', return_value=True)
with patch_spawn, patch_on_disk:
self._rebuild(send_node=True)
migrations = objects.MigrationList.get_in_progress_by_host_and_node(
self.context, self.compute.host, NODENAME)
self.assertEqual(1, len(migrations))
migration = migrations[0]
self.assertEqual("evacuation", migration.migration_type)
self.assertEqual("pre-migrating", migration.status)
def test_rebuild_migration_claim_fails(self):
migration = mock.Mock(spec=objects.Migration)
patch_spawn = mock.patch.object(self.compute.driver, 'spawn')
patch_on_disk = mock.patch.object(
self.compute.driver, 'instance_on_disk', return_value=True)
patch_claim = mock.patch.object(
self.compute._resource_tracker_dict[NODENAME], 'rebuild_claim',
side_effect=exception.ComputeResourcesUnavailable(reason="boom"))
with patch_spawn, patch_on_disk, patch_claim:
self.assertRaises(exception.BuildAbortException,
self._rebuild, migration=migration,
send_node=True)
self.assertEqual("failed", migration.status)
migration.save.assert_called_once_with()
def test_rebuild_fails_migration_failed(self):
migration = mock.Mock(spec=objects.Migration)
patch_spawn = mock.patch.object(self.compute.driver, 'spawn')
patch_on_disk = mock.patch.object(
self.compute.driver, 'instance_on_disk', return_value=True)
patch_claim = mock.patch.object(
self.compute._resource_tracker_dict[NODENAME], 'rebuild_claim')
patch_rebuild = mock.patch.object(
self.compute, '_do_rebuild_instance_with_claim',
side_effect=test.TestingException())
with patch_spawn, patch_on_disk, patch_claim, patch_rebuild:
self.assertRaises(test.TestingException,
self._rebuild, migration=migration,
send_node=True)
self.assertEqual("failed", migration.status)
migration.save.assert_called_once_with()
def test_rebuild_numa_migration_context_honoured(self):
numa_topology = (
test_instance_numa_topology.get_fake_obj_numa_topology(
self.context))
self.inst.numa_topology = numa_topology
patch_spawn = mock.patch.object(self.compute.driver, 'spawn')
patch_on_disk = mock.patch.object(
self.compute.driver, 'instance_on_disk', return_value=True)
with patch_spawn, patch_on_disk:
self._rebuild(send_node=True)
self.assertIsNone(self.inst.numa_topology)
self.assertIsNone(self.inst.migration_context)
class ComputeInjectedFilesTestCase(BaseTestCase):
# Test that running instances with injected_files decodes files correctly

View File

@ -85,6 +85,9 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
else:
host = kwargs['instance']['host']
if method == 'rebuild_instance' and 'node' in expected_kwargs:
expected_kwargs['scheduled_node'] = expected_kwargs.pop('node')
with contextlib.nested(
mock.patch.object(rpcapi.client, rpc_method),
mock.patch.object(rpcapi.client, 'prepare'),
@ -301,6 +304,15 @@ class ComputeRpcAPITestCase(test.NoDBTestCase):
reboot_type='type')
def test_rebuild_instance(self):
self._test_compute_api('rebuild_instance', 'cast', new_pass='None',
injected_files='None', image_ref='None', orig_image_ref='None',
bdms=[], instance=self.fake_instance_obj, host='new_host',
orig_sys_metadata=None, recreate=True, on_shared_storage=True,
preserve_ephemeral=True, migration=None, node=None,
limits=None, version='4.5')
def test_rebuild_instance_downgrade(self):
self.flags(group='upgrade_levels', compute='4.0')
self._test_compute_api('rebuild_instance', 'cast', new_pass='None',
injected_files='None', image_ref='None', orig_image_ref='None',
bdms=[], instance=self.fake_instance_obj, host='new_host',

View File

@ -16,6 +16,7 @@
"""Tests for the conductor service."""
import contextlib
import copy
import uuid
import mock
@ -949,6 +950,11 @@ class _BaseTaskTestCase(object):
fake_deserialize_context)
def _prepare_rebuild_args(self, update_args=None):
# Args that don't get passed in to the method but do get passed to RPC
migration = update_args and update_args.pop('migration', None)
node = update_args and update_args.pop('node', None)
limits = update_args and update_args.pop('limits', None)
rebuild_args = {'new_pass': 'admin_password',
'injected_files': 'files_to_inject',
'image_ref': 'image_ref',
@ -961,7 +967,11 @@ class _BaseTaskTestCase(object):
'host': 'compute-host'}
if update_args:
rebuild_args.update(update_args)
return rebuild_args
compute_rebuild_args = copy.deepcopy(rebuild_args)
compute_rebuild_args['migration'] = migration
compute_rebuild_args['node'] = node
compute_rebuild_args['limits'] = limits
return rebuild_args, compute_rebuild_args
@mock.patch('nova.objects.Migration')
def test_live_migrate(self, migobj):
@ -1397,7 +1407,8 @@ class _BaseTaskTestCase(object):
def test_rebuild_instance(self):
inst_obj = self._create_fake_instance_obj()
rebuild_args = self._prepare_rebuild_args({'host': inst_obj.host})
rebuild_args, compute_args = self._prepare_rebuild_args(
{'host': inst_obj.host})
with contextlib.nested(
mock.patch.object(self.conductor_manager.compute_rpcapi,
@ -1411,13 +1422,16 @@ class _BaseTaskTestCase(object):
self.assertFalse(select_dest_mock.called)
rebuild_mock.assert_called_once_with(self.context,
instance=inst_obj,
**rebuild_args)
**compute_args)
def test_rebuild_instance_with_scheduler(self):
inst_obj = self._create_fake_instance_obj()
inst_obj.host = 'noselect'
rebuild_args = self._prepare_rebuild_args({'host': None})
expected_host = 'thebesthost'
expected_node = 'thebestnode'
expected_limits = 'fake-limits'
rebuild_args, compute_args = self._prepare_rebuild_args(
{'host': None, 'node': expected_node, 'limits': expected_limits})
request_spec = {}
filter_properties = {'ignore_hosts': [(inst_obj.host)]}
@ -1428,7 +1442,9 @@ class _BaseTaskTestCase(object):
return_value=False),
mock.patch.object(self.conductor_manager.scheduler_client,
'select_destinations',
return_value=[{'host': expected_host}]),
return_value=[{'host': expected_host,
'nodename': expected_node,
'limits': expected_limits}]),
mock.patch('nova.scheduler.utils.build_request_spec',
return_value=request_spec)
) as (rebuild_mock, sig_mock, select_dest_mock, bs_mock):
@ -1438,17 +1454,17 @@ class _BaseTaskTestCase(object):
select_dest_mock.assert_called_once_with(self.context,
request_spec,
filter_properties)
rebuild_args['host'] = expected_host
compute_args['host'] = expected_host
rebuild_mock.assert_called_once_with(self.context,
instance=inst_obj,
**rebuild_args)
**compute_args)
self.assertEqual('compute.instance.rebuild.scheduled',
fake_notifier.NOTIFICATIONS[0].event_type)
def test_rebuild_instance_with_scheduler_no_host(self):
inst_obj = self._create_fake_instance_obj()
inst_obj.host = 'noselect'
rebuild_args = self._prepare_rebuild_args({'host': None})
rebuild_args, _ = self._prepare_rebuild_args({'host': None})
request_spec = {}
filter_properties = {'ignore_hosts': [(inst_obj.host)]}
@ -1489,7 +1505,7 @@ class _BaseTaskTestCase(object):
rebuild_mock,
spawn_mock):
inst_obj = self._create_fake_instance_obj()
rebuild_args = self._prepare_rebuild_args({'host': None})
rebuild_args, _ = self._prepare_rebuild_args({'host': None})
request_spec = {}
bs_mock.return_value = request_spec
@ -1515,6 +1531,33 @@ class _BaseTaskTestCase(object):
self.assertFalse(select_dest_mock.called)
self.assertFalse(rebuild_mock.called)
def test_rebuild_instance_evacuate_migration_record(self):
inst_obj = self._create_fake_instance_obj()
migration = objects.Migration(context=self.context,
source_compute=inst_obj.host,
source_node=inst_obj.node,
instance_uuid=inst_obj.uuid,
status='accepted',
migration_type='evacuation')
rebuild_args, compute_args = self._prepare_rebuild_args(
{'host': inst_obj.host, 'migration': migration})
with contextlib.nested(
mock.patch.object(self.conductor_manager.compute_rpcapi,
'rebuild_instance'),
mock.patch.object(self.conductor_manager.scheduler_client,
'select_destinations'),
mock.patch.object(objects.Migration, 'get_by_instance_and_status',
return_value=migration)
) as (rebuild_mock, select_dest_mock, get_migration_mock):
self.conductor_manager.rebuild_instance(context=self.context,
instance=inst_obj,
**rebuild_args)
self.assertFalse(select_dest_mock.called)
rebuild_mock.assert_called_once_with(self.context,
instance=inst_obj,
**compute_args)
class ConductorTaskTestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
"""ComputeTaskManager Tests."""

View File

@ -111,8 +111,8 @@ class _TestServiceObject(object):
def test_create(self):
self.mox.StubOutWithMock(db, 'service_create')
db.service_create(self.context, {'host': 'fake-host',
'version': 1}).AndReturn(
fake_service)
'version': fake_service['version']}
).AndReturn(fake_service)
self.mox.ReplayAll()
service_obj = service.Service(context=self.context)
service_obj.host = 'fake-host'
@ -123,8 +123,8 @@ class _TestServiceObject(object):
def test_recreate_fails(self):
self.mox.StubOutWithMock(db, 'service_create')
db.service_create(self.context, {'host': 'fake-host',
'version': 1}).AndReturn(
fake_service)
'version': fake_service['version']}
).AndReturn(fake_service)
self.mox.ReplayAll()
service_obj = service.Service(context=self.context)
service_obj.host = 'fake-host'
@ -133,9 +133,10 @@ class _TestServiceObject(object):
def test_save(self):
self.mox.StubOutWithMock(db, 'service_update')
db.service_update(self.context, 123, {'host': 'fake-host',
'version': 1}).AndReturn(
fake_service)
db.service_update(self.context, 123,
{'host': 'fake-host',
'version': fake_service['version']}
).AndReturn(fake_service)
self.mox.ReplayAll()
service_obj = service.Service(context=self.context)
service_obj.id = 123