From 023b7f44e2ccd77a7e9ee9ee78431a4646c88f13 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Mon, 8 Dec 2014 10:56:52 +0100 Subject: [PATCH] rabbit: more precise iterconsume timeout The iterconsume always set the timeout of kombu to 1 second even the requested timeout more precise or < 1 second. This change fixes that. Related bug: #1400268 Related bug: #1399257 Change-Id: I157dab80cdb4afcf9a5f26fa900f96f0696db502 --- oslo/messaging/_drivers/amqpdriver.py | 41 +++++++------------------- oslo/messaging/_drivers/common.py | 26 ++++++++++++++++ oslo/messaging/_drivers/impl_rabbit.py | 21 +++++++------ 3 files changed, 47 insertions(+), 41 deletions(-) diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index b9c25970a..41129e7c2 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -31,30 +31,6 @@ from oslo.messaging._i18n import _LI LOG = logging.getLogger(__name__) -class _DecayingTimer(object): - def __init__(self, duration=None): - self._duration = duration - self._ends_at = None - - def start(self): - if self._duration is not None: - self._ends_at = time.time() + max(0, self._duration) - return self - - def check_return(self, msg_id): - if self._duration is None: - return None - if self._ends_at is None: - raise RuntimeError("Can not check/return a timeout from a timer" - " that has not been started") - left = self._ends_at - time.time() - if left <= 0: - raise messaging.MessagingTimeout('Timed out waiting for a ' - 'reply to message ID %s' - % msg_id) - return left - - class AMQPIncomingMessage(base.IncomingMessage): def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q): @@ -231,6 +207,11 @@ class ReplyWaiter(object): def unlisten(self, msg_id): self.waiters.remove(msg_id) + @staticmethod + def _raise_timeout_exception(msg_id): + raise messaging.MessagingTimeout( + 'Timed out waiting for a reply to message ID %s' % msg_id) + def _process_reply(self, data): result = None ending = False @@ -256,15 +237,15 @@ class ReplyWaiter(object): self.waiters.put(incoming_msg_id, message_data) + timeout = timer.check_return(self._raise_timeout_exception, msg_id) try: - self.conn.consume(limit=1, timeout=timer.check_return(msg_id)) + self.conn.consume(limit=1, timeout=timeout) except rpc_common.Timeout: - raise messaging.MessagingTimeout('Timed out waiting for a ' - 'reply to message ID %s' - % msg_id) + self._raise_timeout_exception(msg_id) def _poll_queue(self, msg_id, timer): - message = self.waiters.get(msg_id, timeout=timer.check_return(msg_id)) + timeout = timer.check_return(self._raise_timeout_exception, msg_id) + message = self.waiters.get(msg_id, timeout=timeout) if message is self.waiters.WAKE_UP: return None, None, True # lock was released @@ -293,7 +274,7 @@ class ReplyWaiter(object): # have the first thread take responsibility for passing replies not # intended for itself to the appropriate thread. # - timer = _DecayingTimer(duration=timeout).start() + timer = rpc_common.DecayingTimer(duration=timeout).start() final_reply = None while True: if self.conn_lock.acquire(False): diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py index 2645fbd39..4d0b7a1e7 100644 --- a/oslo/messaging/_drivers/common.py +++ b/oslo/messaging/_drivers/common.py @@ -18,6 +18,7 @@ import copy import logging import sys +import time import traceback import six @@ -349,3 +350,28 @@ def deserialize_msg(msg): raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) return raw_msg + + +class DecayingTimer(object): + def __init__(self, duration=None): + self._duration = duration + self._ends_at = None + + def start(self): + if self._duration is not None: + self._ends_at = time.time() + max(0, self._duration) + return self + + def check_return(self, timeout_callback, *args, **kwargs): + if self._duration is None: + return None + if self._ends_at is None: + raise RuntimeError("Can not check/return a timeout from a timer" + " that has not been started") + + maximum = kwargs.pop('maximum', None) + left = self._ends_at - time.time() + if left <= 0: + timeout_callback(*args, **kwargs) + + return left if maximum is None else min(left, maximum) diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 2ae5c8374..05219ffcd 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -694,19 +694,15 @@ class Connection(object): def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers.""" - if timeout is None: - deadline = None - else: - deadline = time.time() + timeout + timer = rpc_common.DecayingTimer(duration=timeout).start() - def _raise_timeout_if_deadline_is_reached(exc): - if deadline is not None and deadline - time.time() < 0: - LOG.debug('Timed out waiting for RPC response: %s', exc) - raise rpc_common.Timeout() + def _raise_timeout(exc): + LOG.debug('Timed out waiting for RPC response: %s', exc) + raise rpc_common.Timeout() def _error_callback(exc): self.do_consume = True - _raise_timeout_if_deadline_is_reached(exc) + timer.check_return(_raise_timeout, exc) LOG.exception(_('Failed to consume message from queue: %s'), exc) @@ -718,11 +714,14 @@ class Connection(object): queue.consume(nowait=True) queues_tail.consume(nowait=False) self.do_consume = False + + poll_timeout = 1 if timeout is None else min(timeout, 1) while True: try: - return self.connection.drain_events(timeout=1) + return self.connection.drain_events(timeout=poll_timeout) except socket.timeout as exc: - _raise_timeout_if_deadline_is_reached(exc) + poll_timeout = timer.check_return(_raise_timeout, exc, + maximum=1) for iteration in itertools.count(0): if limit and iteration >= limit: