diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index b31fb03f9..74a8ee187 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -397,6 +397,8 @@ class Consumer(object): durable=self.durable, auto_delete=self.exchange_auto_delete) self.enable_cancel_on_failover = enable_cancel_on_failover + self.rabbit_stream_fanout = rabbit_stream_fanout + self.next_stream_offset = "last" def _declare_fallback(self, err, conn, consumer_arguments): """Fallback by declaring a non durable queue. @@ -431,6 +433,13 @@ class Consumer(object): ) self.queue.declare() + def reset_stream_offset(self): + if not self.rabbit_stream_fanout: + return + LOG.warn("Reset consumer for queue %s next offset was at %s.", + self.queue_name, self.next_stream_offset) + self.next_stream_offset = "last" + def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" @@ -439,6 +448,10 @@ class Consumer(object): consumer_arguments = { "x-cancel-on-ha-failover": True} + if self.rabbit_stream_fanout: + consumer_arguments = { + "x-stream-offset": self.next_stream_offset} + self.queue = kombu.entity.Queue( name=self.queue_name, channel=conn.channel, @@ -451,8 +464,13 @@ class Consumer(object): ) try: - LOG.debug('[%s] Queue.declare: %s', - conn.connection_id, self.queue_name) + if self.rabbit_stream_fanout: + LOG.info('[%s] Stream Queue.declare: %s after offset %s', + conn.connection_id, self.queue_name, + self.next_stream_offset) + else: + LOG.debug('[%s] Queue.declare: %s', + conn.connection_id, self.queue_name) try: self.queue.declare() except amqp_ex.PreconditionFailed as err: @@ -572,6 +590,12 @@ class Consumer(object): Messages that are processed and ack'ed. """ + offset = message.headers.get("x-stream-offset") + if offset is not None: + LOG.debug("Stream for %s current offset: %s", self.queue_name, + offset) + self.next_stream_offset = offset + 1 + m2p = getattr(self.queue.channel, 'message_to_python', None) if m2p: message = m2p(message) @@ -1039,11 +1063,24 @@ class Connection(object): info = {'err_str': exc, 'sleep_time': interval} info.update(self._get_connection_info(conn_error=True)) + if 'Basic.cancel' in str(exc): + # This branch allows for consumer offset reset + # in the unlikely case consumers are cancelled. This may + # happen, for example, when we delete the stream queue. + # We need to start consuming from "last" because the stream + # offset maybe reset. + LOG.warn('[%s] Basic.cancel received. ' + 'Resetting consumers offsets to last.', + self.connection_id) + for consumer in self._consumers: + consumer.reset_stream_offset() + if 'Socket closed' in str(exc): LOG.error('[%(connection_id)s] AMQP server' ' %(hostname)s:%(port)s closed' ' the connection. Check login credentials:' ' %(err_str)s', info) + else: LOG.error('[%(connection_id)s] AMQP server on ' '%(hostname)s:%(port)s is unreachable: '