diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 415c9e511..fb7c8e921 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -82,6 +82,14 @@ def run_task(wf_cmd): _schedule_refresh_task_state(task.task_ex, 1) +def rerun_task(task_ex, wf_spec): + task = _build_task_from_execution(wf_spec, task_ex) + + old_task_state = task_ex.state + task.set_state(states.RUNNING, None, False) + task.notify(old_task_state, states.RUNNING) + + @profiler.trace('task-handler-on-action-complete', hide_args=True) def _on_action_complete(action_ex): """Handles action completion event. diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 4577a9a0f..f1919f1c1 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -222,11 +222,6 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None): wf.rerun(task_ex, reset=reset, env=env) - _schedule_check_and_complete(wf_ex) - - if wf_ex.task_execution_id: - _schedule_check_and_complete(wf_ex.task_execution.workflow_execution) - def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 0e8bef3e7..0653f8733 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -233,7 +233,7 @@ class Workflow(object): wf_service.update_workflow_execution_env(self.wf_ex, env) - self.set_state(states.RUNNING, recursive=True) + self._recursive_rerun() wf_ctrl = wf_base.get_controller(self.wf_ex) @@ -248,6 +248,31 @@ class Workflow(object): self._continue_workflow(cmds) + def _recursive_rerun(self): + """Rerun all parent workflow executions recursively. + + If there is a parent execution that it reruns as well. + """ + + from mistral.engine import workflow_handler + + self.set_state(states.RUNNING) + workflow_handler._schedule_check_and_complete(self.wf_ex) + + if self.wf_ex.task_execution_id: + parent_task_ex = db_api.get_task_execution( + self.wf_ex.task_execution_id + ) + + parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution) + + parent_wf.lock() + + parent_wf._recursive_rerun() + + from mistral.engine import task_handler + task_handler.rerun_task(parent_task_ex, parent_wf.wf_spec) + def _get_backlog(self): return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY) @@ -330,7 +355,7 @@ class Workflow(object): ) @profiler.trace('workflow-set-state') - def set_state(self, state, state_info=None, recursive=False): + def set_state(self, state, state_info=None): assert self.wf_ex cur_state = self.wf_ex.state @@ -376,22 +401,6 @@ class Workflow(object): triggers.on_workflow_complete(self.wf_ex) - if recursive and self.wf_ex.task_execution_id: - parent_task_ex = db_api.get_task_execution( - self.wf_ex.task_execution_id - ) - - parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution) - - parent_wf.lock() - - parent_wf.set_state(state, recursive=recursive) - - # TODO(rakhmerov): It'd be better to use instance of Task here. - parent_task_ex.state = state - parent_task_ex.state_info = None - parent_task_ex.processed = False - @profiler.trace('workflow-check-and-complete') def check_and_complete(self): """Completes the workflow if it needs to be completed. diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index c022e54fd..27a5016cd 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -1480,3 +1480,74 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(6, len(action_executions)) self.assertTrue(all(a.state == states.ERROR for a in action_executions)) + + @mock.patch.object( + std_actions.NoOpAction, + 'run', + mock.MagicMock( + side_effect=[ + exc.ActionException(), + 'Success' + ] + ) + ) + def test_rerun_sub_workflow(self): + wf_service.create_workflows("""--- + version: '2.0' + wf1: + tasks: + task1: + workflow: wf2 + wf2: + tasks: + task2: + workflow: wf3 + wf3: + tasks: + task3: + action: std.noop""") + + # Run workflow and fail task. + wf1_ex = self.engine.start_workflow('wf1') + + self.await_workflow_error(wf1_ex.id) + + with db_api.transaction(): + wf_exs = db_api.get_workflow_executions() + task_exs = db_api.get_task_executions() + + self.assertEqual(3, len(wf_exs), + 'The number of workflow executions') + self.assertEqual(3, len(task_exs), + 'The number of task executions') + + for wf_ex in wf_exs: + self.assertEqual(states.ERROR, wf_ex.state, + 'The executions must fail the first time') + for task_ex in task_exs: + self.assertEqual(states.ERROR, task_ex.state, + 'The tasks must fail the first time') + + wf3_ex = self._assert_single_item(wf_exs, name='wf3') + task3_ex = self._assert_single_item(wf3_ex.task_executions, + name="task3") + + self.engine.rerun_workflow(task3_ex.id) + + self.await_workflow_success(wf1_ex.id) + + with db_api.transaction(): + wf_exs = db_api.get_workflow_executions() + task_exs = db_api.get_task_executions() + + self.assertEqual(3, len(wf_exs), + 'The number of workflow executions') + self.assertEqual(3, len(task_exs), + 'The number of task executions') + + for wf_ex in wf_exs: + self.assertEqual(states.SUCCESS, wf_ex.state, + 'The executions must success the second time') + for task_ex in task_exs: + self.assertEqual(states.SUCCESS, task_ex.state, + 'The tasks must success the second time')