Add unit tests for zmq_async

Change option from boolean zmq_native to string zmq_concurrency. This
eliminates ambiguity by requiring the user to explicitly name
the mechanism they want to use for concurrency.

Change-Id: I341a3eee73a0449716d3ee0df690bbe6af39bdf0
This commit is contained in:
Doug Royal 2015-07-22 14:29:37 -05:00
parent c90525bfea
commit dec09ae5ff
3 changed files with 210 additions and 30 deletions

View File

@ -48,10 +48,8 @@ zmq_opts = [
default=True,
help='Use REQ/REP pattern for all methods CALL/CAST/FANOUT.'),
cfg.BoolOpt('rpc_zmq_native',
default=False,
help='Switches ZeroMQ eventlet/threading way of usage.'
'Affects pollers, executors etc.'),
cfg.StrOpt('rpc_zmq_concurrency', default='eventlet',
help='Type of concurrency used. Either "native" or "eventlet"'),
# The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501,

View File

@ -14,20 +14,25 @@
import logging
from oslo_messaging._drivers.zmq_driver.poller import green_poller
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
from oslo_messaging._i18n import _, _LE
from oslo_utils import importutils
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
green_zmq = importutils.try_import('eventlet.green.zmq')
# Map zmq_concurrency config option names to the actual module name.
ZMQ_MODULES = {
'native': 'zmq',
'eventlet': 'eventlet.green.zmq',
}
def import_zmq(native_zmq=False):
if native_zmq:
imported_zmq = importutils.try_import('zmq')
else:
imported_zmq = green_zmq or importutils.try_import('zmq')
def import_zmq(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
imported_zmq = importutils.try_import(ZMQ_MODULES[zmq_concurrency],
default='zmq')
if imported_zmq is None:
errmsg = _LE("ZeroMQ not found!")
@ -36,28 +41,35 @@ def import_zmq(native_zmq=False):
return imported_zmq
def get_poller(native_zmq=False):
if native_zmq or green_zmq is None:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingPoller()
else:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
def get_poller(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.GreenPoller()
return threading_poller.ThreadingPoller()
def get_reply_poller(native_zmq=False):
if native_zmq or green_zmq is None:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingPoller()
else:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
def get_reply_poller(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.HoldReplyPoller()
return threading_poller.ThreadingPoller()
def get_executor(method, native_zmq=False):
if native_zmq or green_zmq is None:
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
return threading_poller.ThreadingExecutor(method)
else:
from oslo_messaging._drivers.zmq_driver.poller import green_poller
def get_executor(method, zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
return green_poller.GreenExecutor(method)
return threading_poller.ThreadingExecutor(method)
def _is_eventlet_zmq_available():
return importutils.try_import('eventlet.green.zmq')
def _raise_error_if_invalid_config_value(zmq_concurrency):
if zmq_concurrency not in ZMQ_MODULES:
errmsg = _('Invalid zmq_concurrency value: %s')
raise ValueError(errmsg % zmq_concurrency)

View File

@ -0,0 +1,170 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_messaging._drivers.zmq_driver.poller import green_poller
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
class TestImportZmq(test_utils.BaseTestCase):
def setUp(self):
super(TestImportZmq, self).setUp()
def test_config_short_names_are_converted_to_correct_module_names(self):
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.importutils.try_import.return_value = 'mock zmq module'
self.assertEqual('mock zmq module', zmq_async.import_zmq('native'))
mock_try_import.assert_called_with('zmq', default='zmq')
zmq_async.importutils.try_import.return_value = 'mock eventlet module'
self.assertEqual('mock eventlet module',
zmq_async.import_zmq('eventlet'))
mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq')
def test_when_no_args_then_default_zmq_module_is_loaded(self):
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.import_zmq()
mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq')
def test_when_import_fails_then_raise_ImportError(self):
zmq_async.importutils.try_import = mock.Mock()
zmq_async.importutils.try_import.return_value = None
with self.assertRaisesRegexp(ImportError, "ZeroMQ not found!"):
zmq_async.import_zmq('native')
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.import_zmq(invalid_opt)
class TestGetPoller(test_utils.BaseTestCase):
def setUp(self):
super(TestGetPoller, self).setUp()
def test_when_no_arg_to_get_poller_then_return_default_poller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_poller()
self.assertTrue(isinstance(actual, green_poller.GreenPoller))
def test_when_native_poller_requested_then_return_ThreadingPoller(self):
actual = zmq_async.get_poller('native')
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: False
actual = zmq_async.get_poller('eventlet')
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_when_eventlet_is_available_then_return_GreenPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_poller('eventlet')
self.assertTrue(isinstance(actual, green_poller.GreenPoller))
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_poller(invalid_opt)
class TestGetReplyPoller(test_utils.BaseTestCase):
def setUp(self):
super(TestGetReplyPoller, self).setUp()
def test_default_reply_poller_is_HoldReplyPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_reply_poller()
self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller))
def test_when_eventlet_is_available_then_return_HoldReplyPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: True
actual = zmq_async.get_reply_poller('eventlet')
self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller))
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async._is_eventlet_zmq_available = lambda: False
actual = zmq_async.get_reply_poller('eventlet')
self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_reply_poller(invalid_opt)
class TestGetExecutor(test_utils.BaseTestCase):
def setUp(self):
super(TestGetExecutor, self).setUp()
def test_default_executor_is_GreenExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: True
executor = zmq_async.get_executor('any method')
self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
self.assertEqual('any method', executor._method)
def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: True
executor = zmq_async.get_executor('any method', 'eventlet')
self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
self.assertEqual('any method', executor._method)
def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
zmq_async._is_eventlet_zmq_available = lambda: False
executor = zmq_async.get_executor('any method', 'eventlet')
self.assertTrue(isinstance(executor,
threading_poller.ThreadingExecutor))
self.assertEqual('any method', executor._method)
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
errmsg = 'Invalid zmq_concurrency value: x'
with self.assertRaisesRegexp(ValueError, errmsg):
zmq_async.get_executor('any method', invalid_opt)