Merge "Add an optional timeout parameter to Listener.poll"

This commit is contained in:
Jenkins 2014-06-20 18:50:16 +00:00 committed by Gerrit Code Review
commit a64692689a
4 changed files with 38 additions and 9 deletions

View File

@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase']
import logging
import threading
import time
import uuid
from six import moves
@ -103,11 +104,21 @@ class AMQPListener(base.Listener):
ctxt.msg_id,
ctxt.reply_q))
def poll(self):
def poll(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
else:
deadline = None
while True:
if self.incoming:
return self.incoming.pop(0)
self.conn.consume(limit=1)
if deadline is not None:
timeout = deadline - time.time()
if timeout < 0:
return None
self.conn.consume(limit=1, timeout=timeout)
else:
self.conn.consume(limit=1)
class ReplyWaiters(object):

View File

@ -53,8 +53,11 @@ class Listener(object):
self.driver = driver
@abc.abstractmethod
def poll(self):
"Blocking until a message is pending and return IncomingMessage."
def poll(self, timeout=None):
"""Blocking until a message is pending and return IncomingMessage.
Return None after timeout seconds if timeout is set and no message is
ending.
"""
@six.add_metaclass(abc.ABCMeta)

View File

@ -45,7 +45,11 @@ class FakeListener(base.Listener):
self._exchange_manager = exchange_manager
self._targets = targets
def poll(self):
def poll(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
else:
deadline = None
while True:
for target in self._targets:
exchange = self._exchange_manager.get_exchange(target.exchange)
@ -54,7 +58,15 @@ class FakeListener(base.Listener):
message = FakeIncomingMessage(self, ctxt, message,
reply_q, requeue)
return message
time.sleep(.05)
if deadline is not None:
pause = deadline - time.time()
if pause < 0:
break
pause = min(pause, 0.050)
else:
pause = 0.050
time.sleep(pause)
return None
class FakeExchange(object):

View File

@ -879,9 +879,12 @@ class ZmqListener(base.Listener):
else:
return incoming.received.reply
def poll(self):
while True:
return self.incoming_queue.get()
def poll(self, timeout=None):
try:
return self.incoming_queue.get(timeout=timeout)
except six.moves.queue.Empty:
# timeout
return None
class ZmqDriver(base.BaseDriver):