Merge "Add functional and unit 0mq driver tests"

This commit is contained in:
Jenkins 2014-12-11 09:55:31 +00:00 committed by Gerrit Code Review
commit fa68eaa280
5 changed files with 549 additions and 37 deletions

View File

@ -32,7 +32,7 @@ from oslo.config import cfg
from oslo.messaging._drivers import base
from oslo.messaging._drivers import common as rpc_common
from oslo.messaging._executors import impl_eventlet # FIXME(markmc)
from oslo.messaging._i18n import _
from oslo.messaging._i18n import _, _LE
from oslo.serialization import jsonutils
from oslo.utils import excutils
from oslo.utils import importutils
@ -180,6 +180,10 @@ class ZmqSocket(object):
self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
self.subscriptions.remove(msg_filter)
@property
def closed(self):
return self.sock is None or self.sock.closed
def close(self):
if self.sock is None or self.sock.closed:
return
@ -257,7 +261,10 @@ class RpcContext(rpc_common.CommonRpcContext):
@classmethod
def marshal(self, ctx):
ctx_data = ctx.to_dict()
if not isinstance(ctx, dict):
ctx_data = ctx.to_dict()
else:
ctx_data = ctx
return _serialize(ctx_data)
@classmethod
@ -395,7 +402,7 @@ class ZmqBaseReactor(ConsumerBase):
def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
while not sock.closed:
self.consume(sock)
for k in self.proxies.keys():
@ -408,12 +415,12 @@ class ZmqBaseReactor(ConsumerBase):
t.wait()
def close(self):
for s in self.sockets:
s.close()
for t in self.threads:
t.kill()
for s in self.sockets:
s.close()
class ZmqProxy(ZmqBaseReactor):
"""A consumer class implementing a topic-based proxy.
@ -612,9 +619,15 @@ class Connection(rpc_common.Connection):
self.topics.append(topic)
def close(self):
_get_matchmaker().stop_heartbeat()
mm = _get_matchmaker()
mm.stop_heartbeat()
for topic in self.topics:
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
try:
mm.unregister(topic, CONF.rpc_zmq_host)
except Exception as err:
LOG.error(_LE('Unable to unregister topic %(topic)s'
' from matchmaker: %(err)s') %
{'topic': topic, 'err': err})
self.reactor.close()
self.topics = []
@ -634,6 +647,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
payload = [RpcContext.marshal(context), msg]
with Timeout(timeout_cast, exception=rpc_common.Timeout):
conn = None
try:
conn = ZmqClient(addr)
@ -642,7 +656,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
if 'conn' in vars():
if conn is not None:
conn.close()
@ -684,12 +698,14 @@ def _call(addr, context, topic, msg, timeout=None,
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug("Sending cast")
LOG.debug("Sending cast: %s", topic)
_cast(addr, context, topic, payload, envelope=envelope)
LOG.debug("Cast sent; Waiting reply")
# Blocks until receives reply
msg = msg_waiter.recv()
if msg is None:
raise rpc_common.Timeout()
LOG.debug("Received message: %s", msg)
LOG.debug("Unpacking response")
@ -789,7 +805,7 @@ class ZmqIncomingMessage(base.IncomingMessage):
self.condition.notify()
def requeue(self):
pass
LOG.debug("WARNING: requeue not supported")
class ZmqListener(base.Listener):
@ -858,19 +874,11 @@ class ZmqDriver(base.BaseDriver):
raise NotImplementedError('The ZeroMQ driver currently only works '
'with oslo.config.cfg.CONF')
self.listeners = []
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False):
# FIXME(markmc): remove this temporary hack
class Context(object):
def __init__(self, d):
self.d = d
def to_dict(self):
return self.d
context = Context(ctxt)
if wait_for_reply:
method = _call
else:
@ -884,7 +892,7 @@ class ZmqDriver(base.BaseDriver):
elif target.server:
topic = '%s.%s' % (topic, target.server)
reply = _multi_send(method, context, topic, message,
reply = _multi_send(method, ctxt, topic, message,
envelope=envelope,
allowed_remote_exmods=self._allowed_remote_exmods)
@ -916,6 +924,7 @@ class ZmqDriver(base.BaseDriver):
conn.create_consumer(target.topic, listener, fanout=True)
conn.consume_in_thread()
self.listeners.append(conn)
return listener
@ -934,8 +943,11 @@ class ZmqDriver(base.BaseDriver):
conn.create_consumer('%s-%s' % (target.topic, priority),
listener)
conn.consume_in_thread()
self.listeners.append(conn)
return listener
def cleanup(self):
pass
for c in self.listeners:
c.close()
self.listeners = []

View File

@ -21,6 +21,9 @@ qpid-python
# for test_matchmaker_redis
redis>=2.10.0
# for test_impl_zmq
pyzmq>=14.3.1
# when we can require tox>= 1.4, this can go into tox.ini:
# [testenv:cover]
# deps = {[testenv]deps} coverage

View File

@ -0,0 +1,504 @@
# Copyright 2014 Canonical, Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import socket
import fixtures
import mock
import testtools
from oslo import messaging
from oslo.utils import importutils
from tests import utils as test_utils
# NOTE(jamespage) the zmq driver implementation is currently tied
# to eventlet so we have to monkey_patch to support testing
# eventlet is not yet py3 compatible, so skip if not installed
eventlet = importutils.try_import('eventlet')
if eventlet:
eventlet.monkey_patch()
impl_zmq = importutils.try_import('oslo.messaging._drivers.impl_zmq')
LOG = logging.getLogger(__name__)
def get_unused_port():
"""Returns an unused port on localhost."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 0))
port = s.getsockname()[1]
s.close()
return port
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['reactor']:
self.attrs['reactor'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
class TestZmqBasics(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqBasics, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
def test_start_stop_listener(self):
target = messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
result = listener.poll(0.01)
self.assertEqual(result, None)
def test_send_receive_raises(self):
"""Call() without method."""
target = messaging.Target(topic='testtopic')
self.driver.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqIncomingMessage')
def test_send_receive_topic(self, mock_msg):
"""Call() with method."""
mock_msg.return_value = msg = mock.MagicMock()
msg.received = received = mock.MagicMock()
received.failure = False
received.reply = True
msg.condition = condition = mock.MagicMock()
condition.wait.return_value = True
target = messaging.Target(topic='testtopic')
self.driver.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
@mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_fanout(self, mock_call):
target = messaging.Target(topic='testtopic', fanout=True)
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
{}, 'fanout~testtopic.127.0.0.1',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [])
@mock.patch('oslo.messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_direct(self, mock_call):
# Also verifies fix for bug http://pad.lv/1301723
target = messaging.Target(topic='testtopic', server='localhost')
self.driver.listen(target)
mock_call.__name__ = '_call'
mock_call.return_value = [True]
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
self.assertEqual(result, True)
mock_call.assert_called_once_with(
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
{}, 'testtopic.localhost',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [])
class TestZmqSocket(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqSocket, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
subscribe=None)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
subscribe=None)
self.assertTrue(sock.can_recv)
self.assertFalse(sock.can_send)
self.assertTrue(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
subscribe=None)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqSocket.subscribe')
@mock.patch('oslo.messaging._drivers.impl_zmq.zmq.Context')
def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
mock_ctxt = mock.Mock()
mock_context.return_value = mock_ctxt
mock_sock = mock.Mock()
mock_ctxt.socket = mock.Mock(return_value=mock_sock)
mock_sock.connect = mock.Mock()
mock_sock.bind = mock.Mock()
addr = '127.0.0.1'
sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
subscribe=None)
self.assertFalse(sock.can_recv)
self.assertTrue(sock.can_send)
self.assertFalse(sock.can_sub)
self.assertTrue(mock_sock.connect.called)
self.assertFalse(mock_sock.bind.called)
class TestZmqIncomingMessage(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqIncomingMessage, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
def test_zmqincomingmessage(self):
msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
msg.reply("abc")
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertIsInstance(
msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
self.assertEqual(msg.received.reply, "abc")
msg.requeue()
class TestZmqConnection(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqConnection, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
# No Fanout
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.PULL,
subscribe=None, in_bind=False)
# Reset for next bunch of checks
conn.reactor.register.reset_mock()
# Fanout
inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context, fanout='subscriber.foo')
conn.reactor.register.assert_called_with(context, inaddr,
impl_zmq.zmq.SUB,
subscribe='subscriber.foo',
in_bind=False)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
(self.internal_ipc_dir))
conn.create_consumer(topic, context)
conn.reactor.register.assert_called_with(
context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
conn.reactor.register.reset_mock()
# Call again with same topic
conn.create_consumer(topic, context)
self.assertFalse(conn.reactor.register.called)
@mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
conn = impl_zmq.Connection(self.driver)
conn.reactor.close = mock.Mock()
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
conn.close()
self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
self.assertTrue(conn.reactor.close.called)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_wait(self, mock_reactor):
conn = impl_zmq.Connection(self.driver)
conn.reactor.wait = mock.Mock()
conn.wait()
self.assertTrue(conn.reactor.wait.called)
@mock.patch('oslo.messaging._drivers.impl_zmq._get_matchmaker',
autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_consume_in_thread(self, mock_reactor,
mock_getmatchmaker):
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
conn = impl_zmq.Connection(self.driver)
conn.reactor.consume_in_thread = mock.Mock()
conn.consume_in_thread()
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
self.assertTrue(conn.reactor.consume_in_thread.called)
class TestZmqListener(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqListener, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
# Timeout = 0 should return straight away since the queue is empty
listener.poll(timeout=0)
def test_zmqlistener_w_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
kwargs = {'a': 1, 'b': 2}
m = mock.Mock()
ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
eventlet.spawn_n(listener.dispatch, ctxt, 0,
m.fake_method, 'name.space', **kwargs)
resp = listener.poll(timeout=10)
msg = {'method': m.fake_method, 'namespace': 'name.space',
'args': kwargs}
self.assertEqual(resp.message, msg)
class TestZmqDriver(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqDriver, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic'
msg = 'jeronimo'
self.driver.send(messaging.Target(topic=topic), context, msg,
False, 0, False)
mock_multi_send.assert_called_with(mock_cast, context, topic, msg,
allowed_remote_exmods=[],
envelope=False)
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)
def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
context = mock.Mock(autospec=impl_zmq.RpcContext)
topic = 'testtopic.foo'
topic_reformat = 'testtopic-foo'
msg = 'jeronimo'
self.driver.send_notification(messaging.Target(topic=topic), context,
msg, False, False)
mock_multi_send.assert_called_with(mock_cast, context, topic_reformat,
msg, allowed_remote_exmods=[],
envelope=False)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen(self, mock_connection, mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
self.driver.listen(messaging.Target(topic=topic))
conn.create_consumer.assert_called_with(topic, listener, fanout=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq.Connection', autospec=True)
def test_zmqdriver_listen_for_notification(self, mock_connection,
mock_listener):
mock_listener.return_value = listener = mock.Mock()
mock_connection.return_value = conn = mock.Mock()
conn.create_consumer = mock.Mock()
conn.consume_in_thread = mock.Mock()
topic = 'testtopic.foo'
data = [(messaging.Target(topic=topic), 0)]
# NOTE(jamespage): Pooling not supported, just pass None for now.
self.driver.listen_for_notifications(data, None)
conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)

View File

@ -64,7 +64,7 @@ class ListenerSetupMixin(object):
def wait_for(self, expect_messages):
while expect_messages != self._received_msgs:
pass
yield
def stop(self):
for listener in self.listeners:

View File

@ -17,10 +17,6 @@ import logging
import logging.config
import os
import sys
try:
import threading
except ImportError:
threading = None
import mock
import testscenarios
@ -39,13 +35,6 @@ logging.AUDIT = logging.INFO + 1
logging.addLevelName(logging.AUDIT, 'AUDIT')
def get_thread_ident():
if threading is not None:
return threading.current_thread().ident
else:
return None
class TestLogNotifier(test_utils.BaseTestCase):
scenarios = [
@ -62,6 +51,10 @@ class TestLogNotifier(test_utils.BaseTestCase):
super(TestLogNotifier, self).setUp()
self.addCleanup(messaging.notify._impl_test.reset)
self.config(notification_driver=['test'])
# NOTE(jamespage) disable thread information logging for testing
# as this causes test failures when zmq tests monkey_patch via
# eventlet
logging.logThreads = 0
@mock.patch('oslo.utils.timeutils.utcnow')
def test_logger(self, mock_utcnow):
@ -93,7 +86,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
{'process': os.getpid(),
'funcName': None,
'name': 'foo',
'thread': get_thread_ident(),
'thread': None,
'levelno': levelno,
'processName': 'MainProcess',
'pathname': '/foo/bar',
@ -149,7 +142,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
{'process': os.getpid(),
'funcName': 'test_logging_conf',
'name': 'default',
'thread': get_thread_ident(),
'thread': None,
'levelno': levelno,
'processName': 'MainProcess',
'pathname': pathname,