Auto-delete the failed quorum rabbit queues
When rabbit is failing for a specific quorum queue, the only thing to
do is to delete the queue (as per rabbit doc, see [1]).
So, to avoid the RPC service to be broken until an operator eventually
do a manual fix on it, catch any INTERNAL ERROR (code 541) and trigger
the deletion of the failed queues under those conditions.
So on next queue declare (triggered from various retries), the queue
will be created again and the service will recover by itself.
Closes-Bug: #2028384
Related-bug: #2031497
[1] https://www.rabbitmq.com/quorum-queues.html#availability
Signed-off-by: Arnaud Morin <arnaud.morin@ovhcloud.com>
Change-Id: Ib8dba833542973091a4e0bf23bb593aca89c5905
(cherry picked from commit 8e3c523fd7
)
This commit is contained in:
parent
29623702fc
commit
34260a4035
|
@ -28,7 +28,7 @@ import time
|
|||
from urllib import parse
|
||||
import uuid
|
||||
|
||||
from amqp import exceptions as amqp_exec
|
||||
from amqp import exceptions as amqp_ex
|
||||
import kombu
|
||||
import kombu.connection
|
||||
import kombu.entity
|
||||
|
@ -415,7 +415,7 @@ class Consumer(object):
|
|||
conn.connection_id, self.queue_name)
|
||||
try:
|
||||
self.queue.declare()
|
||||
except amqp_exec.PreconditionFailed as err:
|
||||
except amqp_ex.PreconditionFailed as err:
|
||||
# NOTE(hberaud): This kind of exception may be triggered
|
||||
# when a control exchange is shared between services and
|
||||
# when services try to create it with configs that differ
|
||||
|
@ -453,6 +453,14 @@ class Consumer(object):
|
|||
'Queue: [%(queue)s], '
|
||||
'error message: [%(err_str)s]', info)
|
||||
time.sleep(interval)
|
||||
if self.queue_arguments.get('x-queue-type') == 'quorum':
|
||||
# Before re-declare queue, try to delete it
|
||||
# This is helping with issue #2028384
|
||||
# NOTE(amorin) we need to make sure the connection is
|
||||
# established again, because when an error occur, the
|
||||
# connection is closed.
|
||||
conn.ensure_connection()
|
||||
self.queue.delete()
|
||||
self.queue.declare()
|
||||
else:
|
||||
raise
|
||||
|
@ -495,6 +503,24 @@ class Consumer(object):
|
|||
nowait=self.nowait)
|
||||
else:
|
||||
raise
|
||||
except amqp_ex.InternalError as exc:
|
||||
if self.queue_arguments.get('x-queue-type') == 'quorum':
|
||||
# Before re-consume queue, try to delete it
|
||||
# This is helping with issue #2028384
|
||||
if exc.code == 541:
|
||||
LOG.warning('Queue %s seems broken, will try delete it '
|
||||
'before starting over.', self.queue.name)
|
||||
# NOTE(amorin) we need to make sure the connection is
|
||||
# established again, because when an error occur, the
|
||||
# connection is closed.
|
||||
conn.ensure_connection()
|
||||
self.queue.delete()
|
||||
self.declare(conn)
|
||||
self.queue.consume(callback=self._callback,
|
||||
consumer_tag=str(tag),
|
||||
nowait=self.nowait)
|
||||
else:
|
||||
raise
|
||||
|
||||
def cancel(self, tag):
|
||||
LOG.trace('ConsumerBase.cancel: canceling %s', tag)
|
||||
|
@ -1204,7 +1230,7 @@ class Connection(object):
|
|||
ConnectionRefusedError,
|
||||
OSError,
|
||||
kombu.exceptions.OperationalError,
|
||||
amqp_exec.ConnectionForced) as exc:
|
||||
amqp_ex.ConnectionForced) as exc:
|
||||
LOG.info("A recoverable connection/channel error "
|
||||
"occurred, trying to reconnect: %s", exc)
|
||||
self.ensure_connection()
|
||||
|
@ -1406,7 +1432,7 @@ class Connection(object):
|
|||
if not (exchange.passive or exchange.name in self._declared_exchanges):
|
||||
try:
|
||||
exchange(self.channel).declare()
|
||||
except amqp_exec.PreconditionFailed as err:
|
||||
except amqp_ex.PreconditionFailed as err:
|
||||
# NOTE(hberaud): This kind of exception may be triggered
|
||||
# when a control exchange is shared between services and
|
||||
# when services try to create it with configs that differ
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
---
|
||||
fixes:
|
||||
- |
|
||||
Auto-delete the failed quorum rabbit queues.
|
||||
When rabbit is failing for a specific quorum queue, delete the queue
|
||||
before trying to recreate it.
|
||||
This may happen if the queue is not recoverable on rabbit side.
|
||||
See https://www.rabbitmq.com/quorum-queues.html#availability for more
|
||||
info on this specific case.
|
Loading…
Reference in New Issue