zmq: Add support for ZmqClient pooling

To avoid creating a new ZMQ connection for every message sent
to a remote broker, implement pooling and re-use of ZmqClient
objects and associated ZMQ context.

A pool is created for each remote endpoint (keyed by address);
the size of each pool is configured using rpc_conn_pool_size.

All outbound message client connections are pooled.

Closes-Bug: 1384113
Change-Id: Ia55d5c310a56e51df5e2f5d39e561a4da3fe4d83
This commit is contained in:
James Page 2015-04-23 17:08:24 +01:00
parent 287a4f56f4
commit de015d5c83
8 changed files with 234 additions and 73 deletions

View File

@ -49,12 +49,6 @@ amqp_opts = [
default=False,
deprecated_group='DEFAULT',
help='Auto-delete queues in AMQP.'),
# FIXME(markmc): this was toplevel in openstack.common.rpc
cfg.IntOpt('rpc_conn_pool_size',
default=30,
deprecated_group='DEFAULT',
help='Size of RPC connection pool.'),
]
UNIQUE_ID = '_unique_id'

View File

@ -17,8 +17,15 @@ import abc
import six
from oslo.config import cfg
from oslo_messaging import exceptions
base_opts = [
cfg.IntOpt('rpc_conn_pool_size',
default=30,
help='Size of RPC connection pool.'),
]
class TransportDriverError(exceptions.MessagingException):
"""Base class for transport driver specific exceptions."""

View File

