From 53256e990d3632e0120e9a10ede1de9b3b2c9a0a Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 2 Dec 2015 11:38:27 +0100 Subject: [PATCH] Don't hold the connection when reply fail This change moves the reply retry code to upper layer to be able to release the connection while we wait between two retries. In the worse scenario, a client waits for more than 30 replies and died/restart, the server tries to send this 30 replies to this this client and can wait too 60s per replies. During this replies for other clients are just stuck. This change fixes that. Related-bug: #1477914 Closes-bug: #1521958 (cherry picked from commit I0d3c16ea6d2c1da143de4924b3be41d1cea159bd) Conflicts: oslo_messaging/_drivers/amqpdriver.py oslo_messaging/_drivers/impl_rabbit.py Change-Id: I492b82082a372763e60cf06ce0b8135ade7a6e71 --- oslo_messaging/_drivers/amqpdriver.py | 38 +++++++++++++++++--- oslo_messaging/_drivers/impl_rabbit.py | 49 +++++++++++--------------- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index bbc4b7828..36a1eb1d2 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase'] import logging import threading +import time import uuid from six import moves @@ -69,10 +70,38 @@ class AMQPIncomingMessage(base.IncomingMessage): # NOTE(Alexei_987) not sending reply, if msg_id is empty # because reply should not be expected by caller side return - with self.listener.driver._get_connection( - rpc_amqp.PURPOSE_SEND) as conn: - self._send_reply(conn, reply, failure, log_failure=log_failure) - self._send_reply(conn, ending=True) + + # NOTE(sileht): we read the configuration value from the driver + # to be able to backport this change in previous version that + # still have the qpid driver + duration = self.listener.driver.missing_destination_retry_timeout + timer = rpc_common.DecayingTimer(duration=duration) + timer.start() + + while True: + try: + with self.listener.driver._get_connection( + rpc_amqp.PURPOSE_SEND) as conn: + self._send_reply(conn, reply, failure, + log_failure=log_failure) + self._send_reply(conn, ending=True) + return + except rpc_amqp.AMQPDestinationNotFound: + if timer.check_return() > 0: + LOG.info(_LI("The reply %(msg_id)s cannot be sent " + "%(reply_q)s reply queue don't exist, " + "retrying...") % { + 'msg_id': self.msg_id, + 'reply_q': self.reply_q}) + time.sleep(0.25) + else: + LOG.info(_LI("The reply %(msg_id)s cannot be sent " + "%(reply_q)s reply queue don't exist after " + "%(duration)s sec abandoning...") % { + 'msg_id': self.msg_id, + 'reply_q': self.reply_q, + 'duration': duration}) + return def acknowledge(self): self.listener.msg_id_cache.add(self.unique_id) @@ -251,6 +280,7 @@ class ReplyWaiter(object): class AMQPDriverBase(base.BaseDriver): + missing_destination_retry_timeout = 0 def __init__(self, conf, url, connection_pool, default_exchange=None, allowed_remote_exmods=None): diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9716b9cee..8e16a2854 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -452,35 +452,24 @@ class RetryOnMissingExchangePublisher(Publisher): passive = True def send(self, conn, msg, timeout=None): - # TODO(sileht): - # * use timeout parameter when available - # * use rpc_timeout if not instead of hardcoded 60 - # * use @retrying - timer = rpc_common.DecayingTimer(duration=60) - timer.start() - - while True: - try: - super(RetryOnMissingExchangePublisher, self).send(conn, msg, - timeout) - return - except conn.connection.channel_errors as exc: - # NOTE(noelbk/sileht): - # If rabbit dies, the consumer can be disconnected before the - # publisher sends, and if the consumer hasn't declared the - # queue, the publisher's will send a message to an exchange - # that's not bound to a queue, and the message wll be lost. - # So we set passive=True to the publisher exchange and catch - # the 404 kombu ChannelError and retry until the exchange - # appears - if exc.code == 404 and timer.check_return() > 0: - LOG.info(_LI("The exchange %(exchange)s to send to " - "%(routing_key)s doesn't exist yet, " - "retrying...") % { - 'exchange': self.exchange, - 'routing_key': self.routing_key}) - time.sleep(1) - continue + try: + super(RetryOnMissingExchangePublisher, self).send(conn, msg, + timeout) + return + except conn.connection.channel_errors as exc: + # NOTE(noelbk/sileht): + # If rabbit dies, the consumer can be disconnected before the + # publisher sends, and if the consumer hasn't declared the + # queue, the publisher's will send a message to an exchange + # that's not bound to a queue, and the message wll be lost. + # So we set passive=True to the publisher exchange and catch + # the 404 kombu ChannelError and retry until the exchange + # appears + if exc.code == 404: + raise rpc_amqp.AMQPDestinationNotFound( + "exchange %s doesn't exists" % + self.exchange.name) + else: raise @@ -1198,6 +1187,8 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): conf.register_opts(rabbit_opts, group=opt_group) conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) + self.missing_destination_retry_timeout = 60 + connection_pool = rpc_amqp.ConnectionPool( conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, url, Connection)