From a3ff23cfffb14331bb6304f30bb53666e2351943 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Mon, 28 Mar 2016 13:44:54 +0600 Subject: [PATCH] Fixing engine transaction model and error handling * Transaction in on_action_complete() must not be splitted into 2 parts, it caused the bug with after task completion logic * Fix executor behavior so that it doesn't send an error back to engine if a error came from engine itself. It should report back only errors occurred with an action itself. * YAQL and other expected Mistral exceptions in transitions should not lead to transaction rollback and rollback of action result. For example if action result came and it's valid but while evaluating transition conditions we got a YAQL exception then action result should be stored normally w/o transaction rollback and corresponding task and workflow should fail with corresponding state_info. * Fixed all tests * Minor cosmetic changes Closes-Bug: #1524477 Change-Id: I09086e40a5902bbb6c977bf195cb035e31f21246 (cherry picked from commit ad07ba0d68836ab0409fb51d235def653143cd2c) --- AUTHORS | 28 ++ mistral/engine/action_handler.py | 2 +- mistral/engine/default_engine.py | 113 ++--- mistral/engine/default_executor.py | 55 ++- mistral/engine/rpc.py | 3 + mistral/engine/task_handler.py | 89 +++- mistral/tests/unit/engine/base.py | 16 +- .../tests/unit/engine/test_direct_workflow.py | 80 ++-- .../test_execution_fields_size_limitation.py | 7 +- mistral/tests/unit/engine/test_with_items.py | 400 +++++++++--------- mistral/workflow/commands.py | 12 +- 11 files changed, 474 insertions(+), 331 deletions(-) diff --git a/AUTHORS b/AUTHORS index 75174db2b..b2a764611 100644 --- a/AUTHORS +++ b/AUTHORS @@ -1,43 +1,71 @@ Abhishek Chanda Alexander Kuznetsov Anastasia Kuznetsova +Andreas Jaeger Angus Salkeld Ankita Wagh Antoine Musso +Bertrand Lallau +Bhaskar Duvvuri Boris Pavlovic Bryan Havenstein +Chaozhe.Chen Christian Berendt Claudiu Belu +Dan Prince +Daryl Mowrer David C Kennedy David Charles Kennedy +Dawid Deja Dmitri Zimine Doug Hellmann Ed Cranford +Gal Margalit Guy Paz Jeremy Stanley +Jiri Tomasek +Kevin Pouget Kirill Izotov Lakshmi Kannan +Limor Limor Stotland Lingxian Kong Liu Sheng +LiuNanke Manas Kelshikar +Michael Krotscheck +Michal Gershenzon Monty Taylor Moshe Elisha Nikolay Mahotkin Noa Koffman +Oleksii Chuprykov Pierre-Arthur MATHIEU Ray Chen Renat Akhmerov +Renat Akhmerov Rico Lin Rinat Sabitov Sergey Kolekonov Sergey Murashov +Shuquan Huang +Thierry Carrez +Thomas Herve Timur Nurlygayanov Venkata Mahesh Kotha Winson Chan Yaroslav Lobankov Zhao Lei Zhenguo Niu +ZhiQiang Fan ZhiQiang Fan Zhu Rong +caoyue +cheneydc hardik +hparekh +keliang +syed ahsan shamim zaidi +tengqm +wangzhh +zhangguoqing diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index 7f62a0b6a..aa5e152f7 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -29,7 +29,7 @@ from mistral.workflow import utils as wf_utils def create_action_execution(action_def, action_input, task_ex=None, index=0, description=''): - # TODO(rakhmerov): We can avoid hitting DB at all when calling something + # TODO(rakhmerov): We can avoid hitting DB at all when calling things like # create_action_execution(), these operations can be just done using # SQLAlchemy session (1-level cache) and session flush (on TX commit) would # send necessary SQL queries to DB. Currently, session flush happens diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 6c70c0504..f71adc1ac 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -24,6 +24,7 @@ from mistral.engine import action_handler from mistral.engine import base from mistral.engine import task_handler from mistral.engine import workflow_handler as wf_handler +from mistral import exceptions as exc from mistral.services import action_manager as a_m from mistral.services import executions as wf_ex_service from mistral.services import workflows as wf_service @@ -55,6 +56,9 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ex_id = None try: + # Create a persistent workflow execution in a separate transaction + # so that we can return it even in case of unexpected errors that + # lead to transaction rollback. with db_api.transaction(): # The new workflow execution will be in an IDLE # state on initial record creation. @@ -65,10 +69,6 @@ class DefaultEngine(base.Engine, coordination.Service): params ) - # Separate workflow execution creation and dispatching command - # transactions in order to be able to return workflow execution - # with corresponding error message in state_info when error occurs - # at dispatching commands. with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex_id) wf_handler.set_execution_state(wf_ex, states.RUNNING) @@ -161,14 +161,10 @@ class DefaultEngine(base.Engine, coordination.Service): self._on_task_state_change(task_ex, wf_ex, wf_spec) - def _on_task_state_change(self, task_ex, wf_ex, wf_spec, - task_state=states.SUCCESS): + def _on_task_state_change(self, task_ex, wf_ex, wf_spec): task_spec = wf_spec.get_tasks()[task_ex.name] - # We must be sure that if task is completed, - # it was also completed in previous transaction. - if (task_handler.is_task_completed(task_ex, task_spec) - and states.is_completed(task_state)): + if task_handler.is_task_completed(task_ex, task_spec): task_handler.after_task_complete(task_ex, task_spec, wf_spec) # Ignore DELAYED state. @@ -178,8 +174,21 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ctrl = wf_base.get_controller(wf_ex, wf_spec) # Calculate commands to process next. - cmds = wf_ctrl.continue_workflow() + try: + cmds = wf_ctrl.continue_workflow() + except exc.YaqlEvaluationException as e: + LOG.error( + 'YAQL error occurred while calculating next workflow ' + 'commands [wf_ex_id=%s, task_ex_id=%s]: %s', + wf_ex.id, task_ex.id, e + ) + wf_handler.fail_workflow(wf_ex, str(e)) + + return + + # Mark task as processed after all decisions have been made + # upon its completion. task_ex.processed = True self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) @@ -235,6 +244,7 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ex_id = action_ex.task_execution.workflow_execution_id wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) task_ex = task_handler.on_action_complete( @@ -248,30 +258,13 @@ class DefaultEngine(base.Engine, coordination.Service): if states.is_paused_or_completed(wf_ex.state): return action_ex.get_clone() - prev_task_state = task_ex.state - - # Separate the task transition in a separate transaction. The task - # has already completed for better or worst. The task state should - # not be affected by errors during transition on conditions such as - # on-success and on-error. - with db_api.transaction(): - wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - action_ex = db_api.get_action_execution(action_ex_id) - task_ex = action_ex.task_execution - - self._on_task_state_change( - task_ex, - wf_ex, - wf_spec, - task_state=prev_task_state - ) + self._on_task_state_change(task_ex, wf_ex, wf_spec) return action_ex.get_clone() except Exception as e: - # TODO(dzimine): try to find out which command caused failure. # TODO(rakhmerov): Need to refactor logging in a more elegant way. LOG.error( - "Failed to handle action execution result [id=%s]: %s\n%s", + 'Failed to handle action execution result [id=%s]: %s\n%s', action_ex_id, e, traceback.format_exc() ) @@ -301,12 +294,13 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ctrl = wf_base.get_controller(wf_ex) + # TODO(rakhmerov): Add YAQL error handling. # Calculate commands to process next. cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) # When resuming a workflow we need to ignore all 'pause' # commands because workflow controller takes tasks that - # completed within the period when the workflow was pause. + # completed within the period when the workflow was paused. cmds = list( filter( lambda c: not isinstance(c, commands.PauseWorkflow), @@ -323,6 +317,7 @@ class DefaultEngine(base.Engine, coordination.Service): t_ex.processed = True wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) if not cmds: @@ -378,9 +373,9 @@ class DefaultEngine(base.Engine, coordination.Service): raise e @u.log_exec(LOG) - def stop_workflow(self, execution_id, state, message=None): + def stop_workflow(self, wf_ex_id, state, message=None): with db_api.transaction(): - wf_ex = wf_handler.lock_workflow_execution(execution_id) + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) return self._stop_workflow(wf_ex, state, message) @@ -390,13 +385,16 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ctrl = wf_base.get_controller(wf_ex) final_context = {} + try: final_context = wf_ctrl.evaluate_workflow_final_context() except Exception as e: LOG.warning( - "Failed to get final context for %s: %s" % (wf_ex, e) + 'Failed to get final context for %s: %s' % (wf_ex, e) ) + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + return wf_handler.succeed_workflow( wf_ex, final_context, @@ -409,7 +407,7 @@ class DefaultEngine(base.Engine, coordination.Service): return wf_ex @u.log_exec(LOG) - def rollback_workflow(self, execution_id): + def rollback_workflow(self, wf_ex_id): # TODO(rakhmerov): Implement. raise NotImplementedError @@ -421,12 +419,26 @@ class DefaultEngine(base.Engine, coordination.Service): if isinstance(cmd, commands.RunTask) and cmd.is_waiting(): task_handler.defer_task(cmd) elif isinstance(cmd, commands.RunTask): - task_handler.run_new_task(cmd, wf_spec) + task_ex = task_handler.run_new_task(cmd, wf_spec) + + if task_ex.state == states.ERROR: + wf_handler.fail_workflow( + wf_ex, + 'Failed to start task [task_ex=%s]: %s' % + (task_ex, task_ex.state_info) + ) elif isinstance(cmd, commands.RunExistingTask): - task_handler.run_existing_task( + task_ex = task_handler.run_existing_task( cmd.task_ex.id, reset=cmd.reset ) + + if task_ex.state == states.ERROR: + wf_handler.fail_workflow( + wf_ex, + 'Failed to start task [task_ex=%s]: %s' % + (task_ex, task_ex.state_info) + ) elif isinstance(cmd, commands.SetWorkflowState): if states.is_completed(cmd.new_state): self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg) @@ -441,33 +453,28 @@ class DefaultEngine(base.Engine, coordination.Service): if wf_ex.state != states.RUNNING: break + # TODO(rakhmerov): This method may not be needed at all because error + # handling is now implemented too roughly w/o distinguishing different + # errors. On most errors (like YAQLException) we shouldn't rollback + # transactions, we just need to fail corresponding execution objects + # where a problem happened (action, task or workflow). @staticmethod - def _fail_workflow(wf_ex_id, err, action_ex_id=None): + def _fail_workflow(wf_ex_id, exc): """Private helper to fail workflow on exceptions.""" - err_msg = str(err) with db_api.transaction(): wf_ex = db_api.load_workflow_execution(wf_ex_id) if wf_ex is None: LOG.error( - "Cant fail workflow execution with id='%s': not found.", + "Can't fail workflow execution with id='%s': not found.", wf_ex_id ) - return + return None - wf_handler.set_execution_state(wf_ex, states.ERROR, err_msg) + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) - if action_ex_id: - # Note(dzimine): Don't call self.engine_client: - # 1) to avoid computing and triggering next tasks - # 2) to avoid a loop in case of error in transport - action_ex = db_api.get_action_execution(action_ex_id) - - task_handler.on_action_complete( - action_ex, - spec_parser.get_workflow_spec(wf_ex.spec), - wf_utils.Result(error=err_msg) - ) + if not states.is_paused_or_completed(wf_ex.state): + wf_handler.set_execution_state(wf_ex, states.ERROR, str(exc)) return wf_ex diff --git a/mistral/engine/default_executor.py b/mistral/engine/default_executor.py index 93882b853..6ec244573 100644 --- a/mistral/engine/default_executor.py +++ b/mistral/engine/default_executor.py @@ -19,7 +19,6 @@ from oslo_log import log as logging from mistral.actions import action_factory as a_f from mistral import coordination from mistral.engine import base -from mistral import exceptions as exc from mistral.utils import inspect_utils as i_u from mistral.workflow import utils as wf_utils @@ -37,7 +36,7 @@ class DefaultExecutor(base.Executor, coordination.Service): action_params): """Runs action. - :param action_ex_id: Corresponding task id. + :param action_ex_id: Action execution id. :param action_class_str: Path to action class in dot notation. :param attributes: Attributes of action class which will be set to. :param action_params: Action parameters. @@ -51,14 +50,29 @@ class DefaultExecutor(base.Executor, coordination.Service): action_ex_id, error_result ) - else: - return error_result + + return None + + return error_result action_cls = a_f.construct_action_class(action_class_str, attributes) + # Instantiate action. + try: action = action_cls(**action_params) + except Exception as e: + msg = ("Failed to initialize action %s. Action init params = %s." + " Actual init params = %s. More info: %s" + % (action_class_str, i_u.get_arg_list(action_cls.__init__), + action_params.keys(), e)) + LOG.warning(msg) + return send_error_back(msg) + + # Run action. + + try: result = action.run() # Note: it's made for backwards compatibility with already @@ -67,24 +81,25 @@ class DefaultExecutor(base.Executor, coordination.Service): if not isinstance(result, wf_utils.Result): result = wf_utils.Result(data=result) - if action_ex_id and (action.is_sync() or result.is_error()): - self._engine_client.on_action_complete(action_ex_id, result) - - return result - except TypeError as e: - msg = ("Failed to initialize action %s. Action init params = %s." - " Actual init params = %s. More info: %s" - % (action_class_str, i_u.get_arg_list(action_cls.__init__), - action_params.keys(), e)) - LOG.warning(msg) - - except exc.ActionException as e: + except Exception as e: msg = ("Failed to run action [action_ex_id=%s, action_cls='%s'," " attributes='%s', params='%s']\n %s" % (action_ex_id, action_cls, attributes, action_params, e)) LOG.exception(msg) - except Exception as e: - msg = str(e) - # Send error info to engine. - return send_error_back(msg) + return send_error_back(msg) + + # Send action result. + + try: + if action_ex_id and (action.is_sync() or result.is_error()): + self._engine_client.on_action_complete(action_ex_id, result) + + except Exception as e: + msg = ("Exception occurred when calling engine on_action_complete" + " [action_ex_id=%s, action_cls='%s'," + " attributes='%s', params='%s']\n %s" + % (action_ex_id, action_cls, attributes, action_params, e)) + LOG.exception(msg) + + return result diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index f04feb835..59bf3b799 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -24,6 +24,7 @@ from mistral.engine import base from mistral import exceptions as exc from mistral.workflow import utils as wf_utils + LOG = logging.getLogger(__name__) @@ -164,6 +165,8 @@ class EngineServer(object): :param rpc_ctx: RPC request context. :param action_ex_id: Action execution id. + :param result_data: Action result data. + :param result_error: Action result error. :return: Action execution. """ diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 7985f245a..7d136b08a 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -46,6 +46,9 @@ def run_existing_task(task_ex_id, reset=True): """This function runs existing task execution. It is needed mostly by scheduler. + + :param task_ex_id: Task execution id. + :param reset: Reset action executions for the task. """ task_ex = db_api.get_task_execution(task_ex_id) task_spec = spec_parser.get_task_spec(task_ex.spec) @@ -54,15 +57,16 @@ def run_existing_task(task_ex_id, reset=True): # Throw exception if the existing task already succeeded. if task_ex.state == states.SUCCESS: - raise exc.EngineException('Reruning existing task that already ' - 'succeeded is not supported.') + raise exc.EngineException( + 'Rerunning existing task that already succeeded is not supported.' + ) # Exit if the existing task failed and reset is not instructed. # For a with-items task without reset, re-running the existing # task will re-run the failed and unstarted items. if (task_ex.state == states.ERROR and not reset and not task_spec.get_with_items()): - return + return task_ex # Reset nested executions only if task is not already RUNNING. if task_ex.state != states.RUNNING: @@ -84,14 +88,27 @@ def run_existing_task(task_ex_id, reset=True): _run_existing_task(task_ex, task_spec, wf_spec) + return task_ex + def _run_existing_task(task_ex, task_spec, wf_spec): - input_dicts = _get_input_dictionaries( - wf_spec, - task_ex, - task_spec, - task_ex.in_context - ) + try: + input_dicts = _get_input_dictionaries( + wf_spec, + task_ex, + task_spec, + task_ex.in_context + ) + except exc.MistralException as e: + LOG.error( + 'An error while calculating task action inputs' + ' [task_execution_id=%s]: %s', + task_ex.id, e + ) + + set_task_state(task_ex, states.ERROR, str(e)) + + return # In some cases we can have no input, e.g. in case of 'with-items'. if input_dicts: @@ -113,8 +130,15 @@ def defer_task(wf_cmd): wf_ex = wf_cmd.wf_ex task_spec = wf_cmd.task_spec - if not wf_utils.find_task_executions_by_spec(wf_ex, task_spec): - _create_task_execution(wf_ex, task_spec, ctx, state=states.WAITING) + if wf_utils.find_task_executions_by_spec(wf_ex, task_spec): + return None + + return _create_task_execution( + wf_ex, + task_spec, + ctx, + state=states.WAITING + ) def run_new_task(wf_cmd, wf_spec): @@ -149,23 +173,25 @@ def run_new_task(wf_cmd, wf_spec): # Policies could possibly change task state. if task_ex.state != states.RUNNING: - return + return task_ex _run_existing_task(task_ex, task_spec, wf_spec) + return task_ex + def on_action_complete(action_ex, wf_spec, result): """Handles event of action result arrival. - Given action result this method performs analysis of the workflow - execution and identifies commands (including tasks) that can be - scheduled for execution. + Given action result this method changes corresponding task execution + object. This method must never be called for the case of individual + action which is not associated with any tasks. :param action_ex: Action execution objects the result belongs to. :param wf_spec: Workflow specification. :param result: Task action/workflow output wrapped into mistral.workflow.utils.Result instance. - :return List of engine commands that need to be performed. + :return Task execution object. """ task_ex = action_ex.task_execution @@ -177,7 +203,18 @@ def on_action_complete(action_ex, wf_spec, result): task_spec = wf_spec.get_tasks()[task_ex.name] - result = action_handler.transform_result(result, task_ex, task_spec) + try: + result = action_handler.transform_result(result, task_ex, task_spec) + except exc.YaqlEvaluationException as e: + err_msg = str(e) + + LOG.error( + 'YAQL error while transforming action result' + ' [action_execution_id=%s, result=%s]: %s', + action_ex.id, result, err_msg + ) + + result = wf_utils.Result(error=err_msg) # Ignore workflow executions because they're handled during # workflow completion. @@ -195,6 +232,7 @@ def on_action_complete(action_ex, wf_spec, result): _complete_task(task_ex, task_spec, task_state, task_state_info) else: with_items.increase_capacity(task_ex) + if with_items.is_completed(task_ex): _complete_task( task_ex, @@ -405,7 +443,10 @@ def _schedule_run_action(task_ex, task_spec, action_input, index, wf_spec): ) action_ex = action_handler.create_action_execution( - action_def, action_input, task_ex, index + action_def, + action_input, + task_ex, + index ) target = expr.evaluate_recursively( @@ -506,11 +547,14 @@ def _complete_task(task_ex, task_spec, state, state_info=None): set_task_state(task_ex, state, state_info) try: - data_flow.publish_variables( - task_ex, - task_spec + data_flow.publish_variables(task_ex, task_spec) + except exc.MistralException as e: + LOG.error( + 'An error while publishing task variables' + ' [task_execution_id=%s]: %s', + task_ex.id, str(e) ) - except Exception as e: + set_task_state(task_ex, states.ERROR, str(e)) if not task_spec.get_keep_result(): @@ -518,7 +562,6 @@ def _complete_task(task_ex, task_spec, state, state_info=None): def set_task_state(task_ex, state, state_info, processed=None): - # TODO(rakhmerov): How do we log task result? wf_trace.info( task_ex.workflow_execution, "Task execution '%s' [%s -> %s]" % diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 89c553669..eb3e01d68 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -146,17 +146,17 @@ class EngineTestCase(base.DbTestCase): def is_task_in_state(self, task_ex_id, state): return db_api.get_task_execution(task_ex_id).state == state - def is_execution_in_state(self, wf_ex_id, state): - return db_api.get_workflow_execution(wf_ex_id).state == state + def is_execution_in_state(self, ex_id, state): + return db_api.get_workflow_execution(ex_id).state == state - def is_execution_success(self, wf_ex_id): - return self.is_execution_in_state(wf_ex_id, states.SUCCESS) + def is_execution_success(self, ex_id): + return self.is_execution_in_state(ex_id, states.SUCCESS) - def is_execution_error(self, wf_ex_id): - return self.is_execution_in_state(wf_ex_id, states.ERROR) + def is_execution_error(self, ex_id): + return self.is_execution_in_state(ex_id, states.ERROR) - def is_execution_paused(self, wf_ex_id): - return self.is_execution_in_state(wf_ex_id, states.PAUSED) + def is_execution_paused(self, ex_id): + return self.is_execution_in_state(ex_id, states.PAUSED) def is_task_success(self, task_ex_id): return self.is_task_in_state(task_ex_id, states.SUCCESS) diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index acffcd9b4..0601cc468 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -29,12 +29,14 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') class DirectWorkflowEngineTest(base.EngineTestCase): - def _run_workflow(self, workflow_yaml, state=states.ERROR): - wf_service.create_workflows(workflow_yaml) + def _run_workflow(self, wf_text, expected_state=states.ERROR): + wf_service.create_workflows(wf_text) wf_ex = self.engine.start_workflow('wf', {}) - self._await(lambda: self.is_execution_in_state(wf_ex.id, state)) + self._await( + lambda: self.is_execution_in_state(wf_ex.id, expected_state) + ) return db_api.get_workflow_execution(wf_ex.id) @@ -274,17 +276,18 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertEqual(states.ERROR, wf_ex.state) self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info) - # Assert that there is only one task execution and it's SUCCESS. - self.assertEqual(1, len(wf_ex.task_executions)) + task_execs = wf_ex.task_executions + self.assertEqual(2, len(task_execs)) + + # 'task1' should be in SUCCESS. task_1_ex = self._assert_single_item( - wf_ex.task_executions, - name='task1' + task_execs, + name='task1', + state=states.SUCCESS ) - self.assertEqual(states.SUCCESS, task_1_ex.state) - - # Assert that there is only one action execution and it's SUCCESS. + # 'task1' should have exactly one action execution (in SUCCESS). task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id ) @@ -292,6 +295,19 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertEqual(1, len(task_1_action_exs)) self.assertEqual(states.SUCCESS, task_1_action_exs[0].state) + # 'task2' should exist but in ERROR. + task_2_ex = self._assert_single_item( + task_execs, + name='task2', + state=states.ERROR + ) + + # 'task2' must not have action executions. + self.assertEqual( + 0, + len(db_api.get_action_executions(task_execution_id=task_2_ex.id)) + ) + def test_async_next_task_with_input_yaql_error(self): wf_text = """ version: '2.0' @@ -331,29 +347,28 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertEqual(states.RUNNING, task_1_action_exs[0].state) # Update async action execution result. - result = wf_utils.Result(data='foobar') - - self.assertRaises( - exc.YaqlEvaluationException, - self.engine.on_action_complete, + self.engine.on_action_complete( task_1_action_exs[0].id, - result + wf_utils.Result(data='foobar') ) - # Assert that task1 is SUCCESS and workflow is ERROR. wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) self.assertIn('Can not evaluate YAQL expression', wf_ex.state_info) - self.assertEqual(1, len(wf_ex.task_executions)) + task_execs = wf_ex.task_executions + + self.assertEqual(2, len(task_execs)) + + # 'task1' must be in SUCCESS. task_1_ex = self._assert_single_item( - wf_ex.task_executions, - name='task1' + task_execs, + name='task1', + state=states.SUCCESS ) - self.assertEqual(states.SUCCESS, task_1_ex.state) - + # 'task1' must have exactly one action execution (in SUCCESS). task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id ) @@ -361,6 +376,19 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertEqual(1, len(task_1_action_exs)) self.assertEqual(states.SUCCESS, task_1_action_exs[0].state) + # 'task2' must be in ERROR. + task_2_ex = self._assert_single_item( + task_execs, + name='task2', + state=states.ERROR + ) + + # 'task2' must not have action executions. + self.assertEqual( + 0, + len(db_api.get_action_executions(task_execution_id=task_2_ex.id)) + ) + def test_messed_yaql_in_first_task(self): wf_text = """ version: '2.0' @@ -511,13 +539,9 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertEqual(states.RUNNING, task_1_action_exs[0].state) # Update async action execution result. - result = wf_utils.Result(data='foobar') - - self.assertRaises( - exc.YaqlEvaluationException, - self.engine.on_action_complete, + self.engine.on_action_complete( task_1_action_exs[0].id, - result + wf_utils.Result(data='foobar') ) # Assert that task1 is SUCCESS and workflow is ERROR. diff --git a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py index 608327740..5c2487570 100644 --- a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py +++ b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py @@ -37,6 +37,7 @@ wf: input: - workflow_input: '__WORKFLOW_INPUT__' - action_output_length: 0 + tasks: task1: action: my_action @@ -79,8 +80,10 @@ def expect_size_limit_exception(field_name): def generate_workflow(tokens): new_wf = WF long_string = ''.join('A' for _ in range(1024)) + for token in tokens: new_wf = new_wf.replace(token, long_string) + return new_wf @@ -136,11 +139,11 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): # Start workflow. wf_ex = self.engine.start_workflow('wf', {}) + self.assertEqual(states.ERROR, wf_ex.state) self.assertIn( "Size of 'input' is 1KB which exceeds the limit of 0KB", wf_ex.state_info ) - self.assertEqual(states.ERROR, wf_ex.state) def test_action_output_limit(self): wf_service.create_workflows(WF) @@ -175,7 +178,7 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertIn( - "Failure caused by error in tasks: task1", + 'Failure caused by error in tasks: task1', wf_ex.state_info ) diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 511e00c47..260eaffaa 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -33,7 +33,7 @@ from mistral.workflow import utils as wf_utils # the change in value is not permanent. cfg.CONF.set_default('auth_enable', False, group='pecan') -WORKBOOK = """ +WB = """ --- version: "2.0" @@ -55,7 +55,7 @@ workflows: """ -WORKBOOK_WITH_STATIC_VAR = """ +WB_WITH_STATIC_VAR = """ --- version: "2.0" @@ -78,7 +78,7 @@ workflows: """ -WORKBOOK_MULTI_ARRAY = """ +WB_MULTI_ARRAY = """ --- version: "2.0" @@ -104,7 +104,7 @@ workflows: """ -WORKBOOK_ACTION_CONTEXT = """ +WB_ACTION_CONTEXT = """ --- version: "2.0" name: wb1 @@ -123,7 +123,7 @@ workflows: """ -WORKFLOW_INPUT = { +WF_INPUT = { 'names_info': [ {'name': 'John'}, {'name': 'Ivan'}, @@ -140,7 +140,7 @@ WF_INPUT_URLS = { ] } -WORKFLOW_INPUT_ONE_ITEM = { +WF_INPUT_ONE_ITEM = { 'names_info': [ {'name': 'Guy'} ] @@ -153,6 +153,7 @@ class RandomSleepEchoAction(action_base.Action): def run(self): utils.random_sleep(1) + return self.output def test(self): @@ -176,10 +177,10 @@ class WithItemsEngineTest(base.EngineTestCase): if ex.state == states.RUNNING]) def test_with_items_simple(self): - wb_service.create_workbook_v2(WORKBOOK) + wb_service.create_workbook_v2(WB) # Start workflow. - wf_ex = self.engine.start_workflow('wb1.with_items', WORKFLOW_INPUT) + wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT) self._await( lambda: self.is_execution_success(wf_ex.id), @@ -188,15 +189,17 @@ class WithItemsEngineTest(base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') - with_items_context = task1.runtime_context['with_items_context'] + task_execs = wf_ex.task_executions - self.assertEqual(3, with_items_context['count']) + task1_ex = self._assert_single_item(task_execs, name='task1') + + with_items_ctx = task1_ex.runtime_context['with_items_context'] + + self.assertEqual(3, with_items_ctx['count']) # Since we know that we can receive results in random order, # check is not depend on order of items. - result = data_flow.get_task_execution_result(task1) + result = data_flow.get_task_execution_result(task1_ex) self.assertIsInstance(result, list) @@ -204,15 +207,15 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn('Ivan', result) self.assertIn('Mistral', result) - published = task1.published + published = task1_ex.published self.assertIn(published['result'], ['John', 'Ivan', 'Mistral']) - self.assertEqual(1, len(tasks)) - self.assertEqual(states.SUCCESS, task1.state) + self.assertEqual(1, len(task_execs)) + self.assertEqual(states.SUCCESS, task1_ex.state) def test_with_items_fail(self): - workflow = """--- + wf_text = """--- version: "2.0" with_items: @@ -227,23 +230,21 @@ class WithItemsEngineTest(base.EngineTestCase): task2: action: std.echo output="With-items failed" """ - wf_service.create_workflows(workflow) + + wf_service.create_workflows(wf_text) # Start workflow. wf_ex = self.engine.start_workflow('with_items', {}) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - self.assertEqual(2, len(tasks)) + self.assertEqual(2, len(wf_ex.task_executions)) def test_with_items_sub_workflow_fail(self): - workbook = """--- + wb_text = """--- version: "2.0" name: wb1 @@ -263,36 +264,34 @@ class WithItemsEngineTest(base.EngineTestCase): subworkflow: type: direct + tasks: fail: action: std.fail """ - wb_service.create_workbook_v2(workbook) + + wb_service.create_workbook_v2(wb_text) # Start workflow. wf_ex = self.engine.start_workflow('wb1.with_items', {}) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - self.assertEqual(2, len(tasks)) + self.assertEqual(2, len(wf_ex.task_executions)) def test_with_items_static_var(self): - wb_service.create_workbook_v2(WORKBOOK_WITH_STATIC_VAR) + wb_service.create_workbook_v2(WB_WITH_STATIC_VAR) - wf_input = copy.deepcopy(WORKFLOW_INPUT) + wf_input = copy.deepcopy(WF_INPUT) wf_input.update({'greeting': 'Hello'}) + # Start workflow. wf_ex = self.engine.start_workflow('wb1.with_items', wf_input) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -311,26 +310,25 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task1.state) def test_with_items_multi_array(self): - wb_service.create_workbook_v2(WORKBOOK_MULTI_ARRAY) + wb_service.create_workbook_v2(WB_MULTI_ARRAY) wf_input = {'arrayI': ['a', 'b', 'c'], 'arrayJ': [1, 2, 3]} # Start workflow. wf_ex = self.engine.start_workflow('wb1.with_items', wf_input) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') + task_execs = wf_ex.task_executions + + task1_ex = self._assert_single_item(task_execs, name='task1') # Since we know that we can receive results in random order, # check is not depend on order of items. - result = data_flow.get_task_execution_result(task1) + result = data_flow.get_task_execution_result(task1_ex) self.assertIsInstance(result, list) @@ -338,30 +336,28 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn('b 2', result) self.assertIn('c 3', result) - self.assertEqual(1, len(tasks)) - self.assertEqual(states.SUCCESS, task1.state) + self.assertEqual(1, len(task_execs)) + self.assertEqual(states.SUCCESS, task1_ex.state) def test_with_items_action_context(self): - wb_service.create_workbook_v2(WORKBOOK_ACTION_CONTEXT) + wb_service.create_workbook_v2(WB_ACTION_CONTEXT) # Start workflow. - wf_ex = self.engine.start_workflow( - 'wb1.wf1_with_items', WF_INPUT_URLS - ) + wf_ex = self.engine.start_workflow('wb1.wf1_with_items', WF_INPUT_URLS) wf_ex = db_api.get_workflow_execution(wf_ex.id) task_ex = wf_ex.task_executions[0] act_exs = task_ex.executions + self.engine.on_action_complete(act_exs[0].id, wf_utils.Result("Ivan")) self.engine.on_action_complete(act_exs[1].id, wf_utils.Result("John")) self.engine.on_action_complete( - act_exs[2].id, wf_utils.Result("Mistral") + act_exs[2].id, + wf_utils.Result("Mistral") ) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -378,7 +374,7 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_ex.state) def test_with_items_empty_list(self): - workbook = """--- + wb_text = """--- version: "2.0" name: wb1 @@ -400,29 +396,29 @@ class WithItemsEngineTest(base.EngineTestCase): task2: action: std.echo output="Hi!" """ - wb_service.create_workbook_v2(workbook) + + wb_service.create_workbook_v2(wb_text) # Start workflow. wf_input = {'names_info': []} wf_ex = self.engine.start_workflow('wb1.with_items', wf_input) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') - task2 = self._assert_single_item(tasks, name='task2') + task_execs = wf_ex.task_executions - self.assertEqual(2, len(tasks)) - self.assertEqual(states.SUCCESS, task1.state) - self.assertEqual(states.SUCCESS, task2.state) + task1_ex = self._assert_single_item(task_execs, name='task1') + task2_ex = self._assert_single_item(task_execs, name='task2') + + self.assertEqual(2, len(task_execs)) + self.assertEqual(states.SUCCESS, task1_ex.state) + self.assertEqual(states.SUCCESS, task2_ex.state) def test_with_items_plain_list(self): - workbook = """--- + wb_text = """--- version: "2.0" name: wb1 @@ -436,7 +432,8 @@ class WithItemsEngineTest(base.EngineTestCase): with-items: i in [1, 2, 3] action: std.echo output=<% $.i %> """ - wb_service.create_workbook_v2(workbook) + + wb_service.create_workbook_v2(wb_text) # Start workflow. wf_ex = self.engine.start_workflow('wb1.with_items', {}) @@ -446,11 +443,13 @@ class WithItemsEngineTest(base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') - self.assertEqual(states.SUCCESS, task1.state) + task1_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) - result = data_flow.get_task_execution_result(task1) + result = data_flow.get_task_execution_result(task1_ex) # Since we know that we can receive results in random order, # check is not depend on order of items. @@ -459,7 +458,7 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn(3, result) def test_with_items_plain_list_wrong(self): - workbook = """--- + wb_text = """--- version: "2.0" name: wb1 @@ -477,13 +476,13 @@ class WithItemsEngineTest(base.EngineTestCase): exception = self.assertRaises( exc.InvalidModelException, - wb_service.create_workbook_v2, workbook + wb_service.create_workbook_v2, wb_text ) self.assertIn("Invalid array in 'with-items'", exception.message) def test_with_items_results_order(self): - workbook = """--- + wb_text = """--- version: "2.0" name: wb1 @@ -502,60 +501,57 @@ class WithItemsEngineTest(base.EngineTestCase): # Register random sleep action in the DB. test_base.register_action_class('sleep_echo', RandomSleepEchoAction) - wb_service.create_workbook_v2(workbook) + wb_service.create_workbook_v2(wb_text) # Start workflow. wf_ex = self.engine.start_workflow('wb1.with_items', {}) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') + task1_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) - self.assertEqual(states.SUCCESS, task1.state) - - published = task1.published + published = task1_ex.published # Now we can check order of results explicitly. self.assertEqual([1, 2, 3], published['one_two_three']) def test_with_items_results_one_item_as_list(self): - wb_service.create_workbook_v2(WORKBOOK) + wb_service.create_workbook_v2(WB) # Start workflow. - wf_ex = self.engine.start_workflow('wb1.with_items', - WORKFLOW_INPUT_ONE_ITEM) + wf_ex = self.engine.start_workflow('wb1.with_items', WF_INPUT_ONE_ITEM) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') + task_execs = wf_ex.task_executions - result = data_flow.get_task_execution_result(task1) + self.assertEqual(1, len(task_execs)) + + task1_ex = self._assert_single_item( + task_execs, + name='task1', + state=states.SUCCESS + ) + + result = data_flow.get_task_execution_result(task1_ex) self.assertIsInstance(result, list) - self.assertIn('Guy', result) - published = task1.published - - self.assertIn(published['result'], ['Guy']) - - self.assertEqual(1, len(tasks)) - self.assertEqual(states.SUCCESS, task1.state) + self.assertIn(task1_ex.published['result'], ['Guy']) def test_with_items_concurrency_1(self): - workflow_with_concurrency_1 = """--- + wf_with_concurrency_1 = """--- version: "2.0" concurrency_test: @@ -569,16 +565,18 @@ class WithItemsEngineTest(base.EngineTestCase): action: std.async_noop with-items: name in <% $.names %> concurrency: 1 - """ - wf_service.create_workflows(workflow_with_concurrency_1) + + wf_service.create_workflows(wf_with_concurrency_1) # Start workflow. wf_ex = self.engine.start_workflow('concurrency_test', {}) - wf_ex = db_api.get_execution(wf_ex.id) - task_ex = wf_ex.task_executions[0] + wf_ex = db_api.get_execution(wf_ex.id) + + task_ex = wf_ex.task_executions[0] task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(0, task_ex) self.assertEqual(1, self.get_running_action_exs_number(task_ex)) @@ -589,6 +587,7 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(0, task_ex) self.assertEqual(1, self.get_running_action_exs_number(task_ex)) @@ -599,6 +598,7 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(0, task_ex) self.assertEqual(1, self.get_running_action_exs_number(task_ex)) @@ -609,13 +609,13 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(1, task_ex) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) task_ex = db_api.get_task_execution(task_ex.id) + # Since we know that we can receive results in random order, # check is not depend on order of items. result = data_flow.get_task_execution_result(task_ex) @@ -629,7 +629,7 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_ex.state) def test_with_items_concurrency_yaql(self): - workflow_with_concurrency_yaql = """--- + wf_with_concurrency_yaql = """--- version: "2.0" concurrency_test: @@ -644,9 +644,9 @@ class WithItemsEngineTest(base.EngineTestCase): action: std.echo output=<% $.name %> with-items: name in <% $.names %> concurrency: <% $.concurrency %> - """ - wf_service.create_workflows(workflow_with_concurrency_yaql) + + wf_service.create_workflows(wf_with_concurrency_yaql) # Start workflow. wf_ex = self.engine.start_workflow( @@ -654,13 +654,14 @@ class WithItemsEngineTest(base.EngineTestCase): {'concurrency': 2} ) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) wf_ex = db_api.get_workflow_execution(wf_ex.id) task_ex = wf_ex.task_executions[0] + + self.assertEqual(states.SUCCESS, task_ex.state) + # Since we know that we can receive results in random order, # check is not depend on order of items. result = data_flow.get_task_execution_result(task_ex) @@ -671,10 +672,8 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn('Ivan', result) self.assertIn('Mistral', result) - self.assertEqual(states.SUCCESS, task_ex.state) - def test_with_items_concurrency_yaql_wrong_type(self): - workflow_with_concurrency_yaql = """--- + wf_with_concurrency_yaql = """--- version: "2.0" concurrency_test: @@ -689,9 +688,9 @@ class WithItemsEngineTest(base.EngineTestCase): action: std.echo output=<% $.name %> with-items: name in <% $.names %> concurrency: <% $.concurrency %> - """ - wf_service.create_workflows(workflow_with_concurrency_yaql) + + wf_service.create_workflows(wf_with_concurrency_yaql) # Start workflow. wf_ex = self.engine.start_workflow( @@ -700,13 +699,13 @@ class WithItemsEngineTest(base.EngineTestCase): ) self.assertIn( - "Invalid data type in ConcurrencyPolicy", + 'Invalid data type in ConcurrencyPolicy', wf_ex.state_info ) self.assertEqual(states.ERROR, wf_ex.state) def test_with_items_concurrency_2(self): - workflow_with_concurrency_2 = """--- + wf_with_concurrency_2 = """--- version: "2.0" concurrency_test: @@ -722,10 +721,11 @@ class WithItemsEngineTest(base.EngineTestCase): concurrency: 2 """ - wf_service.create_workflows(workflow_with_concurrency_2) + wf_service.create_workflows(wf_with_concurrency_2) # Start workflow. wf_ex = self.engine.start_workflow('concurrency_test', {}) + wf_ex = db_api.get_execution(wf_ex.id) task_ex = wf_ex.task_executions[0] @@ -739,6 +739,7 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(0, task_ex) self.assertEqual(2, self.get_running_action_exs_number(task_ex)) @@ -749,6 +750,7 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(0, task_ex) self.assertEqual(2, self.get_running_action_exs_number(task_ex)) @@ -759,6 +761,7 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(1, task_ex) # 4th iteration complete. @@ -768,16 +771,17 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(2, task_ex) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) task_ex = db_api.get_task_execution(task_ex.id) + # Since we know that we can receive results in random order, # check is not depend on order of items. result = data_flow.get_task_execution_result(task_ex) + self.assertIsInstance(result, list) self.assertIn('John', result) @@ -788,7 +792,7 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_ex.state) def test_with_items_concurrency_2_fail(self): - workflow_with_concurrency_2_fail = """--- + wf_with_concurrency_2_fail = """--- version: "2.0" concurrency_test_fail: @@ -805,14 +809,13 @@ class WithItemsEngineTest(base.EngineTestCase): action: std.echo output="With-items failed" """ - wf_service.create_workflows(workflow_with_concurrency_2_fail) + wf_service.create_workflows(wf_with_concurrency_2_fail) # Start workflow. wf_ex = self.engine.start_workflow('concurrency_test_fail', {}) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) + wf_ex = db_api.get_execution(wf_ex.id) task_exs = wf_ex.task_executions @@ -822,12 +825,12 @@ class WithItemsEngineTest(base.EngineTestCase): task_2 = self._assert_single_item(task_exs, name='task2') self.assertEqual( - "With-items failed", + 'With-items failed', data_flow.get_task_execution_result(task_2) ) def test_with_items_concurrency_3(self): - workflow_with_concurrency_3 = """--- + wf_with_concurrency_3 = """--- version: "2.0" concurrency_test: @@ -843,10 +846,12 @@ class WithItemsEngineTest(base.EngineTestCase): concurrency: 3 """ - wf_service.create_workflows(workflow_with_concurrency_3) + + wf_service.create_workflows(wf_with_concurrency_3) # Start workflow. wf_ex = self.engine.start_workflow('concurrency_test', {}) + wf_ex = db_api.get_execution(wf_ex.id) task_ex = wf_ex.task_executions[0] @@ -860,6 +865,7 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(1, task_ex) # 2nd iteration complete. @@ -869,6 +875,7 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(2, task_ex) # 3rd iteration complete. @@ -878,26 +885,27 @@ class WithItemsEngineTest(base.EngineTestCase): ) task_ex = db_api.get_task_execution(task_ex.id) + self.assert_capacity(3, task_ex) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) task_ex = db_api.get_task_execution(task_ex.id) + + self.assertEqual(states.SUCCESS, task_ex.state) + # Since we know that we can receive results in random order, # check is not depend on order of items. result = data_flow.get_task_execution_result(task_ex) + self.assertIsInstance(result, list) self.assertIn('John', result) self.assertIn('Ivan', result) self.assertIn('Mistral', result) - self.assertEqual(states.SUCCESS, task_ex.state) - def test_with_items_concurrency_gt_list_length(self): - workflow_definition = """--- + wf_definition = """--- version: "2.0" concurrency_test: @@ -913,26 +921,29 @@ class WithItemsEngineTest(base.EngineTestCase): concurrency: 3 """ - wf_service.create_workflows(workflow_definition) + wf_service.create_workflows(wf_definition) # Start workflow. wf_ex = self.engine.start_workflow('concurrency_test', {}) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) wf_ex = db_api.get_execution(wf_ex.id) - task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') + + task_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) + result = data_flow.get_task_execution_result(task_ex) - self.assertEqual(states.SUCCESS, task_ex.state) self.assertIsInstance(result, list) self.assertIn('John', result) self.assertIn('Ivan', result) def test_with_items_retry_policy(self): - workflow = """--- + wf_text = """--- version: "2.0" with_items_retry: @@ -948,32 +959,32 @@ class WithItemsEngineTest(base.EngineTestCase): task2: action: std.echo output="With-items failed" """ - wf_service.create_workflows(workflow) + + wf_service.create_workflows(wf_text) # Start workflow. wf_ex = self.engine.start_workflow('with_items_retry', {}) - self._await( - lambda: self.is_execution_success(wf_ex.id) - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - self.assertEqual(2, len(tasks)) + task_execs = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') + self.assertEqual(2, len(task_execs)) + + task1_ex = self._assert_single_item(task_execs, name='task1') self.assertEqual( 2, - task1.runtime_context['retry_task_policy']['retry_no'] + task1_ex.runtime_context['retry_task_policy']['retry_no'] ) - self.assertEqual(9, len(task1.executions)) - self._assert_multiple_items(task1.executions, 3, accepted=True) + self.assertEqual(9, len(task1_ex.executions)) + self._assert_multiple_items(task1_ex.executions, 3, accepted=True) def test_with_items_retry_policy_concurrency(self): - workflow = """--- + wf_text = """--- version: "2.0" with_items_retry_concurrency: @@ -990,31 +1001,28 @@ class WithItemsEngineTest(base.EngineTestCase): task2: action: std.echo output="With-items failed" """ - wf_service.create_workflows(workflow) + + wf_service.create_workflows(wf_text) # Start workflow. - wf_ex = self.engine.start_workflow( - 'with_items_retry_concurrency', - {} - ) + wf_ex = self.engine.start_workflow('with_items_retry_concurrency', {}) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - self.assertEqual(2, len(tasks)) + task_execs = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') + self.assertEqual(2, len(task_execs)) - self.assertEqual(12, len(task1.executions)) - self._assert_multiple_items(task1.executions, 4, accepted=True) + task1_ex = self._assert_single_item(task_execs, name='task1') + + self.assertEqual(12, len(task1_ex.executions)) + self._assert_multiple_items(task1_ex.executions, 4, accepted=True) def test_with_items_env(self): - workflow = """--- + wf_text = """--- version: "2.0" with_items_env: @@ -1023,19 +1031,17 @@ class WithItemsEngineTest(base.EngineTestCase): with-items: i in [1, 2, 3, 4] action: std.echo output="<% $.i %>.<% env().name %>" """ - wf_service.create_workflows(workflow) - env = {'name': 'Mistral'} + + wf_service.create_workflows(wf_text) # Start workflow. wf_ex = self.engine.start_workflow( 'with_items_env', {}, - env=env + env={'name': 'Mistral'} ) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) @@ -1058,7 +1064,7 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task1.state) def test_with_items_two_tasks_second_starts_on_success(self): - workbook = """--- + wb_text = """--- version: "2.0" name: wb1 @@ -1076,7 +1082,8 @@ class WithItemsEngineTest(base.EngineTestCase): with-items: i in [3, 4] action: std.echo output=<% $.i %> """ - wb_service.create_workbook_v2(workbook) + + wb_service.create_workbook_v2(wb_text) # Start workflow. wf_ex = self.engine.start_workflow('wb1.with_items', {}) @@ -1086,14 +1093,21 @@ class WithItemsEngineTest(base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - tasks = wf_ex.task_executions - task1 = self._assert_single_item(tasks, name='task1') - task2 = self._assert_single_item(tasks, name='task2') - self.assertEqual(states.SUCCESS, task1.state) - self.assertEqual(states.SUCCESS, task2.state) + task_execs = wf_ex.task_executions - result_task1 = data_flow.get_task_execution_result(task1) - result_task2 = data_flow.get_task_execution_result(task2) + task1_ex = self._assert_single_item( + task_execs, + name='task1', + state=states.SUCCESS + ) + task2_ex = self._assert_single_item( + task_execs, + name='task2', + state=states.SUCCESS + ) + + result_task1 = data_flow.get_task_execution_result(task1_ex) + result_task2 = data_flow.get_task_execution_result(task2_ex) # Since we know that we can receive results in random order, # check is not depend on order of items. @@ -1103,45 +1117,51 @@ class WithItemsEngineTest(base.EngineTestCase): self.assertIn(4, result_task2) def test_with_items_subflow_concurrency_gt_list_length(self): - workbook_definition = """--- + wb_text = """--- version: "2.0" name: wb1 workflows: main: type: direct + input: - names + tasks: task1: with-items: name in <% $.names %> workflow: subflow1 name=<% $.name %> concurrency: 3 + subflow1: type: direct + input: - name output: result: <% task(task1).result %> + tasks: task1: action: std.echo output=<% $.name %> """ - wb_service.create_workbook_v2(workbook_definition) + wb_service.create_workbook_v2(wb_text) # Start workflow. names = ["Peter", "Susan", "Edmund", "Lucy", "Aslan", "Caspian"] wf_ex = self.engine.start_workflow('wb1.main', {'names': names}) - self._await( - lambda: self.is_execution_success(wf_ex.id), - ) + self._await(lambda: self.is_execution_success(wf_ex.id)) wf_ex = db_api.get_execution(wf_ex.id) - task_ex = self._assert_single_item(wf_ex.task_executions, name='task1') - self.assertEqual(states.SUCCESS, task_ex.state) + task_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1', + state=states.SUCCESS + ) result = [ item['result'] diff --git a/mistral/workflow/commands.py b/mistral/workflow/commands.py index 5fa8f1e0d..ad7b9727e 100644 --- a/mistral/workflow/commands.py +++ b/mistral/workflow/commands.py @@ -63,15 +63,15 @@ class RunExistingTask(WorkflowCommand): """Command for running already existent task.""" def __init__(self, task_ex, reset=True): - wf_ex = task_ex.workflow_execution - task_spec = spec_parser.get_task_spec(task_ex.spec) + super(RunExistingTask, self).__init__( + task_ex.workflow_execution, + spec_parser.get_task_spec(task_ex.spec), + task_ex.in_context + ) + self.task_ex = task_ex self.reset = reset - super(RunExistingTask, self).__init__( - wf_ex, task_spec, task_ex.in_context - ) - class SetWorkflowState(WorkflowCommand): """Instruction to change a workflow state."""