rabbit: Set timeout on the underlying socket

--
NOTE(mriedem): This is two commits squashed to address a problem with
the rabbitmq heartbeat patch in stable/kilo (since oslo.messaging 1.8.1).
--

rabbit: Remove unused stuffs from publisher

The publisher code is over engineered, it allows to override
everything, but this is never used.

None of the child Class have the same signature, sometimes
the constructor use the parameter name as the parent class but for
a different purpose, that make the code hard to read.

It's was never clear which options is passed to the queue and the
exchange at this end to kombu.

This changes removes all of that stuffs, and only use the kombu
terminology for publisher parameters.

(cherry picked from commit cca84f66d4)

--------------------------------------------

rabbit: Set timeout on the underlying socket

They are some case where the underlying can be stuck
until the system socket timeout is reached, but in oslo.messaging
we very often known that is not needed to wait for ever because
the upper layer (usualy the application) expect to return after
a certain period.

So this change set the timeout on the underlying socket each we can
determine that is not needed to wait more.

Closes-bug: #1436788
Change-Id: Ie71ab8147c56eaf672585da107bec8b22af9da6c
(cherry picked from commit 77f952a1f7)
This commit is contained in:
Mehdi Abaakouk 2015-04-30 16:13:14 +02:00 committed by Matt Riedemann
parent 562c41bb78
commit 2daf4dccc3
1 changed files with 174 additions and 151 deletions

View File