@ -28,6 +28,7 @@ import six
from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
from oslo_messaging import exceptions
@ -783,6 +784,7 @@ class QpidDriver(amqpdriver.AMQPDriverBase):
conf.register_group(opt_group)
conf.register_opts(qpid_opts, group=opt_group)
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
conf.register_opts(base.base_opts, group=opt_group)
connection_pool = rpc_amqp.ConnectionPool(
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,

View File

@ -34,6 +34,7 @@ from six.moves.urllib import parse
from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LE
@ -1161,6 +1162,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf.register_group(opt_group)
conf.register_opts(rabbit_opts, group=opt_group)
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
conf.register_opts(base.base_opts, group=opt_group)
connection_pool = rpc_amqp.ConnectionPool(
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,

View File

@ -37,6 +37,7 @@ from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._executors import base as executor_base # FIXME(markmc)
from oslo_messaging._i18n import _, _LE, _LW
from oslo_messaging._drivers import pool
zmq = importutils.try_import('eventlet.green.zmq')
@ -117,8 +118,8 @@ class ZmqSocket(object):
Can be used as a Context (supports the 'with' statement).
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.ctxt = zmq.Context(CONF.rpc_zmq_contexts)
def __init__(self, addr, zmq_type, bind=True, subscribe=None, ctxt=None):
self.ctxt = ctxt or zmq.Context(CONF.rpc_zmq_contexts)
self.sock = self.ctxt.socket(zmq_type)
# Enable IPv6-support in libzmq.
@ -236,8 +237,9 @@ class ZmqSocket(object):
class ZmqClient(object):
"""Client for ZMQ sockets."""
def __init__(self, addr):
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
def __init__(self, addr, ctxt=None):
self.address = addr
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False, ctxt=ctxt)
def cast(self, msg_id, topic, data, envelope):
msg_id = msg_id or 0
@ -259,6 +261,67 @@ class ZmqClient(object):
self.outq.close()
class ZmqClientContext(object):
"""This is essentially a wrapper around ZmqClient that supports 'with'.
It can also return a new ZmqClient, or one from a pool.
The function will also catch when an instance of this class is to be
deleted. With that we can return ZmqClients to the pool on exceptions
and so forth without making the caller be responsible for catching them.
If possible the function makes sure to return a client to the pool.
Based on amqp.ConnectionContext.
"""
def __init__(self, address, connection_pool=None, pooled=False):
self.connection = None
self.connection_pool = connection_pool
self.pooled = pooled
if self.pooled and self.connection_pool is not None:
self.connection = self.connection_pool.get(address)
else:
self.connection = ZmqClient(address)
def __enter__(self):
"""When with ZmqClientContext() is used, return self."""
return self
def _done(self):
"""If the client came from a pool, clean it up and put it back.
If it did not come from a pool, close it.
"""
if self.connection:
if self.pooled and self.connection_pool is not None:
# Reset the connection so it's ready for the next caller
# to grab from the pool
self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
except Exception:
pass
self.connection = None
def __exit__(self, exc_type, exc_value, tb):
"""End of 'with' statement. We're done here."""
self._done()
def __del__(self):
"""Caller is done with this client. Make sure we cleaned up."""
self._done()
def close(self):
"""Caller is done with this client."""
self._done()
def __getattr__(self, key):
"""Proxy all other calls to the ZmqClient instance."""
if self.connection:
return getattr(self.connection, key)
else:
raise rpc_common.InvalidRPCConnectionReuse()
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call."""
def __init__(self, **kwargs):
@ -320,7 +383,7 @@ class InternalContext(object):
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
def reply(self, ctx, proxy,
def reply(self, driver, ctx, proxy,
msg_id=None, context=None, topic=None, msg=None):
"""Reply to a casted call."""
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
@ -336,19 +399,20 @@ class InternalContext(object):
ctx.replies)
LOG.debug("Sending reply")
_multi_send(_cast, ctx, topic, {
_multi_send(driver, _cast, ctx, topic, {
'method': '-process_reply',
'args': {
'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
}, _msg_id=msg_id)
}, _msg_id=msg_id, pooled=True)
class ConsumerBase(object):
"""Base Consumer."""
def __init__(self):
def __init__(self, driver):
self.driver = driver
self.private_ctx = InternalContext(None)
@classmethod
@ -371,7 +435,7 @@ class ConsumerBase(object):
# Internal method
# uses internal context for safety.
if method == '-reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
self.private_ctx.reply(self.driver, ctx, proxy, **data['args'])
return
proxy.dispatch(ctx, data)
@ -383,9 +447,10 @@ class ZmqBaseReactor(ConsumerBase):
Used for RoundRobin requests.
"""
def __init__(self, conf):
super(ZmqBaseReactor, self).__init__()
def __init__(self, conf, driver=None):
super(ZmqBaseReactor, self).__init__(driver)
self.driver = driver
self.proxies = {}
self.threads = []
self.sockets = []
@ -564,8 +629,8 @@ class ZmqReactor(ZmqBaseReactor):
Can also be used as a 1:1 proxy
"""
def __init__(self, conf):
super(ZmqReactor, self).__init__(conf)
def __init__(self, conf, driver):
super(ZmqReactor, self).__init__(conf, driver)
def consume(self, sock):
# TODO(ewindisch): use zero-copy (i.e. references, not copying)
@ -598,9 +663,9 @@ class ZmqReactor(ZmqBaseReactor):
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
def __init__(self, conf, driver):
self.topics = []
self.reactor = ZmqReactor(conf)
self.reactor = ZmqReactor(conf, driver)
def create_consumer(self, topic, proxy, fanout=False):
# Register with matchmaker.
@ -653,8 +718,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None, allowed_remote_exmods=None):
def _cast(driver, addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None, allowed_remote_exmods=None, pooled=False):
allowed_remote_exmods = allowed_remote_exmods or []
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@ -662,21 +727,16 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
topic = topic.encode('utf-8')
with Timeout(timeout_cast, exception=rpc_common.Timeout):
conn = None
try:
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
if conn is not None:
conn.close()
with driver.get_connection(addr, pooled) as conn:
try:
# assumes cast can't return an exception
conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
def _call(addr, context, topic, msg, timeout=None,
envelope=False, allowed_remote_exmods=None):
def _call(driver, addr, context, topic, msg, timeout=None,
envelope=False, allowed_remote_exmods=None, pooled=False):
allowed_remote_exmods = allowed_remote_exmods or []
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@ -714,7 +774,8 @@ def _call(addr, context, topic, msg, timeout=None,
)
LOG.debug("Sending cast: %s", topic)
_cast(addr, context, topic, payload, envelope=envelope)
_cast(driver, addr, context, topic, payload, envelope=envelope,
pooled=pooled)
LOG.debug("Cast sent; Waiting reply")
# Blocks until receives reply
@ -755,8 +816,9 @@ def _call(addr, context, topic, msg, timeout=None,
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None, allowed_remote_exmods=None):
def _multi_send(driver, method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None, allowed_remote_exmods=None,
pooled=False):
"""Wraps the sending of messages.
Dispatches to the matchmaker and sends message to all relevant hosts.
@ -787,11 +849,12 @@ def _multi_send(method, context, topic, msg, timeout=None,
_addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, msg, timeout, envelope, _msg_id)
eventlet.spawn_n(method, driver, _addr, context,
_topic, msg, timeout, envelope, _msg_id,
None, pooled)
else:
return_val = method(_addr, context, _topic, msg, timeout,
envelope, allowed_remote_exmods)
return_val = method(driver, _addr, context, _topic, msg, timeout,
envelope, allowed_remote_exmods, pooled)
return return_val
@ -871,6 +934,50 @@ class ZmqListener(base.Listener):
return None
class ZmqClientPool(pool.Pool):
"""Class that implements a pool of Zmq Clients for a single endpoint"""
def __init__(self, conf, address, connection_cls, ctxt):
self.connection_cls = connection_cls
self.ctxt = ctxt
self.address = address
super(ZmqClientPool, self).__init__(conf.rpc_conn_pool_size)
def create(self):
LOG.debug('Pool creating new ZMQ connection for %s' % self.address)
return self.connection_cls(self.address, self.ctxt)
def empty(self):
for item in self.iter_free():
item.close()
class ZmqClientPoolManager(object):
"""Class that manages pools of clients for Zmq endpoints"""
def __init__(self, conf, ctxt=None):
self._pools = {}
self._lock = threading.Lock()
self.conf = conf
self.ctxt = ctxt
def get(self, address):
if address not in self._pools:
with self._lock:
if address not in self._pools:
self._pools[address] = ZmqClientPool(self.conf,
address,
ZmqClient,
self.ctxt)
return self._pools[address].get()
def put(self, item):
self._pools[item.address].put(item)
def empty(self):
for p in self._pools:
self._pools[p].empty()
class ZmqDriver(base.BaseDriver):
# FIXME(markmc): allow this driver to be used without eventlet
@ -881,6 +988,7 @@ class ZmqDriver(base.BaseDriver):
raise ImportError("Failed to import eventlet.green.zmq")
conf.register_opts(zmq_opts)
conf.register_opts(executor_base._pool_opts)
conf.register_opts(base.base_opts)
super(ZmqDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
@ -899,6 +1007,33 @@ class ZmqDriver(base.BaseDriver):
self.listeners = []
# NOTE(jamespage): Create pool manager on first use to deal with
# os.fork calls in openstack daemons.
self._pool = None
self._pid = None
self._lock = threading.Lock()
def _configure_pool_manager(func):
"""Causes a new pool manager to be created when the messaging service
is first used by the current process. This is important as all
connections in the pools manager by the pool manager will share the
same ZMQ context, which must not be shared across OS processes.
"""
def wrap(self, *args, **kws):
with self._lock:
old_pid = self._pid
self._pid = os.getpid()
if old_pid != self._pid:
# Create fresh pool manager for the current process
# along with a new ZMQ context.
self._pool = ZmqClientPoolManager(
self.conf,
zmq.Context(self.conf.rpc_zmq_contexts)
)
return func(self, *args, **kws)
return wrap
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, envelope=False):
@ -915,19 +1050,22 @@ class ZmqDriver(base.BaseDriver):
elif target.server:
topic = '%s.%s' % (topic, target.server)
reply = _multi_send(method, ctxt, topic, message,
reply = _multi_send(self, method, ctxt, topic, message,
envelope=envelope,
allowed_remote_exmods=self._allowed_remote_exmods)
allowed_remote_exmods=self._allowed_remote_exmods,
pooled=True)
if wait_for_reply:
return reply[-1]
@_configure_pool_manager
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
# NOTE(sileht): retry is not implemented because this driver never
# retry anything
return self._send(target, ctxt, message, wait_for_reply, timeout)
@_configure_pool_manager
def send_notification(self, target, ctxt, message, version, retry=None):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
@ -936,8 +1074,9 @@ class ZmqDriver(base.BaseDriver):
target = target(topic=target.topic.replace('.', '-'))
return self._send(target, ctxt, message, envelope=(version == 2.0))
@_configure_pool_manager
def listen(self, target):
conn = Connection(self.conf)
conn = Connection(self.conf, self)
listener = ZmqListener(self)
@ -951,12 +1090,13 @@ class ZmqDriver(base.BaseDriver):
return listener
@_configure_pool_manager
def listen_for_notifications(self, targets_and_priorities, pool):
# NOTE(sileht): this listener implementation is limited
# because zeromq doesn't support:
# * requeing message
# * pool
conn = Connection(self.conf)
conn = Connection(self.conf, self)
listener = ZmqListener(self)
for target, priority in targets_and_priorities:
@ -974,3 +1114,8 @@ class ZmqDriver(base.BaseDriver):
for c in self.listeners:
c.close()
self.listeners = []
if self._pool:
self._pool.empty()
def get_connection(self, address, pooled=False):
return ZmqClientContext(address, self._pool, pooled)

View File

@ -21,6 +21,7 @@ import copy
import itertools
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_qpid
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers import impl_zmq
@ -34,6 +35,7 @@ from oslo_messaging.rpc import client
from oslo_messaging import transport
_global_opt_lists = [
drivers_base.base_opts,
impl_zmq.zmq_opts,
matchmaker.matchmaker_opts,
base._pool_opts,

View File

@ -150,10 +150,11 @@ class TestZmqBasics(ZmqBaseTestCase):
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
{}, 'fanout~testtopic.127.0.0.1',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [])
None, False, [], True)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_direct(self, mock_call):
@ -171,10 +172,11 @@ class TestZmqBasics(ZmqBaseTestCase):
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
{}, 'testtopic.localhost',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [])
None, False, [], True)
class TestZmqSocket(test_utils.BaseTestCase):
@ -291,7 +293,7 @@ class TestZmqConnection(ZmqBaseTestCase):
def test_zmqconnection_create_consumer(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
@ -317,7 +319,7 @@ class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
@ -335,7 +337,7 @@ class TestZmqConnection(ZmqBaseTestCase):
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.close = mock.Mock()
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
conn.close()
@ -344,7 +346,7 @@ class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_wait(self, mock_reactor):
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver, self.driver)
conn.reactor.wait = mock.Mock()
conn.wait()
self.assertTrue(conn.reactor.wait.called)
@ -355,7 +357,7 @@ class TestZmqConnection(ZmqBaseTestCase):
def test_zmqconnection_consume_in_thread(self, mock_reactor,
mock_getmatchmaker):
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver, self.driver)
conn.reactor.consume_in_thread = mock.Mock()
conn.consume_in_thread()
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
@ -397,7 +399,8 @@ class TestZmqDriver(ZmqBaseTestCase):
with mock.patch.object(impl_zmq.LOG, 'warn') as flog:
mock_queues.return_value = None
impl_zmq._multi_send(mock_cast, context, topic, msg)
impl_zmq._multi_send(self.driver, mock_cast,
context, topic, msg)
self.assertEqual(1, flog.call_count)
args, kwargs = flog.call_args
self.assertIn('No matchmaker results', args[0])
@ -414,7 +417,7 @@ class TestZmqDriver(ZmqBaseTestCase):
mock_queues.return_value = None
self.assertRaises(rpc_common.Timeout,
impl_zmq._multi_send,
impl_zmq._multi_send, self.driver,
mock_call, context, topic, msg)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@ -425,9 +428,10 @@ class TestZmqDriver(ZmqBaseTestCase):
msg = 'jeronimo'
self.driver.send(oslo_messaging.Target(topic=topic), context, msg,
False, 0, False)
mock_multi_send.assert_called_with(mock_cast, context, topic, msg,
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic, msg,
allowed_remote_exmods=[],
envelope=False)
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
@ -438,9 +442,10 @@ class TestZmqDriver(ZmqBaseTestCase):
msg = 'jeronimo'
self.driver.send_notification(oslo_messaging.Target(topic=topic),
context, msg, False, False)
mock_multi_send.assert_called_with(mock_cast, context, topic_reformat,
msg, allowed_remote_exmods=[],
envelope=False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic_reformat, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)

