Don't hold the connection when reply fail

This change moves the reply retry code to upper layer
to be able to release the connection while we wait between
two retries.

In the worse scenario, a client waits for more than 30 replies
and died/restart, the server tries to send this 30 replies to this
this client and can wait too 60s per replies. During this
replies for other clients are just stuck.

This change fixes that.

Related-bug: #1477914
Closes-bug: #1521958

(cherry picked from commit I0d3c16ea6d2c1da143de4924b3be41d1cea159bd)

Conflicts:
	oslo_messaging/_drivers/amqpdriver.py
	oslo_messaging/_drivers/impl_rabbit.py

Change-Id: I492b82082a372763e60cf06ce0b8135ade7a6e71
This commit is contained in:
Mehdi Abaakouk 2015-12-02 11:38:27 +01:00
parent b4ec4c4e86
commit 53256e990d
2 changed files with 54 additions and 33 deletions

View File

@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase']
import logging
import threading
import time
import uuid
from six import moves
@ -69,10 +70,38 @@ class AMQPIncomingMessage(base.IncomingMessage):
# NOTE(Alexei_987) not sending reply, if msg_id is empty
# because reply should not be expected by caller side
return
with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
# NOTE(sileht): we read the configuration value from the driver
# to be able to backport this change in previous version that
# still have the qpid driver
duration = self.listener.driver.missing_destination_retry_timeout
timer = rpc_common.DecayingTimer(duration=duration)
timer.start()
while True:
try:
with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure,
log_failure=log_failure)
self._send_reply(conn, ending=True)
return
except rpc_amqp.AMQPDestinationNotFound:
if timer.check_return() > 0:
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
"%(reply_q)s reply queue don't exist, "
"retrying...") % {
'msg_id': self.msg_id,
'reply_q': self.reply_q})
time.sleep(0.25)
else:
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
"%(reply_q)s reply queue don't exist after "
"%(duration)s sec abandoning...") % {
'msg_id': self.msg_id,
'reply_q': self.reply_q,
'duration': duration})
return
def acknowledge(self):
self.listener.msg_id_cache.add(self.unique_id)
@ -251,6 +280,7 @@ class ReplyWaiter(object):
class AMQPDriverBase(base.BaseDriver):
missing_destination_retry_timeout = 0
def __init__(self, conf, url, connection_pool,
default_exchange=None, allowed_remote_exmods=None):

View File

@ -452,35 +452,24 @@ class RetryOnMissingExchangePublisher(Publisher):
passive = True
def send(self, conn, msg, timeout=None):
# TODO(sileht):
# * use timeout parameter when available
# * use rpc_timeout if not instead of hardcoded 60
# * use @retrying
timer = rpc_common.DecayingTimer(duration=60)
timer.start()
while True:
try:
super(RetryOnMissingExchangePublisher, self).send(conn, msg,
timeout)
return
except conn.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange %(exchange)s to send to "
"%(routing_key)s doesn't exist yet, "
"retrying...") % {
'exchange': self.exchange,
'routing_key': self.routing_key})
time.sleep(1)
continue
try:
super(RetryOnMissingExchangePublisher, self).send(conn, msg,
timeout)
return
except conn.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404:
raise rpc_amqp.AMQPDestinationNotFound(
"exchange %s doesn't exists" %
self.exchange.name)
else:
raise
@ -1198,6 +1187,8 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf.register_opts(rabbit_opts, group=opt_group)
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
self.missing_destination_retry_timeout = 60
connection_pool = rpc_amqp.ConnectionPool(
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
url, Connection)