From dec09ae5ffbe2de415fede0947ad1cb887574e2e Mon Sep 17 00:00:00 2001 From: Doug Royal Date: Wed, 22 Jul 2015 14:29:37 -0500 Subject: [PATCH] Add unit tests for zmq_async Change option from boolean zmq_native to string zmq_concurrency. This eliminates ambiguity by requiring the user to explicitly name the mechanism they want to use for concurrency. Change-Id: I341a3eee73a0449716d3ee0df690bbe6af39bdf0 --- oslo_messaging/_drivers/impl_zmq.py | 6 +- .../_drivers/zmq_driver/zmq_async.py | 64 ++++--- .../tests/drivers/zmq/test_zmq_async.py | 170 ++++++++++++++++++ 3 files changed, 210 insertions(+), 30 deletions(-) create mode 100644 oslo_messaging/tests/drivers/zmq/test_zmq_async.py diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 064fe7c60..18086eb43 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -48,10 +48,8 @@ zmq_opts = [ default=True, help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'), - cfg.BoolOpt('rpc_zmq_native', - default=False, - help='Switches ZeroMQ eventlet/threading way of usage.' - 'Affects pollers, executors etc.'), + cfg.StrOpt('rpc_zmq_concurrency', default='eventlet', + help='Type of concurrency used. Either "native" or "eventlet"'), # The following port is unassigned by IANA as of 2012-05-21 cfg.IntOpt('rpc_zmq_port', default=9501, diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py index 261746392..7f437fd82 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py @@ -14,20 +14,25 @@ import logging +from oslo_messaging._drivers.zmq_driver.poller import green_poller +from oslo_messaging._drivers.zmq_driver.poller import threading_poller +from oslo_messaging._i18n import _, _LE from oslo_utils import importutils -from oslo_messaging._i18n import _LE - LOG = logging.getLogger(__name__) -green_zmq = importutils.try_import('eventlet.green.zmq') +# Map zmq_concurrency config option names to the actual module name. +ZMQ_MODULES = { + 'native': 'zmq', + 'eventlet': 'eventlet.green.zmq', +} -def import_zmq(native_zmq=False): - if native_zmq: - imported_zmq = importutils.try_import('zmq') - else: - imported_zmq = green_zmq or importutils.try_import('zmq') +def import_zmq(zmq_concurrency='eventlet'): + _raise_error_if_invalid_config_value(zmq_concurrency) + + imported_zmq = importutils.try_import(ZMQ_MODULES[zmq_concurrency], + default='zmq') if imported_zmq is None: errmsg = _LE("ZeroMQ not found!") @@ -36,28 +41,35 @@ def import_zmq(native_zmq=False): return imported_zmq -def get_poller(native_zmq=False): - if native_zmq or green_zmq is None: - from oslo_messaging._drivers.zmq_driver.poller import threading_poller - return threading_poller.ThreadingPoller() - else: - from oslo_messaging._drivers.zmq_driver.poller import green_poller +def get_poller(zmq_concurrency='eventlet'): + _raise_error_if_invalid_config_value(zmq_concurrency) + + if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available(): return green_poller.GreenPoller() + return threading_poller.ThreadingPoller() -def get_reply_poller(native_zmq=False): - if native_zmq or green_zmq is None: - from oslo_messaging._drivers.zmq_driver.poller import threading_poller - return threading_poller.ThreadingPoller() - else: - from oslo_messaging._drivers.zmq_driver.poller import green_poller +def get_reply_poller(zmq_concurrency='eventlet'): + _raise_error_if_invalid_config_value(zmq_concurrency) + + if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available(): return green_poller.HoldReplyPoller() + return threading_poller.ThreadingPoller() -def get_executor(method, native_zmq=False): - if native_zmq or green_zmq is None: - from oslo_messaging._drivers.zmq_driver.poller import threading_poller - return threading_poller.ThreadingExecutor(method) - else: - from oslo_messaging._drivers.zmq_driver.poller import green_poller +def get_executor(method, zmq_concurrency='eventlet'): + _raise_error_if_invalid_config_value(zmq_concurrency) + + if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available(): return green_poller.GreenExecutor(method) + return threading_poller.ThreadingExecutor(method) + + +def _is_eventlet_zmq_available(): + return importutils.try_import('eventlet.green.zmq') + + +def _raise_error_if_invalid_config_value(zmq_concurrency): + if zmq_concurrency not in ZMQ_MODULES: + errmsg = _('Invalid zmq_concurrency value: %s') + raise ValueError(errmsg % zmq_concurrency) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py new file mode 100644 index 000000000..28e091a0e --- /dev/null +++ b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py @@ -0,0 +1,170 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from oslo_messaging._drivers.zmq_driver.poller import green_poller +from oslo_messaging._drivers.zmq_driver.poller import threading_poller +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging.tests import utils as test_utils + + +class TestImportZmq(test_utils.BaseTestCase): + + def setUp(self): + super(TestImportZmq, self).setUp() + + def test_config_short_names_are_converted_to_correct_module_names(self): + mock_try_import = mock.Mock() + zmq_async.importutils.try_import = mock_try_import + + zmq_async.importutils.try_import.return_value = 'mock zmq module' + self.assertEqual('mock zmq module', zmq_async.import_zmq('native')) + mock_try_import.assert_called_with('zmq', default='zmq') + + zmq_async.importutils.try_import.return_value = 'mock eventlet module' + self.assertEqual('mock eventlet module', + zmq_async.import_zmq('eventlet')) + mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq') + + def test_when_no_args_then_default_zmq_module_is_loaded(self): + mock_try_import = mock.Mock() + zmq_async.importutils.try_import = mock_try_import + + zmq_async.import_zmq() + + mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq') + + def test_when_import_fails_then_raise_ImportError(self): + zmq_async.importutils.try_import = mock.Mock() + zmq_async.importutils.try_import.return_value = None + + with self.assertRaisesRegexp(ImportError, "ZeroMQ not found!"): + zmq_async.import_zmq('native') + + def test_invalid_config_value_raise_ValueError(self): + invalid_opt = 'x' + + errmsg = 'Invalid zmq_concurrency value: x' + with self.assertRaisesRegexp(ValueError, errmsg): + zmq_async.import_zmq(invalid_opt) + + +class TestGetPoller(test_utils.BaseTestCase): + + def setUp(self): + super(TestGetPoller, self).setUp() + + def test_when_no_arg_to_get_poller_then_return_default_poller(self): + zmq_async._is_eventlet_zmq_available = lambda: True + + actual = zmq_async.get_poller() + + self.assertTrue(isinstance(actual, green_poller.GreenPoller)) + + def test_when_native_poller_requested_then_return_ThreadingPoller(self): + actual = zmq_async.get_poller('native') + + self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller)) + + def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self): + zmq_async._is_eventlet_zmq_available = lambda: False + + actual = zmq_async.get_poller('eventlet') + + self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller)) + + def test_when_eventlet_is_available_then_return_GreenPoller(self): + zmq_async._is_eventlet_zmq_available = lambda: True + + actual = zmq_async.get_poller('eventlet') + + self.assertTrue(isinstance(actual, green_poller.GreenPoller)) + + def test_invalid_config_value_raise_ValueError(self): + invalid_opt = 'x' + + errmsg = 'Invalid zmq_concurrency value: x' + with self.assertRaisesRegexp(ValueError, errmsg): + zmq_async.get_poller(invalid_opt) + + +class TestGetReplyPoller(test_utils.BaseTestCase): + + def setUp(self): + super(TestGetReplyPoller, self).setUp() + + def test_default_reply_poller_is_HoldReplyPoller(self): + zmq_async._is_eventlet_zmq_available = lambda: True + + actual = zmq_async.get_reply_poller() + + self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller)) + + def test_when_eventlet_is_available_then_return_HoldReplyPoller(self): + zmq_async._is_eventlet_zmq_available = lambda: True + + actual = zmq_async.get_reply_poller('eventlet') + + self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller)) + + def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self): + zmq_async._is_eventlet_zmq_available = lambda: False + + actual = zmq_async.get_reply_poller('eventlet') + + self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller)) + + def test_invalid_config_value_raise_ValueError(self): + invalid_opt = 'x' + + errmsg = 'Invalid zmq_concurrency value: x' + with self.assertRaisesRegexp(ValueError, errmsg): + zmq_async.get_reply_poller(invalid_opt) + + +class TestGetExecutor(test_utils.BaseTestCase): + + def setUp(self): + super(TestGetExecutor, self).setUp() + + def test_default_executor_is_GreenExecutor(self): + zmq_async._is_eventlet_zmq_available = lambda: True + + executor = zmq_async.get_executor('any method') + + self.assertTrue(isinstance(executor, green_poller.GreenExecutor)) + self.assertEqual('any method', executor._method) + + def test_when_eventlet_module_is_available_then_return_GreenExecutor(self): + zmq_async._is_eventlet_zmq_available = lambda: True + + executor = zmq_async.get_executor('any method', 'eventlet') + + self.assertTrue(isinstance(executor, green_poller.GreenExecutor)) + self.assertEqual('any method', executor._method) + + def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self): + zmq_async._is_eventlet_zmq_available = lambda: False + + executor = zmq_async.get_executor('any method', 'eventlet') + + self.assertTrue(isinstance(executor, + threading_poller.ThreadingExecutor)) + self.assertEqual('any method', executor._method) + + def test_invalid_config_value_raise_ValueError(self): + invalid_opt = 'x' + + errmsg = 'Invalid zmq_concurrency value: x' + with self.assertRaisesRegexp(ValueError, errmsg): + zmq_async.get_executor('any method', invalid_opt)