Add external_instance_event() method to compute manager

This would allow external services to send notifications in to
the compute manager, which could have one or more threads waiting
to be unblocked. It would also provide a way for us to process
generic events like "update your instance cache for $instance".

Related to blueprint admin-event-callback-api

Change-Id: If87cc7fdb1ebcfa2a1d33f5e864c43ed0290beb6
This commit is contained in:
Dan Smith 2014-02-18 16:11:45 -08:00
parent 8aa55829c5
commit a7b5b975a4
8 changed files with 302 additions and 1 deletions

View File

@ -3027,6 +3027,30 @@ class API(base.Base):
self.compute_rpcapi.volume_snapshot_delete(context, bdm.instance,
volume_id, snapshot_id, delete_info)
def external_instance_event(self, context, instances, events):
# NOTE(danms): The external API consumer just provides events,
# but doesn't know where they go. We need to collate lists
# by the host the affected instance is on and dispatch them
# according to host
instances_by_host = {}
events_by_host = {}
hosts_by_instance = {}
for instance in instances:
instances_on_host = instances_by_host.get(instance.host, [])
instances_on_host.append(instance)
instances_by_host[instance.host] = instances_on_host
hosts_by_instance[instance.uuid] = instance.host
for event in events:
host = hosts_by_instance[event.instance_uuid]
events_on_host = events_by_host.get(host, [])
events_on_host.append(event)
events_by_host[host] = events_on_host
for host in instances_by_host:
self.compute_rpcapi.external_instance_event(
context, instances_by_host[host], events_by_host[host])
class HostAPI(base.Base):
"""Sub-set of the Compute Manager API for managing host operations."""

View File

@ -34,6 +34,7 @@ import time
import traceback
import uuid
import eventlet.event
from eventlet import greenthread
from oslo.config import cfg
from oslo import messaging
@ -384,6 +385,75 @@ def _get_image_meta(context, image_ref):
return image_service.show(context, image_id)
class InstanceEvents(object):
def __init__(self):
self._events = {}
@staticmethod
def _lock_name(instance):
return '%s-%s' % (instance.uuid, 'events')
def prepare_for_instance_event(self, instance, event_name):
"""Prepare to receive an event for an instance.
This will register an event for the given instance that we will
wait on later. This should be called before initiating whatever
action will trigger the event. The resulting eventlet.event.Event
object should be wait()'d on to ensure completion.
:param instance: the instance for which the event will be generated
:param event_name: the name of the event we're expecting
:returns: an event object that should be wait()'d on
"""
@utils.synchronized(self._lock_name)
def _create_or_get_event():
if instance.uuid not in self._events:
self._events.setdefault(instance.uuid, {})
return self._events[instance.uuid].setdefault(
event_name, eventlet.event.Event())
LOG.debug(_('Preparing to wait for external event %(event)s '
'for instance %(uuid)s'), {'event': event_name,
'uuid': instance.uuid})
return _create_or_get_event()
def pop_instance_event(self, instance, event):
"""Remove a pending event from the wait list.
This will remove a pending event from the wait list so that it
can be used to signal the waiters to wake up.
:param instance: the instance for which the event was generated
:param event: the nova.objects.external_event.InstanceExternalEvent
that describes the event
:returns: the eventlet.event.Event object on which the waiters
are blocked
"""
@utils.synchronized(self._lock_name)
def _pop_event():
events = self._events.get(instance.uuid)
if not events:
return None
_event = events.pop(event.key, None)
if not events:
del self._events[instance.uuid]
return _event
return _pop_event()
def clear_events_for_instance(self, instance):
"""Remove all pending events for an instance.
This will remove all events currently pending for an instance
and return them (indexed by event name).
:param instance: the instance for which events should be purged
:returns: a dictionary of {event_name: eventlet.event.Event}
"""
@utils.synchronized(self._lock_name)
def _clear_events():
return self._events.pop(instance.uuid, {})
return _clear_events()
class ComputeVirtAPI(virtapi.VirtAPI):
def __init__(self, compute):
super(ComputeVirtAPI, self).__init__()
@ -411,7 +481,7 @@ class ComputeVirtAPI(virtapi.VirtAPI):
class ComputeManager(manager.Manager):
"""Manages the running instances from creation to destruction."""
target = messaging.Target(version='3.22')
target = messaging.Target(version='3.23')
def __init__(self, compute_driver=None, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
@ -434,6 +504,7 @@ class ComputeManager(manager.Manager):
self.cells_rpcapi = cells_rpcapi.CellsAPI()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self._resource_tracker_dict = {}
self.instance_events = InstanceEvents()
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
@ -5302,6 +5373,27 @@ class ComputeManager(manager.Manager):
aggregate, host,
isinstance(e, exception.AggregateError))
def _process_instance_event(self, instance, event):
_event = self.instance_events.pop_instance_event(instance, event)
if _event:
LOG.debug(_('Processing event %(event)s'),
{'event': event.key, 'instance': instance})
_event.send(event)
@wrap_exception()
def external_instance_event(self, context, instances, events):
# NOTE(danms): Some event types are handled by the manager, such
# as when we're asked to update the instance's info_cache. If it's
# not one of those, look for some thread(s) waiting for the event and
# unblock them if so.
for event in events:
instance = [inst for inst in instances
if inst.uuid == event.instance_uuid][0]
if event.name == 'network-changed':
self.network_api.get_instance_nw_info(context, instance)
else:
self._process_instance_event(instance, event)
@periodic_task.periodic_task(spacing=CONF.image_cache_manager_interval,
external_process_ok=True)
def _run_image_cache_manager_pass(self, context):

