From 7fe2ef7334ce14c2255683c28b01e7d70a4e761f Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 4 Feb 2014 16:06:42 +0100 Subject: [PATCH] Add an optional timeout parameter to Listener.poll For asynchronous programming, a timeout parameter is required on the listener to allow to stop it at exit. poll() returns None on timeout. It plan to use it in my new asyncio (Trollius) executor: https://review.openstack.org/#/c/70983/ See also the related blueprint for the rationale: https://wiki.openstack.org/wiki/Oslo/blueprints/asyncio Change-Id: I918ae3c267743a0eaed1d6a210c79fb4a0eb8733 --- oslo/messaging/_drivers/amqpdriver.py | 15 +++++++++++++-- oslo/messaging/_drivers/base.py | 7 +++++-- oslo/messaging/_drivers/impl_fake.py | 16 ++++++++++++++-- oslo/messaging/_drivers/impl_zmq.py | 9 ++++++--- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index d990e90a7..1f2bf7c5b 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase'] import logging import threading +import time import uuid from six import moves @@ -103,11 +104,21 @@ class AMQPListener(base.Listener): ctxt.msg_id, ctxt.reply_q)) - def poll(self): + def poll(self, timeout=None): + if timeout is not None: + deadline = time.time() + timeout + else: + deadline = None while True: if self.incoming: return self.incoming.pop(0) - self.conn.consume(limit=1) + if deadline is not None: + timeout = deadline - time.time() + if timeout < 0: + return None + self.conn.consume(limit=1, timeout=timeout) + else: + self.conn.consume(limit=1) class ReplyWaiters(object): diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 82b36412e..a35f776bf 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -53,8 +53,11 @@ class Listener(object): self.driver = driver @abc.abstractmethod - def poll(self): - "Blocking until a message is pending and return IncomingMessage." + def poll(self, timeout=None): + """Blocking until a message is pending and return IncomingMessage. + Return None after timeout seconds if timeout is set and no message is + ending. + """ @six.add_metaclass(abc.ABCMeta) diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 2fa0bc494..98a0a2488 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -45,7 +45,11 @@ class FakeListener(base.Listener): self._exchange_manager = exchange_manager self._targets = targets - def poll(self): + def poll(self, timeout=None): + if timeout is not None: + deadline = time.time() + timeout + else: + deadline = None while True: for target in self._targets: exchange = self._exchange_manager.get_exchange(target.exchange) @@ -54,7 +58,15 @@ class FakeListener(base.Listener): message = FakeIncomingMessage(self, ctxt, message, reply_q, requeue) return message - time.sleep(.05) + if deadline is not None: + pause = deadline - time.time() + if pause < 0: + break + pause = min(pause, 0.050) + else: + pause = 0.050 + time.sleep(pause) + return None class FakeExchange(object): diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index 210380afb..3d49427c1 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -879,9 +879,12 @@ class ZmqListener(base.Listener): else: return incoming.received.reply - def poll(self): - while True: - return self.incoming_queue.get() + def poll(self, timeout=None): + try: + return self.incoming_queue.get(timeout=timeout) + except six.moves.queue.Empty: + # timeout + return None class ZmqDriver(base.BaseDriver):