Fix consuming from unbound reply queue

Consumer declaration consist of the next steps:
  1) declare an exchange
  2) declare a queue
  3) bind the queue to the exchange

Due to reply exchanges are auto-delete, at the
step 3 the exchange can be removed and consumer.declare()
will raise `queue.bind 404 Exchange not found`.

So, in this case the queue is exist and AMQPListener
just call consumer.consume() on the queue and go to
drain_events() despite on the fact that the queue is
unbound.

This change tries to redeclare queue/exchange proactively
each times channel change and just before consuming messages.

Co-Authored-By: Mehdi Abaakouk <sileht@redhat.com>
Closes-Bug: #1609766
Change-Id: Id8b48df3d26675d72955d417ce7622b1e8aa6195
(cherry picked from commit 3f4ce9470b)
This commit is contained in:
kbespalov 2016-08-04 15:18:25 +03:00 committed by Mehdi Abaakouk
parent 70c2a484f9
commit b90580653a
1 changed files with 38 additions and 34 deletions

View File

@ -265,6 +265,7 @@ class Consumer(object):
rabbit_queue_ttl)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
name=exchange_name,
type=type,
@ -273,6 +274,7 @@ class Consumer(object):
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
@ -296,17 +298,41 @@ class Consumer(object):
self.queue.declare()
else:
raise
self._declared_on = conn.channel
def consume(self, tag):
def consume(self, conn, tag):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
Connection.consume() will process the messages,
calling the appropriate callback.
"""
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
nowait=self.nowait)
# Ensure we are on the correct channel before consuming
if conn.channel != self._declared_on:
self.declare(conn)
try:
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
nowait=self.nowait)
except conn.connection.channel_errors as exc:
# We retries once because of some races that we can
# recover before informing the deployer
# bugs.launchpad.net/oslo.messaging/+bug/1581148
# bugs.launchpad.net/oslo.messaging/+bug/1609766
# bugs.launchpad.net/neutron/+bug/1318721
# At any channel error, the RabbitMQ closes
# the channel, but the amqp-lib quietly re-open
# it. So, we must reset all tags and declare
# all consumers again.
conn._new_tags = set(conn._consumers.values())
if exc.code == 404:
self.declare(conn)
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
nowait=self.nowait)
else:
raise
def cancel(self, tag):
LOG.trace('ConsumerBase.cancel: canceling %s', tag)
@ -722,8 +748,6 @@ class Connection(object):
self.set_transport_socket_timeout()
self._set_current_channel(new_channel)
for consumer in self._consumers:
consumer.declare(self)
LOG.info(_LI('Reconnected to AMQP server on '
'%(hostname)s:%(port)s via [%(transport)s] client'),
@ -794,9 +818,11 @@ class Connection(object):
self.channel = new_channel
if (new_channel is not None and
self.purpose == rpc_common.PURPOSE_LISTEN):
self._set_qos(new_channel)
if new_channel is not None:
if self.purpose == rpc_common.PURPOSE_LISTEN:
self._set_qos(new_channel)
for consumer in self._consumers:
consumer.declare(self)
def _set_qos(self, channel):
"""Set QoS prefetch count on the channel"""
@ -989,33 +1015,11 @@ class Connection(object):
if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0]
consume_max_retries = 1
while self._new_tags and consume_max_retries >= 0:
while self._new_tags:
for consumer, tag in self._consumers.items():
if tag in self._new_tags:
try:
consumer.consume(tag=tag)
self._new_tags.remove(tag)
except self.connection.channel_errors as exc:
# NOTE(kbespalov): during the interval between
# a queue declaration and consumer declaration
# the queue can disappear. In this case
# we must redeclare queue and try to re-consume.
# More details is here:
# bugs.launchpad.net/oslo.messaging/+bug/1581148
LOG.debug("Failed to declare consumer: a queue is "
"not exists. Trying to create queue...")
if exc.code == 404 and consume_max_retries:
consumer.declare(self)
# NOTE(kbespalov): the broker closes a channel
# at any channel error. The py-amqp catches
# this situation and re-open a new channel.
# So, we must re-declare all consumers again.
self._new_tags = set(self._consumers.values())
consume_max_retries -= 1
break
else:
raise
consumer.consume(self, tag=tag)
self._new_tags.remove(tag)
poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout))