From c49594a62fc62caa2dab24dac3163ee49dc775bb Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Fri, 16 Jan 2015 15:17:56 +0100 Subject: [PATCH] Remove usage of contentmanager for executors The context manager in the executor fit only for the blocking executor. Even the dispatcher needs to run code before and after the application callback, eventlet and future executors have to run the pre/post code into the main thread and can run the callback into an other thread, and that force them to run __enter__ and __exit__ manually and deal the exception path. This change adds a helper object instead of the context manager. It is designed to be explicit on what must be executed before and after the callback and what can be done in a thread or not. All the executor code is now in the impl_pooledexecutor.py and use the futures "PoolExecutor" API. This use futurist to provide a eventlet and aioeventlet futures friendly object. Change-Id: I8cd7640f36beeda47560e3c82671bad3530e38d1 --- oslo_messaging/_drivers/impl_zmq.py | 4 +- oslo_messaging/_executors/base.py | 18 +-- oslo_messaging/_executors/impl_aioeventlet.py | 4 +- oslo_messaging/_executors/impl_blocking.py | 48 ++++---- oslo_messaging/_executors/impl_eventlet.py | 83 ++----------- .../_executors/impl_pooledexecutor.py | 112 ++++++++++++++++++ oslo_messaging/_executors/impl_thread.py | 106 +---------------- oslo_messaging/_utils.py | 55 +++++++++ oslo_messaging/notify/dispatcher.py | 15 +-- oslo_messaging/opts.py | 4 +- oslo_messaging/rpc/dispatcher.py | 6 +- .../tests/executors/test_executor.py | 98 +++------------ .../tests/notify/test_dispatcher.py | 15 ++- oslo_messaging/tests/rpc/test_dispatcher.py | 5 +- requirements.txt | 1 + tests/notify/test_dispatcher.py | 15 ++- tests/rpc/test_dispatcher.py | 5 +- 17 files changed, 260 insertions(+), 334 deletions(-) create mode 100644 oslo_messaging/_executors/impl_pooledexecutor.py diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f673b9c06..6108e9519 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -35,7 +35,7 @@ from stevedore import driver from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._executors import base as executor_base # FIXME(markmc) +from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc) from oslo_messaging._i18n import _, _LE, _LW from oslo_messaging._drivers import pool @@ -1001,7 +1001,7 @@ class ZmqDriver(base.BaseDriver): if not zmq: raise ImportError("Failed to import eventlet.green.zmq") conf.register_opts(zmq_opts) - conf.register_opts(executor_base._pool_opts) + conf.register_opts(impl_pooledexecutor._pool_opts) conf.register_opts(base.base_opts) super(ZmqDriver, self).__init__(conf, url, default_exchange, diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py index 2cc667e27..6fbc15377 100644 --- a/oslo_messaging/_executors/base.py +++ b/oslo_messaging/_executors/base.py @@ -14,19 +14,15 @@ import abc -from oslo_config import cfg import six -_pool_opts = [ - cfg.IntOpt('rpc_thread_pool_size', - default=64, - help='Size of RPC thread pool.'), -] - @six.add_metaclass(abc.ABCMeta) class ExecutorBase(object): + # Executor can override how we run the application callback + _executor_callback = None + def __init__(self, conf, listener, dispatcher): self.conf = conf self.listener = listener @@ -43,11 +39,3 @@ class ExecutorBase(object): @abc.abstractmethod def wait(self): "Wait until the executor has stopped polling." - - -class PooledExecutorBase(ExecutorBase): - """An executor that uses a rpc thread pool of a given size.""" - - def __init__(self, conf, listener, callback): - super(PooledExecutorBase, self).__init__(conf, listener, callback) - self.conf.register_opts(_pool_opts) diff --git a/oslo_messaging/_executors/impl_aioeventlet.py b/oslo_messaging/_executors/impl_aioeventlet.py index d0fc4aa31..8f67d7958 100644 --- a/oslo_messaging/_executors/impl_aioeventlet.py +++ b/oslo_messaging/_executors/impl_aioeventlet.py @@ -70,6 +70,4 @@ class AsyncioEventletExecutor(impl_eventlet.EventletExecutor): result = aioeventlet.yield_future(result, loop=self._loop) return result - def _dispatch(self, incoming): - ctx = self.dispatcher(incoming, self._coroutine_wrapper) - impl_eventlet.spawn_with(ctxt=ctx, pool=self._greenpool) + _executor_callback = _coroutine_wrapper diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py index 733403601..11ad81226 100644 --- a/oslo_messaging/_executors/impl_blocking.py +++ b/oslo_messaging/_executors/impl_blocking.py @@ -13,15 +13,28 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +import futurist -from oslo_messaging._executors import base -from oslo_messaging._i18n import _ - -LOG = logging.getLogger(__name__) +from oslo_messaging._executors import impl_pooledexecutor -class BlockingExecutor(base.ExecutorBase): +class FakeBlockingThread(object): + def __init__(self, target): + self._target = target + + def start(self): + self._target() + + @staticmethod + def join(): + pass + + @staticmethod + def stop(): + pass + + +class BlockingExecutor(impl_pooledexecutor.PooledExecutor): """A message executor which blocks the current thread. @@ -34,24 +47,5 @@ class BlockingExecutor(base.ExecutorBase): for simple demo programs. """ - def __init__(self, conf, listener, dispatcher): - super(BlockingExecutor, self).__init__(conf, listener, dispatcher) - self._running = False - - def start(self): - self._running = True - while self._running: - try: - incoming = self.listener.poll() - if incoming is not None: - with self.dispatcher(incoming) as callback: - callback() - except Exception: - LOG.exception(_("Unexpected exception occurred.")) - - def stop(self): - self._running = False - self.listener.stop() - - def wait(self): - pass + _executor_cls = lambda __, ___: futurist.SynchronousExecutor() + _thread_cls = FakeBlockingThread diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py index 3333fe713..48771edc7 100644 --- a/oslo_messaging/_executors/impl_eventlet.py +++ b/oslo_messaging/_executors/impl_eventlet.py @@ -14,50 +14,17 @@ # under the License. import logging -import sys -import eventlet from eventlet.green import threading as greenthreading -from eventlet import greenpool -import greenlet -from oslo_utils import excutils +import futurist -from oslo_messaging._executors import base +from oslo_messaging._executors import impl_pooledexecutor from oslo_messaging import localcontext LOG = logging.getLogger(__name__) -def spawn_with(ctxt, pool): - """This is the equivalent of a with statement - but with the content of the BLOCK statement executed - into a greenthread - - exception path grab from: - http://www.python.org/dev/peps/pep-0343/ - """ - - def complete(thread, exit): - exc = True - try: - try: - thread.wait() - except Exception: - exc = False - if not exit(*sys.exc_info()): - raise - finally: - if exc: - exit(None, None, None) - - callback = ctxt.__enter__() - thread = pool.spawn(callback) - thread.link(complete, ctxt.__exit__) - - return thread - - -class EventletExecutor(base.PooledExecutorBase): +class EventletExecutor(impl_pooledexecutor.PooledExecutor): """A message executor which integrates with eventlet. @@ -70,10 +37,6 @@ class EventletExecutor(base.PooledExecutorBase): def __init__(self, conf, listener, dispatcher): super(EventletExecutor, self).__init__(conf, listener, dispatcher) - self._thread = None - self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size) - self._running = False - if not isinstance(localcontext._STORE, greenthreading.local): LOG.debug('eventlet executor in use but the threading module ' 'has not been monkeypatched or has been ' @@ -82,39 +45,7 @@ class EventletExecutor(base.PooledExecutorBase): 'behavior. In the future, we will raise a ' 'RuntimeException in this case.') - def _dispatch(self, incoming): - spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool) - - def start(self): - if self._thread is not None: - return - - @excutils.forever_retry_uncaught_exceptions - def _executor_thread(): - try: - while self._running: - incoming = self.listener.poll() - if incoming is not None: - self._dispatch(incoming) - except greenlet.GreenletExit: - return - - self._running = True - self._thread = eventlet.spawn(_executor_thread) - - def stop(self): - if self._thread is None: - return - self._running = False - self.listener.stop() - self._thread.cancel() - - def wait(self): - if self._thread is None: - return - self._greenpool.waitall() - try: - self._thread.wait() - except greenlet.GreenletExit: - pass - self._thread = None + _executor_cls = futurist.GreenThreadPoolExecutor + _lock_cls = greenthreading.Lock + _event_cls = greenthreading.Event + _thread_cls = greenthreading.Thread diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py new file mode 100644 index 000000000..7689bd14f --- /dev/null +++ b/oslo_messaging/_executors/impl_pooledexecutor.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import threading + +from concurrent import futures +from oslo.config import cfg +from oslo.utils import excutils + +from oslo_messaging._executors import base + +_pool_opts = [ + cfg.IntOpt('rpc_thread_pool_size', + default=64, + help='Size of RPC thread pool.'), +] + + +class PooledExecutor(base.ExecutorBase): + """A message executor which integrates with threads. + + A message process that polls for messages from a dispatching thread and + on reception of an incoming message places the message to be processed in + a thread pool to be executed at a later time. + """ + + # NOTE(harlowja): if eventlet is being used and the thread module is monkey + # patched this should/is supposed to work the same as the eventlet based + # executor. + + # NOTE(harlowja): Make it somewhat easy to change this via + # inheritance (since there does exist other executor types that could be + # used/tried here). + _executor_cls = futures.ThreadPoolExecutor + _event_cls = threading.Event + _lock_cls = threading.Lock + _thread_cls = threading.Thread + + def __init__(self, conf, listener, dispatcher): + super(PooledExecutor, self).__init__(conf, listener, dispatcher) + self.conf.register_opts(_pool_opts) + self._poller = None + self._executor = None + self._tombstone = self._event_cls() + self._incomplete = collections.deque() + self._mutator = self._lock_cls() + + @excutils.forever_retry_uncaught_exceptions + def _runner(self): + while not self._tombstone.is_set(): + incoming = self.listener.poll() + if incoming is None: + continue + callback = self.dispatcher(incoming, self._executor_callback) + try: + fut = self._executor.submit(callback.run) + except RuntimeError: + # This is triggered when the executor has been shutdown... + # + # TODO(harlowja): should we put whatever we pulled off back + # since when this is thrown it means the executor has been + # shutdown already?? + callback.done() + return + else: + with self._mutator: + self._incomplete.append(fut) + # Run the other post processing of the callback when done... + fut.add_done_callback(lambda f: callback.done()) + + def start(self): + if self._executor is None: + self._executor = self._executor_cls(self.conf.rpc_thread_pool_size) + self._tombstone.clear() + if self._poller is None or not self._poller.is_alive(): + self._poller = self._thread_cls(target=self._runner) + self._poller.daemon = True + self._poller.start() + + def stop(self): + if self._executor is not None: + self._executor.shutdown(wait=False) + self._tombstone.set() + self.listener.stop() + + def wait(self): + # TODO(harlowja): this method really needs a timeout. + if self._poller is not None: + self._tombstone.wait() + self._poller.join() + self._poller = None + if self._executor is not None: + with self._mutator: + incomplete_fs = list(self._incomplete) + self._incomplete.clear() + if incomplete_fs: + futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED) + self._executor = None diff --git a/oslo_messaging/_executors/impl_thread.py b/oslo_messaging/_executors/impl_thread.py index ca8ebc790..9a4651aa2 100644 --- a/oslo_messaging/_executors/impl_thread.py +++ b/oslo_messaging/_executors/impl_thread.py @@ -14,118 +14,16 @@ # License for the specific language governing permissions and limitations # under the License. -import collections -import functools -import sys -import threading - from concurrent import futures -from oslo_utils import excutils -import six -from oslo_messaging._executors import base +from oslo_messaging._executors import impl_pooledexecutor -class ThreadExecutor(base.PooledExecutorBase): +class ThreadExecutor(impl_pooledexecutor.PooledExecutor): """A message executor which integrates with threads. A message process that polls for messages from a dispatching thread and on reception of an incoming message places the message to be processed in a thread pool to be executed at a later time. """ - - # NOTE(harlowja): if eventlet is being used and the thread module is monkey - # patched this should/is supposed to work the same as the eventlet based - # executor. - - # NOTE(harlowja): Make it somewhat easy to change this via - # inheritance (since there does exist other executor types that could be - # used/tried here). _executor_cls = futures.ThreadPoolExecutor - - def __init__(self, conf, listener, dispatcher): - super(ThreadExecutor, self).__init__(conf, listener, dispatcher) - self._poller = None - self._executor = None - self._tombstone = threading.Event() - self._incomplete = collections.deque() - self._mutator = threading.Lock() - - def _completer(self, exit_method, fut): - """Completes futures.""" - try: - exc = fut.exception() - if exc is not None: - exc_type = type(exc) - # Not available on < 3.x due to this being an added feature - # of pep-3134 (exception chaining and embedded tracebacks). - if six.PY3: - exc_tb = exc.__traceback__ - else: - exc_tb = None - if not exit_method(exc_type, exc, exc_tb): - six.reraise(exc_type, exc, tb=exc_tb) - else: - exit_method(None, None, None) - finally: - with self._mutator: - try: - self._incomplete.remove(fut) - except ValueError: - pass - - @excutils.forever_retry_uncaught_exceptions - def _runner(self): - while not self._tombstone.is_set(): - incoming = self.listener.poll() - if incoming is None: - continue - # This is hacky, needs to be fixed.... - context = self.dispatcher(incoming) - enter_method = context.__enter__() - exit_method = context.__exit__ - try: - fut = self._executor.submit(enter_method) - except RuntimeError: - # This is triggered when the executor has been shutdown... - # - # TODO(harlowja): should we put whatever we pulled off back - # since when this is thrown it means the executor has been - # shutdown already?? - exit_method(*sys.exc_info()) - return - else: - with self._mutator: - self._incomplete.append(fut) - # Run the other half (__exit__) when done... - fut.add_done_callback(functools.partial(self._completer, - exit_method)) - - def start(self): - if self._executor is None: - self._executor = self._executor_cls(self.conf.rpc_thread_pool_size) - self._tombstone.clear() - if self._poller is None or not self._poller.is_alive(): - self._poller = threading.Thread(target=self._runner) - self._poller.daemon = True - self._poller.start() - - def stop(self): - if self._executor is not None: - self._executor.shutdown(wait=False) - self._tombstone.set() - self.listener.stop() - - def wait(self): - # TODO(harlowja): this method really needs a timeout. - if self._poller is not None: - self._tombstone.wait() - self._poller.join() - self._poller = None - if self._executor is not None: - with self._mutator: - incomplete_fs = list(self._incomplete) - self._incomplete.clear() - if incomplete_fs: - futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED) - self._executor = None diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py index 532c98b8a..ddec6d7a7 100644 --- a/oslo_messaging/_utils.py +++ b/oslo_messaging/_utils.py @@ -13,6 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import logging + +LOG = logging.getLogger(__name__) + def version_is_compatible(imp_version, version): """Determine whether versions are compatible. @@ -39,3 +43,54 @@ def version_is_compatible(imp_version, version): int(rev) > int(imp_rev)): # Revision return False return True + + +class DispatcherExecutorContext(object): + """Dispatcher executor context helper + + A dispatcher can have work to do before and after the dispatch of the + request in the main server thread while the dispatcher itself can be + done in its own thread. + + The executor can use the helper like this: + + callback = dispatcher(incoming) + callback.prepare() + thread = MyWhateverThread() + thread.on_done(callback.done) + thread.run(callback.run) + + """ + def __init__(self, incoming, dispatch, executor_callback=None, + post=None): + self._result = None + self._incoming = incoming + self._dispatch = dispatch + self._post = post + self._executor_callback = executor_callback + + def run(self): + """The incoming message dispath itself + + Can be run in an other thread/greenlet/corotine if the executor is + able to do it. + """ + try: + self._result = self._dispatch(self._incoming, + self._executor_callback) + except Exception: + msg = 'The dispatcher method must catches all exceptions' + LOG.exception(msg) + raise RuntimeError(msg) + + def done(self): + """Callback after the incoming message have been dispathed + + Should be runned in the main executor thread/greenlet/corotine + """ + # FIXME(sileht): this is not currently true, this works only because + # the driver connection used for polling write on the wire only to + # ack/requeue message, but what if one day, the driver do something + # else + if self._post is not None: + self._post(self._incoming, self._result) diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py index 6214b301a..46d53035e 100644 --- a/oslo_messaging/notify/dispatcher.py +++ b/oslo_messaging/notify/dispatcher.py @@ -14,11 +14,11 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib import itertools import logging import sys +from oslo_messaging import _utils as utils from oslo_messaging import localcontext from oslo_messaging import serializer as msg_serializer @@ -68,14 +68,15 @@ class NotificationDispatcher(object): return transport._listen_for_notifications(self._targets_priorities, pool=self.pool) - @contextlib.contextmanager def __call__(self, incoming, executor_callback=None): - result_wrapper = [] + return utils.DispatcherExecutorContext( + incoming, self._dispatch_and_handle_error, + executor_callback=executor_callback, + post=self._post_dispatch) - yield lambda: result_wrapper.append( - self._dispatch_and_handle_error(incoming, executor_callback)) - - if result_wrapper[0] == NotificationResult.HANDLED: + @staticmethod + def _post_dispatch(incoming, result): + if result == NotificationResult.HANDLED: incoming.acknowledge() else: incoming.requeue() diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 5911b69d7..1f065b5ea 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -29,7 +29,7 @@ from oslo_messaging._drivers import matchmaker from oslo_messaging._drivers import matchmaker_redis from oslo_messaging._drivers import matchmaker_ring from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts -from oslo_messaging._executors import base +from oslo_messaging._executors import impl_pooledexecutor from oslo_messaging.notify import notifier from oslo_messaging.rpc import client from oslo_messaging import transport @@ -38,7 +38,7 @@ _global_opt_lists = [ drivers_base.base_opts, impl_zmq.zmq_opts, matchmaker.matchmaker_opts, - base._pool_opts, + impl_pooledexecutor._pool_opts, notifier._notifier_opts, client._client_opts, transport._transport_opts, diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py index 0277d8e3a..e679cebef 100644 --- a/oslo_messaging/rpc/dispatcher.py +++ b/oslo_messaging/rpc/dispatcher.py @@ -24,7 +24,6 @@ __all__ = [ 'ExpectedException', ] -import contextlib import logging import sys @@ -130,10 +129,11 @@ class RPCDispatcher(object): result = func(ctxt, **new_args) return self.serializer.serialize_entity(ctxt, result) - @contextlib.contextmanager def __call__(self, incoming, executor_callback=None): incoming.acknowledge() - yield lambda: self._dispatch_and_reply(incoming, executor_callback) + return utils.DispatcherExecutorContext( + incoming, self._dispatch_and_reply, + executor_callback=executor_callback) def _dispatch_and_reply(self, incoming, executor_callback): try: diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py index 221f4931a..3a6b00d9c 100644 --- a/oslo_messaging/tests/executors/test_executor.py +++ b/oslo_messaging/tests/executors/test_executor.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import contextlib import threading # eventlet 0.16 with monkey patching does not work yet on Python 3, @@ -28,7 +27,6 @@ try: except ImportError: eventlet = None import testscenarios -import testtools try: import trollius except ImportError: @@ -45,6 +43,7 @@ try: except ImportError: impl_eventlet = None from oslo_messaging._executors import impl_thread +from oslo_messaging import _utils as utils from oslo_messaging.tests import utils as test_utils from six.moves import mock @@ -106,7 +105,6 @@ class TestExecutor(test_utils.BaseTestCase): @trollius.coroutine def simple_coroutine(value): - yield None raise trollius.Return(value) endpoint = mock.MagicMock(return_value=simple_coroutine('result')) @@ -123,30 +121,29 @@ class TestExecutor(test_utils.BaseTestCase): self.endpoint = endpoint self.result = "not set" - @contextlib.contextmanager - def __call__(self, incoming, executor_callback=None): - if executor_callback is not None: - def callback(): - result = executor_callback(self.endpoint, - incoming.ctxt, - incoming.message) - self.result = result - return result - yield callback - event.send() + def callback(self, incoming, executor_callback): + if executor_callback is None: + result = self.endpoint(incoming.ctxt, + incoming.message) else: - def callback(): - result = self.endpoint(incoming.ctxt, incoming.message) - self.result = result - return result - yield callback + result = executor_callback(self.endpoint, + incoming.ctxt, + incoming.message) + if is_aioeventlet: + event.send() + self.result = result + return result - listener = mock.Mock(spec=['poll']) + def __call__(self, incoming, executor_callback=None): + return utils.DispatcherExecutorContext(incoming, + self.callback, + executor_callback) + + listener = mock.Mock(spec=['poll', 'stop']) dispatcher = Dispatcher(endpoint) executor = self.executor(self.conf, listener, dispatcher) - incoming_message = mock.MagicMock(ctxt={}, - message={'payload': 'data'}) + incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) def fake_poll(timeout=None): if is_aioeventlet: @@ -167,60 +164,3 @@ class TestExecutor(test_utils.BaseTestCase): self.assertEqual(dispatcher.result, 'result') TestExecutor.generate_scenarios() - - -class ExceptedException(Exception): - pass - - -class EventletContextManagerSpawnTest(test_utils.BaseTestCase): - @testtools.skipIf(impl_eventlet is None, "Eventlet not available") - def setUp(self): - super(EventletContextManagerSpawnTest, self).setUp() - self.before = mock.Mock() - self.callback = mock.Mock() - self.after = mock.Mock() - self.exception_call = mock.Mock() - - @contextlib.contextmanager - def context_mgr(): - self.before() - try: - yield lambda: self.callback() - except ExceptedException: - self.exception_call() - self.after() - - self.mgr = context_mgr() - - def test_normal_run(self): - thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet) - thread.wait() - self.assertEqual(1, self.before.call_count) - self.assertEqual(1, self.callback.call_count) - self.assertEqual(1, self.after.call_count) - self.assertEqual(0, self.exception_call.call_count) - - def test_excepted_exception(self): - self.callback.side_effect = ExceptedException - thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet) - try: - thread.wait() - except ExceptedException: - pass - self.assertEqual(1, self.before.call_count) - self.assertEqual(1, self.callback.call_count) - self.assertEqual(1, self.after.call_count) - self.assertEqual(1, self.exception_call.call_count) - - def test_unexcepted_exception(self): - self.callback.side_effect = Exception - thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet) - try: - thread.wait() - except Exception: - pass - self.assertEqual(1, self.before.call_count) - self.assertEqual(1, self.callback.call_count) - self.assertEqual(0, self.after.call_count) - self.assertEqual(0, self.exception_call.call_count) diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py index ee86491a0..f0da90d89 100644 --- a/oslo_messaging/tests/notify/test_dispatcher.py +++ b/oslo_messaging/tests/notify/test_dispatcher.py @@ -107,8 +107,9 @@ class TestDispatcher(test_utils.BaseTestCase): sorted(dispatcher._targets_priorities)) incoming = mock.Mock(ctxt={}, message=msg) - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() # check endpoint callbacks are called or not for i, endpoint_methods in enumerate(self.endpoints): @@ -143,8 +144,9 @@ class TestDispatcher(test_utils.BaseTestCase): msg['priority'] = 'what???' dispatcher = notify_dispatcher.NotificationDispatcher( [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None) - with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback: - callback() + callback = dispatcher(mock.Mock(ctxt={}, message=msg)) + callback.run() + callback.done() mylog.warning.assert_called_once_with('Unknown priority "%s"', 'what???') @@ -244,8 +246,9 @@ class TestDispatcherFilter(test_utils.BaseTestCase): 'timestamp': '2014-03-03 18:21:04.369234', 'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'} incoming = mock.Mock(ctxt=self.context, message=message) - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() if self.match: self.assertEqual(1, endpoint.info.call_count) diff --git a/oslo_messaging/tests/rpc/test_dispatcher.py b/oslo_messaging/tests/rpc/test_dispatcher.py index 03181d1d0..f81be0b9c 100644 --- a/oslo_messaging/tests/rpc/test_dispatcher.py +++ b/oslo_messaging/tests/rpc/test_dispatcher.py @@ -133,8 +133,9 @@ class TestDispatcher(test_utils.BaseTestCase): incoming = mock.Mock(ctxt=self.ctxt, message=self.msg) incoming.reply.side_effect = check_reply - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() for n, endpoint in enumerate(endpoints): for method_name in ['foo', 'bar']: diff --git a/requirements.txt b/requirements.txt index 1b29f8a28..f851551d5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ pbr<2.0,>=0.11 +futurist>=0.1.1 # Apache-2.0 oslo.config>=1.11.0 # Apache-2.0 oslo.context>=0.2.0 # Apache-2.0 oslo.utils>=1.6.0 # Apache-2.0 diff --git a/tests/notify/test_dispatcher.py b/tests/notify/test_dispatcher.py index 5c61840a8..061c29bff 100644 --- a/tests/notify/test_dispatcher.py +++ b/tests/notify/test_dispatcher.py @@ -107,8 +107,9 @@ class TestDispatcherScenario(test_utils.BaseTestCase): sorted(dispatcher._targets_priorities)) incoming = mock.Mock(ctxt={}, message=msg) - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() # check endpoint callbacks are called or not for i, endpoint_methods in enumerate(self.endpoints): @@ -146,8 +147,9 @@ class TestDispatcher(test_utils.BaseTestCase): msg['priority'] = 'what???' dispatcher = notify_dispatcher.NotificationDispatcher( [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None) - with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback: - callback() + callback = dispatcher(mock.Mock(ctxt={}, message=msg)) + callback.run() + callback.done() mylog.warning.assert_called_once_with('Unknown priority "%s"', 'what???') @@ -165,7 +167,8 @@ class TestDispatcher(test_utils.BaseTestCase): incoming = mock.Mock(ctxt={}, message=msg) executor_callback = mock.Mock() - with dispatcher(incoming, executor_callback) as callback: - callback() + callback = dispatcher(incoming, executor_callback) + callback.run() + callback.done() self.assertTrue(executor_callback.called) self.assertEqual(executor_callback.call_args[0][0], endpoint_method) diff --git a/tests/rpc/test_dispatcher.py b/tests/rpc/test_dispatcher.py index 64181f026..acd87bf5f 100644 --- a/tests/rpc/test_dispatcher.py +++ b/tests/rpc/test_dispatcher.py @@ -120,8 +120,9 @@ class TestDispatcher(test_utils.BaseTestCase): incoming = mock.Mock(ctxt=self.ctxt, message=self.msg) incoming.reply.side_effect = check_reply - with dispatcher(incoming) as callback: - callback() + callback = dispatcher(incoming) + callback.run() + callback.done() for n, endpoint in enumerate(endpoints): for method_name in ['foo', 'bar']: