Auto-delete the failed quorum rabbit queues

When rabbit is failing for a specific quorum queue, the only thing to
do is to delete the queue (as per rabbit doc, see [1]).

So, to avoid the RPC service to be broken until an operator eventually
do a manual fix on it, catch any INTERNAL ERROR (code 541) and trigger
the deletion of the failed queues under those conditions.
So on next queue declare (triggered from various retries), the queue
will be created again and the service will recover by itself.

Closes-Bug: #2028384
Related-bug: #2031497

[1] https://www.rabbitmq.com/quorum-queues.html#availability

Signed-off-by: Arnaud Morin <arnaud.morin@ovhcloud.com>
Change-Id: Ib8dba833542973091a4e0bf23bb593aca89c5905
This commit is contained in:
Arnaud Morin 2023-07-21 16:51:51 +02:00
parent f23f3276c4
commit 8e3c523fd7
2 changed files with 39 additions and 4 deletions

View File

@ -28,7 +28,7 @@ import time
from urllib import parse
import uuid
from amqp import exceptions as amqp_exec
from amqp import exceptions as amqp_ex
import kombu
import kombu.connection
import kombu.entity
@ -419,7 +419,7 @@ class Consumer(object):
conn.connection_id, self.queue_name)
try:
self.queue.declare()
except amqp_exec.PreconditionFailed as err:
except amqp_ex.PreconditionFailed as err:
# NOTE(hberaud): This kind of exception may be triggered
# when a control exchange is shared between services and
# when services try to create it with configs that differ
@ -457,6 +457,14 @@ class Consumer(object):
'Queue: [%(queue)s], '
'error message: [%(err_str)s]', info)
time.sleep(interval)
if self.queue_arguments.get('x-queue-type') == 'quorum':
# Before re-declare queue, try to delete it
# This is helping with issue #2028384
# NOTE(amorin) we need to make sure the connection is
# established again, because when an error occur, the
# connection is closed.
conn.ensure_connection()
self.queue.delete()
self.queue.declare()
else:
raise
@ -499,6 +507,24 @@ class Consumer(object):
nowait=self.nowait)
else:
raise
except amqp_ex.InternalError as exc:
if self.queue_arguments.get('x-queue-type') == 'quorum':
# Before re-consume queue, try to delete it
# This is helping with issue #2028384
if exc.code == 541:
LOG.warning('Queue %s seems broken, will try delete it '
'before starting over.', self.queue.name)
# NOTE(amorin) we need to make sure the connection is
# established again, because when an error occur, the
# connection is closed.
conn.ensure_connection()
self.queue.delete()
self.declare(conn)
self.queue.consume(callback=self._callback,
consumer_tag=str(tag),
nowait=self.nowait)
else:
raise
def cancel(self, tag):
LOG.trace('ConsumerBase.cancel: canceling %s', tag)
@ -1208,7 +1234,7 @@ class Connection(object):
ConnectionRefusedError,
OSError,
kombu.exceptions.OperationalError,
amqp_exec.ConnectionForced) as exc:
amqp_ex.ConnectionForced) as exc:
LOG.info("A recoverable connection/channel error "
"occurred, trying to reconnect: %s", exc)
self.ensure_connection()
@ -1410,7 +1436,7 @@ class Connection(object):
if not (exchange.passive or exchange.name in self._declared_exchanges):
try:
exchange(self.channel).declare()
except amqp_exec.PreconditionFailed as err:
except amqp_ex.PreconditionFailed as err:
# NOTE(hberaud): This kind of exception may be triggered
# when a control exchange is shared between services and
# when services try to create it with configs that differ

View File

@ -0,0 +1,9 @@
---
fixes:
- |
Auto-delete the failed quorum rabbit queues.
When rabbit is failing for a specific quorum queue, delete the queue
before trying to recreate it.
This may happen if the queue is not recoverable on rabbit side.
See https://www.rabbitmq.com/quorum-queues.html#availability for more
info on this specific case.