diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index ebae514e3..0d3dc5194 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -55,6 +55,26 @@ amqp_opts = [ UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) +# NOTE(sileht): Even if rabbit/qpid have only one Connection class, +# this connection can be used for two purposes: +# * wait and receive amqp messages (only do read stuffs on the socket) +# * send messages to the broker (only do write stuffs on the socket) +# The code inside a connection class is not concurrency safe. +# Using one Connection class instance for doing both, will result +# of eventlet complaining of multiple greenthreads that read/write the +# same fd concurrently... because 'send' and 'listen' run in different +# greenthread. +# So, a connection cannot be shared between thread/greenthread and +# this two variables permit to define the purpose of the connection +# to allow drivers to add special handling if needed (like heatbeat). +# amqp drivers create 3 kind of connections: +# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection +# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used +# * driver internally have another 'PURPOSE_LISTEN' connection dedicated +# to wait replies of rpc call +PURPOSE_LISTEN = 'listen' +PURPOSE_SEND = 'send' + class ConnectionPool(pool.Pool): """Class that implements a Pool of Connections.""" @@ -66,9 +86,11 @@ class ConnectionPool(pool.Pool): self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while - def create(self): + def create(self, purpose=None): + if purpose is None: + purpose = PURPOSE_SEND LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url) + return self.connection_cls(self.conf, self.url, purpose) def empty(self): for item in self.iter_free(): @@ -87,16 +109,18 @@ class ConnectionContext(rpc_common.Connection): If possible the function makes sure to return a connection to the pool. """ - def __init__(self, connection_pool, pooled=True): + def __init__(self, connection_pool, purpose): """Create a new connection, or get one from the pool.""" self.connection = None self.connection_pool = connection_pool + pooled = purpose == PURPOSE_SEND if pooled: self.connection = connection_pool.get() else: # a non-pooled connection is requested, so create a new connection - self.connection = connection_pool.create() + self.connection = connection_pool.create(purpose) self.pooled = pooled + self.connection.pooled = pooled def __enter__(self): """When with ConnectionContext() is used, return self.""" diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 9e967c4da..bbc4b7828 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -69,7 +69,8 @@ class AMQPIncomingMessage(base.IncomingMessage): # NOTE(Alexei_987) not sending reply, if msg_id is empty # because reply should not be expected by caller side return - with self.listener.driver._get_connection() as conn: + with self.listener.driver._get_connection( + rpc_amqp.PURPOSE_SEND) as conn: self._send_reply(conn, reply, failure, log_failure=log_failure) self._send_reply(conn, ending=True) @@ -268,9 +269,9 @@ class AMQPDriverBase(base.BaseDriver): def _get_exchange(self, target): return target.exchange or self._default_exchange - def _get_connection(self, pooled=True): + def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND): return rpc_amqp.ConnectionContext(self._connection_pool, - pooled=pooled) + purpose=purpose) def _get_reply_q(self): with self._reply_q_lock: @@ -279,7 +280,7 @@ class AMQPDriverBase(base.BaseDriver): reply_q = 'reply_' + uuid.uuid4().hex - conn = self._get_connection(pooled=False) + conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) self._waiter = ReplyWaiter(reply_q, conn, self._allowed_remote_exmods) @@ -320,7 +321,7 @@ class AMQPDriverBase(base.BaseDriver): self._waiter.listen(msg_id) try: - with self._get_connection() as conn: + with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn: if notify: conn.notify_send(self._get_exchange(target), target.topic, msg, retry=retry) @@ -353,7 +354,7 @@ class AMQPDriverBase(base.BaseDriver): envelope=(version == 2.0), notify=True, retry=retry) def listen(self, target): - conn = self._get_connection(pooled=False) + conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) listener = AMQPListener(self, conn) @@ -369,7 +370,7 @@ class AMQPDriverBase(base.BaseDriver): return listener def listen_for_notifications(self, targets_and_priorities, pool): - conn = self._get_connection(pooled=False) + conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) listener = AMQPListener(self, conn) for target, priority in targets_and_priorities: diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py index 464023054..3a2b29692 100644 --- a/oslo_messaging/_drivers/impl_qpid.py +++ b/oslo_messaging/_drivers/impl_qpid.py @@ -462,7 +462,7 @@ class Connection(object): pools = {} - def __init__(self, conf, url): + def __init__(self, conf, url, purpose): if not qpid_messaging: raise ImportError("Failed to import qpid.messaging") diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index b06d290aa..d10ac2247 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -12,19 +12,20 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib import functools import itertools import logging import os import socket import ssl +import threading import time import uuid import kombu import kombu.connection import kombu.entity -import kombu.exceptions import kombu.messaging from oslo_config import cfg from oslo_utils import netutils @@ -35,6 +36,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as rpc_common from oslo_messaging._i18n import _ +from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LI from oslo_messaging._i18n import _LW from oslo_messaging import exceptions @@ -120,6 +122,15 @@ rabbit_opts = [ help='Use HA queues in RabbitMQ (x-ha-policy: all). ' 'If you change this option, you must wipe the ' 'RabbitMQ database.'), + cfg.IntOpt('heartbeat_timeout_threshold', + default=60, + help="Number of seconds after which the Rabbit broker is " + "considered down if heartbeat's keep-alive fails " + "(0 disable the heartbeat)."), + cfg.IntOpt('heartbeat_rate', + default=2, + help='How often times during the heartbeat_timeout_threshold ' + 'we check the heartbeat.'), # NOTE(sileht): deprecated option since oslo_messaging 1.5.0, cfg.BoolOpt('fake_rabbit', @@ -460,12 +471,119 @@ class NotifyPublisher(TopicPublisher): queue.declare() +class DummyConnectionLock(object): + def acquire(self): + pass + + def release(self): + pass + + def heartbeat_acquire(self): + pass + + def __enter__(self): + self.acquire() + + def __exit__(self, type, value, traceback): + self.release() + + +class ConnectionLock(DummyConnectionLock): + """Lock object to protect access the the kombu connection + + This is a lock object to protect access the the kombu connection + object between the heartbeat thread and the driver thread. + + They are two way to acquire this lock: + * lock.acquire() + * lock.heartbeat_acquire() + + In both case lock.release(), release the lock. + + The goal is that the heartbeat thread always have the priority + for acquiring the lock. This ensures we have no heartbeat + starvation when the driver sends a lot of messages. + + So when lock.heartbeat_acquire() is called next time the lock + is released(), the caller unconditionnaly acquires + the lock, even someone else have asked for the lock before it. + """ + + def __init__(self): + self._workers_waiting = 0 + self._heartbeat_waiting = False + self._lock_acquired = None + self._monitor = threading.Lock() + self._workers_locks = threading.Condition(self._monitor) + self._heartbeat_lock = threading.Condition(self._monitor) + self._get_thread_id = self._fetch_current_thread_functor() + + def acquire(self): + with self._monitor: + while self._lock_acquired: + self._workers_waiting += 1 + self._workers_locks.wait() + self._workers_waiting -= 1 + self._lock_acquired = self._get_thread_id() + + def heartbeat_acquire(self): + # NOTE(sileht): must be called only one time + with self._monitor: + while self._lock_acquired is not None: + self._heartbeat_waiting = True + self._heartbeat_lock.wait() + self._heartbeat_waiting = False + self._lock_acquired = self._get_thread_id() + + def release(self): + with self._monitor: + if self._lock_acquired is None: + raise RuntimeError("We can't release a not acquired lock") + thread_id = self._get_thread_id() + if self._lock_acquired != thread_id: + raise RuntimeError("We can't release lock acquired by another " + "thread/greenthread; %s vs %s" % + (self._lock_acquired, thread_id)) + self._lock_acquired = None + if self._heartbeat_waiting: + self._heartbeat_lock.notify() + elif self._workers_waiting > 0: + self._workers_locks.notify() + + @contextlib.contextmanager + def for_heartbeat(self): + self.heartbeat_acquire() + try: + yield + finally: + self.release() + + @staticmethod + def _fetch_current_thread_functor(): + # Until https://github.com/eventlet/eventlet/issues/172 is resolved + # or addressed we have to use complicated workaround to get a object + # that will not be recycled; the usage of threading.current_thread() + # doesn't appear to currently be monkey patched and therefore isn't + # reliable to use (and breaks badly when used as all threads share + # the same current_thread() object)... + try: + import eventlet + from eventlet import patcher + green_threaded = patcher.is_monkey_patched('thread') + except ImportError: + green_threaded = False + if green_threaded: + return lambda: eventlet.getcurrent() + else: + return lambda: threading.current_thread() + + class Connection(object): """Connection object.""" pools = {} - def __init__(self, conf, url): + def __init__(self, conf, url, purpose): self.consumers = [] self.consumer_num = itertools.count(1) self.conf = conf @@ -527,18 +645,47 @@ class Connection(object): self.do_consume = True self._consume_loop_stopped = False - self.channel = None + + # NOTE(sileht): if purpose is PURPOSE_LISTEN + # we don't need the lock because we don't + # have a heartbeat thread + if purpose == rpc_amqp.PURPOSE_SEND: + self._connection_lock = ConnectionLock() + else: + self._connection_lock = DummyConnectionLock() + self.connection = kombu.connection.Connection( self._url, ssl=self._fetch_ssl_params(), login_method=self._login_method, - failover_strategy="shuffle") + failover_strategy="shuffle", + heartbeat=self.driver_conf.heartbeat_timeout_threshold) LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)d'), self.connection.info()) + + # NOTE(sileht): kombu recommend to run heartbeat_check every + # seconds, but we use a lock around the kombu connection + # so, to not lock to much this lock to most of the time do nothing + # expected waiting the events drain, we start heartbeat_check and + # retreive the server heartbeat packet only two times more than + # the minimum required for the heartbeat works + # (heatbeat_timeout/heartbeat_rate/2.0, default kombu + # heartbeat_rate is 2) + self._heartbeat_wait_timeout = ( + float(self.driver_conf.heartbeat_timeout_threshold) / + float(self.driver_conf.heartbeat_rate) / 2.0) + self._heartbeat_support_log_emitted = False + # NOTE(sileht): just ensure the connection is setuped at startup - self.ensure(error_callback=None, - method=lambda: True) + self.ensure_connection() + + # NOTE(sileht): if purpose is PURPOSE_LISTEN + # the consume code does the heartbeat stuff + # we don't need a thread + if purpose == rpc_amqp.PURPOSE_SEND: + self._heartbeat_start() + LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), self.connection.info()) @@ -602,6 +749,10 @@ class Connection(object): return ssl_params or True return False + def ensure_connection(self): + self.ensure(error_callback=None, + method=lambda: True) + def ensure(self, error_callback, method, retry=None, timeout_is_error=True): """Will retry up to retry number of times. @@ -609,6 +760,8 @@ class Connection(object): retry = -1 means to retry forever retry = 0 means no retry retry = N means N retries + + NOTE(sileht): Must be called within the connection lock """ current_pid = os.getpid() @@ -676,6 +829,7 @@ class Connection(object): recoverable_errors = (self.connection.recoverable_channel_errors + self.connection.recoverable_connection_errors) + try: autoretry_method = self.connection.autoretry( execute_method, channel=self.channel, @@ -703,12 +857,17 @@ class Connection(object): raise exceptions.MessageDeliveryFailure(msg) def _set_current_channel(self, new_channel): + """Change the channel to use. + + NOTE(sileht): Must be called within the connection lock + """ if self.channel is not None and new_channel != self.channel: self.connection.maybe_close_channel(self.channel) self.channel = new_channel def close(self): """Close/release this connection.""" + self._heartbeat_stop() if self.connection: self._set_current_channel(None) self.connection.release() @@ -716,10 +875,74 @@ class Connection(object): def reset(self): """Reset a connection so it can be used again.""" - self._set_current_channel(self.connection.channel()) + with self._connection_lock: + self._set_current_channel(self.connection.channel()) self.consumers = [] self.consumer_num = itertools.count(1) + def _heartbeat_supported_and_enabled(self): + if self.driver_conf.heartbeat_timeout_threshold <= 0: + return False + + if self.connection.supports_heartbeats: + return True + elif not self._heartbeat_support_log_emitted: + LOG.warn(_LW("Heartbeat support requested but it is not supported " + "by the kombu driver or the broker")) + self._heartbeat_support_log_emitted = True + return False + + def _heartbeat_start(self): + if self._heartbeat_supported_and_enabled(): + self._heartbeat_exit_event = threading.Event() + self._heartbeat_thread = threading.Thread( + target=self._heartbeat_thread_job) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + else: + self._heartbeat_thread = None + + def _heartbeat_stop(self): + if self._heartbeat_thread is not None: + self._heartbeat_exit_event.set() + self._heartbeat_thread.join() + self._heartbeat_thread = None + + def _heartbeat_thread_job(self): + """Thread that maintains inactive connections + """ + while not self._heartbeat_exit_event.is_set(): + with self._connection_lock.for_heartbeat(): + + recoverable_errors = ( + self.connection.recoverable_channel_errors + + self.connection.recoverable_connection_errors) + + try: + try: + self.connection.heartbeat_check( + rate=self.driver_conf.heartbeat_rate) + # NOTE(sileht): We need to drain event to receive + # heartbeat from the broker but don't hold the + # connection too much times. In amqpdriver a connection + # is used exclusivly for read or for write, so we have + # to do this for connection used for write drain_events + # already do that for other connection + try: + self.connection.drain_events(timeout=0.001) + except socket.timeout: + pass + except recoverable_errors as exc: + LOG.info(_LI("A recoverable connection/channel error " + "occurs, try to reconnect: %s"), exc) + except Exception: + LOG.exception(_LE("Unexpected error during heartbeart " + "thread processing, retrying...")) + + self._heartbeat_exit_event.wait( + timeout=self._heartbeat_wait_timeout) + self._heartbeat_exit_event.clear() + def declare_consumer(self, consumer_cls, topic, callback): """Create a Consumer using the class that was passed in and add it to our list of consumers @@ -736,10 +959,14 @@ class Connection(object): self.consumers.append(consumer) return consumer - return self.ensure(_connect_error, _declare_consumer) + with self._connection_lock: + return self.ensure(_connect_error, _declare_consumer) def iterconsume(self, limit=None, timeout=None): - """Return an iterator that will consume from all queues/consumers.""" + """Return an iterator that will consume from all queues/consumers. + + NOTE(sileht): Must be called within the connection lock + """ timer = rpc_common.DecayingTimer(duration=timeout) timer.start() @@ -770,6 +997,9 @@ class Connection(object): self._consume_loop_stopped = False raise StopIteration + if self._heartbeat_supported_and_enabled(): + self.connection.heartbeat_check( + rate=self.driver_conf.heartbeat_rate) try: return self.connection.drain_events(timeout=poll_timeout) except socket.timeout as exc: @@ -795,7 +1025,8 @@ class Connection(object): **kwargs) publisher.send(msg, timeout) - self.ensure(_error_callback, _publish, retry=retry) + with self._connection_lock: + self.ensure(_error_callback, _publish, retry=retry) def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -861,12 +1092,13 @@ class Connection(object): def consume(self, limit=None, timeout=None): """Consume from all queues/consumers.""" - it = self.iterconsume(limit=limit, timeout=timeout) - while True: - try: - six.next(it) - except StopIteration: - return + with self._connection_lock: + it = self.iterconsume(limit=limit, timeout=timeout) + while True: + try: + six.next(it) + except StopIteration: + return def stop_consuming(self): self._consume_loop_stopped = True diff --git a/oslo_messaging/tests/drivers/test_impl_qpid.py b/oslo_messaging/tests/drivers/test_impl_qpid.py index 2d7dd6a07..e39f72a8a 100644 --- a/oslo_messaging/tests/drivers/test_impl_qpid.py +++ b/oslo_messaging/tests/drivers/test_impl_qpid.py @@ -27,6 +27,7 @@ import testscenarios import testtools import oslo_messaging +from oslo_messaging._drivers import amqp from oslo_messaging._drivers import impl_qpid as qpid_driver from oslo_messaging.tests import utils as test_utils @@ -564,7 +565,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase): with mock.patch('qpid.messaging.Connection') as conn_mock: # starting from the first broker in the list url = oslo_messaging.TransportURL.parse(self.conf, None) - connection = qpid_driver.Connection(self.conf, url) + connection = qpid_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) # reconnect will advance to the next broker, one broker per # attempt, and then wrap to the start of the list once the end is @@ -806,7 +808,8 @@ class QPidHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = oslo_messaging.TransportURL.parse(self.conf, None) - self.connection = qpid_driver.Connection(self.conf, url) + self.connection = qpid_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) self.addCleanup(self.connection.close) self.info.update({'attempt': 0, diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index df0f3b3df..b4e691de2 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -21,6 +21,7 @@ import uuid import fixtures import kombu +import kombu.transport.memory import mock from oslo_config import cfg from oslo_serialization import jsonutils @@ -28,6 +29,7 @@ from oslotest import mockpatch import testscenarios import oslo_messaging +from oslo_messaging._drivers import amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import impl_rabbit as rabbit_driver @@ -54,6 +56,52 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase): self.assertEqual('memory:////', url) +class TestHeartbeat(test_utils.BaseTestCase): + + @mock.patch('oslo_messaging._drivers.impl_rabbit.LOG') + @mock.patch('kombu.connection.Connection.heartbeat_check') + @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.' + '_heartbeat_supported_and_enabled', return_value=True) + def _do_test_heartbeat_sent(self, fake_heartbeat_support, fake_heartbeat, + fake_logger, heartbeat_side_effect=None, + info=None): + + event = threading.Event() + + def heartbeat_check(rate=2): + event.set() + if heartbeat_side_effect: + raise heartbeat_side_effect + + fake_heartbeat.side_effect = heartbeat_check + + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + conn = transport._driver._get_connection() + event.wait() + conn._heartbeat_stop() + + # check heartbeat have been called + self.assertLess(0, fake_heartbeat.call_count) + + if not heartbeat_side_effect: + self.assertEqual(2, fake_logger.info.call_count) + else: + self.assertEqual(3, fake_logger.info.call_count) + self.assertIn(mock.call(info, mock.ANY), + fake_logger.info.mock_calls) + + def test_test_heartbeat_sent_default(self): + self._do_test_heartbeat_sent() + + def test_test_heartbeat_sent_connection_fail(self): + self._do_test_heartbeat_sent( + heartbeat_side_effect=kombu.exceptions.ConnectionError, + info='A recoverable connection/channel error occurs, ' + 'try to reconnect: %s') + + class TestRabbitDriverLoad(test_utils.BaseTestCase): scenarios = [ @@ -68,6 +116,8 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase): @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset') def test_driver_load(self, fake_ensure, fake_reset): + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') self.messaging_conf.transport_driver = self.transport_driver transport = oslo_messaging.get_transport(self.conf) self.addCleanup(transport.cleanup) @@ -107,8 +157,8 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase): transport._driver._get_connection() connection_klass.assert_called_once_with( - 'memory:///', ssl=self.expected, - login_method='AMQPLAIN', failover_strategy="shuffle") + 'memory:///', ssl=self.expected, login_method='AMQPLAIN', + heartbeat=60, failover_strategy="shuffle") class TestRabbitIterconsume(test_utils.BaseTestCase): @@ -118,7 +168,7 @@ class TestRabbitIterconsume(test_utils.BaseTestCase): 'kombu+memory:////') self.addCleanup(transport.cleanup) deadline = time.time() + 3 - with transport._driver._get_connection() as conn: + with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: conn.iterconsume(timeout=3) # kombu memory transport doesn't really raise error # so just simulate a real driver behavior @@ -170,10 +220,12 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): def setUp(self): super(TestRabbitTransportURL, self).setUp() self.messaging_conf.transport_driver = 'rabbit' + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset') - def test_transport_url(self, fake_ensure_connection, fake_reset): + def test_transport_url(self, fake_reset, fake_ensure): transport = oslo_messaging.get_transport(self.conf, self.url) self.addCleanup(transport.cleanup) driver = transport._driver @@ -223,6 +275,8 @@ class TestSendReceive(test_utils.BaseTestCase): cls._timeout) def test_send_receive(self): + self.config(heartbeat_timeout_threshold=0, + group="oslo_messaging_rabbit") transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) @@ -708,6 +762,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): rabbit_retry_interval=0.01, rabbit_retry_backoff=0.01, kombu_reconnect_delay=0, + heartbeat_timeout_threshold=0, group="oslo_messaging_rabbit") self.kombu_connect = mock.Mock() @@ -719,7 +774,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = oslo_messaging.TransportURL.parse(self.conf, None) - self.connection = rabbit_driver.Connection(self.conf, url) + self.connection = rabbit_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) self.addCleanup(self.connection.close) def test_ensure_four_retry(self): @@ -745,3 +801,59 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): retry=0) self.assertEqual(1, self.kombu_connect.call_count) self.assertEqual(2, mock_callback.call_count) + + +class ConnectionLockTestCase(test_utils.BaseTestCase): + def _thread(self, lock, sleep, heartbeat=False): + def thread_task(): + if heartbeat: + with lock.for_heartbeat(): + time.sleep(sleep) + else: + with lock: + time.sleep(sleep) + + t = threading.Thread(target=thread_task) + t.daemon = True + t.start() + start = time.time() + + def get_elapsed_time(): + t.join() + return time.time() - start + + return get_elapsed_time + + def test_workers_only(self): + l = rabbit_driver.ConnectionLock() + t1 = self._thread(l, 1) + t2 = self._thread(l, 1) + self.assertAlmostEqual(1, t1(), places=1) + self.assertAlmostEqual(2, t2(), places=1) + + def test_worker_and_heartbeat(self): + l = rabbit_driver.ConnectionLock() + t1 = self._thread(l, 1) + t2 = self._thread(l, 1, heartbeat=True) + self.assertAlmostEqual(1, t1(), places=1) + self.assertAlmostEqual(2, t2(), places=1) + + def test_workers_and_heartbeat(self): + l = rabbit_driver.ConnectionLock() + t1 = self._thread(l, 1) + t2 = self._thread(l, 1) + t3 = self._thread(l, 1) + t4 = self._thread(l, 1, heartbeat=True) + t5 = self._thread(l, 1) + self.assertAlmostEqual(1, t1(), places=1) + self.assertAlmostEqual(2, t4(), places=1) + self.assertAlmostEqual(3, t2(), places=1) + self.assertAlmostEqual(4, t3(), places=1) + self.assertAlmostEqual(5, t5(), places=1) + + def test_heartbeat(self): + l = rabbit_driver.ConnectionLock() + t1 = self._thread(l, 1, heartbeat=True) + t2 = self._thread(l, 1) + self.assertAlmostEqual(1, t1(), places=1) + self.assertAlmostEqual(2, t2(), places=1) diff --git a/tests/drivers/test_impl_qpid.py b/tests/drivers/test_impl_qpid.py index d8cd1e7bc..2c2c0a50c 100644 --- a/tests/drivers/test_impl_qpid.py +++ b/tests/drivers/test_impl_qpid.py @@ -27,6 +27,7 @@ import testscenarios import testtools from oslo import messaging +from oslo_messaging._drivers import amqp from oslo_messaging._drivers import impl_qpid as qpid_driver from oslo_messaging.tests import utils as test_utils @@ -564,7 +565,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase): with mock.patch('qpid.messaging.Connection') as conn_mock: # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) - connection = qpid_driver.Connection(self.conf, url) + connection = qpid_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) # reconnect will advance to the next broker, one broker per # attempt, and then wrap to the start of the list once the end is @@ -806,7 +808,8 @@ class QPidHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) - self.connection = qpid_driver.Connection(self.conf, url) + self.connection = qpid_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) self.addCleanup(self.connection.close) self.info.update({'attempt': 0, diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index 8e9b29e43..783afd855 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -27,6 +27,7 @@ import testscenarios from oslo.config import cfg from oslo import messaging from oslo.serialization import jsonutils +from oslo_messaging._drivers import amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import impl_rabbit as rabbit_driver @@ -44,6 +45,8 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase): self.config(fake_rabbit=True, group="oslo_messaging_rabbit") def test_driver_load(self): + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') transport = messaging.get_transport(self.conf) self.addCleanup(transport.cleanup) driver = transport._driver @@ -67,6 +70,8 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase): @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.reset') def test_driver_load(self, fake_ensure, fake_reset): + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') self.messaging_conf.transport_driver = self.transport_driver transport = messaging.get_transport(self.conf) self.addCleanup(transport.cleanup) @@ -83,7 +88,7 @@ class TestRabbitIterconsume(test_utils.BaseTestCase): transport = messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) deadline = time.time() + 3 - with transport._driver._get_connection() as conn: + with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: conn.iterconsume(timeout=3) # kombu memory transport doesn't really raise error # so just simulate a real driver behavior @@ -140,6 +145,8 @@ class TestRabbitTransportURL(test_utils.BaseTestCase): def setUp(self): super(TestRabbitTransportURL, self).setUp() + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') self.messaging_conf.transport_driver = 'rabbit' @mock.patch('oslo_messaging._drivers.impl_rabbit.Connection.ensure') @@ -200,6 +207,8 @@ class TestSendReceive(test_utils.BaseTestCase): cls._timeout) def test_send_receive(self): + self.config(heartbeat_timeout_threshold=0, + group='oslo_messaging_rabbit') transport = messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) @@ -710,7 +719,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = messaging.TransportURL.parse(self.conf, None) - self.connection = rabbit_driver.Connection(self.conf, url) + self.connection = rabbit_driver.Connection(self.conf, url, + amqp.PURPOSE_SEND) self.addCleanup(self.connection.close) def test_ensure_four_retry(self):