View File

@ -240,6 +240,7 @@ class ComputeAPI(object):
3.20 - Make restore_instance take an instance object
3.21 - Made rebuild take new-world BDM objects
3.22 - Made terminate_instance take new-world BDM objects
3.23 - Added external_instance_event()
'''
VERSION_ALIASES = {
@ -960,6 +961,13 @@ class ComputeAPI(object):
volume_id=volume_id, snapshot_id=snapshot_id,
delete_info=delete_info)
def external_instance_event(self, ctxt, instances, events):
cctxt = self.client.prepare(
server=_compute_host(None, instances[0]),
version='3.23')
cctxt.cast(ctxt, 'external_instance_event', instances=instances,
events=events)
class SecurityGroupAPI(object):
'''Client side of the security group rpc API.

View File

@ -0,0 +1,53 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.objects import base as obj_base
from nova.objects import fields
EVENT_NAMES = [
# Network has changed for this instance, rebuild info_cache
'network-changed',
# VIF plugging notifications, tag is port_id
'network-vif-plugged',
'network-vif-unplugged',
]
EVENT_STATUSES = ['failed', 'completed', 'in-progress']
class InstanceExternalEvent(obj_base.NovaObject):
# Version 1.0: Initial version
# Supports network-changed and vif-plugged
VERSION = '1.0'
fields = {
'instance_uuid': fields.UUIDField(),
'name': fields.StringField(),
'status': fields.StringField(),
'tag': fields.StringField(nullable=True),
'data': fields.DictOfStringsField(),
}
@staticmethod
def make_key(name, tag=None):
if tag is not None:
return '%s-%s' % (name, tag)
else:
return name
@property
def key(self):
return self.make_key(self.name, self.tag)

View File

@ -31,6 +31,7 @@ from nova import db
from nova import exception
from nova.objects import base as obj_base
from nova.objects import block_device as block_device_obj
from nova.objects import external_event as external_event_obj
from nova.objects import instance as instance_obj
from nova.objects import instance_info_cache
from nova.objects import migration as migration_obj
@ -1712,6 +1713,25 @@ class _ComputeAPIUnitTestMixIn(object):
self.assertEqual(instance.task_state, task_states.RESTORING)
self.assertEqual(1, quota_commit.call_count)
def test_external_instance_event(self):
instances = [
instance_obj.Instance(uuid='uuid1', host='host1'),
instance_obj.Instance(uuid='uuid2', host='host1'),
instance_obj.Instance(uuid='uuid3', host='host2'),
]
events = [
external_event_obj.InstanceExternalEvent(instance_uuid='uuid1'),
external_event_obj.InstanceExternalEvent(instance_uuid='uuid2'),
external_event_obj.InstanceExternalEvent(instance_uuid='uuid3'),
]
self.compute_api.compute_rpcapi = mock.MagicMock()
self.compute_api.external_instance_event(self.context,
instances, events)
method = self.compute_api.compute_rpcapi.external_instance_event
method.assert_any_call(self.context, instances[0:2], events[0:2])
method.assert_any_call(self.context, instances[2:], events[2:])
self.assertEqual(2, method.call_count)
class ComputeAPIUnitTestCase(_ComputeAPIUnitTestMixIn, test.NoDBTestCase):
def setUp(self):

