From c38857e1101027a734a35f4e80bc4084fabc034b Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 28 Nov 2017 15:47:54 +0100 Subject: [PATCH] rabbitmq: don't wait for message ack/requeue I don't see any obvious reason why we should wait ack/requeue is done. This waiter have already be removed from amqp1. https://git.openstack.org/cgit/openstack/oslo.messaging/tree/oslo_messaging/_drivers/amqp1_driver/controller.py#n242 So, this change remove it from rabbitmq driver too. Closes-bug: #1734788 Change-Id: I5ecedc762596181be19410b863851a0054fd6579 --- oslo_messaging/_drivers/amqpdriver.py | 27 +++---------------- oslo_messaging/server.py | 12 --------- ...bbit-no-wait-for-ack-9e5de3e1320d7660.yaml | 12 +++++++++ 3 files changed, 16 insertions(+), 35 deletions(-) create mode 100644 releasenotes/notes/rabbit-no-wait-for-ack-9e5de3e1320d7660.yaml diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 539e48b0b..deeaba29b 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -56,13 +56,6 @@ class MessageOperationsHandler(object): target=self._process_in_background) self._shutdown_thread.daemon = True - # HACK(sileht): this is set by the server.Server temporary - # to not have to rewrite the entire internal API to pass - # executor everywhere to make Listener aware of the server - # executor. All this hack is only for the blocking executor. - # And it's deprecated so... - self._executor = None - def stop(self): self._shutdown.set() @@ -82,26 +75,14 @@ class MessageOperationsHandler(object): while True: try: - task, event = self._tasks.get(block=False) + task = self._tasks.get(block=False) except moves.queue.Empty: break - try: - task() - finally: - event.set() + task() def do(self, task): - "Put the task in the queue and waits until the task is completed." - if self._executor is None: - raise RuntimeError("Unexpected error, no executor is setuped") - elif self._executor == "blocking": - # NOTE(sileht): Blocking will hang forever if we waiting the - # polling thread - task() - else: - event = threading.Event() - self._tasks.put((task, event)) - event.wait() + "Put the task in the queue." + self._tasks.put(task) class AMQPIncomingMessage(base.RpcIncomingMessage): diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index c8e77a673..d2e50ac46 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -417,18 +417,6 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) - # HACK(sileht): We temporary pass the executor to the rabbit - # listener to fix a race with the deprecated blocking executor. - # We do this hack because this is need only for 'synchronous' - # executor like blocking. And this one is deprecated. Making - # driver working in an sync and an async way is complicated - # and blocking have 0% tests coverage. - if hasattr(self.listener, '_poll_style_listener'): - l = self.listener._poll_style_listener - if hasattr(l, "_message_operations_handler"): - l._message_operations_handler._executor = ( - self.executor_type) - self.listener.start(self._on_incoming) @ordered(after='start') diff --git a/releasenotes/notes/rabbit-no-wait-for-ack-9e5de3e1320d7660.yaml b/releasenotes/notes/rabbit-no-wait-for-ack-9e5de3e1320d7660.yaml new file mode 100644 index 000000000..4b9d47af3 --- /dev/null +++ b/releasenotes/notes/rabbit-no-wait-for-ack-9e5de3e1320d7660.yaml @@ -0,0 +1,12 @@ +--- +other: + - | + On rabbitmq, in the past, acknownlegement of messages was done within the + application callback thread/greenlet. This thread was blocked until the + message was ack. In newton, we rewrote the message acknownlegement to + ensure we haven't two threads writting the the socket at the same times. + Now all pendings ack are done by the main thread. They are no more reason + to block the application callback thread until the message is ack. Other + driver already release the application callback threads before the message + is acknownleged. This is also the case for rabbitmq, now. +