Add an option to use rabbitmq stream for fanout queues

This is introducing the "stream" queues for fanout so all components
relying on fanout can use the same stream, lowering the number of queues
needed and leveraging the new "stream" type of queues from rabbitmq.

Closes-Bug: #2031497

Change-Id: I5056a19aada9143bcd80aaf064ced8cad441e6eb
Signed-off-by: Arnaud Morin <arnaud.morin@ovhcloud.com>
This commit is contained in:
julien.cosmao 2023-07-25 15:42:22 +02:00 committed by Arnaud Morin
parent 4614132ad0
commit e95f334459
2 changed files with 71 additions and 18 deletions

View File

@ -251,6 +251,14 @@ rabbit_opts = [
cfg.StrOpt('processname', cfg.StrOpt('processname',
default=os.path.basename(sys.argv[0]), default=os.path.basename(sys.argv[0]),
help='Process name used by queue manager'), help='Process name used by queue manager'),
cfg.BoolOpt('rabbit_stream_fanout',
default=False,
help='Use stream queues in RabbitMQ (x-queue-type: stream). '
'The stream queue is a modern queue type for RabbitMQ '
'implementing a durable, replicated FIFO queue based on the '
'Raft consensus algorithm. It is available as of '
'RabbitMQ 3.8.0. If set this option will replace all fanout '
'queues with only one stream queue.'),
] ]
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -258,7 +266,8 @@ LOG = logging.getLogger(__name__)
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
rabbit_quorum_queue, rabbit_quorum_queue,
rabbit_quorum_queue_config): rabbit_quorum_queue_config,
rabbit_stream_fanout):
"""Construct the arguments for declaring a queue. """Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@ -295,11 +304,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
rabbit_quorum_queue_config: rabbit_quorum_queue_config:
Quorum queues provides three options to handle message poisoning Quorum queues provides three options to handle message poisoning
and limit the resources the qourum queue can use and limit the resources the quorum queue can use
x-delivery-limit number of times the queue will try to deliver x-delivery-limit number of times the queue will try to deliver
a message before it decide to discard it a message before it decide to discard it
x-max-in-memory-length, x-max-in-memory-bytes control the size x-max-in-memory-length, x-max-in-memory-bytes control the size
of memory used by quorum queue of memory used by quorum queue
If the rabbit_stream_fanout option is set, fanout queues are going to use
stream instead of quorum queues. See here:
https://www.rabbitmq.com/streams.html
""" """
args = {} args = {}
@ -326,6 +339,12 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
if rabbit_queue_ttl > 0: if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000 args['x-expires'] = rabbit_queue_ttl * 1000
if rabbit_stream_fanout:
args = {'x-queue-type': 'stream'}
if rabbit_queue_ttl > 0:
# max-age is a string
args['x-max-age'] = f"{rabbit_queue_ttl}s"
return args return args
@ -352,7 +371,8 @@ class Consumer(object):
exchange_auto_delete, queue_auto_delete, callback, exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
enable_cancel_on_failover=False, rabbit_quorum_queue=False, enable_cancel_on_failover=False, rabbit_quorum_queue=False,
rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)): rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0),
rabbit_stream_fanout=False):
"""Init the Consumer class with the exchange_name, routing_key, """Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete type, durable auto_delete
""" """
@ -368,7 +388,7 @@ class Consumer(object):
rabbit_quorum_queue_config = rabbit_quorum_queue_config or {} rabbit_quorum_queue_config = rabbit_quorum_queue_config or {}
self.queue_arguments = _get_queue_arguments( self.queue_arguments = _get_queue_arguments(
rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue,
rabbit_quorum_queue_config) rabbit_quorum_queue_config, rabbit_stream_fanout)
self.queue = None self.queue = None
self._declared_on = None self._declared_on = None
self.exchange = kombu.entity.Exchange( self.exchange = kombu.entity.Exchange(
@ -657,6 +677,7 @@ class Connection(object):
driver_conf) driver_conf)
self.rabbit_transient_quorum_queue = \ self.rabbit_transient_quorum_queue = \
driver_conf.rabbit_transient_quorum_queue driver_conf.rabbit_transient_quorum_queue
self.rabbit_stream_fanout = driver_conf.rabbit_stream_fanout
self.rabbit_transient_queues_ttl = \ self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@ -676,6 +697,17 @@ class Connection(object):
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
self.use_queue_manager = driver_conf.use_queue_manager self.use_queue_manager = driver_conf.use_queue_manager
if self.rabbit_stream_fanout and self.rabbit_qos_prefetch_count <= 0:
raise RuntimeError('Configuration Error: rabbit_stream_fanout '
'need rabbit_qos_prefetch_count to be set to '
'a value greater than 0.')
if (self.rabbit_stream_fanout and not
self.rabbit_transient_quorum_queue):
raise RuntimeError('Configuration Error: rabbit_stream_fanout '
'need rabbit_transient_quorum_queue to be set '
'to true.')
if self.heartbeat_in_pthread: if self.heartbeat_in_pthread:
# NOTE(hberaud): Experimental: threading module is in use to run # NOTE(hberaud): Experimental: threading module is in use to run
# the rabbitmq health check heartbeat. in some situation like # the rabbitmq health check heartbeat. in some situation like
@ -1121,11 +1153,20 @@ class Connection(object):
"""Close/release this connection.""" """Close/release this connection."""
self._heartbeat_stop() self._heartbeat_stop()
if self.connection: if self.connection:
for consumer in filter(lambda c: c.type == 'fanout', # NOTE(jcosmao) Delete queue should be called only when queue name
self._consumers): # is randomized. When using streams, queue is shared between
LOG.debug('[connection close] Deleting fanout ' # all consumers, thus deleting fanout queue will force all other
'queue: %s ' % consumer.queue.name) # consumers to disconnect/reconnect by throwing
consumer.queue.delete() # amqp.exceptions.ConsumerCancelled.
# When using QManager, queue name is consistent accross agent
# restart, so we don't need to delete it either. Deletion must be
# handled by expiration policy.
if not self.rabbit_stream_fanout and not self.use_queue_manager:
for consumer in filter(lambda c: c.type == 'fanout',
self._consumers):
LOG.debug('[connection close] Deleting fanout '
'queue: %s ' % consumer.queue.name)
consumer.queue.delete()
self._set_current_channel(None) self._set_current_channel(None)
self.connection.release() self.connection.release()
self.connection = None self.connection = None
@ -1371,7 +1412,7 @@ class Connection(object):
queue_name=topic, queue_name=topic,
routing_key='', routing_key='',
type='direct', type='direct',
durable=False, durable=self.rabbit_transient_quorum_queue,
exchange_auto_delete=False, exchange_auto_delete=False,
queue_auto_delete=False, queue_auto_delete=False,
callback=callback, callback=callback,
@ -1405,20 +1446,26 @@ class Connection(object):
def declare_fanout_consumer(self, topic, callback): def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer.""" """Create a 'fanout' consumer."""
if self._q_manager:
unique = self._q_manager.get()
else:
unique = uuid.uuid4().hex
exchange_name = '%s_fanout' % topic exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique) if self.rabbit_stream_fanout:
queue_name = '%s_fanout' % topic
else:
if self._q_manager:
unique = self._q_manager.get()
else:
unique = uuid.uuid4().hex
queue_name = '%s_fanout_%s' % (topic, unique)
LOG.info('Creating fanout queue: %s', queue_name) LOG.info('Creating fanout queue: %s', queue_name)
is_durable = (self.rabbit_transient_quorum_queue or
self.rabbit_stream_fanout)
consumer = Consumer( consumer = Consumer(
exchange_name=exchange_name, exchange_name=exchange_name,
queue_name=queue_name, queue_name=queue_name,
routing_key=topic, routing_key=topic,
type='fanout', type='fanout',
durable=self.rabbit_transient_quorum_queue, durable=is_durable,
exchange_auto_delete=True, exchange_auto_delete=True,
queue_auto_delete=False, queue_auto_delete=False,
callback=callback, callback=callback,
@ -1426,7 +1473,8 @@ class Connection(object):
rabbit_queue_ttl=self.rabbit_transient_queues_ttl, rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover, enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_transient_quorum_queue, rabbit_quorum_queue=self.rabbit_transient_quorum_queue,
rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) rabbit_quorum_queue_config=self.rabbit_quorum_queue_config,
rabbit_stream_fanout=self.rabbit_stream_fanout)
self.declare_consumer(consumer) self.declare_consumer(consumer)
@ -1533,7 +1581,8 @@ class Connection(object):
self.rabbit_ha_queues, self.rabbit_ha_queues,
0, 0,
self.rabbit_quorum_queue, self.rabbit_quorum_queue,
self.rabbit_quorum_queue_config)) self.rabbit_quorum_queue_config,
False))
log_info = {'key': routing_key, 'exchange': exchange} log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace( LOG.trace(
'Connection._publish_and_creates_default_queue: ' 'Connection._publish_and_creates_default_queue: '

View File

@ -0,0 +1,4 @@
---
features:
- |
Add an option to use stream queues for rabbitmq driver instead of fanouts.