From 62d8c5edaf4a60b1da439c827d1c08af7397a3c5 Mon Sep 17 00:00:00 2001 From: Winson Chan Date: Thu, 13 Apr 2017 18:17:53 +0000 Subject: [PATCH] Add option to run actions locally on the engine Make executor pluggable and allow option to run the executor locally on the engine or remotely over RPC. Change-Id: I7cfb13068aa1d1f88136eaa092e629c34b78adf2 Implements: blueprint mistral-actions-run-by-engine --- mistral/cmd/launch.py | 2 +- mistral/config.py | 10 + mistral/engine/action_queue.py | 12 +- mistral/engine/actions.py | 17 +- mistral/engine/base.py | 20 -- mistral/engine/rpc_backend/rpc.py | 44 ++--- mistral/executors/__init__.py | 0 mistral/executors/base.py | 67 +++++++ .../{engine => executors}/default_executor.py | 102 +++++++---- .../{engine => executors}/executor_server.py | 25 +-- mistral/executors/remote_executor.py | 29 +++ mistral/tests/unit/engine/base.py | 49 ++--- .../tests/unit/engine/test_default_engine.py | 4 +- mistral/tests/unit/engine/test_environment.py | 25 +-- mistral/tests/unit/engine/test_safe_rerun.py | 15 +- mistral/tests/unit/executors/__init__.py | 0 mistral/tests/unit/executors/base.py | 24 +++ .../unit/executors/test_local_executor.py | 171 ++++++++++++++++++ mistral/tests/unit/executors/test_plugins.py | 40 ++++ setup.cfg | 4 + 20 files changed, 520 insertions(+), 140 deletions(-) create mode 100644 mistral/executors/__init__.py create mode 100644 mistral/executors/base.py rename mistral/{engine => executors}/default_executor.py (57%) rename mistral/{engine => executors}/executor_server.py (82%) create mode 100644 mistral/executors/remote_executor.py create mode 100644 mistral/tests/unit/executors/__init__.py create mode 100644 mistral/tests/unit/executors/base.py create mode 100644 mistral/tests/unit/executors/test_local_executor.py create mode 100644 mistral/tests/unit/executors/test_plugins.py diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 595de850e..421a7ff7a 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -44,9 +44,9 @@ from oslo_service import service from mistral.api import service as api_service from mistral import config from mistral.engine import engine_server -from mistral.engine import executor_server from mistral.engine.rpc_backend import rpc from mistral.event_engine import event_engine_server +from mistral.executors import executor_server from mistral import version diff --git a/mistral/config.py b/mistral/config.py index 335a5a38c..6b9520b8b 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -169,6 +169,16 @@ engine_opts = [ ] executor_opts = [ + cfg.StrOpt( + 'type', + choices=['local', 'remote'], + default='remote', + help=( + 'Type of executor. Use local to run the executor within the ' + 'engine server. Use remote if the executor is launched as ' + 'a separate server to run action executions.' + ) + ), cfg.StrOpt( 'host', default='0.0.0.0', diff --git a/mistral/engine/action_queue.py b/mistral/engine/action_queue.py index 8f08f90ec..9241d3a96 100644 --- a/mistral/engine/action_queue.py +++ b/mistral/engine/action_queue.py @@ -15,7 +15,9 @@ import functools -from mistral.engine.rpc_backend import rpc +from oslo_config import cfg + +from mistral.executors import base as exe from mistral import utils @@ -44,14 +46,16 @@ def _get_queue(): def _run_actions(): + executor = exe.get_executor(cfg.CONF.executor.type) + for action_ex, action_def, target in _get_queue(): - rpc.get_executor_client().run_action( + executor.run_action( action_ex.id, action_def.action_class, action_def.attributes or {}, action_ex.input, - target, - safe_rerun=action_ex.runtime_context.get('safe_rerun', False) + action_ex.runtime_context.get('safe_rerun', False), + target=target ) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index e9310d28a..d5460b12c 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -15,15 +15,16 @@ import abc from oslo_config import cfg +from oslo_log import log as logging from osprofiler import profiler import six from mistral.db.v2 import api as db_api from mistral.engine import action_queue -from mistral.engine.rpc_backend import rpc from mistral.engine import utils as engine_utils from mistral.engine import workflow_handler as wf_handler from mistral import exceptions as exc +from mistral.executors import base as exe from mistral import expressions as expr from mistral.lang import parser as spec_parser from mistral.services import action_manager as a_m @@ -34,6 +35,9 @@ from mistral.workflow import states from mistral.workflow import utils as wf_utils +LOG = logging.getLogger(__name__) + + @six.add_metaclass(abc.ABCMeta) class Action(object): """Action. @@ -251,14 +255,16 @@ class PythonAction(Action): action_ex_id=action_ex_id ) - result = rpc.get_executor_client().run_action( + executor = exe.get_executor(cfg.CONF.executor.type) + + result = executor.run_action( self.action_ex.id if self.action_ex else None, self.action_def.action_class, self.action_def.attributes or {}, input_dict, - target, - async_=False, - safe_rerun=safe_rerun + safe_rerun=safe_rerun, + target=target, + async_=False ) return self._prepare_output(result) @@ -528,6 +534,7 @@ def resolve_action_definition(action_spec_name, wf_name=None, :param wf_spec_name: Workflow name according to a spec. :return: Action definition (python or ad-hoc). """ + action_db = None if wf_name and wf_name != wf_spec_name: diff --git a/mistral/engine/base.py b/mistral/engine/base.py index 07e9ae20f..2e37f4842 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -128,26 +128,6 @@ class Engine(object): raise NotImplementedError -@six.add_metaclass(abc.ABCMeta) -class Executor(object): - """Action executor interface.""" - - @abc.abstractmethod - def run_action(self, action_ex_id, action_class_str, attributes, - action_params, safe_rerun, redelivered=False): - """Runs action. - - :param action_ex_id: Corresponding action execution id. - :param action_class_str: Path to action class in dot notation. - :param attributes: Attributes of action class which will be set to. - :param action_params: Action parameters. - :param safe_rerun: Tells if given action can be safely rerun. - :param redelivered: Tells if given action was run before on another - executor. - """ - raise NotImplementedError() - - @six.add_metaclass(abc.ABCMeta) class EventEngine(object): """Action event trigger interface.""" diff --git a/mistral/engine/rpc_backend/rpc.py b/mistral/engine/rpc_backend/rpc.py index 390771a01..28971e894 100644 --- a/mistral/engine/rpc_backend/rpc.py +++ b/mistral/engine/rpc_backend/rpc.py @@ -1,5 +1,6 @@ # Copyright 2014 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. +# Copyright 2017 - Brocade Communications Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,8 +22,9 @@ from osprofiler import profiler from stevedore import driver from mistral import context as auth_ctx -from mistral.engine import base +from mistral.engine import base as eng from mistral import exceptions as exc +from mistral.executors import base as exe LOG = logging.getLogger(__name__) @@ -149,7 +151,7 @@ def wrap_messaging_exception(method): return decorator -class EngineClient(base.Engine): +class EngineClient(eng.Engine): """RPC Engine client.""" def __init__(self, rpc_conf_dict): @@ -317,50 +319,50 @@ class EngineClient(base.Engine): ) -class ExecutorClient(base.Executor): +class ExecutorClient(exe.Executor): """RPC Executor client.""" def __init__(self, rpc_conf_dict): - """Constructs an RPC client for the Executor. - - :param rpc_conf_dict: Dict containing RPC configuration. - """ + """Constructs an RPC client for the Executor.""" self.topic = cfg.CONF.executor.topic self._client = get_rpc_client_driver()(rpc_conf_dict) @profiler.trace('executor-client-run-action') - def run_action(self, action_ex_id, action_class_str, attributes, - action_params, target=None, async_=True, safe_rerun=False): + def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, + params, safe_rerun, redelivered=False, + target=None, async_=True): """Sends a request to run action to executor. :param action_ex_id: Action execution id. - :param action_class_str: Action class name. - :param attributes: Action class attributes. - :param action_params: Action input parameters. - :param target: Target (group of action executors). - :param async: If True, run action in asynchronous mode (w/o waiting - for completion). + :param action_cls_str: Action class name. + :param action_cls_attrs: Action class attributes. + :param params: Action input parameters. :param safe_rerun: If true, action would be re-run if executor dies during execution. + :param redelivered: Tells if given action was run before on another + executor. + :param target: Target (group of action executors). + :param async_: If True, run action in asynchronous mode (w/o waiting + for completion). :return: Action result. """ - kwargs = { + rpc_kwargs = { 'action_ex_id': action_ex_id, - 'action_class_str': action_class_str, - 'attributes': attributes, - 'params': action_params, + 'action_cls_str': action_cls_str, + 'action_cls_attrs': action_cls_attrs, + 'params': params, 'safe_rerun': safe_rerun } rpc_client_method = (self._client.async_call if async_ else self._client.sync_call) - return rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs) + return rpc_client_method(auth_ctx.ctx(), 'run_action', **rpc_kwargs) -class EventEngineClient(base.EventEngine): +class EventEngineClient(eng.EventEngine): """RPC EventEngine client.""" def __init__(self, rpc_conf_dict): diff --git a/mistral/executors/__init__.py b/mistral/executors/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/executors/base.py b/mistral/executors/base.py new file mode 100644 index 000000000..81ebd5ca6 --- /dev/null +++ b/mistral/executors/base.py @@ -0,0 +1,67 @@ +# Copyright 2017 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + +from stevedore import driver + + +_EXECUTORS = {} + + +def cleanup(): + global _EXECUTORS + _EXECUTORS = {} + + +def get_executor(exec_type): + global _EXECUTORS + + if not _EXECUTORS.get(exec_type): + mgr = driver.DriverManager( + 'mistral.executors', + exec_type, + invoke_on_load=True + ) + + _EXECUTORS[exec_type] = mgr.driver + + return _EXECUTORS[exec_type] + + +@six.add_metaclass(abc.ABCMeta) +class Executor(object): + """Action executor interface.""" + + @abc.abstractmethod + def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, + params, safe_rerun, redelivered=False, + target=None, async_=True): + """Runs action. + + :param action_ex_id: Corresponding action execution id. + :param action_cls_str: Path to action class in dot notation. + :param action_cls_attrs: Attributes of action class which + will be set to. + :param params: Action parameters. + :param safe_rerun: Tells if given action can be safely rerun. + :param redelivered: Tells if given action was run before on another + executor. + :param target: Target (group of action executors). + :param async_: If True, run action in asynchronous mode (w/o waiting + for completion). + :return: Action result. + """ + raise NotImplementedError() diff --git a/mistral/engine/default_executor.py b/mistral/executors/default_executor.py similarity index 57% rename from mistral/engine/default_executor.py rename to mistral/executors/default_executor.py index 27b88363a..83b806dbd 100644 --- a/mistral/engine/default_executor.py +++ b/mistral/executors/default_executor.py @@ -17,9 +17,9 @@ from oslo_log import log as logging from osprofiler import profiler from mistral.actions import action_factory as a_f -from mistral.engine import base from mistral.engine.rpc_backend import rpc from mistral import exceptions as exc +from mistral.executors import base from mistral.utils import inspect_utils as i_u from mistral.workflow import utils as wf_utils @@ -31,18 +31,24 @@ class DefaultExecutor(base.Executor): def __init__(self): self._engine_client = rpc.get_engine_client() - @profiler.trace('executor-run-action', hide_args=True) - def run_action(self, action_ex_id, action_class_str, attributes, - action_params, safe_rerun, redelivered=False): + @profiler.trace('default-executor-run-action', hide_args=True) + def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, + params, safe_rerun, redelivered=False, + target=None, async_=True): """Runs action. :param action_ex_id: Action execution id. - :param action_class_str: Path to action class in dot notation. - :param attributes: Attributes of action class which will be set to. - :param action_params: Action parameters. + :param action_cls_str: Path to action class in dot notation. + :param action_cls_attrs: Attributes of action class which + will be set to. + :param params: Action parameters. :param safe_rerun: Tells if given action can be safely rerun. :param redelivered: Tells if given action was run before on another executor. + :param target: Target (group of action executors). + :param async_: If True, run action in asynchronous mode (w/o waiting + for completion). + :return: Action result. """ def send_error_back(error_msg): @@ -60,31 +66,38 @@ class DefaultExecutor(base.Executor): if redelivered and not safe_rerun: msg = ( - "Request to run action %s was redelivered, but action %s" - " cannot be re-run safely. The only safe thing to do is fail" - " action." - % (action_class_str, action_class_str) + "Request to run action %s was redelivered, but action %s " + "cannot be re-run safely. The only safe thing to do is fail " + "action." % (action_cls_str, action_cls_str) ) return send_error_back(msg) - action_cls = a_f.construct_action_class(action_class_str, attributes) + # Load action module. + action_cls = a_f.construct_action_class( + action_cls_str, + action_cls_attrs + ) # Instantiate action. - try: - action = action_cls(**action_params) + action = action_cls(**params) except Exception as e: - msg = ("Failed to initialize action %s. Action init params = %s." - " Actual init params = %s. More info: %s" - % (action_class_str, i_u.get_arg_list(action_cls.__init__), - action_params.keys(), e)) - LOG.exception(msg) + msg = ( + "Failed to initialize action %s. Action init params = %s. " + "Actual init params = %s. More info: %s" % ( + action_cls_str, + i_u.get_arg_list(action_cls.__init__), + params.keys(), + e + ) + ) + + LOG.warning(msg) return send_error_back(msg) # Run action. - try: result = action.run() @@ -95,15 +108,22 @@ class DefaultExecutor(base.Executor): result = wf_utils.Result(data=result) except Exception as e: - msg = ("Failed to run action [action_ex_id=%s, action_cls='%s'," - " attributes='%s', params='%s']\n %s" - % (action_ex_id, action_cls, attributes, action_params, e)) + msg = ( + "Failed to run action [action_ex_id=%s, action_cls='%s', " + "attributes='%s', params='%s']\n %s" % ( + action_ex_id, + action_cls, + action_cls_attrs, + params, + e + ) + ) + LOG.exception(msg) return send_error_back(msg) # Send action result. - try: if action_ex_id and (action.is_sync() or result.is_error()): self._engine_client.on_action_complete( @@ -118,22 +138,36 @@ class DefaultExecutor(base.Executor): # such as message bus or network. One known case is when the action # returns a bad result (e.g. invalid unicode) which can't be # serialized. - msg = ("Failed to call engine's on_action_complete() method due" - " to a Mistral exception" - " [action_ex_id=%s, action_cls='%s'," - " attributes='%s', params='%s']\n %s" - % (action_ex_id, action_cls, attributes, action_params, e)) + msg = ( + "Failed to complete action due to a Mistral exception " + "[action_ex_id=%s, action_cls='%s', " + "attributes='%s', params='%s']\n %s" % ( + action_ex_id, + action_cls, + action_cls_attrs, + params, + e + ) + ) + LOG.exception(msg) return send_error_back(msg) except Exception as e: # If it's not a Mistral exception all we can do is only # log the error. - msg = ("Failed to call engine's on_action_complete() method due" - " to an unexpected exception" - " [action_ex_id=%s, action_cls='%s'," - " attributes='%s', params='%s']\n %s" - % (action_ex_id, action_cls, attributes, action_params, e)) + msg = ( + "Failed to complete action due to an unexpected exception " + "[action_ex_id=%s, action_cls='%s', " + "attributes='%s', params='%s']\n %s" % ( + action_ex_id, + action_cls, + action_cls_attrs, + params, + e + ) + ) + LOG.exception(msg) return result diff --git a/mistral/engine/executor_server.py b/mistral/executors/executor_server.py similarity index 82% rename from mistral/engine/executor_server.py rename to mistral/executors/executor_server.py index 4159bb549..2ef75065b 100644 --- a/mistral/engine/executor_server.py +++ b/mistral/executors/executor_server.py @@ -15,8 +15,8 @@ from oslo_log import log as logging from mistral import config as cfg -from mistral.engine import default_executor from mistral.engine.rpc_backend import rpc +from mistral.executors import default_executor as exe from mistral.service import base as service_base from mistral import utils from mistral.utils import profiler as profiler_utils @@ -59,14 +59,14 @@ class ExecutorServer(service_base.MistralService): if self._rpc_server: self._rpc_server.stop(graceful) - def run_action(self, rpc_ctx, action_ex_id, action_class_str, - attributes, params, safe_rerun): + def run_action(self, rpc_ctx, action_ex_id, action_cls_str, + action_cls_attrs, params, safe_rerun): """Receives calls over RPC to run action on executor. :param rpc_ctx: RPC request context dictionary. :param action_ex_id: Action execution id. - :param action_class_str: Action class name. - :param attributes: Action class attributes. + :param action_cls_str: Action class name. + :param action_cls_attrs: Action class attributes. :param params: Action input parameters. :param safe_rerun: Tells if given action can be safely rerun. :return: Action result. @@ -74,17 +74,20 @@ class ExecutorServer(service_base.MistralService): LOG.info( "Received RPC request 'run_action'[action_ex_id=%s, " - "action_class=%s, attributes=%s, params=%s]" - % (action_ex_id, action_class_str, attributes, - utils.cut(params)) + "action_cls_str=%s, action_cls_attrs=%s, params=%s]" % ( + action_ex_id, + action_cls_str, + action_cls_attrs, + utils.cut(params) + ) ) redelivered = rpc_ctx.redelivered or False return self.executor.run_action( action_ex_id, - action_class_str, - attributes, + action_cls_str, + action_cls_attrs, params, safe_rerun, redelivered @@ -93,6 +96,6 @@ class ExecutorServer(service_base.MistralService): def get_oslo_service(setup_profiler=True): return ExecutorServer( - default_executor.DefaultExecutor(), + exe.DefaultExecutor(), setup_profiler=setup_profiler ) diff --git a/mistral/executors/remote_executor.py b/mistral/executors/remote_executor.py new file mode 100644 index 000000000..0f1bec890 --- /dev/null +++ b/mistral/executors/remote_executor.py @@ -0,0 +1,29 @@ +# Copyright 2017 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging + +from mistral.engine.rpc_backend import rpc + + +LOG = logging.getLogger(__name__) + + +class RemoteExecutor(rpc.ExecutorClient): + """Executor that passes execution request to a remote executor.""" + + def __init__(self): + self.topic = cfg.CONF.executor.topic + self._client = rpc.get_rpc_client_driver()(cfg.CONF.executor) diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 0c2ef249d..95a7cc79b 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -22,8 +22,9 @@ from oslo_service import service from mistral.db.v2 import api as db_api from mistral.engine import engine_server -from mistral.engine import executor_server from mistral.engine.rpc_backend import rpc +from mistral.executors import base as exe +from mistral.executors import executor_server from mistral.tests.unit import base from mistral.workflow import states @@ -57,40 +58,42 @@ class EngineTestCase(base.DbTestCase): # Drop all RPC objects (transport, clients). rpc.cleanup() + exe.cleanup() + self.threads = [] + + # Start remote executor. + if cfg.CONF.executor.type == 'remote': + LOG.info("Starting remote executor threads...") + self.executor_client = rpc.get_executor_client() + exe_svc = executor_server.get_oslo_service(setup_profiler=False) + self.executor = exe_svc.executor + self.threads.append(eventlet.spawn(launch_service, exe_svc)) + self.addCleanup(exe_svc.stop, True) + + # Start engine. + LOG.info("Starting engine threads...") self.engine_client = rpc.get_engine_client() - self.executor_client = rpc.get_executor_client() - - LOG.info("Starting engine and executor threads...") - - engine_service = engine_server.get_oslo_service(setup_profiler=False) - executor_service = executor_server.get_oslo_service( - setup_profiler=False - ) - - self.engine = engine_service.engine - self.executor = executor_service.executor - - self.threads = [ - eventlet.spawn(launch_service, executor_service), - eventlet.spawn(launch_service, engine_service) - ] + eng_svc = engine_server.get_oslo_service(setup_profiler=False) + self.engine = eng_svc.engine + self.threads.append(eventlet.spawn(launch_service, eng_svc)) + self.addCleanup(eng_svc.stop, True) self.addOnException(self.print_executions) - - self.addCleanup(executor_service.stop, True) - self.addCleanup(engine_service.stop, True) self.addCleanup(self.kill_threads) # Make sure that both services fully started, otherwise # the test may run too early. - executor_service.wait_started() - engine_service.wait_started() + if cfg.CONF.executor.type == 'remote': + exe_svc.wait_started() + + eng_svc.wait_started() def kill_threads(self): LOG.info("Finishing engine and executor threads...") - [thread.kill() for thread in self.threads] + for thread in self.threads: + thread.kill() @staticmethod def print_executions(exc_info=None): diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index f734b13f8..7df297d1c 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -23,8 +23,8 @@ from oslo_utils import uuidutils from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models from mistral.engine import default_engine as d_eng -from mistral.engine.rpc_backend import rpc from mistral import exceptions as exc +from mistral.executors import base as exe from mistral.services import workbooks as wb_service from mistral.tests.unit import base from mistral.tests.unit.engine import base as eng_test_base @@ -93,7 +93,7 @@ MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB) MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) -@mock.patch.object(rpc, 'get_executor_client', mock.Mock()) +@mock.patch.object(exe, 'get_executor', mock.Mock()) class DefaultEngineTest(base.DbTestCase): def setUp(self): super(DefaultEngineTest, self).setUp() diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 0a550c0cb..e056f33df 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -16,8 +16,8 @@ import mock from oslo_config import cfg from mistral.db.v2 import api as db_api -from mistral.engine import default_executor -from mistral.engine.rpc_backend import rpc +from mistral.executors import default_executor as d_exe +from mistral.executors import remote_executor as r_exe from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base @@ -77,16 +77,17 @@ workflows: """ -def _run_at_target(action_ex_id, action_class_str, attributes, - action_params, target=None, async_=True, safe_rerun=False): +def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs, + params, safe_rerun, target=None, async_=True): + # We'll just call executor directly for testing purposes. - executor = default_executor.DefaultExecutor() + executor = d_exe.DefaultExecutor() executor.run_action( action_ex_id, - action_class_str, - attributes, - action_params, + action_cls_str, + action_cls_attrs, + params, safe_rerun ) @@ -100,7 +101,7 @@ class EnvironmentTest(base.EngineTestCase): wb_service.create_workbook_v2(WORKBOOK) - @mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) + @mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET) def _test_subworkflow(self, env): wf2_ex = self.engine.start_workflow('my_wb.wf2', {}, env=env) @@ -169,13 +170,13 @@ class EnvironmentTest(base.EngineTestCase): for t_ex in wf1_task_execs: a_ex = t_ex.action_executions[0] - rpc.ExecutorClient.run_action.assert_any_call( + r_exe.RemoteExecutor.run_action.assert_any_call( a_ex.id, 'mistral.actions.std_actions.EchoAction', {}, a_ex.input, - TARGET, - safe_rerun=False + False, + target=TARGET ) def test_subworkflow_env_task_input(self): diff --git a/mistral/tests/unit/engine/test_safe_rerun.py b/mistral/tests/unit/engine/test_safe_rerun.py index b709dde77..5934d8a32 100644 --- a/mistral/tests/unit/engine/test_safe_rerun.py +++ b/mistral/tests/unit/engine/test_safe_rerun.py @@ -16,8 +16,8 @@ import mock from mistral.db.v2 import api as db_api -from mistral.engine import default_executor -from mistral.engine.rpc_backend import rpc +from mistral.executors import default_executor as d_exe +from mistral.executors import remote_executor as r_exe from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import data_flow @@ -25,9 +25,9 @@ from mistral.workflow import states def _run_at_target(action_ex_id, action_class_str, attributes, - action_params, target=None, async_=True, safe_rerun=False): + action_params, safe_rerun, target=None, async_=True): # We'll just call executor directly for testing purposes. - executor = default_executor.DefaultExecutor() + executor = d_exe.DefaultExecutor() executor.run_action( action_ex_id, @@ -43,7 +43,8 @@ MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target) class TestSafeRerun(base.EngineTestCase): - @mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) + + @mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET) def test_safe_rerun_true(self): wf_text = """--- version: '2.0' @@ -89,7 +90,7 @@ class TestSafeRerun(base.EngineTestCase): self.assertEqual(task1.state, states.SUCCESS) self.assertEqual(task2.state, states.SUCCESS) - @mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) + @mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET) def test_safe_rerun_false(self): wf_text = """--- version: '2.0' @@ -135,7 +136,7 @@ class TestSafeRerun(base.EngineTestCase): self.assertEqual(task1.state, states.ERROR) self.assertEqual(task3.state, states.SUCCESS) - @mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) + @mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET) def test_safe_rerun_with_items(self): wf_text = """--- version: '2.0' diff --git a/mistral/tests/unit/executors/__init__.py b/mistral/tests/unit/executors/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/tests/unit/executors/base.py b/mistral/tests/unit/executors/base.py new file mode 100644 index 000000000..505b8fd23 --- /dev/null +++ b/mistral/tests/unit/executors/base.py @@ -0,0 +1,24 @@ +# Copyright 2017 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral.tests.unit.engine import base as engine_test_base + + +LOG = logging.getLogger(__name__) + + +class ExecutorTestCase(engine_test_base.EngineTestCase): + pass diff --git a/mistral/tests/unit/executors/test_local_executor.py b/mistral/tests/unit/executors/test_local_executor.py new file mode 100644 index 000000000..130bb0c99 --- /dev/null +++ b/mistral/tests/unit/executors/test_local_executor.py @@ -0,0 +1,171 @@ +# Copyright 2017 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from oslo_config import cfg +from oslo_log import log as logging + +from mistral.actions import std_actions +from mistral.db.v2 import api as db_api +from mistral.executors import base as exe +from mistral.executors import remote_executor as r_exe +from mistral.services import workbooks as wb_svc +from mistral.tests.unit.executors import base +from mistral.workflow import states + + +LOG = logging.getLogger(__name__) + + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +@mock.patch.object( + r_exe.RemoteExecutor, + 'run_action', + mock.MagicMock(return_value=None) +) +class LocalExecutorTestCase(base.ExecutorTestCase): + + @classmethod + def setUpClass(cls): + super(LocalExecutorTestCase, cls).setUpClass() + cfg.CONF.set_default('type', 'local', group='executor') + + @classmethod + def tearDownClass(cls): + exe.cleanup() + cfg.CONF.set_default('type', 'remote', group='executor') + super(LocalExecutorTestCase, cls).tearDownClass() + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 1', # Mock task1 success. + 'Task 2', # Mock task2 success. + 'Task 3' # Mock task3 success. + ] + ) + ) + def test_run(self): + wb_def = """ + version: '2.0' + + name: wb1 + + workflows: + wf1: + type: direct + + tasks: + t1: + action: std.echo output="Task 1" + on-success: + - t2 + t2: + action: std.echo output="Task 2" + on-success: + - t3 + t3: + action: std.echo output="Task 3" + """ + + wb_svc.create_workbook_v2(wb_def) + wf_ex = self.engine.start_workflow('wb1.wf1', {}) + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_execs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(3, len(task_execs)) + + task_1_ex = self._assert_single_item(task_execs, name='t1') + task_2_ex = self._assert_single_item(task_execs, name='t2') + task_3_ex = self._assert_single_item(task_execs, name='t3') + + self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.SUCCESS, task_2_ex.state) + self.assertEqual(states.SUCCESS, task_3_ex.state) + + # Make sure the remote executor is not called. + self.assertFalse(r_exe.RemoteExecutor.run_action.called) + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 1.0', # Mock task1 success. + 'Task 1.1', # Mock task1 success. + 'Task 1.2', # Mock task1 success. + 'Task 2' # Mock task2 success. + ] + ) + ) + def test_run_with_items(self): + wb_def = """ + version: '2.0' + + name: wb1 + + workflows: + wf1: + type: direct + + tasks: + t1: + with-items: i in <% list(range(0, 3)) %> + action: std.echo output="Task 1.<% $.i %>" + publish: + v1: <% task(t1).result %> + on-success: + - t2 + t2: + action: std.echo output="Task 2" + """ + + wb_svc.create_workbook_v2(wb_def) + wf_ex = self.engine.start_workflow('wb1.wf1', {}) + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_execs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertEqual(2, len(wf_ex.task_executions)) + + task_1_ex = self._assert_single_item(task_execs, name='t1') + task_2_ex = self._assert_single_item(task_execs, name='t2') + + self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.SUCCESS, task_2_ex.state) + + with db_api.transaction(): + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id + ) + + self.assertEqual(3, len(task_1_action_exs)) + + # Make sure the remote executor is not called. + self.assertFalse(r_exe.RemoteExecutor.run_action.called) diff --git a/mistral/tests/unit/executors/test_plugins.py b/mistral/tests/unit/executors/test_plugins.py new file mode 100644 index 000000000..bed64790c --- /dev/null +++ b/mistral/tests/unit/executors/test_plugins.py @@ -0,0 +1,40 @@ +# Copyright 2017 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral.executors import base as exe +from mistral.executors import default_executor as d_exe +from mistral.executors import remote_executor as r_exe +from mistral.tests.unit.executors import base + + +LOG = logging.getLogger(__name__) + + +class PluginTestCase(base.ExecutorTestCase): + + def tearDown(self): + exe.cleanup() + super(PluginTestCase, self).tearDown() + + def test_get_local_executor(self): + executor = exe.get_executor('local') + + self.assertIsInstance(executor, d_exe.DefaultExecutor) + + def test_get_remote_executor(self): + executor = exe.get_executor('remote') + + self.assertIsInstance(executor, r_exe.RemoteExecutor) diff --git a/setup.cfg b/setup.cfg index 14f9be8ec..c34147480 100644 --- a/setup.cfg +++ b/setup.cfg @@ -70,6 +70,10 @@ mistral.actions = std.sleep = mistral.actions.std_actions:SleepAction std.test_dict = mistral.actions.std_actions:TestDictAction +mistral.executors = + local = mistral.executors.default_executor:DefaultExecutor + remote = mistral.executors.remote_executor:RemoteExecutor + mistral.expression.functions = global = mistral.utils.expression_utils:global_ json_pp = mistral.utils.expression_utils:json_pp_