Merge "Ack message after processing (oslo.messaging)"

This commit is contained in:
Jenkins 2016-03-16 16:42:45 +00:00 committed by Gerrit Code Review
commit 85b7b9c1fa
3 changed files with 56 additions and 3 deletions

View File

@ -75,7 +75,9 @@ def launch_executor(transport):
endpoints = [rpc.ExecutorServer(executor_v2)]
server = messaging.get_rpc_server(
get_rpc_server = get_rpc_server_function()
server = get_rpc_server(
transport,
target,
endpoints,
@ -114,7 +116,9 @@ def launch_engine(transport):
# Setup expiration policy
expiration_policy.setup()
server = messaging.get_rpc_server(
get_rpc_server = get_rpc_server_function()
server = get_rpc_server(
transport,
target,
endpoints,
@ -140,6 +144,13 @@ class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer):
pass
def get_rpc_server_function():
if CONF.use_mistral_rpc:
return rpc.get_rpc_server
else:
return messaging.get_rpc_server
def launch_api(transport):
host = cfg.CONF.api.host
port = cfg.CONF.api.port

View File

@ -93,6 +93,14 @@ executor_opts = [
help='The version of the executor.')
]
rpc_option = cfg.BoolOpt(
'use_mistral_rpc',
default=False,
help='Specifies whether Mistral uses modified oslo.messaging (if True)'
' or original oslo.messaging. Modified oslo.messaging is done for'
' acknowledgement a message after processing.'
)
execution_expiration_policy_opts = [
cfg.IntOpt('evaluation_interval',
help='How often will the executions be evaluated '
@ -139,6 +147,7 @@ CONF.register_opts(executor_opts, group=EXECUTOR_GROUP)
CONF.register_opts(execution_expiration_policy_opts,
group=EXECUTION_EXPIRATION_POLICY_GROUP)
CONF.register_opt(wf_trace_log_name_opt)
CONF.register_opt(rpc_option)
CONF.register_opts(coordination_opts, group=COORDINATION_GROUP)
CLI_OPTS = [
@ -172,7 +181,10 @@ def list_opts():
(EXECUTION_EXPIRATION_POLICY_GROUP, execution_expiration_policy_opts),
(None, itertools.chain(
CLI_OPTS,
[wf_trace_log_name_opt]
[
wf_trace_log_name_opt,
rpc_option
]
))
]

View File

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import client
from oslo_messaging.rpc import dispatcher
from mistral import context as auth_ctx
from mistral.engine import base
@ -32,6 +33,35 @@ _ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
class RPCDispatcherPostAck(dispatcher.RPCDispatcher):
def __call__(self, incoming, executor_callback=None):
return messaging.rpc.dispatcher.dispatcher.DispatcherExecutorContext(
incoming,
self._dispatch_and_reply,
executor_callback=executor_callback
)
def _dispatch_and_reply(self, incoming, executor_callback):
incoming = incoming[0]
super(RPCDispatcherPostAck, self)._dispatch_and_reply(
incoming,
executor_callback
)
incoming.acknowledge()
def get_rpc_server(transport, target, endpoints, executor='blocking',
serializer=None):
dispatcher = RPCDispatcherPostAck(target, endpoints, serializer)
return messaging.server.MessageHandlingServer(
transport,
dispatcher,
executor
)
def cleanup():
"""Intended to be used by tests to recreate all RPC related objects."""