@ -365,111 +365,122 @@ class FanoutConsumer(ConsumerBase):
class Publisher(object):
"""Base Publisher class."""
"""Publisher that silently creates exchange but no queues."""
def __init__(self, channel, exchange_name, routing_key, **kwargs):
passive = False
def __init__(self, conf, exchange_name, routing_key, type, durable,
auto_delete):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
type, durable auto_delete
"""
self.queue_arguments = _get_queue_arguments(conf)
self.exchange_name = exchange_name
self.routing_key = routing_key
self.kwargs = kwargs
self.reconnect(channel)
def reconnect(self, channel):
"""Re-establish the Producer after a rabbit reconnection."""
self.auto_delete = auto_delete
self.durable = durable
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
channel=channel,
routing_key=self.routing_key)
type=type,
exclusive=False,
durable=durable,
auto_delete=auto_delete,
passive=self.passive)
def send(self, msg, timeout=None):
"""Send a message."""
def send(self, conn, msg, timeout=None):
"""Send a message on an channel."""
producer = kombu.messaging.Producer(exchange=self.exchange,
channel=conn.channel,
routing_key=self.routing_key)
headers = {}
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
#
self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
else:
self.producer.publish(msg)
# AMQP TTL is in milliseconds when set in the property.
# Details: http://www.rabbitmq.com/ttl.html#per-message-ttl
# NOTE(sileht): this amqp header doesn't exists ... LP#1444854
headers['ttl'] = timeout * 1000
# NOTE(sileht): no need to wait more, caller expects
# a answer before timeout is reached
transport_timeout = timeout
heartbeat_timeout = conn.driver_conf.heartbeat_timeout_threshold
if (conn._heartbeat_supported_and_enabled() and (
transport_timeout is None or
transport_timeout > heartbeat_timeout)):
# NOTE(sileht): we are supposed to send heartbeat every
# heartbeat_timeout, no need to wait more otherwise will
# disconnect us, so raise timeout earlier ourself
transport_timeout = heartbeat_timeout
with conn._transport_socket_timeout(transport_timeout):
producer.publish(msg, headers=headers)
class DirectPublisher(Publisher):
"""Publisher class for 'direct'."""
def __init__(self, conf, channel, topic, **kwargs):
"""Init a 'direct' publisher.
class DeclareQueuePublisher(Publisher):
"""Publisher that declares a default queue
Kombu options may be passed as keyword args to override defaults
"""
When the exchange is missing instead of silently creating an exchange
not binded to a queue, this publisher creates a default queue
named with the routing_key.
options = {'durable': False,
'auto_delete': True,
'exclusive': False,
'passive': True}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, topic, topic,
type='direct', **options)
This is mainly used to not miss notifications in case of nobody consumes
them yet. If the future consumer binds the default queue it can retrieve
missing messages.
"""
# FIXME(sileht): The side effect of this is that we declare again and
# again the same queue, and generate a lot of useless rabbit traffic.
# https://bugs.launchpad.net/oslo.messaging/+bug/1437902
class TopicPublisher(Publisher):
"""Publisher class for 'topic'."""
def __init__(self, conf, channel, exchange_name, topic, **kwargs):
"""Init a 'topic' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': conf.amqp_durable_queues,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
options.update(kwargs)
super(TopicPublisher, self).__init__(channel,
exchange_name,
topic,
type='topic',
**options)
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'."""
def __init__(self, conf, channel, topic, **kwargs):
"""Init a 'fanout' publisher.
Kombu options may be passed as keyword args to override defaults
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': False}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'."""
def __init__(self, conf, channel, exchange_name, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.auto_delete = kwargs.pop('auto_delete', conf.amqp_auto_delete)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, exchange_name,
topic, **kwargs)
def reconnect(self, channel):
super(NotifyPublisher, self).reconnect(channel)
# NOTE(jerdfelt): Normally the consumer would create the queue, but
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(channel=channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
def send(self, conn, msg, timeout=None):
queue = kombu.entity.Queue(
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
name=self.routing_key,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare()
super(DeclareQueuePublisher, self).send(
conn, msg, timeout)
class RetryOnMissingExchangePublisher(Publisher):
"""Publisher that retry during 60 seconds if the exchange is missing."""
passive = True
def send(self, conn, msg, timeout=None):
# TODO(sileht):
# * use timeout parameter when available
# * use rpc_timeout if not instead of hardcoded 60
# * use @retrying
timer = rpc_common.DecayingTimer(duration=60)
timer.start()
while True:
try:
super(RetryOnMissingExchangePublisher, self).send(conn, msg,
timeout)
return
except conn.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange %(exchange)s to send to "
"%(routing_key)s doesn't exist yet, "
"retrying...") % {
'exchange': self.exchange,
'routing_key': self.routing_key})
time.sleep(1)
continue
raise
class DummyConnectionLock(object):
@ -691,10 +702,14 @@ class Connection(object):
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
self.connection.info())
# NOTE(sileht):
# value choosen according the best practice from kombu:
# NOTE(sileht): value choosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
self._poll_timeout = 1
# For heatbeat, we can set a bigger timeout, and check we receive the
# heartbeat packets regulary
if self._heartbeat_supported_and_enabled():
self._poll_timeout = self._heartbeat_wait_timeout
else:
self._poll_timeout = 1
if self._url.startswith('memory://'):
# Kludge to speed up tests.
@ -916,6 +931,28 @@ class Connection(object):
self._heartbeat_support_log_emitted = True
return False
@contextlib.contextmanager
def _transport_socket_timeout(self, timeout):
# NOTE(sileht): they are some case where the heartbeat check
# or the producer.send return only when the system socket
# timeout if reach. kombu doesn't allow use to customise this
# timeout so for py-amqp we tweak ourself
sock = getattr(self.connection.transport, 'sock', None)
if sock:
orig_timeout = sock.gettimeout()
sock.settimeout(timeout)
yield
if sock:
sock.settimeout(orig_timeout)
def _heartbeat_check(self):
# NOTE(sileht): we are suposed to send at least one heartbeat
# every heartbeat_timeout_threshold, so no need to way more
with self._transport_socket_timeout(
self.driver_conf.heartbeat_timeout_threshold):
self.connection.heartbeat_check(
rate=self.driver_conf.heartbeat_rate)
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
self._heartbeat_exit_event = threading.Event()
@ -944,8 +981,7 @@ class Connection(object):
try:
try:
self.connection.heartbeat_check(
rate=self.driver_conf.heartbeat_rate)
self._heartbeat_check()
# NOTE(sileht): We need to drain event to receive
# heartbeat from the broker but don't hold the
# connection too much times. In amqpdriver a connection
@ -1028,8 +1064,8 @@ class Connection(object):
raise StopIteration
if self._heartbeat_supported_and_enabled():
self.connection.heartbeat_check(
rate=self.driver_conf.heartbeat_rate)
self._heartbeat_check()
try:
return self.connection.drain_events(timeout=poll_timeout)
except socket.timeout as exc:
@ -1044,32 +1080,20 @@ class Connection(object):
recoverable_error_callback=_recoverable_error_callback,
error_callback=_error_callback)
@staticmethod
def _log_publisher_send_error(topic, exc):
log_info = {'topic': topic, 'err_str': exc}
LOG.error(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
LOG.debug('Exception', exc_info=exc)
default_marker = object()
def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
error_callback=default_marker, **kwargs):
def publisher_send(self, publisher, msg, timeout=None, retry=None):
"""Send to a publisher based on the publisher class."""
def _default_error_callback(exc):
self._log_publisher_send_error(topic, exc)
if error_callback is self.default_marker:
error_callback = _default_error_callback
def _error_callback(exc):
log_info = {'topic': publisher.exchange_name, 'err_str': exc}
LOG.error(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
LOG.debug('Exception', exc_info=exc)
def _publish():
publisher = cls(self.driver_conf, self.channel, topic=topic,
**kwargs)
publisher.send(msg, timeout)
publisher.send(self, msg, timeout)
with self._connection_lock:
self.ensure(_publish, retry=retry, error_callback=error_callback)
self.ensure(_publish, retry=retry, error_callback=_error_callback)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@ -1094,49 +1118,48 @@ class Connection(object):
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
timer = rpc_common.DecayingTimer(duration=60)
timer.start()
# NOTE(sileht): retry at least 60sec, after we have a good change
# that the caller is really dead too...
p = RetryOnMissingExchangePublisher(self.driver_conf,
exchange_name=msg_id,
routing_key=msg_id,
type='direct',
durable=False,
auto_delete=True)
while True:
try:
self.publisher_send(DirectPublisher, msg_id, msg,
error_callback=None)
return
except self.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange to reply to %s doesn't "
"exist yet, retrying...") % msg_id)
time.sleep(1)
continue
self._log_publisher_send_error(msg_id, exc)
raise
except Exception as exc:
self._log_publisher_send_error(msg_id, exc)
raise
self.publisher_send(p, msg)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
self.publisher_send(TopicPublisher, topic, msg, timeout,
exchange_name=exchange_name, retry=retry)
p = Publisher(self.driver_conf,
exchange_name=exchange_name,
routing_key=topic,
type='topic',
durable=self.driver_conf.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete)
self.publisher_send(p, msg, timeout, retry=retry)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
p = Publisher(self.driver_conf,
exchange_name='%s_fanout' % topic,
routing_key=None,
type='fanout',
durable=False,
auto_delete=True)
self.publisher_send(p, msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
exchange_name=exchange_name, retry=retry, **kwargs)
p = DeclareQueuePublisher(
self.driver_conf,
exchange_name=exchange_name,
routing_key=topic,
type='topic',
durable=self.driver_conf.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete)
self.publisher_send(p, msg, timeout=None, retry=retry)
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""