Merge "rabbitmq: don't wait for message ack/requeue"

This commit is contained in:
Zuul 2017-11-30 08:01:37 +00:00 committed by Gerrit Code Review
commit 8ac97450c8
3 changed files with 16 additions and 35 deletions

View File

@ -56,13 +56,6 @@ class MessageOperationsHandler(object):
target=self._process_in_background)
self._shutdown_thread.daemon = True
# HACK(sileht): this is set by the server.Server temporary
# to not have to rewrite the entire internal API to pass
# executor everywhere to make Listener aware of the server
# executor. All this hack is only for the blocking executor.
# And it's deprecated so...
self._executor = None
def stop(self):
self._shutdown.set()
@ -82,26 +75,14 @@ class MessageOperationsHandler(object):
while True:
try:
task, event = self._tasks.get(block=False)
task = self._tasks.get(block=False)
except moves.queue.Empty:
break
try:
task()
finally:
event.set()
task()
def do(self, task):
"Put the task in the queue and waits until the task is completed."
if self._executor is None:
raise RuntimeError("Unexpected error, no executor is setuped")
elif self._executor == "blocking":
# NOTE(sileht): Blocking will hang forever if we waiting the
# polling thread
task()
else:
event = threading.Event()
self._tasks.put((task, event))
event.wait()
"Put the task in the queue."
self._tasks.put(task)
class AMQPIncomingMessage(base.RpcIncomingMessage):

View File

@ -417,18 +417,6 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
# HACK(sileht): We temporary pass the executor to the rabbit
# listener to fix a race with the deprecated blocking executor.
# We do this hack because this is need only for 'synchronous'
# executor like blocking. And this one is deprecated. Making
# driver working in an sync and an async way is complicated
# and blocking have 0% tests coverage.
if hasattr(self.listener, '_poll_style_listener'):
l = self.listener._poll_style_listener
if hasattr(l, "_message_operations_handler"):
l._message_operations_handler._executor = (
self.executor_type)
self.listener.start(self._on_incoming)
@ordered(after='start')

View File

@ -0,0 +1,12 @@
---
other:
- |
On rabbitmq, in the past, acknownlegement of messages was done within the
application callback thread/greenlet. This thread was blocked until the
message was ack. In newton, we rewrote the message acknownlegement to
ensure we haven't two threads writting the the socket at the same times.
Now all pendings ack are done by the main thread. They are no more reason
to block the application callback thread until the message is ack. Other
driver already release the application callback threads before the message
is acknownleged. This is also the case for rabbitmq, now.