diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 5cd17d0bd..8c3db9c93 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -31,6 +31,30 @@ from oslo.messaging._i18n import _LI LOG = logging.getLogger(__name__) +class _DecayingTimer(object): + def __init__(self, duration=None): + self._duration = duration + self._ends_at = None + + def start(self): + if self._duration is not None: + self._ends_at = time.time() + max(0, self._duration) + return self + + def check_return(self, msg_id): + if self._duration is None: + return None + if self._ends_at is None: + raise RuntimeError("Can not check/return a timeout from a timer" + " that has not been started") + left = self._ends_at - time.time() + if left <= 0: + raise messaging.MessagingTimeout('Timed out waiting for a ' + 'reply to message ID %s' + % msg_id) + return left + + class AMQPIncomingMessage(base.IncomingMessage): def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q): @@ -221,7 +245,7 @@ class ReplyWaiter(object): result = data['result'] return result, ending - def _poll_connection(self, msg_id, timeout): + def _poll_connection(self, msg_id, timer): while True: while self.incoming: message_data = self.incoming.pop(0) @@ -233,14 +257,14 @@ class ReplyWaiter(object): self.waiters.put(incoming_msg_id, message_data) try: - self.conn.consume(limit=1, timeout=timeout) + self.conn.consume(limit=1, timeout=timer.check_return(msg_id)) except rpc_common.Timeout: raise messaging.MessagingTimeout('Timed out waiting for a ' 'reply to message ID %s' % msg_id) - def _poll_queue(self, msg_id, timeout): - message = self.waiters.get(msg_id, timeout) + def _poll_queue(self, msg_id, timer): + message = self.waiters.get(msg_id, timeout=timer.check_return(msg_id)) if message is self.waiters.WAKE_UP: return None, None, True # lock was released @@ -269,6 +293,7 @@ class ReplyWaiter(object): # have the first thread take responsibility for passing replies not # intended for itself to the appropriate thread. # + timer = _DecayingTimer(duration=timeout).start() final_reply = None while True: if self.conn_lock.acquire(False): @@ -287,7 +312,7 @@ class ReplyWaiter(object): # Now actually poll the connection while True: - reply, ending = self._poll_connection(msg_id, timeout) + reply, ending = self._poll_connection(msg_id, timer) if not ending: final_reply = reply else: @@ -300,7 +325,7 @@ class ReplyWaiter(object): self.waiters.wake_all(msg_id) else: # We're going to wait for the first thread to pass us our reply - reply, ending, trylock = self._poll_queue(msg_id, timeout) + reply, ending, trylock = self._poll_queue(msg_id, timer) if trylock: # The first thread got its reply, let's try and take over # the responsibility for polling