View File

@ -15,6 +15,7 @@
import contextlib
import time
from eventlet import event as eventlet_event
import mock
import mox
from oslo.config import cfg
@ -30,6 +31,7 @@ from nova import exception
from nova.network import model as network_model
from nova.objects import base as obj_base
from nova.objects import block_device as block_device_obj
from nova.objects import external_event as external_event_obj
from nova.objects import instance as instance_obj
from nova.openstack.common import importutils
from nova.openstack.common import uuidutils
@ -858,6 +860,54 @@ class ComputeManagerUnitTestCase(test.NoDBTestCase):
self._test_check_can_live_migrate_destination,
do_raise=True)
def test_prepare_for_instance_event(self):
inst_obj = instance_obj.Instance(uuid='foo')
result = self.compute.instance_events.prepare_for_instance_event(
inst_obj, 'test-event')
self.assertIn('foo', self.compute.instance_events._events)
self.assertIn('test-event',
self.compute.instance_events._events['foo'])
self.assertEqual(
result,
self.compute.instance_events._events['foo']['test-event'])
self.assertTrue(hasattr(result, 'send'))
def test_process_instance_event(self):
event = eventlet_event.Event()
self.compute.instance_events._events = {
'foo': {
'test-event': event,
}
}
inst_obj = instance_obj.Instance(uuid='foo')
event_obj = external_event_obj.InstanceExternalEvent(name='test-event',
tag=None)
self.compute._process_instance_event(inst_obj, event_obj)
self.assertTrue(event.ready())
self.assertEqual(event_obj, event.wait())
self.assertEqual({}, self.compute.instance_events._events)
def test_external_instance_event(self):
instances = [
instance_obj.Instance(uuid='uuid1'),
instance_obj.Instance(uuid='uuid2')]
events = [
external_event_obj.InstanceExternalEvent(name='network-changed',
instance_uuid='uuid1'),
external_event_obj.InstanceExternalEvent(name='foo',
instance_uuid='uuid2')]
@mock.patch.object(self.compute.network_api, 'get_instance_nw_info')
@mock.patch.object(self.compute, '_process_instance_event')
def do_test(_process_instance_event, get_instance_nw_info):
self.compute.external_instance_event(self.context,
instances, events)
get_instance_nw_info.assert_called_once_with(self.context,
instances[0])
_process_instance_event.assert_called_once_with(instances[1],
events[1])
do_test()
class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
def setUp(self):

View File

@ -75,6 +75,8 @@ class ComputeRpcAPITestCase(test.TestCase):
host = kwargs['host']
elif 'destination' in kwargs:
host = kwargs['destination']
elif 'instances' in kwargs:
host = kwargs['instances'][0]['host']
else:
host = kwargs['instance']['host']
@ -808,3 +810,9 @@ class ComputeRpcAPITestCase(test.TestCase):
self._test_compute_api('volume_snapshot_delete', 'cast',
instance=self.fake_instance, volume_id='fake_id',
snapshot_id='fake_id2', delete_info={}, version='2.44')
def test_external_instance_event(self):
self._test_compute_api('external_instance_event', 'cast',
instances=[self.fake_instance],
events=['event'],
version='3.23')

View File

@ -0,0 +1,46 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nova.objects import external_event as external_event_obj
from nova.tests.objects import test_objects
class _TestInstanceExternalEventObject(object):
def test_make_key(self):
key = external_event_obj.InstanceExternalEvent.make_key('foo', 'bar')
self.assertEqual('foo-bar', key)
def test_make_key_no_tag(self):
key = external_event_obj.InstanceExternalEvent.make_key('foo')
self.assertEqual('foo', key)
def test_key(self):
event = external_event_obj.InstanceExternalEvent(name='foo',
tag='bar')
with mock.patch.object(event, 'make_key') as make_key:
make_key.return_value = 'key'
self.assertEqual('key', event.key)
make_key.assert_called_once_with('foo', 'bar')
class TestInstanceExternalEventObject(test_objects._LocalTest,
_TestInstanceExternalEventObject):
pass
class TestRemoteInstanceExternalEventObject(test_objects._RemoteTest,
_TestInstanceExternalEventObject):
pass