rabbit: more precise iterconsume timeout

The iterconsume always set the timeout of kombu to 1 second
even the requested timeout more precise or < 1 second.

This change fixes that.

Related bug: #1400268
Related bug: #1399257
Related-bug: #1338732

(cherry picked from commit 023b7f44e2)

Conflicts:
	oslo/messaging/_drivers/amqpdriver.py
	oslo/messaging/_drivers/impl_rabbit.py

Change-Id: I157dab80cdb4afcf9a5f26fa900f96f0696db502
This commit is contained in:
Mehdi Abaakouk 2014-12-08 10:56:52 +01:00 committed by Edward Hope-Morley
parent 51ebadfe17
commit b581802101
3 changed files with 59 additions and 17 deletions

View File

@ -184,6 +184,11 @@ class ReplyWaiter(object):
def unlisten(self, msg_id):
self.waiters.remove(msg_id)
@staticmethod
def _raise_timeout_exception(msg_id):
raise messaging.MessagingTimeout(
'Timed out waiting for a reply to message ID %s' % msg_id)
def _process_reply(self, data):
result = None
ending = False
@ -198,7 +203,7 @@ class ReplyWaiter(object):
result = data['result']
return result, ending
def _poll_connection(self, msg_id, timeout):
def _poll_connection(self, msg_id, timer):
while True:
while self.incoming:
message_data = self.incoming.pop(0)
@ -209,15 +214,15 @@ class ReplyWaiter(object):
self.waiters.put(incoming_msg_id, message_data)
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
try:
self.conn.consume(limit=1, timeout=timeout)
except rpc_common.Timeout:
raise messaging.MessagingTimeout('Timed out waiting for a '
'reply to message ID %s'
% msg_id)
self._raise_timeout_exception(msg_id)
def _poll_queue(self, msg_id, timeout):
message = self.waiters.get(msg_id, timeout)
def _poll_queue(self, msg_id, timer):
timeout = timer.check_return(self._raise_timeout_exception, msg_id)
message = self.waiters.get(msg_id, timeout=timeout)
if message is self.waiters.WAKE_UP:
return None, None, True # lock was released
@ -246,6 +251,8 @@ class ReplyWaiter(object):
# have the first thread take responsibility for passing replies not
# intended for itself to the appropriate thread.
#
timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
final_reply = None
while True:
if self.conn_lock.acquire(False):
@ -264,7 +271,7 @@ class ReplyWaiter(object):
# Now actually poll the connection
while True:
reply, ending = self._poll_connection(msg_id, timeout)
reply, ending = self._poll_connection(msg_id, timer)
if not ending:
final_reply = reply
else:
@ -277,7 +284,7 @@ class ReplyWaiter(object):
self.waiters.wake_all(msg_id)
else:
# We're going to wait for the first thread to pass us our reply
reply, ending, trylock = self._poll_queue(msg_id, timeout)
reply, ending, trylock = self._poll_queue(msg_id, timer)
if trylock:
# The first thread got its reply, let's try and take over
# the responsibility for polling

View File

@ -18,6 +18,7 @@
import copy
import logging
import sys
import time
import traceback
from oslo.config import cfg
@ -507,3 +508,28 @@ def deserialize_msg(msg):
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg
class DecayingTimer(object):
def __init__(self, duration=None):
self._duration = duration
self._ends_at = None
def start(self):
if self._duration is not None:
self._ends_at = time.time() + max(0, self._duration)
return self
def check_return(self, timeout_callback, *args, **kwargs):
if self._duration is None:
return None
if self._ends_at is None:
raise RuntimeError("Can not check/return a timeout from a timer"
" that has not been started")
maximum = kwargs.pop('maximum', None)
left = self._ends_at - time.time()
if left <= 0:
timeout_callback(*args, **kwargs)
return left if maximum is None else min(left, maximum)

View File

@ -689,16 +689,18 @@ class Connection(object):
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers."""
timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
def _raise_timeout(exc):
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.debug(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
str(exc))
self.do_consume = True
self.do_consume = True
timer.check_return(_raise_timeout, exc)
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
def _consume():
if self.do_consume:
@ -708,7 +710,14 @@ class Connection(object):
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
self.do_consume = False
return self.connection.drain_events(timeout=timeout)
poll_timeout = 1 if timeout is None else min(timeout, 1)
while True:
try:
return self.connection.drain_events(timeout=poll_timeout)
except socket.timeout as exc:
poll_timeout = timer.check_return(_raise_timeout, exc,
maximum=1)
for iteration in itertools.count(0):
if limit and iteration >= limit: