rabbit: restore synchronous ack/requeue

Note this change also contains the fix for the regression it
introduced.

In https://review.openstack.org/#/c/436958, we fix a thread safety
issue. But we make the ack/requeue of message asynchronous. In nominal
case, it works, but if network/rabbit connection issue occurs this
can result to rpc call handle twice. By chance we double check already
processed message ids, and drop duplicates, but that if the message
goes to another node, the mitigation won't work.

This restore the previous behavior, to ensure we run application
callback of rpc.call/rpc.cast only when the message have been
successfully ack.

(cherry picked from commit da02bc2169)

Fix rabbitmq driver with blocking executor

We recently move ack/requeue of messages in main/polling thread
of rabbitmq drivers. And break the blocking executor.

This one is not tested by any tests and now deprecated.

This change workaround the issue until we completely remove the
blocking executor.

Closes-bug: #1694728
(cherry picked from commit 8ee5ae135a)

Change-Id: I62b9e09513e3ebfebc64a941d4b21b6c053b511d
This commit is contained in:
Mehdi Abaakouk 2017-05-10 09:19:38 +02:00 committed by Mehdi Abaakouk
parent e869159278
commit c41f0ce709
2 changed files with 107 additions and 22 deletions

View File

@ -42,20 +42,72 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0
def do_pending_tasks(tasks):
while True:
try:
task = tasks.get(block=False)
except moves.queue.Empty:
break
else:
class MessageOperationsHandler(object):
"""Queue used by message operations to ensure that all tasks are
serialized and run in the same thread, since underlying drivers like kombu
are not thread safe.
"""
def __init__(self, name):
self.name = "%s (%s)" % (name, hex(id(self)))
self._tasks = moves.queue.Queue()
self._shutdown = threading.Event()
self._shutdown_thread = threading.Thread(
target=self._process_in_background)
self._shutdown_thread.daemon = True
# HACK(sileht): this is set by the server.Server temporary
# to not have to rewrite the entire internal API to pass
# executor everywhere to make Listener aware of the server
# executor. All this hack is only for the blocking executor.
# And it's deprecated so...
self._executor = None
def stop(self):
self._shutdown.set()
def process_in_background(self):
"""Run all pending tasks queued by do() in an thread during the
shutdown process.
"""
self._shutdown_thread.start()
def _process_in_background(self):
while not self._shutdown.is_set():
self.process()
time.sleep(ACK_REQUEUE_EVERY_SECONDS_MIN)
def process(self):
"Run all pending tasks queued by do()."
while True:
try:
task, event = self._tasks.get(block=False)
except moves.queue.Empty:
break
try:
task()
finally:
event.set()
def do(self, task):
"Put the task in the queue and waits until the task is completed."
if self._executor is None:
raise RuntimeError("Unexpected error, no executor is setuped")
elif self._executor == "blocking":
# NOTE(sileht): Blocking will hang forever if we waiting the
# polling thread
task()
else:
event = threading.Event()
self._tasks.put((task, event))
event.wait()
class AMQPIncomingMessage(base.RpcIncomingMessage):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
obsolete_reply_queues, pending_message_actions):
obsolete_reply_queues, message_operations_handler):
super(AMQPIncomingMessage, self).__init__(ctxt, message)
self.listener = listener
@ -63,7 +115,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
self.msg_id = msg_id
self.reply_q = reply_q
self._obsolete_reply_queues = obsolete_reply_queues
self._pending_tasks = pending_message_actions
self._message_operations_handler = message_operations_handler
self.stopwatch = timeutils.StopWatch()
self.stopwatch.start()
@ -133,7 +185,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
return
def acknowledge(self):
self._pending_tasks.put(self.message.acknowledge)
self._message_operations_handler.do(self.message.acknowledge)
self.listener.msg_id_cache.add(self.unique_id)
def requeue(self):
@ -143,7 +195,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
self._pending_tasks.put(self.message.requeue)
self._message_operations_handler.do(self.message.requeue)
class ObsoleteReplyQueuesCache(object):
@ -199,9 +251,11 @@ class AMQPListener(base.PollStyleListener):
self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = []
self._stopped = threading.Event()
self._shutdown = threading.Event()
self._shutoff = threading.Event()
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
self._pending_tasks = moves.queue.Queue()
self._message_operations_handler = MessageOperationsHandler(
"AMQPListener")
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
def __call__(self, message):
@ -222,14 +276,14 @@ class AMQPListener(base.PollStyleListener):
ctxt.msg_id,
ctxt.reply_q,
self._obsolete_reply_queues,
self._pending_tasks))
self._message_operations_handler))
@base.batch_poll_helper
def poll(self, timeout=None):
stopwatch = timeutils.StopWatch(duration=timeout).start()
while not self._stopped.is_set():
do_pending_tasks(self._pending_tasks)
while not self._shutdown.is_set():
self._message_operations_handler.process()
if self.incoming:
return self.incoming.pop(0)
@ -248,12 +302,30 @@ class AMQPListener(base.PollStyleListener):
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
# NOTE(sileht): listener is stopped, just processes remaining messages
# and operations
self._message_operations_handler.process()
if self.incoming:
return self.incoming.pop(0)
self._shutoff.set()
def stop(self):
self._stopped.set()
self._shutdown.set()
self.conn.stop_consuming()
do_pending_tasks(self._pending_tasks)
self._shutoff.wait()
# NOTE(sileht): Here, the listener is stopped, but some incoming
# messages may still live on server side, because callback is still
# running and message is not yet ack/requeue. It's safe to do the ack
# into another thread, side the polling thread is now terminated.
self._message_operations_handler.process_in_background()
def cleanup(self):
# NOTE(sileht): server executor is now stopped, we are sure that no
# more incoming messages in live, we can acknowledge
# remaining messages and stop the thread
self._message_operations_handler.stop()
# Closes listener connection
self.conn.close()
@ -303,7 +375,6 @@ class ReplyWaiter(object):
self.allowed_remote_exmods = allowed_remote_exmods
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.waiters = ReplyWaiters()
self._pending_tasks = moves.queue.Queue()
self.conn.declare_direct_consumer(reply_q, self)
@ -318,12 +389,10 @@ class ReplyWaiter(object):
self.conn.stop_consuming()
self._thread.join()
self._thread = None
do_pending_tasks(self._pending_tasks)
def poll(self):
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
while not self._thread_exit_event.is_set():
do_pending_tasks(self._pending_tasks)
try:
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
@ -337,7 +406,11 @@ class ReplyWaiter(object):
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
def __call__(self, message):
self._pending_tasks.put(message.acknowledge)
# NOTE(sileht): __call__ is running within the polling thread,
# (conn.consume -> conn.conn.drain_events() -> __call__ callback)
# it's threadsafe to acknowledge the message here, no need to wait
# the next polling
message.acknowledge()
incoming_msg_id = message.pop('_msg_id', None)
if message.get('ending'):
LOG.debug("received reply msg_id: %s", incoming_msg_id)

View File

@ -421,6 +421,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
# HACK(sileht): We temporary pass the executor to the rabbit
# listener to fix a race with the deprecated blocking executor.
# We do this hack because this is need only for 'synchronous'
# executor like blocking. And this one is deprecated. Making
# driver working in an sync and an async way is complicated
# and blocking have 0% tests coverage.
if hasattr(self.listener, '_poll_style_listener'):
l = self.listener._poll_style_listener
if hasattr(l, "_message_operations_handler"):
l._message_operations_handler._executor = (
self.executor_type)
self.listener.start(self._on_incoming)
@ordered(after='start')