diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f673b9c06..6108e9519 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -35,7 +35,7 @@ from stevedore import driver from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._executors import base as executor_base # FIXME(markmc) +from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc) from oslo_messaging._i18n import _, _LE, _LW from oslo_messaging._drivers import pool @@ -1001,7 +1001,7 @@ class ZmqDriver(base.BaseDriver): if not zmq: raise ImportError("Failed to import eventlet.green.zmq") conf.register_opts(zmq_opts) - conf.register_opts(executor_base._pool_opts) + conf.register_opts(impl_pooledexecutor._pool_opts) conf.register_opts(base.base_opts) super(ZmqDriver, self).__init__(conf, url, default_exchange, diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py index 2cc667e27..6fbc15377 100644 --- a/oslo_messaging/_executors/base.py +++ b/oslo_messaging/_executors/base.py @@ -14,19 +14,15 @@ import abc -from oslo_config import cfg import six -_pool_opts = [ - cfg.IntOpt('rpc_thread_pool_size', - default=64, - help='Size of RPC thread pool.'), -] - @six.add_metaclass(abc.ABCMeta) class ExecutorBase(object): + # Executor can override how we run the application callback + _executor_callback = None + def __init__(self, conf, listener, dispatcher): self.conf = conf self.listener = listener @@ -43,11 +39,3 @@ class ExecutorBase(object): @abc.abstractmethod def wait(self): "Wait until the executor has stopped polling." - - -class PooledExecutorBase(ExecutorBase): - """An executor that uses a rpc thread pool of a given size.""" - - def __init__(self, conf, listener, callback): - super(PooledExecutorBase, self).__init__(conf, listener, callback) - self.conf.register_opts(_pool_opts) diff --git a/oslo_messaging/_executors/impl_aioeventlet.py b/oslo_messaging/_executors/impl_aioeventlet.py index d0fc4aa31..8f67d7958 100644 --- a/oslo_messaging/_executors/impl_aioeventlet.py +++ b/oslo_messaging/_executors/impl_aioeventlet.py @@ -70,6 +70,4 @@ class AsyncioEventletExecutor(impl_eventlet.EventletExecutor): result = aioeventlet.yield_future(result, loop=self._loop) return result - def _dispatch(self, incoming): - ctx = self.dispatcher(incoming, self._coroutine_wrapper) - impl_eventlet.spawn_with(ctxt=ctx, pool=self._greenpool) + _executor_callback = _coroutine_wrapper diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py index 733403601..11ad81226 100644 --- a/oslo_messaging/_executors/impl_blocking.py +++ b/oslo_messaging/_executors/impl_blocking.py @@ -13,15 +13,28 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +import futurist -from oslo_messaging._executors import base -from oslo_messaging._i18n import _ - -LOG = logging.getLogger(__name__) +from oslo_messaging._executors import impl_pooledexecutor -class BlockingExecutor(base.ExecutorBase): +class FakeBlockingThread(object): + def __init__(self, target): + self._target = target + + def start(self): + self._target() + + @staticmethod + def join(): + pass + + @staticmethod + def stop(): + pass + + +class BlockingExecutor(impl_pooledexecutor.PooledExecutor): """A message executor which blocks the current thread. @@ -34,24 +47,5 @@ class BlockingExecutor(base.ExecutorBase): for simple demo programs. """ - def __init__(self, conf, listener, dispatcher): - super(BlockingExecutor, self).__init__(conf, listener, dispatcher) - self._running = False - - def start(self): - self._running = True - while self._running: - try: - incoming = self.listener.poll() - if incoming is not None: - with self.dispatcher(incoming) as callback: - callback() - except Exception: - LOG.exception(_("Unexpected exception occurred.")) - - def stop(self): - self._running = False - self.listener.stop() - - def wait(self): - pass + _executor_cls = lambda __, ___: futurist.SynchronousExecutor() + _thread_cls = FakeBlockingThread diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py index 3333fe713..48771edc7 100644 --- a/oslo_messaging/_executors/impl_eventlet.py +++ b/oslo_messaging/_executors/impl_eventlet.py @@ -14,50 +14,17 @@ # under the License. import logging -import sys -import eventlet from eventlet.green import threading as greenthreading -from eventlet import greenpool -import greenlet -from oslo_utils import excutils +import futurist -from oslo_messaging._executors import base +from oslo_messaging._executors import impl_pooledexecutor from oslo_messaging import localcontext LOG = logging.getLogger(__name__) -def spawn_with(ctxt, pool): - """This is the equivalent of a with statement - but with the content of the BLOCK statement executed - into a greenthread - - exception path grab from: - http://www.python.org/dev/peps/pep-0343/ - """ - - def complete(thread, exit): - exc = True - try: - try: - thread.wait() - except Exception: - exc = False - if not exit(*sys.exc_info()): - raise - finally: - if exc: - exit(None, None, None) - - callback = ctxt.__enter__() - thread = pool.spawn(callback) - thread.link(complete, ctxt.__exit__) - - return thread - - -class EventletExecutor(base.PooledExecutorBase): +class EventletExecutor(impl_pooledexecutor.PooledExecutor): """A message executor which integrates with eventlet. @@ -70,10 +37,6 @@ class EventletExecutor(base.PooledExecutorBase): def __init__(self, conf, listener, dispatcher): super(EventletExecutor, self).__init__(conf, listener, dispatcher) - self._thread = None - self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size) - self._running = False - if not isinstance(localcontext._STORE, greenthreading.local): LOG.debug('eventlet executor in use but the threading module ' 'has not been monkeypatched or has been ' @@ -82,39 +45,7 @@ class EventletExecutor(base.PooledExecutorBase): 'behavior. In the future, we will raise a ' 'RuntimeException in this case.') - def _dispatch(self, incoming): - spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool) - - def start(self): - if self._thread is not None: - return - - @excutils.forever_retry_uncaught_exceptions - def _executor_thread(): - try: - while self._running: - incoming = self.listener.poll() - if incoming is not None: - self._dispatch(incoming) - except greenlet.GreenletExit: - return - - self._running = True - self._thread = eventlet.spawn(_executor_thread) - - def stop(self): - if self._thread is None: - return - self._running = False - self.listener.stop() - self._thread.cancel() - - def wait(self): - if self._thread is None: - return - self._greenpool.waitall() - try: - self._thread.wait() - except greenlet.GreenletExit: - pass - self._thread = None + _executor_cls = futurist.GreenThreadPoolExecutor + _lock_cls = greenthreading.Lock + _event_cls = greenthreading.Event + _thread_cls = greenthreading.Thread diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py new file mode 100644 index 000000000..7689bd14f --- /dev/null +++ b/oslo_messaging/_executors/impl_pooledexecutor.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import threading + +from concurrent import futures +from oslo.config import cfg +from oslo.utils import excutils + +from oslo_messaging._executors import base + +_pool_opts = [ + cfg.IntOpt('rpc_thread_pool_size', + default=64, + help='Size of RPC thread pool.'), +] + + +class PooledExecutor(base.ExecutorBase): + """A message executor which integrates with threads. + + A message process that polls for messages from a dispatching thread and + on reception of an incoming message places the message to be processed in + a thread pool to be executed at a later time. + """ + + # NOTE(harlowja): if eventlet is being used and the thread module is monkey + # patched this should/is supposed to work the same as the eventlet based + # executor. + + # NOTE(harlowja): Make it somewhat easy to change this via + # inheritance (since there does exist other executor types that could be + # used/tried here). + _executor_cls = futures.ThreadPoolExecutor + _event_cls = threading.Event + _lock_cls = threading.Lock + _thread_cls = threading.Thread + + def __init__(self, conf, listener, dispatcher): + super(PooledExecutor, self).__init__(conf, listener, dispatcher) + self.conf.register_opts(_pool_opts) + self._poller = None + self._executor = None + self._tombstone = self._event_cls() + self._incomplete = collections.deque() + self._mutator = self._lock_cls() + + @excutils.forever_retry_uncaught_exceptions + def _runner(self): + while not self._tombstone.is_set(): + incoming = self.listener.poll() + if incoming is None: + continue + callback = self.dispatcher(incoming, self._executor_callback) + try: + fut = self._executor.submit(callback.run) + except RuntimeError: + # This is triggered when the executor has been shutdown... + # + # TODO(harlowja): should we put whatever we pulled off back + # since when this is thrown it means the executor has been + # shutdown already?? + callback.done() + return + else: + with self._mutator: + self._incomplete.append(fut) + # Run the other post processing of the callback when done... + fut.add_done_callback(lambda f: callback.done()) + + def start(self): + if self._executor is None: + self._executor = self._executor_cls(self.conf.rpc_thread_pool_size) + self._tombstone.clear() + if self._poller is None or not self._poller.is_alive(): + self._poller = self._thread_cls(target=self._runner) + self._poller.daemon = True + self._poller.start() + + def stop(self): + if self._executor is not None: + self._executor.shutdown(wait=False) + self._tombstone.set() + self.listener.stop() + + def wait(self): + # TODO(harlowja): this method really needs a timeout. + if self._poller is not None: + self._tombstone.wait() + self._poller.join() + self._poller = None + if self._executor is not None: + with self._mutator: + incomplete_fs = list(self._incomplete) + self._incomplete.clear() + if incomplete_fs: + futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED) + self._executor = None diff --git a/oslo_messaging/_executors/impl_thread.py b/oslo_messaging/_executors/impl_thread.py index ca8ebc790..9a4651aa2 100644 --- a/oslo_messaging/_executors/impl_thread.py +++ b/oslo_messaging/_executors/impl_thread.py @@ -14,118 +14,16 @@ # License for the specific language governing permissions and limitations # under the License. -import collections -import functools -import sys -import threading - from concurrent import futures -from oslo_utils import excutils -import six -from oslo_messaging._executors import base +from oslo_messaging._executors import impl_pooledexecutor -class ThreadExecutor(base.PooledExecutorBase): +class ThreadExecutor(impl_pooledexecutor.PooledExecutor): """A message executor which integrates with threads. A message process that polls for messages from a dispatching thread and on reception of an incoming message places the message to be processed in a thread pool to be executed at a later time. """ - - # NOTE(harlowja): if eventlet is being used and the thread module is monkey - # patched this should/is supposed to work the same as the eventlet based - # executor. - - # NOTE(harlowja): Make it somewhat easy to change this via - # inheritance (since there does exist other executor types that could be - # used/tried here). _executor_cls = futures.ThreadPoolExecutor - - def __init__(self, conf, listener, dispatcher): - super(ThreadExecutor, self).__init__(conf, listener, dispatcher) - self._poller = None - self._executor = None - self._tombstone = threading.Event() - self._incomplete = collections.deque() - self._mutator = threading.Lock() - - def _completer(self, exit_method, fut): - """Completes futures.""" - try: - exc = fut.exception() - if exc is not None: - exc_type = type(exc) - # Not available on < 3.x due to this being an added feature - # of pep-3134 (exception chaining and embedded tracebacks). - if six.PY3: - exc_tb = exc.__traceback__ - else: - exc_tb = None - if not exit_method(exc_type, exc, exc_tb): - six.reraise(exc_type, exc, tb=exc_tb) - else: - exit_method(None, None, None) - finally: - with self._mutator: - try: - self._incomplete.remove(fut) - except ValueError: - pass - - @excutils.forever_retry_uncaught_exceptions - def _runner(self): - while not self._tombstone.is_set(): - incoming = self.listener.poll() - if incoming is None: - continue - # This is hacky, needs to be fixed.... - context = self.dispatcher(incoming) - enter_method = context.__enter__() - exit_method = context.__exit__ - try: - fut = self._executor.submit(enter_method) - except RuntimeError: - # This is triggered when the executor has been shutdown... - # - # TODO(harlowja): should we put whatever we pulled off back - # since when this is thrown it means the executor has been - # shutdown already?? - exit_method(*sys.exc_info()) - return - else: - with self._mutator: - self._incomplete.append(fut) - # Run the other half (__exit__) when done... - fut.add_done_callback(functools.partial(self._completer, - exit_method)) - - def start(self): - if self._executor is None: - self._executor = self._executor_cls(self.conf.rpc_thread_pool_size) - self._tombstone.clear() - if self._poller is None or not self._poller.is_alive(): - self._poller = threading.Thread(target=self._runner) - self._poller.daemon = True - self._poller.start() - - def stop(self): - if self._executor is not None: - self._executor.shutdown(wait=False) - self._tombstone.set() - self.listener.stop() - - def wait(self): - # TODO(harlowja): this method really needs a timeout. - if self._poller is not None: - self._tombstone.wait() - self._poller.join() - self._poller = None - if self._executor is not None: - with self._mutator: - incomplete_fs = list(self._incomplete) - self._incomplete.clear() - if incomplete_fs: - futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED) - self._executor = None diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py index 532c98b8a..ddec6d7a7 100644 --- a/oslo_messaging/_utils.py +++ b/oslo_messaging/_utils.py @@ -13,6 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import logging + +LOG = logging.getLogger(__name__) + def version_is_compatible(imp_version, version): """Determine whether versions are compatible. @@ -39,3 +43,54 @@ def version_is_compatible(imp_version, version): int(rev) > int(imp_rev)): # Revision return False return True + + +class DispatcherExecutorContext(object): + """Dispatcher executor context helper + + A dispatcher can have work to do before and after the dispatch of the + request in the main server thread while the dispatcher itself can be + done in its own thread. + + The executor can use the helper like this: + + callback = dispatcher(incoming) + callback.prepare() + thread = MyWhateverThread() + thread.on_done(callback.done) + thread.run(callback.run) + + """ + def __init__(self, incoming, dispatch, executor_callback=None, + post=None): + self._result = None + self._incoming = incoming + self._dispatch = dispatch + self._post = post + self._executor_callback = executor_callback + + def run(self): + """The incoming message dispath itself + + Can be run in an other thread/greenlet/corotine if the executor is + able to do it. + """ + try: + self._result = self._dispatch(self._incoming, + self._executor_callback) + except Exception: + msg = 'The dispatcher method must catches all exceptions' + LOG.exception(msg) + raise RuntimeError(msg) + + def done(self): + """Callback after the incoming message have been dispathed + + Should be runned in the main executor thread/greenlet/corotine + """ + # FIXME(sileht): this is not currently true, this works only because + # the driver connection used for polling write on the wire only to + # ack/requeue message, but what if one day, the driver do something + # else + if self._post is not None: + self._post(self._incoming, self._result) diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index 6214b301a..46d53035e 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -14,11 +14,11 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib import itertools import logging import sys +from oslo_messaging import _utils as utils from oslo_messaging import localcontext from oslo_messaging import serializer as msg_serializer @@ -68,14 +68,15 @@ class NotificationDispatcher(object): return transport._listen_for_notifications(self._targets_priorities, pool=self.pool) - @contextlib.contextmanager def __call__(self, incoming, executor_callback=None): - result_wrapper = [] + return utils.DispatcherExecutorContext( + incoming, self._dispatch_and_handle_error, + executor_callback=executor_callback, + post=self._post_dispatch) - yield lambda: result_wrapper.append( - self._dispatch_and_handle_error(incoming, executor_callback)) - - if result_wrapper[0] == NotificationResult.HANDLED: + @staticmethod + def _post_dispatch(incoming, result): + if result == NotificationResult.HANDLED: incoming.acknowledge() else: incoming.requeue() diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 5911b69d7..1f065b5ea 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -29,7 +29,7 @@ from oslo_messaging._drivers import matchmaker from oslo_messaging._drivers import matchmaker_redis from oslo_messaging._drivers import matchmaker_ring from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts -from oslo_messaging._executors import base +from oslo_messaging._executors import impl_pooledexecutor from oslo_messaging.notify import notifier from oslo_messaging.rpc import client from oslo_messaging import transport @@ -38,7 +38,7 @@ _global_opt_lists = [ drivers_base.base_opts, impl_zmq.zmq_opts, matchmaker.matchmaker_opts, - base._pool_opts, + impl_pooledexecutor._pool_opts, notifier._notifier_opts, client._client_opts, transport._transport_opts, diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index 0277d8e3a..e679cebef 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -24,7 +24,6 @@ __all__ = [ 'ExpectedException', ] -import contextlib import logging import sys @@ -130,10 +129,11 @@ class RPCDispatcher(object): result = func(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result) - @contextlib.contextmanager def __call__(self, incoming, executor_callback=None): incoming.acknowledge() - yield lambda: self._dispatch_and_reply(incoming, executor_callback) + return utils.DispatcherExecutorContext( + incoming, self._dispatch_and_reply, + executor_callback=executor_callback) def _dispatch_and_reply(self, incoming, executor_callback): try: diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py index 221f4931a..3a6b00d9c 100644 --- a/oslo_messaging/tests/executors/test_executor.py +++ b/oslo_messaging/tests/executors/test_executor.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib import threading # eventlet 0.16 with monkey patching does not work yet on Python 3, @@ -28,7 +27,6 @@ try: except ImportError: eventlet = None import testscenarios -import testtools try: import trollius except ImportError: @@ -45,6 +43,7 @@ try: except ImportError: impl_eventlet = None from oslo_messaging._executors import impl_thread +from oslo_messaging import _utils as utils from oslo_messaging.tests import utils as test_utils from six.moves import mock @@ -106,7 +105,6 @@ class TestExecutor(test_utils.BaseTestCase): @trollius.coroutine def simple_coroutine(value): - yield None raise trollius.Return(value) endpoint = mock.MagicMock(return_value=simple_coroutine('result')) @@ -123,30 +121,29 @@ class TestExecutor(test_utils.BaseTestCase): self.endpoint = endpoint self.result = "not set" - @contextlib.contextmanager - def __call__(self, incoming, executor_callback=None): - if executor_callback is not None: - def callback(): - result = executor_callback(self.endpoint, - incoming.ctxt, - incoming.message) - self.result = result - return result - yield callback - event.send() + def callback(self, incoming, executor_callback): + if executor_callback is None: + result = self.endpoint(incoming.ctxt, + incoming.message) else: - def callback(): - result = self.endpoint(incoming.ctxt, incoming.message) - self.result = result - return result - yield callback + result = executor_callback(self.endpoint, + incoming.ctxt, + incoming.message) + if is_aioeventlet: + event.send() + self.result = result + return result - listener = mock.Mock(spec=['poll']) + def __call__(self, incoming, executor_callback=None): + return utils.DispatcherExecutorContext(incoming, + self.callback, + executor_callback) + + listener = mock.Mock(spec=['poll', 'stop']) dispatcher = Dispatcher(endpoint) executor = self.executor(self.conf, listener, dispatcher) - incoming_message = mock.MagicMock(ctxt={}, - message={'payload': 'data'}) + incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) def fake_poll(timeout=None): if is_aioeventlet: @@ -167,60 +164,3 @@ class TestExecutor(test_utils.BaseTestCase): self.assertEqual(dispatcher.result, 'result') TestExecutor.generate_scenarios() - - -class ExceptedException(Exception): - pass - - -class EventletContextManagerSpawnTest(test_utils.BaseTestCase): - @testtools.skipIf(impl_eventlet is None, "Eventlet not available") - def setUp(self): - super(EventletContextManagerSpawnTest, self).setUp() - self.before = mock.Mock() - self.callback = mock.Mock() - self.after = mock.Mock() - self.exception_call = mock.Mock() - - @contextlib.contextmanager - def context_mgr(): - self.before() - try: - yield lambda: self.callback() - except ExceptedException: - self.exception_call() - self.after() - - self.mgr = context_mgr() - - def test_normal_run(self): - thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet) - thread.wait() - self.assertEqual(1, self.before.call_count) - self.assertEqual(1, self.callback.call_count) - self.assertEqual(1, self.after.call_count) - self.assertEqual(0, self.exception_call.call_count) - - def test_excepted_exception(self): - self.callback.side_effect = ExceptedException - thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet) - try: - thread.wait() - except ExceptedException: - pass - self.assertEqual(1, self.before.call_count) - self.assertEqual(1, self.callback.call_count) - self.assertEqual(1, self.after.call_count) - self.assertEqual(1, self.exception_call.call_count) - - def test_unexcepted_exception(self): - self.callback.side_effect = Exception - thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet) - try: - thread.wait() - except Exception: - pass - self.assertEqual(1, self.before.call_count) - self.assertEqual(1, self.callback.call_count) - self.assertEqual(0, self.after.call_count) - self.assertEqual(0, self.exception_call.call_count) diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py index ee86491a0..f0da90d89 100644 --- a/oslo_messaging/tests/notify/test_dispatcher.py +++ b/oslo_messaging/tests/notify/test_dispatcher.py @@ -107,8 +107,9 @@ class TestDispatcher(test_utils.BaseTestCase): sorted(dispatcher._targets_priorities)) incoming = mock.Mock(ctxt={}, message=msg) - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() # check endpoint callbacks are called or not for i, endpoint_methods in enumerate(self.endpoints): @@ -143,8 +144,9 @@ class TestDispatcher(test_utils.BaseTestCase): msg['priority'] = 'what???' dispatcher = notify_dispatcher.NotificationDispatcher( [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None) - with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback: - callback() + callback = dispatcher(mock.Mock(ctxt={}, message=msg)) + callback.run() + callback.done() mylog.warning.assert_called_once_with('Unknown priority "%s"', 'what???') @@ -244,8 +246,9 @@ class TestDispatcherFilter(test_utils.BaseTestCase): 'timestamp': '2014-03-03 18:21:04.369234', 'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'} incoming = mock.Mock(ctxt=self.context, message=message) - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() if self.match: self.assertEqual(1, endpoint.info.call_count) diff --git a/oslo_messaging/tests/rpc/test_dispatcher.py b/oslo_messaging/tests/rpc/test_dispatcher.py index 03181d1d0..f81be0b9c 100644 --- a/oslo_messaging/tests/rpc/test_dispatcher.py +++ b/oslo_messaging/tests/rpc/test_dispatcher.py @@ -133,8 +133,9 @@ class TestDispatcher(test_utils.BaseTestCase): incoming = mock.Mock(ctxt=self.ctxt, message=self.msg) incoming.reply.side_effect = check_reply - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() for n, endpoint in enumerate(endpoints): for method_name in ['foo', 'bar']: diff --git a/requirements.txt b/requirements.txt index 1b29f8a28..f851551d5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ pbr<2.0,>=0.11 +futurist>=0.1.1 # Apache-2.0 oslo.config>=1.11.0 # Apache-2.0 oslo.context>=0.2.0 # Apache-2.0 oslo.utils>=1.6.0 # Apache-2.0 diff --git a/tests/notify/test_dispatcher.py b/tests/notify/test_dispatcher.py index 5c61840a8..061c29bff 100644 --- a/tests/notify/test_dispatcher.py +++ b/tests/notify/test_dispatcher.py @@ -107,8 +107,9 @@ class TestDispatcherScenario(test_utils.BaseTestCase): sorted(dispatcher._targets_priorities)) incoming = mock.Mock(ctxt={}, message=msg) - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() # check endpoint callbacks are called or not for i, endpoint_methods in enumerate(self.endpoints): @@ -146,8 +147,9 @@ class TestDispatcher(test_utils.BaseTestCase): msg['priority'] = 'what???' dispatcher = notify_dispatcher.NotificationDispatcher( [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None) - with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback: - callback() + callback = dispatcher(mock.Mock(ctxt={}, message=msg)) + callback.run() + callback.done() mylog.warning.assert_called_once_with('Unknown priority "%s"', 'what???') @@ -165,7 +167,8 @@ class TestDispatcher(test_utils.BaseTestCase): incoming = mock.Mock(ctxt={}, message=msg) executor_callback = mock.Mock() - with dispatcher(incoming, executor_callback) as callback: - callback() + callback = dispatcher(incoming, executor_callback) + callback.run() + callback.done() self.assertTrue(executor_callback.called) self.assertEqual(executor_callback.call_args[0][0], endpoint_method) diff --git a/tests/rpc/test_dispatcher.py b/tests/rpc/test_dispatcher.py index 64181f026..acd87bf5f 100644 --- a/tests/rpc/test_dispatcher.py +++ b/tests/rpc/test_dispatcher.py @@ -120,8 +120,9 @@ class TestDispatcher(test_utils.BaseTestCase): incoming = mock.Mock(ctxt=self.ctxt, message=self.msg) incoming.reply.side_effect = check_reply - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() for n, endpoint in enumerate(endpoints): for method_name in ['foo', 'bar']: