From 041f3bd35c4f438c27087f5b48784055ae75f1c8 Mon Sep 17 00:00:00 2001 From: Vitalii Solodilov Date: Sat, 15 Sep 2018 00:57:24 +0400 Subject: [PATCH] An execution hangs in the RUNNING state after rerun When we rerun an execution we must create the "_check_and_complete" delayed calls for all parent workflows. The problem was that we created the delayed call only for the rerun execution and its parent. Recursive rerun was extracted in the separated function. Because we need to execute some additional operations, for example, create delayed call for every a rerun execution. Change-Id: I530094e916daf25bb9c672c445afa980ad4311ae Closes-Bug: #1792451 Signed-off-by: Vitalii Solodilov --- mistral/engine/task_handler.py | 8 +++ mistral/engine/workflow_handler.py | 5 -- mistral/engine/workflows.py | 45 +++++++----- .../unit/engine/test_direct_workflow_rerun.py | 71 +++++++++++++++++++ 4 files changed, 106 insertions(+), 23 deletions(-) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 415c9e511..fb7c8e921 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -82,6 +82,14 @@ def run_task(wf_cmd): _schedule_refresh_task_state(task.task_ex, 1) +def rerun_task(task_ex, wf_spec): + task = _build_task_from_execution(wf_spec, task_ex) + + old_task_state = task_ex.state + task.set_state(states.RUNNING, None, False) + task.notify(old_task_state, states.RUNNING) + + @profiler.trace('task-handler-on-action-complete', hide_args=True) def _on_action_complete(action_ex): """Handles action completion event. diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 4577a9a0f..f1919f1c1 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -222,11 +222,6 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None): wf.rerun(task_ex, reset=reset, env=env) - _schedule_check_and_complete(wf_ex) - - if wf_ex.task_execution_id: - _schedule_check_and_complete(wf_ex.task_execution.workflow_execution) - def resume_workflow(wf_ex, env=None): if not states.is_paused_or_idle(wf_ex.state): diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 0e8bef3e7..0653f8733 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -233,7 +233,7 @@ class Workflow(object): wf_service.update_workflow_execution_env(self.wf_ex, env) - self.set_state(states.RUNNING, recursive=True) + self._recursive_rerun() wf_ctrl = wf_base.get_controller(self.wf_ex) @@ -248,6 +248,31 @@ class Workflow(object): self._continue_workflow(cmds) + def _recursive_rerun(self): + """Rerun all parent workflow executions recursively. + + If there is a parent execution that it reruns as well. + """ + + from mistral.engine import workflow_handler + + self.set_state(states.RUNNING) + workflow_handler._schedule_check_and_complete(self.wf_ex) + + if self.wf_ex.task_execution_id: + parent_task_ex = db_api.get_task_execution( + self.wf_ex.task_execution_id + ) + + parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution) + + parent_wf.lock() + + parent_wf._recursive_rerun() + + from mistral.engine import task_handler + task_handler.rerun_task(parent_task_ex, parent_wf.wf_spec) + def _get_backlog(self): return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY) @@ -330,7 +355,7 @@ class Workflow(object): ) @profiler.trace('workflow-set-state') - def set_state(self, state, state_info=None, recursive=False): + def set_state(self, state, state_info=None): assert self.wf_ex cur_state = self.wf_ex.state @@ -376,22 +401,6 @@ class Workflow(object): triggers.on_workflow_complete(self.wf_ex) - if recursive and self.wf_ex.task_execution_id: - parent_task_ex = db_api.get_task_execution( - self.wf_ex.task_execution_id - ) - - parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution) - - parent_wf.lock() - - parent_wf.set_state(state, recursive=recursive) - - # TODO(rakhmerov): It'd be better to use instance of Task here. - parent_task_ex.state = state - parent_task_ex.state_info = None - parent_task_ex.processed = False - @profiler.trace('workflow-check-and-complete') def check_and_complete(self): """Completes the workflow if it needs to be completed. diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index c022e54fd..27a5016cd 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -1480,3 +1480,74 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(6, len(action_executions)) self.assertTrue(all(a.state == states.ERROR for a in action_executions)) + + @mock.patch.object( + std_actions.NoOpAction, + 'run', + mock.MagicMock( + side_effect=[ + exc.ActionException(), + 'Success' + ] + ) + ) + def test_rerun_sub_workflow(self): + wf_service.create_workflows("""--- + version: '2.0' + wf1: + tasks: + task1: + workflow: wf2 + wf2: + tasks: + task2: + workflow: wf3 + wf3: + tasks: + task3: + action: std.noop""") + + # Run workflow and fail task. + wf1_ex = self.engine.start_workflow('wf1') + + self.await_workflow_error(wf1_ex.id) + + with db_api.transaction(): + wf_exs = db_api.get_workflow_executions() + task_exs = db_api.get_task_executions() + + self.assertEqual(3, len(wf_exs), + 'The number of workflow executions') + self.assertEqual(3, len(task_exs), + 'The number of task executions') + + for wf_ex in wf_exs: + self.assertEqual(states.ERROR, wf_ex.state, + 'The executions must fail the first time') + for task_ex in task_exs: + self.assertEqual(states.ERROR, task_ex.state, + 'The tasks must fail the first time') + + wf3_ex = self._assert_single_item(wf_exs, name='wf3') + task3_ex = self._assert_single_item(wf3_ex.task_executions, + name="task3") + + self.engine.rerun_workflow(task3_ex.id) + + self.await_workflow_success(wf1_ex.id) + + with db_api.transaction(): + wf_exs = db_api.get_workflow_executions() + task_exs = db_api.get_task_executions() + + self.assertEqual(3, len(wf_exs), + 'The number of workflow executions') + self.assertEqual(3, len(task_exs), + 'The number of task executions') + + for wf_ex in wf_exs: + self.assertEqual(states.SUCCESS, wf_ex.state, + 'The executions must success the second time') + for task_ex in task_exs: + self.assertEqual(states.SUCCESS, task_ex.state, + 'The tasks must success the second time')