From 16b54d8766f229349770cd1967f5726a4256a6a2 Mon Sep 17 00:00:00 2001 From: Winson Chan Date: Wed, 19 Jul 2017 00:00:10 +0000 Subject: [PATCH] Cascade pause from pause-before in subworkflows When a workflow is paused by pause-before, the state will cascade down to other subworkflows and up to parent workflow. Change-Id: Ied178fe08f8308455bf05b3168635a3b69799cec Closes-Bug: #1700196 --- mistral/engine/policies.py | 3 +- .../engine/test_subworkflows_pause_resume.py | 302 ++++++++++++++++++ 2 files changed, 304 insertions(+), 1 deletion(-) diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index e632db1a3..b603d6ba7 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -16,6 +16,7 @@ from mistral.db.v2 import api as db_api from mistral.engine import action_queue from mistral.engine import base +from mistral.engine import workflow_handler as wf_handler from mistral import expressions from mistral.services import scheduler from mistral.utils import wf_trace @@ -420,8 +421,8 @@ class PauseBeforePolicy(base.TaskPolicy): (task_ex.name, task_ex.workflow_execution.state, states.PAUSED) ) - task_ex.workflow_execution.state = states.PAUSED task_ex.state = states.IDLE + wf_handler.pause_workflow(task_ex.workflow_execution) class ConcurrencyPolicy(base.TaskPolicy): diff --git a/mistral/tests/unit/engine/test_subworkflows_pause_resume.py b/mistral/tests/unit/engine/test_subworkflows_pause_resume.py index dbf788515..78522d0e5 100644 --- a/mistral/tests/unit/engine/test_subworkflows_pause_resume.py +++ b/mistral/tests/unit/engine/test_subworkflows_pause_resume.py @@ -1983,3 +1983,305 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, wf_3_ex.state) self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state) + + def test_pause_resume_cascade_up_from_subworkflow_pause_before(self): + workbook = """ + version: '2.0' + name: wb + workflows: + wf1: + tasks: + task1: + workflow: wf2 + on-success: + - task3 + task2: + workflow: wf3 + on-success: + - task3 + task3: + join: all + action: std.noop + wf2: + tasks: + task1: + action: std.noop + on-success: + - task2 + task2: + pause-before: true + action: std.async_noop + wf3: + tasks: + task1: + action: std.async_noop + on-success: + - task2 + task2: + action: std.noop + """ + + wb_service.create_workbook_v2(workbook) + + # Start workflow execution. + wf_1_ex = self.engine.start_workflow('wb.wf1') + + self.await_workflow_state(wf_1_ex.id, states.PAUSED) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_2_task_2_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task2' + ) + + wf_2_task_2_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_2_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.PAUSED, wf_2_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.IDLE, wf_2_task_2_ex.state) + self.assertEqual(0, len(wf_2_task_2_action_exs)) + self.assertEqual(states.PAUSED, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) + self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) + self.assertEqual(states.PAUSED, wf_1_ex.state) + + # Resume the main workflow. + self.engine.resume_workflow(wf_1_ex.id) + + self.await_workflow_running(wf_1_ex.id) + self.await_workflow_running(wf_2_ex.id) + self.await_workflow_running(wf_3_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_2_task_2_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task2' + ) + + wf_2_task_2_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_2_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + self.assertEqual(states.RUNNING, wf_2_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_2_task_2_ex.state) + self.assertEqual(states.RUNNING, wf_2_task_2_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_3_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) + self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) + self.assertEqual(states.RUNNING, wf_1_ex.state) + + # Complete action executions of the subworkflows. + self.engine.on_action_complete( + wf_2_task_2_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.engine.on_action_complete( + wf_3_task_1_action_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_success(wf_2_ex.id) + self.await_workflow_success(wf_3_ex.id) + self.await_workflow_success(wf_1_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + # Get objects for the parent workflow execution. + wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf_1_task_execs = wf_1_ex.task_executions + + wf_1_task_1_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task1' + ) + + wf_1_task_1_action_exs = wf_1_task_1_ex.executions + + wf_1_task_2_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task2' + ) + + wf_1_task_2_action_exs = wf_1_task_2_ex.executions + + wf_1_task_3_ex = self._assert_single_item( + wf_1_ex.task_executions, + name='task3' + ) + + # Get objects for the subworkflow executions. + wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + wf_2_task_execs = wf_2_ex.task_executions + + wf_2_task_1_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task1' + ) + + wf_2_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_1_ex.id + ) + + wf_2_task_2_ex = self._assert_single_item( + wf_2_ex.task_executions, + name='task2' + ) + + wf_2_task_2_action_exs = db_api.get_action_executions( + task_execution_id=wf_2_task_2_ex.id + ) + + wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + + wf_3_task_execs = wf_3_ex.task_executions + + wf_3_task_1_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task1' + ) + + wf_3_task_1_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_1_ex.id + ) + + wf_3_task_2_ex = self._assert_single_item( + wf_3_ex.task_executions, + name='task2' + ) + + wf_3_task_2_action_exs = db_api.get_action_executions( + task_execution_id=wf_3_task_2_ex.id + ) + + self.assertEqual(states.SUCCESS, wf_1_ex.state) + self.assertEqual(3, len(wf_1_task_execs)) + self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_3_ex.state) + self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_2_ex.state) + self.assertEqual(2, len(wf_2_task_execs)) + self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_2_ex.state) + self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_2_task_2_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_3_ex.state) + self.assertEqual(2, len(wf_3_task_execs)) + self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) + self.assertEqual(states.SUCCESS, wf_3_task_2_ex.state) + self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf_3_task_2_action_exs[0].state)