Fix rabbitmq driver with blocking executor

We recently move ack/requeue of messages in main/polling thread
of rabbitmq drivers. And break the blocking executor.

This one is not tested by any tests and now deprecated.

This change workaround the issue until we completely remove the
blocking executor.

Change-Id: Id479100f6ff364cf67a199e9b70f9f0c7bf7e1a9
Closes-bug: #1694728
This commit is contained in:
Mehdi Abaakouk 2017-06-01 10:28:23 +02:00
parent 1a036f5b55
commit 8ee5ae135a
2 changed files with 29 additions and 3 deletions

View File

@ -56,6 +56,13 @@ class MessageOperationsHandler(object):
target=self._process_in_background)
self._shutdown_thread.daemon = True
# HACK(sileht): this is set by the server.Server temporary
# to not have to rewrite the entire internal API to pass
# executor everywhere to make Listener aware of the server
# executor. All this hack is only for the blocking executor.
# And it's deprecated so...
self._executor = None
def stop(self):
self._shutdown.set()
@ -85,9 +92,16 @@ class MessageOperationsHandler(object):
def do(self, task):
"Put the task in the queue and waits until the task is completed."
event = threading.Event()
self._tasks.put((task, event))
event.wait()
if self._executor is None:
raise RuntimeError("Unexpected error, no executor is setuped")
elif self._executor == "blocking":
# NOTE(sileht): Blocking will hang forever if we waiting the
# polling thread
task()
else:
event = threading.Event()
self._tasks.put((task, event))
event.wait()
class AMQPIncomingMessage(base.RpcIncomingMessage):

View File

@ -417,6 +417,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
# HACK(sileht): We temporary pass the executor to the rabbit
# listener to fix a race with the deprecated blocking executor.
# We do this hack because this is need only for 'synchronous'
# executor like blocking. And this one is deprecated. Making
# driver working in an sync and an async way is complicated
# and blocking have 0% tests coverage.
if hasattr(self.listener, '_poll_style_listener'):
l = self.listener._poll_style_listener
if hasattr(l, "_message_operations_handler"):
l._message_operations_handler._executor = (
self.executor_type)
self.listener.start(self._on_incoming)
@ordered(after='start')