From c51db0fa70d0f195d6296e549e0c999aa1bdfea6 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Tue, 18 Feb 2014 19:15:59 -0800 Subject: [PATCH] Make compute manager's virtapi support waiting for events This makes compute manager's virtapi expose a context manager that lets virt drivers do something that doesn't complete until an external event is triggered. Related to blueprint admin-event-callback-api Change-Id: I00b2821200f89a04f27963153805280c0063fd57 --- nova/compute/manager.py | 59 +++++++++++++++++++ nova/tests/compute/test_virtapi.py | 95 ++++++++++++++++++++++++++++++ nova/virt/fake.py | 9 +++ nova/virt/virtapi.py | 7 +++ 4 files changed, 170 insertions(+) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 1714c16bf4df..bc4734924ae6 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -36,6 +36,7 @@ import uuid import eventlet.event from eventlet import greenthread +import eventlet.timeout from oslo.config import cfg from oslo import messaging @@ -63,6 +64,7 @@ from nova.network.security_group import openstack_driver from nova.objects import aggregate as aggregate_obj from nova.objects import base as obj_base from nova.objects import block_device as block_device_obj +from nova.objects import external_event as external_event_obj from nova.objects import flavor as flavor_obj from nova.objects import instance as instance_obj from nova.objects import migration as migration_obj @@ -477,6 +479,63 @@ class ComputeVirtAPI(virtapi.VirtAPI): return capi.block_device_mapping_get_all_by_instance(context, instance, legacy=legacy) + def _default_error_callback(self, event_name, instance): + raise exception.NovaException(_('Instance event failed')) + + @contextlib.contextmanager + def wait_for_instance_event(self, instance, event_names, deadline=300, + error_callback=None): + """Plan to wait for some events, run some code, then wait. + + This context manager will first create plans to wait for the + provided event_names, yield, and then wait for all the scheduled + events to complete. + + Note that this uses an eventlet.timeout.Timeout to bound the + operation, so callers should be prepared to catch that + failure and handle that situation appropriately. + + If the event is not received by the specified timeout deadline, + eventlet.timeout.Timeout is raised. + + If the event is received but did not have a 'completed' + status, a NovaException is raised. If an error_callback is + provided, instead of raising an exception as detailed above + for the failure case, the callback will be called with the + event_name and instance, and can return True to continue + waiting for the rest of the events, False to stop processing, + or raise an exception which will bubble up to the waiter. + + :param:instance: The instance for which an event is expected + :param:event_names: A list of event names. Each element can be a + string event name or tuple of strings to + indicate (name, tag). + :param:deadline: Maximum number of seconds we should wait for all + of the specified events to arrive. + :param:error_callback: A function to be called if an event arrives + """ + + if error_callback is None: + error_callback = self._default_error_callback + events = {} + for event_name in event_names: + if isinstance(event_name, tuple): + name, tag = event_name + event_name = external_event_obj.InstanceExternalEvent.make_key( + name, tag) + events[event_name] = ( + self._compute.instance_events.prepare_for_instance_event( + instance, event_name)) + yield + with eventlet.timeout.Timeout(deadline): + for event_name, event in events.items(): + actual_event = event.wait() + if actual_event.status == 'completed': + continue + decision = error_callback(event_name, instance) + if decision is False: + break + class ComputeManager(manager.Manager): """Manages the running instances from creation to destruction.""" diff --git a/nova/tests/compute/test_virtapi.py b/nova/tests/compute/test_virtapi.py index a4c98767ab25..227b3a590e3c 100644 --- a/nova/tests/compute/test_virtapi.py +++ b/nova/tests/compute/test_virtapi.py @@ -12,11 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. +import mock import mox from nova.compute import manager as compute_manager from nova import context from nova import db +from nova import exception +from nova.objects import external_event as external_event_obj from nova import test from nova.virt import fake from nova.virt import virtapi @@ -54,6 +57,10 @@ class VirtAPIBaseTest(test.NoDBTestCase, test.APICoverage): self.assertExpected('block_device_mapping_get_all_by_instance', {'uuid': 'fake_uuid'}, legacy=False) + def test_wait_for_instance_event(self): + self.assertExpected('wait_for_instance_event', + 'instance', ['event']) + class FakeVirtAPITest(VirtAPIBaseTest): @@ -63,6 +70,13 @@ class FakeVirtAPITest(VirtAPIBaseTest): self.virtapi = fake.FakeVirtAPI() def assertExpected(self, method, *args, **kwargs): + if method == 'wait_for_instance_event': + run = False + with self.virtapi.wait_for_instance_event(*args, **kwargs): + run = True + self.assertTrue(run) + return + if method == 'instance_update': # NOTE(danms): instance_update actually becomes the other variant # in FakeVirtAPI @@ -98,12 +112,29 @@ class FakeCompute(object): def __init__(self): self.conductor_api = mox.MockAnything() self.db = mox.MockAnything() + self._events = [] + self.instance_events = mock.MagicMock() + self.instance_events.prepare_for_instance_event.side_effect = \ + self._prepare_for_instance_event def _instance_update(self, context, instance_uuid, **kwargs): # NOTE(danms): Fake this behavior from compute/manager::ComputeManager return self.conductor_api.instance_update(context, instance_uuid, kwargs) + def _event_waiter(self): + event = mock.MagicMock() + event.status = 'completed' + return event + + def _prepare_for_instance_event(self, instance, event_name): + m = mock.MagicMock() + m.instance = instance + m.event_name = event_name + m.wait.side_effect = self._event_waiter + self._events.append(m) + return m + class ComputeVirtAPITest(VirtAPIBaseTest): @@ -120,3 +151,67 @@ class ComputeVirtAPITest(VirtAPIBaseTest): self.mox.ReplayAll() result = getattr(self.virtapi, method)(self.context, *args, **kwargs) self.assertEqual(result, 'it worked') + + def test_wait_for_instance_event(self): + and_i_ran = '' + event_1_tag = external_event_obj.InstanceExternalEvent.make_key( + 'event1') + event_2_tag = external_event_obj.InstanceExternalEvent.make_key( + 'event2', 'tag') + events = { + 'event1': event_1_tag, + ('event2', 'tag'): event_2_tag, + } + with self.virtapi.wait_for_instance_event('instance', events.keys()): + and_i_ran = 'I ran so far a-waa-y' + + self.assertEqual('I ran so far a-waa-y', and_i_ran) + self.assertEqual(2, len(self.compute._events)) + for event in self.compute._events: + self.assertEqual('instance', event.instance) + self.assertIn(event.event_name, events.values()) + event.wait.assert_called_once_with() + + def test_wait_for_instance_event_failed(self): + def _failer(): + event = mock.MagicMock() + event.status = 'failed' + return event + + @mock.patch.object(self.virtapi._compute, '_event_waiter', _failer) + def do_test(): + with self.virtapi.wait_for_instance_event('instance', ['foo']): + pass + + self.assertRaises(exception.NovaException, do_test) + + def test_wait_for_instance_event_failed_callback(self): + def _failer(): + event = mock.MagicMock() + event.status = 'failed' + return event + + @mock.patch.object(self.virtapi._compute, '_event_waiter', _failer) + def do_test(): + callback = mock.MagicMock() + with self.virtapi.wait_for_instance_event('instance', ['foo'], + error_callback=callback): + pass + callback.assert_called_with('foo', 'instance') + + do_test() + + def test_wait_for_instance_event_timeout(self): + class TestException(Exception): + pass + + def _failer(): + raise TestException() + + @mock.patch.object(self.virtapi._compute, '_event_waiter', _failer) + @mock.patch('eventlet.timeout.Timeout') + def do_test(timeout): + with self.virtapi.wait_for_instance_event('instance', ['foo']): + pass + + self.assertRaises(TestException, do_test) diff --git a/nova/virt/fake.py b/nova/virt/fake.py index a26b87946704..30ca28492887 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -23,6 +23,8 @@ semantics of real hypervisor connections. """ +import contextlib + from oslo.config import cfg from nova import block_device @@ -486,3 +488,10 @@ class FakeVirtAPI(virtapi.VirtAPI): if legacy: bdms = block_device.legacy_mapping(bdms) return bdms + + @contextlib.contextmanager + def wait_for_instance_event(self, instance, event_names, deadline=300, + error_callback=None): + # NOTE(danms): Don't actually wait for any events, just + # fall through + yield diff --git a/nova/virt/virtapi.py b/nova/virt/virtapi.py index 497133c2ce0b..145bc8821b78 100644 --- a/nova/virt/virtapi.py +++ b/nova/virt/virtapi.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib + class VirtAPI(object): def instance_update(self, context, instance_uuid, updates): @@ -48,3 +50,8 @@ class VirtAPI(object): :param legacy: get bdm info in legacy format (or not) """ raise NotImplementedError() + + @contextlib.contextmanager + def wait_for_instance_event(self, instance, event_names, deadline=300, + error_callback=None): + raise NotImplementedError()