Merge "Add the config option for Oslo Messaging executor type"

This commit is contained in:
Zuul 2018-07-18 13:39:02 +00:00 committed by Gerrit Code Review
commit 910948abaa
6 changed files with 15 additions and 8 deletions

View File

@ -111,6 +111,14 @@ rpc_response_timeout_opt = cfg.IntOpt(
help=_('Seconds to wait for a response from a call.')
)
oslo_rpc_executor = cfg.StrOpt(
'oslo_rpc_executor',
default='eventlet',
choices=['eventlet', 'blocking', 'threading'],
help=_('Executor type used by Oslo Messaging framework. Defines how '
'Oslo Messaging based RPC subsystem processes incoming calls.')
)
expiration_token_duration = cfg.IntOpt(
'expiration_token_duration',
default=30,
@ -559,6 +567,7 @@ CONF.register_opt(auth_type_opt)
CONF.register_opt(js_impl_opt)
CONF.register_opt(rpc_impl_opt)
CONF.register_opt(rpc_response_timeout_opt)
CONF.register_opt(oslo_rpc_executor)
CONF.register_opt(expiration_token_duration)
CONF.register_opts(api_opts, group=API_GROUP)
@ -596,6 +605,7 @@ default_group_opts = itertools.chain(
js_impl_opt,
rpc_impl_opt,
rpc_response_timeout_opt,
oslo_rpc_executor,
expiration_token_duration
]
)

View File

@ -61,10 +61,7 @@ class EngineServer(service_base.MistralService):
self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine)
self._rpc_server.register_endpoint(self)
# Note(ddeja): Engine needs to be run in default (blocking) mode
# since using another mode may lead to a deadlock.
# See https://review.openstack.org/#/c/356343 for more info.
self._rpc_server.run(executor='blocking')
self._rpc_server.run(executor=cfg.CONF.oslo_rpc_executor)
self._notify_started('Engine server started.')

View File

@ -112,7 +112,7 @@ def _check_and_complete(wf_ex_id):
else 4
)
# Rescheduling this check may not happen if erros are
# Rescheduling this check may not happen if errors are
# raised in the business logic. If the error is DB related
# and not considered fatal (e.g. disconnect, deadlock), the
# retry annotation around the method will ensure that the

View File

@ -168,7 +168,7 @@ class RPCServer(object):
raise NotImplementedError
@abc.abstractmethod
def run(self, executor='blocking'):
def run(self, executor='eventlet'):
"""Runs the RPC server.
:param executor: Executor used to process incoming requests. Different

View File

@ -83,7 +83,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
"""Return whether server is running."""
return self._running.is_set()
def run(self, executor='blocking'):
def run(self, executor='eventlet'):
if self._thread is None:
self._thread = threading.Thread(target=self._run, args=(executor,))
self._thread.daemon = True

View File

@ -35,7 +35,7 @@ class OsloRPCServer(rpc.RPCServer):
def register_endpoint(self, endpoint):
self.endpoints.append(endpoint)
def run(self, executor='blocking'):
def run(self, executor='eventlet'):
target = messaging.Target(
topic=self.topic,
server=self.server_id