rabbit: redeclare consumers when ack/requeue fail

In case the acknowledgement or requeue of a message fail,
the kombu transport can be disconnected

In this case, we must redeclare our consumers.

This changes fixes that.

This have no tests because the kombu memory transport we use in our tests
cannot be in disconnected state.

Closes-bug: #1448650

Change-Id: I5991a4cf827411bc27c857561d97461212a17f40
This commit is contained in:
Mehdi Abaakouk 2015-05-05 10:29:22 +02:00 committed by Mehdi Abaakouk
parent 0c954cffa2
commit 415db68b67
2 changed files with 22 additions and 0 deletions

View File

@ -885,6 +885,13 @@ class Connection(object):
exc)
def _consume():
# NOTE(sileht): in case the acknowledgement or requeue of a
# message fail, the kombu transport can be disconnected
# In this case, we must redeclare our consumers, so raise
# a recoverable error to trigger the reconnection code.
if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0]
if self._new_consumers:
for tag, consumer in enumerate(self._consumers):
if consumer in self._new_consumers:

View File

@ -208,6 +208,21 @@ class TestRabbitConsume(test_utils.BaseTestCase):
conn.reset()
self.assertEqual(channel, conn.channel)
def test_connection_ack_have_disconnected_kombu_connection(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
).connection
channel = conn.channel
with mock.patch('kombu.connection.Connection.connected',
new_callable=mock.PropertyMock,
return_value=False):
self.assertRaises(driver_common.Timeout,
conn.consume, timeout=0.01)
# Ensure a new channel have been setuped
self.assertNotEqual(channel, conn.channel)
class TestRabbitTransportURL(test_utils.BaseTestCase):