rabbit: Set timeout on the underlying socket

They are some case where the underlying can be stuck
until the system socket timeout is reached, but in oslo.messaging
we very often known that is not needed to wait for ever because
the upper layer (usualy the application) expect to return after
a certain period.

So this change set the timeout on the underlying socket each we can
determine that is not needed to wait more.

Closes-bug: #1436788
Change-Id: Ie71ab8147c56eaf672585da107bec8b22af9da6c
This commit is contained in:
Mehdi Abaakouk 2015-05-01 13:12:38 +02:00 committed by Mehdi Abaakouk
parent 8af6b2ff82
commit 77f952a1f7
1 changed files with 47 additions and 8 deletions

View File

@ -282,7 +282,21 @@ class Publisher(object):
# NOTE(sileht): this amqp header doesn't exists ... LP#1444854
headers['ttl'] = timeout * 1000
producer.publish(msg, headers=headers)
# NOTE(sileht): no need to wait more, caller expects
# a answer before timeout is reached
transport_timeout = timeout
heartbeat_timeout = conn.driver_conf.heartbeat_timeout_threshold
if (conn._heartbeat_supported_and_enabled() and (
transport_timeout is None or
transport_timeout > heartbeat_timeout)):
# NOTE(sileht): we are supposed to send heartbeat every
# heartbeat_timeout, no need to wait more otherwise will
# disconnect us, so raise timeout earlier ourself
transport_timeout = heartbeat_timeout
with conn._transport_socket_timeout(transport_timeout):
producer.publish(msg, headers=headers)
class DeclareQueuePublisher(Publisher):
@ -580,10 +594,14 @@ class Connection(object):
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)s'),
self.connection.info())
# NOTE(sileht):
# value choosen according the best practice from kombu:
# NOTE(sileht): value choosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
self._poll_timeout = 1
# For heatbeat, we can set a bigger timeout, and check we receive the
# heartbeat packets regulary
if self._heartbeat_supported_and_enabled():
self._poll_timeout = self._heartbeat_wait_timeout
else:
self._poll_timeout = 1
if self._url.startswith('memory://'):
# Kludge to speed up tests.
@ -814,6 +832,28 @@ class Connection(object):
self._heartbeat_support_log_emitted = True
return False
@contextlib.contextmanager
def _transport_socket_timeout(self, timeout):
# NOTE(sileht): they are some case where the heartbeat check
# or the producer.send return only when the system socket
# timeout if reach. kombu doesn't allow use to customise this
# timeout so for py-amqp we tweak ourself
sock = getattr(self.connection.transport, 'sock', None)
if sock:
orig_timeout = sock.gettimeout()
sock.settimeout(timeout)
yield
if sock:
sock.settimeout(orig_timeout)
def _heartbeat_check(self):
# NOTE(sileht): we are suposed to send at least one heartbeat
# every heartbeat_timeout_threshold, so no need to way more
with self._transport_socket_timeout(
self.driver_conf.heartbeat_timeout_threshold):
self.connection.heartbeat_check(
rate=self.driver_conf.heartbeat_rate)
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
self._heartbeat_exit_event = threading.Event()
@ -842,8 +882,7 @@ class Connection(object):
try:
try:
self.connection.heartbeat_check(
rate=self.driver_conf.heartbeat_rate)
self._heartbeat_check()
# NOTE(sileht): We need to drain event to receive
# heartbeat from the broker but don't hold the
# connection too much times. In amqpdriver a connection
@ -927,8 +966,8 @@ class Connection(object):
return
if self._heartbeat_supported_and_enabled():
self.connection.heartbeat_check(
rate=self.driver_conf.heartbeat_rate)
self._heartbeat_check()
try:
self.connection.drain_events(timeout=poll_timeout)
return