The executor doesn't need to set the timeout

It's up to the driver to set a suitable timeout for polling the broker,
this one can be different that the one requested by the driver
caller as long as the caller timeout is respected.

This change also adds a new driver listener API, to be able
to stop it cleanly, specially in case of timeout=None.

Closes bug: #1400268
Closes bug: #1399257
Change-Id: I674c0def1efb420c293897d49683593a0b10e291
This commit is contained in:
Mehdi Abaakouk 2014-12-08 10:52:45 +01:00
parent 43a9dc1de5
commit 15aa5cbda8
9 changed files with 54 additions and 34 deletions

View File

@ -17,7 +17,6 @@ __all__ = ['AMQPDriverBase']
import logging
import threading
import time
import uuid
from six import moves
@ -94,6 +93,7 @@ class AMQPListener(base.Listener):
self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = []
self._stopped = threading.Event()
def __call__(self, message):
# FIXME(markmc): logging isn't driver specific
@ -110,23 +110,17 @@ class AMQPListener(base.Listener):
ctxt.reply_q))
def poll(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
else:
deadline = None
while True:
while not self._stopped.is_set():
if self.incoming:
return self.incoming.pop(0)
if deadline is not None:
timeout = deadline - time.time()
if timeout < 0:
return None
try:
self.conn.consume(limit=1, timeout=timeout)
except rpc_common.Timeout:
return None
else:
self.conn.consume(limit=1)
try:
self.conn.consume(limit=1, timeout=timeout)
except rpc_common.Timeout:
return None
def stop(self):
self._stopped.set()
self.conn.stop_consuming()
def cleanup(self):
# Closes listener connection

View File

@ -56,9 +56,15 @@ class Listener(object):
def poll(self, timeout=None):
"""Blocking until a message is pending and return IncomingMessage.
Return None after timeout seconds if timeout is set and no message is
ending.
ending or if the listener have been stopped.
"""
def stop(self):
"""Stop listener.
Stop the listener message polling
"""
pass
def cleanup(self):
"""Cleanup listener.
Close connection used by listener if any. For some listeners like

View File

@ -46,6 +46,7 @@ class FakeListener(base.Listener):
self._exchange_manager = exchange_manager
self._targets = targets
self._pool = pool
self._stopped = threading.Event()
# NOTE(sileht): Ensure that all needed queues exists even the listener
# have not been polled yet
@ -58,7 +59,7 @@ class FakeListener(base.Listener):
deadline = time.time() + timeout
else:
deadline = None
while True:
while not self._stopped.is_set():
for target in self._targets:
exchange = self._exchange_manager.get_exchange(target.exchange)
(ctxt, message, reply_q, requeue) = exchange.poll(target,
@ -77,6 +78,9 @@ class FakeListener(base.Listener):
time.sleep(pause)
return None
def stop(self):
self._stopped.set()
class FakeExchange(object):

View File

@ -460,6 +460,8 @@ class Connection(object):
self.consumers = {}
self.conf = conf
self._consume_loop_stopped = False
self.brokers_params = []
if url.hosts:
for host in url.hosts:
@ -651,8 +653,16 @@ class Connection(object):
LOG.exception(_('Failed to consume message from queue: %s'), exc)
def _consume():
# NOTE(sileht):
# maximun value choosen according the best practice from kombu:
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
poll_timeout = 1 if timeout is None else min(timeout, 1)
while True:
if self._consume_loop_stopped:
self._consume_loop_stopped = False
raise StopIteration
try:
nxt_receiver = self.session.next_receiver(
timeout=poll_timeout)
@ -745,6 +755,9 @@ class Connection(object):
except StopIteration:
return
def stop_consuming(self):
self._consume_loop_stopped = True
class QpidDriver(amqpdriver.AMQPDriverBase):

View File

@ -497,6 +497,7 @@ class Connection(object):
self._initial_pid = os.getpid()
self.do_consume = True
self._consume_loop_stopped = False
self.channel = None
self.connection = kombu.connection.Connection(
@ -715,8 +716,15 @@ class Connection(object):
queues_tail.consume(nowait=False)
self.do_consume = False
# NOTE(sileht):
# maximun value choosen according the best practice from kombu:
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
poll_timeout = 1 if timeout is None else min(timeout, 1)
while True:
if self._consume_loop_stopped:
self._consume_loop_stopped = False
raise StopIteration
try:
return self.connection.drain_events(timeout=poll_timeout)
except socket.timeout as exc:
@ -790,6 +798,9 @@ class Connection(object):
except StopIteration:
return
def stop_consuming(self):
self._consume_loop_stopped = True
class RabbitDriver(amqpdriver.AMQPDriverBase):

View File

@ -16,10 +16,6 @@ import abc
import six
# NOTE(sileht): value choosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
POLL_TIMEOUT = 1
@six.add_metaclass(abc.ABCMeta)
class ExecutorBase(object):

View File

@ -42,7 +42,7 @@ class BlockingExecutor(base.ExecutorBase):
self._running = True
while self._running:
try:
incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
incoming = self.listener.poll()
if incoming is not None:
with self.dispatcher(incoming) as callback:
callback()
@ -51,6 +51,7 @@ class BlockingExecutor(base.ExecutorBase):
def stop(self):
self._running = False
self.listener.stop()
def wait(self):
pass

View File

@ -85,7 +85,7 @@ class EventletExecutor(base.ExecutorBase):
def _executor_thread():
try:
while self._running:
incoming = self.listener.poll(timeout=base.POLL_TIMEOUT)
incoming = self.listener.poll()
if incoming is not None:
spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
@ -99,6 +99,7 @@ class EventletExecutor(base.ExecutorBase):
if self._thread is None:
return
self._running = False
self.listener.stop()
self._thread.cancel()
def wait(self):

View File

@ -39,12 +39,10 @@ class TestExecutor(test_utils.BaseTestCase):
@classmethod
def generate_scenarios(cls):
impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor,
stop_before_return=True))]
impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor))]
if impl_eventlet is not None:
impl.append(
('eventlet', dict(executor=impl_eventlet.EventletExecutor,
stop_before_return=False)))
('eventlet', dict(executor=impl_eventlet.EventletExecutor)))
cls.scenarios = testscenarios.multiply_scenarios(impl)
@staticmethod
@ -72,13 +70,9 @@ class TestExecutor(test_utils.BaseTestCase):
message={'payload': 'data'})
def fake_poll(timeout=None):
if self.stop_before_return:
executor.stop()
if listener.poll.call_count == 1:
return incoming_message
else:
if listener.poll.call_count == 1:
return incoming_message
executor.stop()
executor.stop()
listener.poll.side_effect = fake_poll