From c41f0ce709ea39717eb75748a6c5b7e9a69628cf Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 10 May 2017 09:19:38 +0200 Subject: [PATCH] rabbit: restore synchronous ack/requeue Note this change also contains the fix for the regression it introduced. In https://review.openstack.org/#/c/436958, we fix a thread safety issue. But we make the ack/requeue of message asynchronous. In nominal case, it works, but if network/rabbit connection issue occurs this can result to rpc call handle twice. By chance we double check already processed message ids, and drop duplicates, but that if the message goes to another node, the mitigation won't work. This restore the previous behavior, to ensure we run application callback of rpc.call/rpc.cast only when the message have been successfully ack. (cherry picked from commit da02bc2169b09959d857c605961ead1bbba1019d) Fix rabbitmq driver with blocking executor We recently move ack/requeue of messages in main/polling thread of rabbitmq drivers. And break the blocking executor. This one is not tested by any tests and now deprecated. This change workaround the issue until we completely remove the blocking executor. Closes-bug: #1694728 (cherry picked from commit 8ee5ae135a6ecb918f40619982e3dc7e38ed0bbf) Change-Id: I62b9e09513e3ebfebc64a941d4b21b6c053b511d --- oslo_messaging/_drivers/amqpdriver.py | 117 +++++++++++++++++++++----- oslo_messaging/server.py | 12 +++ 2 files changed, 107 insertions(+), 22 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 64289adf3..f41edf119 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -42,20 +42,72 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001 ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0 -def do_pending_tasks(tasks): - while True: - try: - task = tasks.get(block=False) - except moves.queue.Empty: - break - else: +class MessageOperationsHandler(object): + """Queue used by message operations to ensure that all tasks are + serialized and run in the same thread, since underlying drivers like kombu + are not thread safe. + """ + def __init__(self, name): + self.name = "%s (%s)" % (name, hex(id(self))) + self._tasks = moves.queue.Queue() + + self._shutdown = threading.Event() + self._shutdown_thread = threading.Thread( + target=self._process_in_background) + self._shutdown_thread.daemon = True + + # HACK(sileht): this is set by the server.Server temporary + # to not have to rewrite the entire internal API to pass + # executor everywhere to make Listener aware of the server + # executor. All this hack is only for the blocking executor. + # And it's deprecated so... + self._executor = None + + def stop(self): + self._shutdown.set() + + def process_in_background(self): + """Run all pending tasks queued by do() in an thread during the + shutdown process. + """ + self._shutdown_thread.start() + + def _process_in_background(self): + while not self._shutdown.is_set(): + self.process() + time.sleep(ACK_REQUEUE_EVERY_SECONDS_MIN) + + def process(self): + "Run all pending tasks queued by do()." + + while True: + try: + task, event = self._tasks.get(block=False) + except moves.queue.Empty: + break + try: + task() + finally: + event.set() + + def do(self, task): + "Put the task in the queue and waits until the task is completed." + if self._executor is None: + raise RuntimeError("Unexpected error, no executor is setuped") + elif self._executor == "blocking": + # NOTE(sileht): Blocking will hang forever if we waiting the + # polling thread task() + else: + event = threading.Event() + self._tasks.put((task, event)) + event.wait() class AMQPIncomingMessage(base.RpcIncomingMessage): def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q, - obsolete_reply_queues, pending_message_actions): + obsolete_reply_queues, message_operations_handler): super(AMQPIncomingMessage, self).__init__(ctxt, message) self.listener = listener @@ -63,7 +115,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): self.msg_id = msg_id self.reply_q = reply_q self._obsolete_reply_queues = obsolete_reply_queues - self._pending_tasks = pending_message_actions + self._message_operations_handler = message_operations_handler self.stopwatch = timeutils.StopWatch() self.stopwatch.start() @@ -133,7 +185,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): return def acknowledge(self): - self._pending_tasks.put(self.message.acknowledge) + self._message_operations_handler.do(self.message.acknowledge) self.listener.msg_id_cache.add(self.unique_id) def requeue(self): @@ -143,7 +195,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): # msg_id_cache, the message will be reconsumed, the only difference is # the message stay at the beginning of the queue instead of moving to # the end. - self._pending_tasks.put(self.message.requeue) + self._message_operations_handler.do(self.message.requeue) class ObsoleteReplyQueuesCache(object): @@ -199,9 +251,11 @@ class AMQPListener(base.PollStyleListener): self.conn = conn self.msg_id_cache = rpc_amqp._MsgIdCache() self.incoming = [] - self._stopped = threading.Event() + self._shutdown = threading.Event() + self._shutoff = threading.Event() self._obsolete_reply_queues = ObsoleteReplyQueuesCache() - self._pending_tasks = moves.queue.Queue() + self._message_operations_handler = MessageOperationsHandler( + "AMQPListener") self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN def __call__(self, message): @@ -222,14 +276,14 @@ class AMQPListener(base.PollStyleListener): ctxt.msg_id, ctxt.reply_q, self._obsolete_reply_queues, - self._pending_tasks)) + self._message_operations_handler)) @base.batch_poll_helper def poll(self, timeout=None): stopwatch = timeutils.StopWatch(duration=timeout).start() - while not self._stopped.is_set(): - do_pending_tasks(self._pending_tasks) + while not self._shutdown.is_set(): + self._message_operations_handler.process() if self.incoming: return self.incoming.pop(0) @@ -248,12 +302,30 @@ class AMQPListener(base.PollStyleListener): else: self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN + # NOTE(sileht): listener is stopped, just processes remaining messages + # and operations + self._message_operations_handler.process() + if self.incoming: + return self.incoming.pop(0) + + self._shutoff.set() + def stop(self): - self._stopped.set() + self._shutdown.set() self.conn.stop_consuming() - do_pending_tasks(self._pending_tasks) + self._shutoff.wait() + + # NOTE(sileht): Here, the listener is stopped, but some incoming + # messages may still live on server side, because callback is still + # running and message is not yet ack/requeue. It's safe to do the ack + # into another thread, side the polling thread is now terminated. + self._message_operations_handler.process_in_background() def cleanup(self): + # NOTE(sileht): server executor is now stopped, we are sure that no + # more incoming messages in live, we can acknowledge + # remaining messages and stop the thread + self._message_operations_handler.stop() # Closes listener connection self.conn.close() @@ -303,7 +375,6 @@ class ReplyWaiter(object): self.allowed_remote_exmods = allowed_remote_exmods self.msg_id_cache = rpc_amqp._MsgIdCache() self.waiters = ReplyWaiters() - self._pending_tasks = moves.queue.Queue() self.conn.declare_direct_consumer(reply_q, self) @@ -318,12 +389,10 @@ class ReplyWaiter(object): self.conn.stop_consuming() self._thread.join() self._thread = None - do_pending_tasks(self._pending_tasks) def poll(self): current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN while not self._thread_exit_event.is_set(): - do_pending_tasks(self._pending_tasks) try: # ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds self.conn.consume(timeout=current_timeout) @@ -337,7 +406,11 @@ class ReplyWaiter(object): current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN def __call__(self, message): - self._pending_tasks.put(message.acknowledge) + # NOTE(sileht): __call__ is running within the polling thread, + # (conn.consume -> conn.conn.drain_events() -> __call__ callback) + # it's threadsafe to acknowledge the message here, no need to wait + # the next polling + message.acknowledge() incoming_msg_id = message.pop('_msg_id', None) if message.get('ending'): LOG.debug("received reply msg_id: %s", incoming_msg_id) diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index ac4a96474..0bc5b851a 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -421,6 +421,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) + # HACK(sileht): We temporary pass the executor to the rabbit + # listener to fix a race with the deprecated blocking executor. + # We do this hack because this is need only for 'synchronous' + # executor like blocking. And this one is deprecated. Making + # driver working in an sync and an async way is complicated + # and blocking have 0% tests coverage. + if hasattr(self.listener, '_poll_style_listener'): + l = self.listener._poll_style_listener + if hasattr(l, "_message_operations_handler"): + l._message_operations_handler._executor = ( + self.executor_type) + self.listener.start(self._on_incoming) @ordered(after='start')