Rabbit: iterconsume must honor timeout

The iterconsume method of the rabbit driver must
honor timeout parameter when reconnection to the broker
occurs.

Change-Id: I666d818449750c6bae9dde02f519842687a8e4fa
This commit is contained in:
Mehdi Abaakouk 2014-12-01 23:27:46 +01:00
parent bcb3b23b8f
commit 2dd7de989f
2 changed files with 61 additions and 22 deletions

View File

@ -536,14 +536,6 @@ class Connection(object):
# Return the extended behavior or just have the default behavior
return ssl_params or None
def _setup_new_channel(self, new_channel):
"""Callback invoked when the kombu connection have created
a new channel, we use it the reconfigure our consumers.
"""
self.consumer_num = itertools.count(1)
for consumer in self.consumers:
consumer.reconnect(new_channel)
def ensure(self, error_callback, method, retry=None,
timeout_is_error=True):
"""Will retry up to retry number of times.
@ -559,18 +551,23 @@ class Connection(object):
retry = None
def on_error(exc, interval):
self.channel = None
error_callback and error_callback(exc)
interval = (self.conf.kombu_reconnect_delay + interval
if self.conf.kombu_reconnect_delay > 0 else interval)
info = {'hostname': self.connection.hostname,
'port': self.connection.port,
'err_str': exc, 'sleep_time': interval}
if 'Socket closed' in six.text_type(exc):
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
LOG.error(_('AMQP server %(hostname)s:%(port)s closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
LOG.error(_('AMQP server on %(hostname)s:%(port)s is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)
@ -585,10 +582,16 @@ class Connection(object):
# should sufficient, because the underlying kombu transport
# connection object freed.
if self.conf.kombu_reconnect_delay > 0:
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
self.conf.kombu_reconnect_delay)
time.sleep(self.conf.kombu_reconnect_delay)
def on_reconnection(new_channel):
"""Callback invoked when the kombu reconnects and creates
a new channel, we use it the reconfigure our consumers.
"""
self.consumer_num = itertools.count(1)
for consumer in self.consumers:
consumer.reconnect(new_channel)
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
try:
@ -598,12 +601,13 @@ class Connection(object):
errback=on_error,
interval_start=self.interval_start or 1,
interval_step=self.interval_stepping,
on_revive=self._setup_new_channel,
on_revive=on_reconnection,
)
ret, channel = autoretry_method()
self.channel = channel
return ret
except recoverable_errors as exc:
self.channel = None
# NOTE(sileht): number of retry exceeded and the connection
# is still broken
msg = _('Unable to connect to AMQP server on '
@ -624,10 +628,11 @@ class Connection(object):
def reset(self):
"""Reset a connection so it can be used again."""
self.channel.close()
self.channel = self.connection.channel()
if self.channel is not None:
self.channel.close()
self.channel = self.connection.channel()
self.consumers = []
self._setup_new_channel(self.channel)
self.consumer_num = itertools.count(1)
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
@ -650,10 +655,21 @@ class Connection(object):
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers."""
if timeout is None:
deadline = None
else:
deadline = time.time() + timeout
def _raise_timeout_if_deadline_is_reached(exc):
if deadline is not None and deadline - time.time() < 0:
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
def _error_callback(exc):
self.do_consume = True
_raise_timeout_if_deadline_is_reached(exc)
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
self.do_consume = True
def _consume(channel):
if self.do_consume:
@ -663,11 +679,11 @@ class Connection(object):
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
self.do_consume = False
try:
return self.connection.drain_events(timeout=timeout)
except socket.timeout as exc:
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
while True:
try:
return self.connection.drain_events(timeout=1)
except socket.timeout as exc:
_raise_timeout_if_deadline_is_reached(exc)
for iteration in itertools.count(0):
if limit and iteration >= limit:

View File

@ -15,6 +15,7 @@
import datetime
import sys
import threading
import time
import uuid
import fixtures
@ -47,6 +48,28 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
class TestRabbitIterconsume(test_utils.BaseTestCase):
def test_iterconsume_timeout(self):
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup)
deadline = time.time() + 3
with transport._driver._get_connection() as conn:
conn.iterconsume(timeout=3)
# kombu memory transport doesn't really raise error
# so just simulate a real driver behavior
conn.connection.connection.recoverable_channel_errors = (IOError,)
conn.declare_fanout_consumer("notif.info", lambda msg: True)
with mock.patch('kombu.connection.Connection.drain_events',
side_effect=IOError):
try:
conn.consume(timeout=3)
except driver_common.Timeout:
pass
self.assertEqual(0, int(deadline - time.time()))
class TestRabbitTransportURL(test_utils.BaseTestCase):
scenarios = [