Merge "Restore read stream queues from last known offset"

This commit is contained in:
Zuul 2024-03-19 19:38:41 +00:00 committed by Gerrit Code Review
commit 9b39870819
1 changed files with 39 additions and 2 deletions

View File

@ -397,6 +397,8 @@ class Consumer(object):
durable=self.durable,
auto_delete=self.exchange_auto_delete)
self.enable_cancel_on_failover = enable_cancel_on_failover
self.rabbit_stream_fanout = rabbit_stream_fanout
self.next_stream_offset = "last"
def _declare_fallback(self, err, conn, consumer_arguments):
"""Fallback by declaring a non durable queue.
@ -431,6 +433,13 @@ class Consumer(object):
)
self.queue.declare()
def reset_stream_offset(self):
if not self.rabbit_stream_fanout:
return
LOG.warn("Reset consumer for queue %s next offset was at %s.",
self.queue_name, self.next_stream_offset)
self.next_stream_offset = "last"
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
@ -439,6 +448,10 @@ class Consumer(object):
consumer_arguments = {
"x-cancel-on-ha-failover": True}
if self.rabbit_stream_fanout:
consumer_arguments = {
"x-stream-offset": self.next_stream_offset}
self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
@ -451,8 +464,13 @@ class Consumer(object):
)
try:
LOG.debug('[%s] Queue.declare: %s',
conn.connection_id, self.queue_name)
if self.rabbit_stream_fanout:
LOG.info('[%s] Stream Queue.declare: %s after offset %s',
conn.connection_id, self.queue_name,
self.next_stream_offset)
else:
LOG.debug('[%s] Queue.declare: %s',
conn.connection_id, self.queue_name)
try:
self.queue.declare()
except amqp_ex.PreconditionFailed as err:
@ -572,6 +590,12 @@ class Consumer(object):
Messages that are processed and ack'ed.
"""
offset = message.headers.get("x-stream-offset")
if offset is not None:
LOG.debug("Stream for %s current offset: %s", self.queue_name,
offset)
self.next_stream_offset = offset + 1
m2p = getattr(self.queue.channel, 'message_to_python', None)
if m2p:
message = m2p(message)
@ -1039,11 +1063,24 @@ class Connection(object):
info = {'err_str': exc, 'sleep_time': interval}
info.update(self._get_connection_info(conn_error=True))
if 'Basic.cancel' in str(exc):
# This branch allows for consumer offset reset
# in the unlikely case consumers are cancelled. This may
# happen, for example, when we delete the stream queue.
# We need to start consuming from "last" because the stream
# offset maybe reset.
LOG.warn('[%s] Basic.cancel received. '
'Resetting consumers offsets to last.',
self.connection_id)
for consumer in self._consumers:
consumer.reset_stream_offset()
if 'Socket closed' in str(exc):
LOG.error('[%(connection_id)s] AMQP server'
' %(hostname)s:%(port)s closed'
' the connection. Check login credentials:'
' %(err_str)s', info)
else:
LOG.error('[%(connection_id)s] AMQP server on '
'%(hostname)s:%(port)s is unreachable: '