View File

@ -150,10 +150,11 @@ class TestZmqBasics(ZmqBaseTestCase):
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
{}, 'fanout~testtopic.127.0.0.1',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [])
None, False, [], True)
@mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
def test_send_receive_direct(self, mock_call):
@ -171,10 +172,11 @@ class TestZmqBasics(ZmqBaseTestCase):
self.assertEqual(result, True)
mock_call.assert_called_once_with(
self.driver,
'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
{}, 'testtopic.localhost',
{'tx_id': 1, 'method': 'hello-world'},
None, False, [])
None, False, [], True)
class TestZmqSocket(test_utils.BaseTestCase):
@ -291,7 +293,7 @@ class TestZmqConnection(ZmqBaseTestCase):
def test_zmqconnection_create_consumer(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
@ -317,7 +319,7 @@ class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
mock_reactor.register = mock.Mock()
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver.conf, self.driver)
topic = 'topic.foo'
context = mock.Mock()
inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
@ -335,7 +337,7 @@ class TestZmqConnection(ZmqBaseTestCase):
autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.close = mock.Mock()
mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
conn.close()
@ -344,7 +346,7 @@ class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_wait(self, mock_reactor):
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.wait = mock.Mock()
conn.wait()
self.assertTrue(conn.reactor.wait.called)
@ -355,7 +357,7 @@ class TestZmqConnection(ZmqBaseTestCase):
def test_zmqconnection_consume_in_thread(self, mock_reactor,
mock_getmatchmaker):
mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
conn = impl_zmq.Connection(self.driver)
conn = impl_zmq.Connection(self.driver.conf, self.driver)
conn.reactor.consume_in_thread = mock.Mock()
conn.consume_in_thread()
self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
@ -393,9 +395,10 @@ class TestZmqDriver(ZmqBaseTestCase):
msg = 'jeronimo'
self.driver.send(messaging.Target(topic=topic), context, msg,
False, 0, False)
mock_multi_send.assert_called_with(mock_cast, context, topic, msg,
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic, msg,
allowed_remote_exmods=[],
envelope=False)
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
@ -406,9 +409,10 @@ class TestZmqDriver(ZmqBaseTestCase):
msg = 'jeronimo'
self.driver.send_notification(messaging.Target(topic=topic), context,
msg, False, False)
mock_multi_send.assert_called_with(mock_cast, context, topic_reformat,
msg, allowed_remote_exmods=[],
envelope=False)
mock_multi_send.assert_called_with(self.driver, mock_cast, context,
topic_reformat, msg,
allowed_remote_exmods=[],
envelope=False, pooled=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)