diff --git a/mistral/config.py b/mistral/config.py index d437f9eb5..ad4aab494 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -111,6 +111,14 @@ rpc_response_timeout_opt = cfg.IntOpt( help=_('Seconds to wait for a response from a call.') ) +oslo_rpc_executor = cfg.StrOpt( + 'oslo_rpc_executor', + default='eventlet', + choices=['eventlet', 'blocking', 'threading'], + help=_('Executor type used by Oslo Messaging framework. Defines how ' + 'Oslo Messaging based RPC subsystem processes incoming calls.') +) + expiration_token_duration = cfg.IntOpt( 'expiration_token_duration', default=30, @@ -559,6 +567,7 @@ CONF.register_opt(auth_type_opt) CONF.register_opt(js_impl_opt) CONF.register_opt(rpc_impl_opt) CONF.register_opt(rpc_response_timeout_opt) +CONF.register_opt(oslo_rpc_executor) CONF.register_opt(expiration_token_duration) CONF.register_opts(api_opts, group=API_GROUP) @@ -596,6 +605,7 @@ default_group_opts = itertools.chain( js_impl_opt, rpc_impl_opt, rpc_response_timeout_opt, + oslo_rpc_executor, expiration_token_duration ] ) diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py index 7cf88aad3..c93836898 100644 --- a/mistral/engine/engine_server.py +++ b/mistral/engine/engine_server.py @@ -61,10 +61,7 @@ class EngineServer(service_base.MistralService): self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.engine) self._rpc_server.register_endpoint(self) - # Note(ddeja): Engine needs to be run in default (blocking) mode - # since using another mode may lead to a deadlock. - # See https://review.openstack.org/#/c/356343 for more info. - self._rpc_server.run(executor='blocking') + self._rpc_server.run(executor=cfg.CONF.oslo_rpc_executor) self._notify_started('Engine server started.') diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 7a9e3bfae..4577a9a0f 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -112,7 +112,7 @@ def _check_and_complete(wf_ex_id): else 4 ) - # Rescheduling this check may not happen if erros are + # Rescheduling this check may not happen if errors are # raised in the business logic. If the error is DB related # and not considered fatal (e.g. disconnect, deadlock), the # retry annotation around the method will ensure that the diff --git a/mistral/rpc/base.py b/mistral/rpc/base.py index 69e5d64b7..1139cb550 100644 --- a/mistral/rpc/base.py +++ b/mistral/rpc/base.py @@ -168,7 +168,7 @@ class RPCServer(object): raise NotImplementedError @abc.abstractmethod - def run(self, executor='blocking'): + def run(self, executor='eventlet'): """Runs the RPC server. :param executor: Executor used to process incoming requests. Different diff --git a/mistral/rpc/kombu/kombu_server.py b/mistral/rpc/kombu/kombu_server.py index aeed649b6..51c556d02 100644 --- a/mistral/rpc/kombu/kombu_server.py +++ b/mistral/rpc/kombu/kombu_server.py @@ -83,7 +83,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): """Return whether server is running.""" return self._running.is_set() - def run(self, executor='blocking'): + def run(self, executor='eventlet'): if self._thread is None: self._thread = threading.Thread(target=self._run, args=(executor,)) self._thread.daemon = True diff --git a/mistral/rpc/oslo/oslo_server.py b/mistral/rpc/oslo/oslo_server.py index a78f2c2fd..fb6ee297e 100644 --- a/mistral/rpc/oslo/oslo_server.py +++ b/mistral/rpc/oslo/oslo_server.py @@ -35,7 +35,7 @@ class OsloRPCServer(rpc.RPCServer): def register_endpoint(self, endpoint): self.endpoints.append(endpoint) - def run(self, executor='blocking'): + def run(self, executor='eventlet'): target = messaging.Target( topic=self.topic, server=self.server_id