Don't hold the connection when reply fail
This change moves the reply retry code to upper layer to be able to release the connection while we wait between two retries. In the worse scenario, a client waits for more than 30 replies and died/restart, the server tries to send this 30 replies to this this client and can wait too 60s per replies. During this replies for other clients are just stuck. This change fixes that. Related-bug: #1477914 Closes-bug: #1521958 (cherry picked from commit I0d3c16ea6d2c1da143de4924b3be41d1cea159bd) Conflicts: oslo_messaging/_drivers/amqpdriver.py oslo_messaging/_drivers/impl_rabbit.py Change-Id: I492b82082a372763e60cf06ce0b8135ade7a6e71
This commit is contained in:
parent
b4ec4c4e86
commit
53256e990d
|
@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase']
|
|||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from six import moves
|
||||
|
@ -69,10 +70,38 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
|||
# NOTE(Alexei_987) not sending reply, if msg_id is empty
|
||||
# because reply should not be expected by caller side
|
||||
return
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_amqp.PURPOSE_SEND) as conn:
|
||||
self._send_reply(conn, reply, failure, log_failure=log_failure)
|
||||
self._send_reply(conn, ending=True)
|
||||
|
||||
# NOTE(sileht): we read the configuration value from the driver
|
||||
# to be able to backport this change in previous version that
|
||||
# still have the qpid driver
|
||||
duration = self.listener.driver.missing_destination_retry_timeout
|
||||
timer = rpc_common.DecayingTimer(duration=duration)
|
||||
timer.start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_amqp.PURPOSE_SEND) as conn:
|
||||
self._send_reply(conn, reply, failure,
|
||||
log_failure=log_failure)
|
||||
self._send_reply(conn, ending=True)
|
||||
return
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
if timer.check_return() > 0:
|
||||
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
|
||||
"%(reply_q)s reply queue don't exist, "
|
||||
"retrying...") % {
|
||||
'msg_id': self.msg_id,
|
||||
'reply_q': self.reply_q})
|
||||
time.sleep(0.25)
|
||||
else:
|
||||
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
|
||||
"%(reply_q)s reply queue don't exist after "
|
||||
"%(duration)s sec abandoning...") % {
|
||||
'msg_id': self.msg_id,
|
||||
'reply_q': self.reply_q,
|
||||
'duration': duration})
|
||||
return
|
||||
|
||||
def acknowledge(self):
|
||||
self.listener.msg_id_cache.add(self.unique_id)
|
||||
|
@ -251,6 +280,7 @@ class ReplyWaiter(object):
|
|||
|
||||
|
||||
class AMQPDriverBase(base.BaseDriver):
|
||||
missing_destination_retry_timeout = 0
|
||||
|
||||
def __init__(self, conf, url, connection_pool,
|
||||
default_exchange=None, allowed_remote_exmods=None):
|
||||
|
|
|
@ -452,35 +452,24 @@ class RetryOnMissingExchangePublisher(Publisher):
|
|||
passive = True
|
||||
|
||||
def send(self, conn, msg, timeout=None):
|
||||
# TODO(sileht):
|
||||
# * use timeout parameter when available
|
||||
# * use rpc_timeout if not instead of hardcoded 60
|
||||
# * use @retrying
|
||||
timer = rpc_common.DecayingTimer(duration=60)
|
||||
timer.start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
super(RetryOnMissingExchangePublisher, self).send(conn, msg,
|
||||
timeout)
|
||||
return
|
||||
except conn.connection.channel_errors as exc:
|
||||
# NOTE(noelbk/sileht):
|
||||
# If rabbit dies, the consumer can be disconnected before the
|
||||
# publisher sends, and if the consumer hasn't declared the
|
||||
# queue, the publisher's will send a message to an exchange
|
||||
# that's not bound to a queue, and the message wll be lost.
|
||||
# So we set passive=True to the publisher exchange and catch
|
||||
# the 404 kombu ChannelError and retry until the exchange
|
||||
# appears
|
||||
if exc.code == 404 and timer.check_return() > 0:
|
||||
LOG.info(_LI("The exchange %(exchange)s to send to "
|
||||
"%(routing_key)s doesn't exist yet, "
|
||||
"retrying...") % {
|
||||
'exchange': self.exchange,
|
||||
'routing_key': self.routing_key})
|
||||
time.sleep(1)
|
||||
continue
|
||||
try:
|
||||
super(RetryOnMissingExchangePublisher, self).send(conn, msg,
|
||||
timeout)
|
||||
return
|
||||
except conn.connection.channel_errors as exc:
|
||||
# NOTE(noelbk/sileht):
|
||||
# If rabbit dies, the consumer can be disconnected before the
|
||||
# publisher sends, and if the consumer hasn't declared the
|
||||
# queue, the publisher's will send a message to an exchange
|
||||
# that's not bound to a queue, and the message wll be lost.
|
||||
# So we set passive=True to the publisher exchange and catch
|
||||
# the 404 kombu ChannelError and retry until the exchange
|
||||
# appears
|
||||
if exc.code == 404:
|
||||
raise rpc_amqp.AMQPDestinationNotFound(
|
||||
"exchange %s doesn't exists" %
|
||||
self.exchange.name)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
|
@ -1198,6 +1187,8 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
|
|||
conf.register_opts(rabbit_opts, group=opt_group)
|
||||
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
||||
|
||||
self.missing_destination_retry_timeout = 60
|
||||
|
||||
connection_pool = rpc_amqp.ConnectionPool(
|
||||
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
|
||||
url, Connection)
|
||||
|
|
Loading…
Reference in New Issue