Make compute manager's virtapi support waiting for events
This makes compute manager's virtapi expose a context manager that lets virt drivers do something that doesn't complete until an external event is triggered. Related to blueprint admin-event-callback-api Change-Id: I00b2821200f89a04f27963153805280c0063fd57
This commit is contained in:
parent
388de275cb
commit
c51db0fa70
|
@ -36,6 +36,7 @@ import uuid
|
|||
|
||||
import eventlet.event
|
||||
from eventlet import greenthread
|
||||
import eventlet.timeout
|
||||
from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
|
||||
|
@ -63,6 +64,7 @@ from nova.network.security_group import openstack_driver
|
|||
from nova.objects import aggregate as aggregate_obj
|
||||
from nova.objects import base as obj_base
|
||||
from nova.objects import block_device as block_device_obj
|
||||
from nova.objects import external_event as external_event_obj
|
||||
from nova.objects import flavor as flavor_obj
|
||||
from nova.objects import instance as instance_obj
|
||||
from nova.objects import migration as migration_obj
|
||||
|
@ -477,6 +479,63 @@ class ComputeVirtAPI(virtapi.VirtAPI):
|
|||
return capi.block_device_mapping_get_all_by_instance(context, instance,
|
||||
legacy=legacy)
|
||||
|
||||
def _default_error_callback(self, event_name, instance):
|
||||
raise exception.NovaException(_('Instance event failed'))
|
||||
|
||||
@contextlib.contextmanager
|
||||
def wait_for_instance_event(self, instance, event_names, deadline=300,
|
||||
error_callback=None):
|
||||
"""Plan to wait for some events, run some code, then wait.
|
||||
|
||||
This context manager will first create plans to wait for the
|
||||
provided event_names, yield, and then wait for all the scheduled
|
||||
events to complete.
|
||||
|
||||
Note that this uses an eventlet.timeout.Timeout to bound the
|
||||
operation, so callers should be prepared to catch that
|
||||
failure and handle that situation appropriately.
|
||||
|
||||
If the event is not received by the specified timeout deadline,
|
||||
eventlet.timeout.Timeout is raised.
|
||||
|
||||
If the event is received but did not have a 'completed'
|
||||
status, a NovaException is raised. If an error_callback is
|
||||
provided, instead of raising an exception as detailed above
|
||||
for the failure case, the callback will be called with the
|
||||
event_name and instance, and can return True to continue
|
||||
waiting for the rest of the events, False to stop processing,
|
||||
or raise an exception which will bubble up to the waiter.
|
||||
|
||||
:param:instance: The instance for which an event is expected
|
||||
:param:event_names: A list of event names. Each element can be a
|
||||
string event name or tuple of strings to
|
||||
indicate (name, tag).
|
||||
:param:deadline: Maximum number of seconds we should wait for all
|
||||
of the specified events to arrive.
|
||||
:param:error_callback: A function to be called if an event arrives
|
||||
"""
|
||||
|
||||
if error_callback is None:
|
||||
error_callback = self._default_error_callback
|
||||
events = {}
|
||||
for event_name in event_names:
|
||||
if isinstance(event_name, tuple):
|
||||
name, tag = event_name
|
||||
event_name = external_event_obj.InstanceExternalEvent.make_key(
|
||||
name, tag)
|
||||
events[event_name] = (
|
||||
self._compute.instance_events.prepare_for_instance_event(
|
||||
instance, event_name))
|
||||
yield
|
||||
with eventlet.timeout.Timeout(deadline):
|
||||
for event_name, event in events.items():
|
||||
actual_event = event.wait()
|
||||
if actual_event.status == 'completed':
|
||||
continue
|
||||
decision = error_callback(event_name, instance)
|
||||
if decision is False:
|
||||
break
|
||||
|
||||
|
||||
class ComputeManager(manager.Manager):
|
||||
"""Manages the running instances from creation to destruction."""
|
||||
|
|
|
@ -12,11 +12,14 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import mox
|
||||
|
||||
from nova.compute import manager as compute_manager
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.objects import external_event as external_event_obj
|
||||
from nova import test
|
||||
from nova.virt import fake
|
||||
from nova.virt import virtapi
|
||||
|
@ -54,6 +57,10 @@ class VirtAPIBaseTest(test.NoDBTestCase, test.APICoverage):
|
|||
self.assertExpected('block_device_mapping_get_all_by_instance',
|
||||
{'uuid': 'fake_uuid'}, legacy=False)
|
||||
|
||||
def test_wait_for_instance_event(self):
|
||||
self.assertExpected('wait_for_instance_event',
|
||||
'instance', ['event'])
|
||||
|
||||
|
||||
class FakeVirtAPITest(VirtAPIBaseTest):
|
||||
|
||||
|
@ -63,6 +70,13 @@ class FakeVirtAPITest(VirtAPIBaseTest):
|
|||
self.virtapi = fake.FakeVirtAPI()
|
||||
|
||||
def assertExpected(self, method, *args, **kwargs):
|
||||
if method == 'wait_for_instance_event':
|
||||
run = False
|
||||
with self.virtapi.wait_for_instance_event(*args, **kwargs):
|
||||
run = True
|
||||
self.assertTrue(run)
|
||||
return
|
||||
|
||||
if method == 'instance_update':
|
||||
# NOTE(danms): instance_update actually becomes the other variant
|
||||
# in FakeVirtAPI
|
||||
|
@ -98,12 +112,29 @@ class FakeCompute(object):
|
|||
def __init__(self):
|
||||
self.conductor_api = mox.MockAnything()
|
||||
self.db = mox.MockAnything()
|
||||
self._events = []
|
||||
self.instance_events = mock.MagicMock()
|
||||
self.instance_events.prepare_for_instance_event.side_effect = \
|
||||
self._prepare_for_instance_event
|
||||
|
||||
def _instance_update(self, context, instance_uuid, **kwargs):
|
||||
# NOTE(danms): Fake this behavior from compute/manager::ComputeManager
|
||||
return self.conductor_api.instance_update(context,
|
||||
instance_uuid, kwargs)
|
||||
|
||||
def _event_waiter(self):
|
||||
event = mock.MagicMock()
|
||||
event.status = 'completed'
|
||||
return event
|
||||
|
||||
def _prepare_for_instance_event(self, instance, event_name):
|
||||
m = mock.MagicMock()
|
||||
m.instance = instance
|
||||
m.event_name = event_name
|
||||
m.wait.side_effect = self._event_waiter
|
||||
self._events.append(m)
|
||||
return m
|
||||
|
||||
|
||||
class ComputeVirtAPITest(VirtAPIBaseTest):
|
||||
|
||||
|
@ -120,3 +151,67 @@ class ComputeVirtAPITest(VirtAPIBaseTest):
|
|||
self.mox.ReplayAll()
|
||||
result = getattr(self.virtapi, method)(self.context, *args, **kwargs)
|
||||
self.assertEqual(result, 'it worked')
|
||||
|
||||
def test_wait_for_instance_event(self):
|
||||
and_i_ran = ''
|
||||
event_1_tag = external_event_obj.InstanceExternalEvent.make_key(
|
||||
'event1')
|
||||
event_2_tag = external_event_obj.InstanceExternalEvent.make_key(
|
||||
'event2', 'tag')
|
||||
events = {
|
||||
'event1': event_1_tag,
|
||||
('event2', 'tag'): event_2_tag,
|
||||
}
|
||||
with self.virtapi.wait_for_instance_event('instance', events.keys()):
|
||||
and_i_ran = 'I ran so far a-waa-y'
|
||||
|
||||
self.assertEqual('I ran so far a-waa-y', and_i_ran)
|
||||
self.assertEqual(2, len(self.compute._events))
|
||||
for event in self.compute._events:
|
||||
self.assertEqual('instance', event.instance)
|
||||
self.assertIn(event.event_name, events.values())
|
||||
event.wait.assert_called_once_with()
|
||||
|
||||
def test_wait_for_instance_event_failed(self):
|
||||
def _failer():
|
||||
event = mock.MagicMock()
|
||||
event.status = 'failed'
|
||||
return event
|
||||
|
||||
@mock.patch.object(self.virtapi._compute, '_event_waiter', _failer)
|
||||
def do_test():
|
||||
with self.virtapi.wait_for_instance_event('instance', ['foo']):
|
||||
pass
|
||||
|
||||
self.assertRaises(exception.NovaException, do_test)
|
||||
|
||||
def test_wait_for_instance_event_failed_callback(self):
|
||||
def _failer():
|
||||
event = mock.MagicMock()
|
||||
event.status = 'failed'
|
||||
return event
|
||||
|
||||
@mock.patch.object(self.virtapi._compute, '_event_waiter', _failer)
|
||||
def do_test():
|
||||
callback = mock.MagicMock()
|
||||
with self.virtapi.wait_for_instance_event('instance', ['foo'],
|
||||
error_callback=callback):
|
||||
pass
|
||||
callback.assert_called_with('foo', 'instance')
|
||||
|
||||
do_test()
|
||||
|
||||
def test_wait_for_instance_event_timeout(self):
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
def _failer():
|
||||
raise TestException()
|
||||
|
||||
@mock.patch.object(self.virtapi._compute, '_event_waiter', _failer)
|
||||
@mock.patch('eventlet.timeout.Timeout')
|
||||
def do_test(timeout):
|
||||
with self.virtapi.wait_for_instance_event('instance', ['foo']):
|
||||
pass
|
||||
|
||||
self.assertRaises(TestException, do_test)
|
||||
|
|
|
@ -23,6 +23,8 @@ semantics of real hypervisor connections.
|
|||
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova import block_device
|
||||
|
@ -486,3 +488,10 @@ class FakeVirtAPI(virtapi.VirtAPI):
|
|||
if legacy:
|
||||
bdms = block_device.legacy_mapping(bdms)
|
||||
return bdms
|
||||
|
||||
@contextlib.contextmanager
|
||||
def wait_for_instance_event(self, instance, event_names, deadline=300,
|
||||
error_callback=None):
|
||||
# NOTE(danms): Don't actually wait for any events, just
|
||||
# fall through
|
||||
yield
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
|
||||
|
||||
class VirtAPI(object):
|
||||
def instance_update(self, context, instance_uuid, updates):
|
||||
|
@ -48,3 +50,8 @@ class VirtAPI(object):
|
|||
:param legacy: get bdm info in legacy format (or not)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def wait_for_instance_event(self, instance, event_names, deadline=300,
|
||||
error_callback=None):
|
||||
raise NotImplementedError()
|
||||
|
|
Loading…
Reference in New Issue