diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 48e026f75..d602f0070 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -536,14 +536,6 @@ class Connection(object): # Return the extended behavior or just have the default behavior return ssl_params or None - def _setup_new_channel(self, new_channel): - """Callback invoked when the kombu connection have created - a new channel, we use it the reconfigure our consumers. - """ - self.consumer_num = itertools.count(1) - for consumer in self.consumers: - consumer.reconnect(new_channel) - def ensure(self, error_callback, method, retry=None, timeout_is_error=True): """Will retry up to retry number of times. @@ -559,18 +551,23 @@ class Connection(object): retry = None def on_error(exc, interval): + self.channel = None + error_callback and error_callback(exc) + interval = (self.conf.kombu_reconnect_delay + interval + if self.conf.kombu_reconnect_delay > 0 else interval) + info = {'hostname': self.connection.hostname, 'port': self.connection.port, 'err_str': exc, 'sleep_time': interval} if 'Socket closed' in six.text_type(exc): - LOG.error(_('AMQP server %(hostname)s:%(port)d closed' + LOG.error(_('AMQP server %(hostname)s:%(port)s closed' ' the connection. Check login credentials:' ' %(err_str)s'), info) else: - LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' + LOG.error(_('AMQP server on %(hostname)s:%(port)s is ' 'unreachable: %(err_str)s. Trying again in ' '%(sleep_time)d seconds.'), info) @@ -585,10 +582,16 @@ class Connection(object): # should sufficient, because the underlying kombu transport # connection object freed. if self.conf.kombu_reconnect_delay > 0: - LOG.info(_("Delaying reconnect for %1.1f seconds...") % - self.conf.kombu_reconnect_delay) time.sleep(self.conf.kombu_reconnect_delay) + def on_reconnection(new_channel): + """Callback invoked when the kombu reconnects and creates + a new channel, we use it the reconfigure our consumers. + """ + self.consumer_num = itertools.count(1) + for consumer in self.consumers: + consumer.reconnect(new_channel) + recoverable_errors = (self.connection.recoverable_channel_errors + self.connection.recoverable_connection_errors) try: @@ -598,12 +601,13 @@ class Connection(object): errback=on_error, interval_start=self.interval_start or 1, interval_step=self.interval_stepping, - on_revive=self._setup_new_channel, + on_revive=on_reconnection, ) ret, channel = autoretry_method() self.channel = channel return ret except recoverable_errors as exc: + self.channel = None # NOTE(sileht): number of retry exceeded and the connection # is still broken msg = _('Unable to connect to AMQP server on ' @@ -624,10 +628,11 @@ class Connection(object): def reset(self): """Reset a connection so it can be used again.""" - self.channel.close() - self.channel = self.connection.channel() + if self.channel is not None: + self.channel.close() + self.channel = self.connection.channel() self.consumers = [] - self._setup_new_channel(self.channel) + self.consumer_num = itertools.count(1) def declare_consumer(self, consumer_cls, topic, callback): """Create a Consumer using the class that was passed in and @@ -650,10 +655,21 @@ class Connection(object): def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers.""" + if timeout is None: + deadline = None + else: + deadline = time.time() + timeout + + def _raise_timeout_if_deadline_is_reached(exc): + if deadline is not None and deadline - time.time() < 0: + LOG.debug('Timed out waiting for RPC response: %s', exc) + raise rpc_common.Timeout() + def _error_callback(exc): + self.do_consume = True + _raise_timeout_if_deadline_is_reached(exc) LOG.exception(_('Failed to consume message from queue: %s'), exc) - self.do_consume = True def _consume(channel): if self.do_consume: @@ -663,11 +679,11 @@ class Connection(object): queue.consume(nowait=True) queues_tail.consume(nowait=False) self.do_consume = False - try: - return self.connection.drain_events(timeout=timeout) - except socket.timeout as exc: - LOG.debug('Timed out waiting for RPC response: %s', exc) - raise rpc_common.Timeout() + while True: + try: + return self.connection.drain_events(timeout=1) + except socket.timeout as exc: + _raise_timeout_if_deadline_is_reached(exc) for iteration in itertools.count(0): if limit and iteration >= limit: diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index fb3f90b9d..c15a3c956 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -15,6 +15,7 @@ import datetime import sys import threading +import time import uuid import fixtures @@ -47,6 +48,28 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase): self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver) +class TestRabbitIterconsume(test_utils.BaseTestCase): + + def test_iterconsume_timeout(self): + transport = messaging.get_transport(self.conf, 'kombu+memory:////') + self.addCleanup(transport.cleanup) + deadline = time.time() + 3 + with transport._driver._get_connection() as conn: + conn.iterconsume(timeout=3) + # kombu memory transport doesn't really raise error + # so just simulate a real driver behavior + conn.connection.connection.recoverable_channel_errors = (IOError,) + conn.declare_fanout_consumer("notif.info", lambda msg: True) + with mock.patch('kombu.connection.Connection.drain_events', + side_effect=IOError): + try: + conn.consume(timeout=3) + except driver_common.Timeout: + pass + + self.assertEqual(0, int(deadline - time.time())) + + class TestRabbitTransportURL(test_utils.BaseTestCase): scenarios = [