From b562989e78d8833cfcf15900da63e21d216b96d6 Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Thu, 11 Feb 2016 15:26:38 +0300 Subject: [PATCH] Ack message after processing (oslo.messaging) This patch fixes the HA gap when executor dies. Now if executor dies, next executor pick up previous executor's task. * Currently it is almost impossible to write unit-test on this bug. For now, I created a new config option for whether to use this fix or to use original oslo.messaging. By default it is False. * For tests, need to wait creating of HA-gate. Closes-Bug: #1502120 Change-Id: Ia6d25d039b1e8210b7e544540e4b527d28f6d394 --- mistral/cmd/launch.py | 15 +++++++++++++-- mistral/config.py | 14 +++++++++++++- mistral/engine/rpc.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index b17d2398a..571c2cc74 100755 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -75,7 +75,9 @@ def launch_executor(transport): endpoints = [rpc.ExecutorServer(executor_v2)] - server = messaging.get_rpc_server( + get_rpc_server = get_rpc_server_function() + + server = get_rpc_server( transport, target, endpoints, @@ -114,7 +116,9 @@ def launch_engine(transport): # Setup expiration policy expiration_policy.setup() - server = messaging.get_rpc_server( + get_rpc_server = get_rpc_server_function() + + server = get_rpc_server( transport, target, endpoints, @@ -140,6 +144,13 @@ class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer): pass +def get_rpc_server_function(): + if CONF.use_mistral_rpc: + return rpc.get_rpc_server + else: + return messaging.get_rpc_server + + def launch_api(transport): host = cfg.CONF.api.host port = cfg.CONF.api.port diff --git a/mistral/config.py b/mistral/config.py index 240f34386..6c61305ec 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -93,6 +93,14 @@ executor_opts = [ help='The version of the executor.') ] +rpc_option = cfg.BoolOpt( + 'use_mistral_rpc', + default=False, + help='Specifies whether Mistral uses modified oslo.messaging (if True)' + ' or original oslo.messaging. Modified oslo.messaging is done for' + ' acknowledgement a message after processing.' +) + execution_expiration_policy_opts = [ cfg.IntOpt('evaluation_interval', help='How often will the executions be evaluated ' @@ -139,6 +147,7 @@ CONF.register_opts(executor_opts, group=EXECUTOR_GROUP) CONF.register_opts(execution_expiration_policy_opts, group=EXECUTION_EXPIRATION_POLICY_GROUP) CONF.register_opt(wf_trace_log_name_opt) +CONF.register_opt(rpc_option) CONF.register_opts(coordination_opts, group=COORDINATION_GROUP) CLI_OPTS = [ @@ -172,7 +181,10 @@ def list_opts(): (EXECUTION_EXPIRATION_POLICY_GROUP, execution_expiration_policy_opts), (None, itertools.chain( CLI_OPTS, - [wf_trace_log_name_opt] + [ + wf_trace_log_name_opt, + rpc_option + ] )) ] diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index 103ed1cfe..f04feb835 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -17,6 +17,7 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging.rpc import client +from oslo_messaging.rpc import dispatcher from mistral import context as auth_ctx from mistral.engine import base @@ -32,6 +33,35 @@ _ENGINE_CLIENT = None _EXECUTOR_CLIENT = None +class RPCDispatcherPostAck(dispatcher.RPCDispatcher): + def __call__(self, incoming, executor_callback=None): + return messaging.rpc.dispatcher.dispatcher.DispatcherExecutorContext( + incoming, + self._dispatch_and_reply, + executor_callback=executor_callback + ) + + def _dispatch_and_reply(self, incoming, executor_callback): + incoming = incoming[0] + + super(RPCDispatcherPostAck, self)._dispatch_and_reply( + incoming, + executor_callback + ) + + incoming.acknowledge() + + +def get_rpc_server(transport, target, endpoints, executor='blocking', + serializer=None): + dispatcher = RPCDispatcherPostAck(target, endpoints, serializer) + return messaging.server.MessageHandlingServer( + transport, + dispatcher, + executor + ) + + def cleanup(): """Intended to be used by tests to recreate all RPC related objects."""