Add the mandatory flag for direct send

With this feature, the server will raise and log a Message Undeliverable
exception. So it is possible to log immediately an error in case the
reply queue does not exist for some reason.

This is part of blueprint transport-options
The blueprint link is [1]
Please follow the link [2] to use and test the feature.

1-
https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options
2- https://github.com/Gsantomaggio/rabbitmq-utils/
tree/master/openstack/mandatory_test

Change-Id: Iac7474c06ef425a2afe5bcd912e51510ba1c8fb3
This commit is contained in:
Gabriele 2019-07-22 17:27:21 +02:00
parent b56380654a
commit b7e9faf659
No known key found for this signature in database
GPG Key ID: 2DEE5B9E783BBCFA
2 changed files with 25 additions and 7 deletions

View File

@ -168,6 +168,12 @@ rabbit_opts = [
default=2,
help='How often times during the heartbeat_timeout_threshold '
'we check the heartbeat.'),
cfg.IntOpt('direct_mandatory_flag',
default=True,
help='Enable/Disable the RabbitMQ mandatory flag '
'for direct send. The direct send is used as reply,'
'so the MessageUndeliverable exception is raised'
' in case the client queue does not exist.'),
]
LOG = logging.getLogger(__name__)
@ -492,6 +498,7 @@ class Connection(object):
# if it was already monkey patched by eventlet/greenlet.
global threading
threading = stdlib_threading
self.direct_mandatory_flag = driver_conf.direct_mandatory_flag
if self.ssl:
self.ssl_version = driver_conf.ssl_version
@ -1291,9 +1298,11 @@ class Connection(object):
durable=False,
auto_delete=True,
passive=True)
options = oslo_messaging.TransportOptions(
at_least_once=self.direct_mandatory_flag)
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
exchange, msg, routing_key=msg_id)
exchange, msg, routing_key=msg_id,
transport_options=options)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,
transport_options=None):

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -124,6 +123,7 @@ A simple example of an RPC server with multiple endpoints might be::
import logging
import sys
from oslo_messaging import exceptions
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from oslo_messaging import server as msg_server
from oslo_messaging import transport as msg_transport
@ -178,13 +178,19 @@ class RPCServer(msg_server.MessageHandlingServer):
message.reply(res)
else:
message.reply(failure=failure)
except exceptions.MessageUndeliverable as e:
LOG.exception(
"MessageUndeliverable error, "
"source exception: %s, routing_key: %s, exchange: %s: ",
e.exception, e.routing_key, e.exchange
)
except Exception:
LOG.exception("Can not send reply for message")
finally:
# NOTE(dhellmann): Remove circular object reference
# between the current stack frame and the traceback in
# exc_info.
del failure
# NOTE(dhellmann): Remove circular object reference
# between the current stack frame and the traceback in
# exc_info.
del failure
def get_rpc_server(transport, target, endpoints,
@ -222,6 +228,7 @@ def expected_exceptions(*exceptions):
ExpectedException, which is used internally by the RPC sever. The RPC
client will see the original exception type.
"""
def outer(func):
def inner(*args, **kwargs):
try:
@ -234,7 +241,9 @@ def expected_exceptions(*exceptions):
# ignored and thrown as normal.
except exceptions:
raise rpc_dispatcher.ExpectedException()
return inner
return outer