Restore read stream queues from last known offset

When an agent reconnected to a rabbitmq server, it would start
consumming messages from the last offset available in the stream.

This could cause important messages to be lost.

With this patch, oslo_messaging will keep track of the last consummed
offset and restore reading from that point.

Related-bug: #2031497

Change-Id: I449008829b0c0a1a759c211b83f7a99d9c7f2c0d
This commit is contained in:
Guillaume Espanel 2024-01-24 12:11:56 +01:00 committed by Arnaud Morin
parent ebdc7db19e
commit 5988c7bf14
1 changed files with 39 additions and 2 deletions

View File

@ -397,6 +397,8 @@ class Consumer(object):
durable=self.durable,
auto_delete=self.exchange_auto_delete)
self.enable_cancel_on_failover = enable_cancel_on_failover
self.rabbit_stream_fanout = rabbit_stream_fanout
self.next_stream_offset = "last"
def _declare_fallback(self, err, conn, consumer_arguments):
"""Fallback by declaring a non durable queue.
@ -431,6 +433,13 @@ class Consumer(object):
)
self.queue.declare()
def reset_stream_offset(self):
if not self.rabbit_stream_fanout:
return
LOG.warn("Reset consumer for queue %s next offset was at %s.",
self.queue_name, self.next_stream_offset)
self.next_stream_offset = "last"
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
@ -439,6 +448,10 @@ class Consumer(object):
consumer_arguments = {
"x-cancel-on-ha-failover": True}
if self.rabbit_stream_fanout:
consumer_arguments = {
"x-stream-offset": self.next_stream_offset}
self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
@ -451,8 +464,13 @@ class Consumer(object):
)
try:
LOG.debug('[%s] Queue.declare: %s',
conn.connection_id, self.queue_name)
if self.rabbit_stream_fanout:
LOG.info('[%s] Stream Queue.declare: %s after offset %s',
conn.connection_id, self.queue_name,
self.next_stream_offset)
else:
LOG.debug('[%s] Queue.declare: %s',
conn.connection_id, self.queue_name)
try:
self.queue.declare()
except amqp_ex.PreconditionFailed as err:
@ -572,6 +590,12 @@ class Consumer(object):
Messages that are processed and ack'ed.
"""
offset = message.headers.get("x-stream-offset")
if offset is not None:
LOG.debug("Stream for %s current offset: %s", self.queue_name,
offset)
self.next_stream_offset = offset + 1
m2p = getattr(self.queue.channel, 'message_to_python', None)
if m2p:
message = m2p(message)
@ -1039,11 +1063,24 @@ class Connection(object):
info = {'err_str': exc, 'sleep_time': interval}
info.update(self._get_connection_info(conn_error=True))
if 'Basic.cancel' in str(exc):
# This branch allows for consumer offset reset
# in the unlikely case consumers are cancelled. This may
# happen, for example, when we delete the stream queue.
# We need to start consuming from "last" because the stream
# offset maybe reset.
LOG.warn('[%s] Basic.cancel received. '
'Resetting consumers offsets to last.',
self.connection_id)
for consumer in self._consumers:
consumer.reset_stream_offset()
if 'Socket closed' in str(exc):
LOG.error('[%(connection_id)s] AMQP server'
' %(hostname)s:%(port)s closed'
' the connection. Check login credentials:'
' %(err_str)s', info)
else:
LOG.error('[%(connection_id)s] AMQP server on '
'%(hostname)s:%(port)s is unreachable: '