Send events to all relevant hosts if migrating

Previously, external events were sent to the instance object's host
field. This patch fixes the external event dispatching to check for
migration. If an instance is being migrated, the source and
destination compute are added to the set of hosts to which the event
is sent.

Change-Id: If00736ab36df4a5a3be4f02b0a550e4bcae77b1b
Closes-bug: 1535918
Closes-bug: 1624052
This commit is contained in:
Artom Lifshitz 2016-10-05 14:37:03 -04:00
parent 7067ca71fb
commit a5b920a197
5 changed files with 91 additions and 21 deletions

View File

@ -65,8 +65,11 @@ class ServerExternalEventsController(wsgi.Controller):
instance = instances.get(event.instance_uuid)
if not instance:
try:
# Load migration_context here in a single DB operation
# because we need it later on
instance = objects.Instance.get_by_uuid(
context, event.instance_uuid)
context, event.instance_uuid,
expected_attrs='migration_context')
instances[event.instance_uuid] = instance
except exception.InstanceNotFound:
LOG.debug('Dropping event %(name)s:%(tag)s for unknown '

View File

@ -20,6 +20,7 @@
networking and storage of VMs, and compute hosts on which they run)."""
import base64
import collections
import copy
import functools
import re
@ -3883,27 +3884,39 @@ class API(base.Base):
# but doesn't know where they go. We need to collate lists
# by the host the affected instance is on and dispatch them
# according to host
instances_by_host = {}
events_by_host = {}
hosts_by_instance = {}
instances_by_host = collections.defaultdict(list)
events_by_host = collections.defaultdict(list)
hosts_by_instance = collections.defaultdict(list)
for instance in instances:
instances_on_host = instances_by_host.get(instance.host, [])
instances_on_host.append(instance)
instances_by_host[instance.host] = instances_on_host
hosts_by_instance[instance.uuid] = instance.host
for host in self._get_relevant_hosts(context, instance):
instances_by_host[host].append(instance)
hosts_by_instance[instance.uuid].append(host)
for event in events:
host = hosts_by_instance[event.instance_uuid]
events_on_host = events_by_host.get(host, [])
events_on_host.append(event)
events_by_host[host] = events_on_host
for host in hosts_by_instance[event.instance_uuid]:
events_by_host[host].append(event)
for host in instances_by_host:
# TODO(salv-orlando): Handle exceptions raised by the rpc api layer
# in order to ensure that a failure in processing events on a host
# will not prevent processing events on other hosts
self.compute_rpcapi.external_instance_event(
context, instances_by_host[host], events_by_host[host])
context, instances_by_host[host], events_by_host[host],
host=host)
def _get_relevant_hosts(self, context, instance):
hosts = set()
hosts.add(instance.host)
if instance.migration_context is not None:
migration_id = instance.migration_context.migration_id
migration = objects.Migration.get_by_id(context, migration_id)
hosts.add(migration.dest_compute)
hosts.add(migration.source_compute)
LOG.debug('Instance %(instance)s is migrating, '
'copying events to all relevant hosts: '
'%(hosts)s', {'instance': instance.uuid,
'hosts': hosts})
return hosts
def get_instance_host_status(self, instance):
if instance.host:

View File

@ -1049,10 +1049,10 @@ class ComputeAPI(object):
volume_id=volume_id, snapshot_id=snapshot_id,
delete_info=delete_info)
def external_instance_event(self, ctxt, instances, events):
def external_instance_event(self, ctxt, instances, events, host=None):
instance = instances[0]
cctxt = self.router.by_instance(ctxt, instance).prepare(
server=_compute_host(None, instance),
server=_compute_host(host, instance),
version='4.0')
cctxt.cast(ctxt, 'external_instance_event', instances=instances,
events=events)

View File

@ -37,7 +37,7 @@ MISSING_UUID = '00000000-0000-0000-0000-000000000005'
@classmethod
def fake_get_by_uuid(cls, context, uuid):
def fake_get_by_uuid(cls, context, uuid, **kwargs):
try:
return fake_instances[uuid]
except KeyError:

View File

@ -3186,9 +3186,12 @@ class _ComputeAPIUnitTestMixIn(object):
def test_external_instance_event(self):
instances = [
objects.Instance(uuid=uuids.instance_1, host='host1'),
objects.Instance(uuid=uuids.instance_2, host='host1'),
objects.Instance(uuid=uuids.instance_3, host='host2'),
objects.Instance(uuid=uuids.instance_1, host='host1',
migration_context=None),
objects.Instance(uuid=uuids.instance_2, host='host1',
migration_context=None),
objects.Instance(uuid=uuids.instance_3, host='host2',
migration_context=None),
]
events = [
objects.InstanceExternalEvent(
@ -3202,10 +3205,61 @@ class _ComputeAPIUnitTestMixIn(object):
self.compute_api.external_instance_event(self.context,
instances, events)
method = self.compute_api.compute_rpcapi.external_instance_event
method.assert_any_call(self.context, instances[0:2], events[0:2])
method.assert_any_call(self.context, instances[2:], events[2:])
method.assert_any_call(self.context, instances[0:2], events[0:2],
host='host1')
method.assert_any_call(self.context, instances[2:], events[2:],
host='host2')
self.assertEqual(2, method.call_count)
def test_external_instance_event_evacuating_instance(self):
# Since we're patching the db's migration_get(), use a dict here so
# that we can validate the id is making its way correctly to the db api
migrations = {}
migrations[42] = {'id': 42, 'source_compute': 'host1',
'dest_compute': 'host2', 'source_node': None,
'dest_node': None, 'dest_host': None,
'old_instance_type_id': None,
'new_instance_type_id': None,
'instance_uuid': uuids.instance_2, 'status': None,
'migration_type': 'evacuation', 'memory_total': None,
'memory_processed': None, 'memory_remaining': None,
'disk_total': None, 'disk_processed': None,
'disk_remaining': None, 'deleted': False,
'hidden': False, 'created_at': None,
'updated_at': None, 'deleted_at': None}
def migration_get(context, id):
return migrations[id]
instances = [
objects.Instance(uuid=uuids.instance_1, host='host1',
migration_context=None),
objects.Instance(uuid=uuids.instance_2, host='host1',
migration_context=objects.MigrationContext(
migration_id=42)),
objects.Instance(uuid=uuids.instance_3, host='host2',
migration_context=None)
]
events = [
objects.InstanceExternalEvent(
instance_uuid=uuids.instance_1),
objects.InstanceExternalEvent(
instance_uuid=uuids.instance_2),
objects.InstanceExternalEvent(
instance_uuid=uuids.instance_3),
]
with mock.patch('nova.db.sqlalchemy.api.migration_get', migration_get):
self.compute_api.compute_rpcapi = mock.MagicMock()
self.compute_api.external_instance_event(self.context,
instances, events)
method = self.compute_api.compute_rpcapi.external_instance_event
method.assert_any_call(self.context, instances[0:2], events[0:2],
host='host1')
method.assert_any_call(self.context, instances[1:], events[1:],
host='host2')
self.assertEqual(2, method.call_count)
def test_volume_ops_invalid_task_state(self):
instance = self._create_instance_obj()
self.assertEqual(instance.vm_state, vm_states.ACTIVE)