Merge "Enable use of quorum queues for transient messages"

This commit is contained in:
Zuul 2024-01-11 22:19:53 +00:00 committed by Gerrit Code Review
commit 875506fff0
2 changed files with 61 additions and 15 deletions

View File

@ -165,9 +165,16 @@ rabbit_opts = [
'Raft consensus algorithm. It is available as of '
'RabbitMQ 3.8.0. If set this option will conflict with '
'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
'in other words the HA queues should be disabled, quorum '
'queues durable by default so the amqp_durable_queues '
'opion is ignored when this option enabled.'),
'in other words the HA queues should be disabled. '
'Quorum queues are also durable by default so the '
'amqp_durable_queues option is ignored when this option is '
'enabled.'),
cfg.BoolOpt('rabbit_transient_quorum_queue',
default=False,
help='Use quorum queues for transients queues in RabbitMQ. '
'Enabling this option will then make sure those queues are '
'also using quorum kind of rabbit queues, which are HA by '
'default.'),
cfg.IntOpt('rabbit_quorum_delivery_limit',
default=0,
help='Each time a message is redelivered to a consumer, '
@ -639,6 +646,8 @@ class Connection(object):
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
self.rabbit_quorum_queue_config = self._get_quorum_configurations(
driver_conf)
self.rabbit_transient_quorum_queue = \
driver_conf.rabbit_transient_quorum_queue
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@ -1351,7 +1360,9 @@ class Connection(object):
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover)
enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_transient_quorum_queue,
rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer)
@ -1386,13 +1397,15 @@ class Connection(object):
queue_name=queue_name,
routing_key=topic,
type='fanout',
durable=False,
durable=self.rabbit_transient_quorum_queue,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover)
enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_transient_quorum_queue,
rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
self.declare_consumer(consumer)
@ -1538,11 +1551,12 @@ class Connection(object):
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
exchange = kombu.entity.Exchange(name='', # using default exchange
type='direct',
durable=False,
auto_delete=True,
passive=True)
exchange = kombu.entity.Exchange(
name='', # using default exchange
type='direct',
durable=self.rabbit_transient_quorum_queue,
auto_delete=True,
passive=True)
options = oslo_messaging.TransportOptions(
at_least_once=self.direct_mandatory_flag)
@ -1569,10 +1583,11 @@ class Connection(object):
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
exchange = kombu.entity.Exchange(name='%s_fanout' % topic,
type='fanout',
durable=False,
auto_delete=True)
exchange = kombu.entity.Exchange(
name='%s_fanout' % topic,
type='fanout',
durable=self.rabbit_transient_quorum_queue,
auto_delete=True)
LOG.debug('Sending fanout to %s_fanout', topic)
self._ensure_publishing(self._publish, exchange, msg, retry=retry)

View File

@ -0,0 +1,31 @@
---
features:
- |
Add an option to enable transient queues to use quorum.
Transient queues in OpenStack are not so transient, they live the whole
process lifetime (e.g. until you restart a service, like nova-compute).
Transient here means they belong to a specific process, compared to
regular queues which may be used by more processes.
Usually, transients queues are the "fanout" and "reply" queues.
By default, without any rabbitmq policy tuning, they are not durable
neither highly available.
By enabling quorum for transients, oslo.messaging will declare quorum
queues instead of classic on rabbitmq. As a result, those queues will
automatically become HA and durable.
Note that this may have an impact on your cluster, as rabbit will need
more cpu, ram and network bandwith to manage the queues. This was tested
at pretty large scale (2k hypervisors) with a cluster of 5 nodes.
Also note that the current rabbitmq implementation rely on a fixed number
of "erlang atom" (5M by default), and one atom is consumed each time a
quorum queue is created with a different name. If your deployment is doing
a lot of queue deletion/creation, you may consume all your atoms quicker.
When enabling quorum for transients, you may also want to update your
rabbitmq policies accordingly (e.g. make sure they apply on quorum).
This option will stay disabled by default for now but may become the
default in the future.