rabbit: redeclare consumers when ack/requeue fail
In case the acknowledgement or requeue of a message fail,
the kombu transport can be disconnected
In this case, we must redeclare our consumers.
This changes fixes that.
This have no tests because the kombu memory transport we use in our tests
cannot be in disconnected state.
Closes-bug: #1448650
(cherry picked from commit 415db68b67
)
Conflicts are due to the refactoring to oslo_messaging namespace.
Conflicts:
oslo_messaging/_drivers/impl_rabbit.py
oslo_messaging/tests/drivers/test_impl_rabbit.py
Change-Id: I5991a4cf827411bc27c857561d97461212a17f40
This commit is contained in:
parent
3ac14ae548
commit
b6b6edca46
|
@ -733,6 +733,13 @@ class Connection(object):
|
|||
self.do_consume = True
|
||||
|
||||
def _consume():
|
||||
# NOTE(sileht): in case the acknowledgement or requeue of a
|
||||
# message fail, the kombu transport can be disconnected
|
||||
# In this case, we must redeclare our consumers, so raise
|
||||
# a recoverable error to trigger the reconnection code.
|
||||
if not self.connection.connected:
|
||||
raise self.connection.recoverable_connection_errors[0]
|
||||
|
||||
if self.do_consume:
|
||||
queues_head = self.consumers[:-1] # not fanout.
|
||||
queues_tail = self.consumers[-1] # fanout
|
||||
|
|
|
@ -45,6 +45,27 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
|
|||
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
|
||||
|
||||
|
||||
class TestRabbitConsume(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRabbitConsume, self).setUp()
|
||||
self.messaging_conf.transport_driver = 'rabbit'
|
||||
self.messaging_conf.in_memory = True
|
||||
|
||||
def test_connection_ack_have_disconnected_kombu_connection(self):
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.addCleanup(transport.cleanup)
|
||||
conn = transport._driver._get_connection().connection
|
||||
channel = conn.channel
|
||||
with mock.patch('kombu.connection.Connection.connected',
|
||||
new_callable=mock.PropertyMock,
|
||||
return_value=False):
|
||||
self.assertRaises(driver_common.Timeout,
|
||||
conn.consume, timeout=0.01)
|
||||
# Ensure a new channel have been setuped
|
||||
self.assertNotEqual(channel, conn.channel)
|
||||
|
||||
|
||||
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
|
|
Loading…
Reference in New Issue