qpid: honor iterconsume timeout

The qpid driver must always honor the timeout passed the iterconsume
method, this change fixes that.

Related bug: #1400268
Related bug: #1399257

Change-Id: I8f267fc8b5a7abc852f0caf84d1e7c2c342ba951
This commit is contained in:
Mehdi Abaakouk 2014-12-08 10:28:12 +01:00
parent 023b7f44e2
commit 43a9dc1de5
1 changed files with 21 additions and 8 deletions

View File

@ -640,20 +640,33 @@ class Connection(object):
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers."""
timer = rpc_common.DecayingTimer(duration=timeout).start()
def _raise_timeout(exc):
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
timer.check_return(_raise_timeout, exc)
LOG.exception(_('Failed to consume message from queue: %s'), exc)
def _consume():
nxt_receiver = self.session.next_receiver(timeout=timeout)
poll_timeout = 1 if timeout is None else min(timeout, 1)
while True:
try:
nxt_receiver = self.session.next_receiver(
timeout=poll_timeout)
except qpid_exceptions.Empty as exc:
poll_timeout = timer.check_return(_raise_timeout, exc,
maximum=1)
else:
break
try:
self._lookup_consumer(nxt_receiver).consume()
except Exception:
LOG.exception(_("Error processing message. Skipping it."))
LOG.exception(_("Error processing message. "
"Skipping it."))
for iteration in itertools.count(0):
if limit and iteration >= limit: