Merge "Cascade pause from pause-before in subworkflows"
This commit is contained in:
commit
e7bbe4cd75
|
@ -16,6 +16,7 @@
|
|||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import base
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral import expressions
|
||||
from mistral.services import scheduler
|
||||
from mistral.utils import wf_trace
|
||||
|
@ -420,8 +421,8 @@ class PauseBeforePolicy(base.TaskPolicy):
|
|||
(task_ex.name, task_ex.workflow_execution.state, states.PAUSED)
|
||||
)
|
||||
|
||||
task_ex.workflow_execution.state = states.PAUSED
|
||||
task_ex.state = states.IDLE
|
||||
wf_handler.pause_workflow(task_ex.workflow_execution)
|
||||
|
||||
|
||||
class ConcurrencyPolicy(base.TaskPolicy):
|
||||
|
|
|
@ -1983,3 +1983,305 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
|
|||
self.assertEqual(states.SUCCESS, wf_3_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state)
|
||||
|
||||
def test_pause_resume_cascade_up_from_subworkflow_pause_before(self):
|
||||
workbook = """
|
||||
version: '2.0'
|
||||
name: wb
|
||||
workflows:
|
||||
wf1:
|
||||
tasks:
|
||||
task1:
|
||||
workflow: wf2
|
||||
on-success:
|
||||
- task3
|
||||
task2:
|
||||
workflow: wf3
|
||||
on-success:
|
||||
- task3
|
||||
task3:
|
||||
join: all
|
||||
action: std.noop
|
||||
wf2:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task2
|
||||
task2:
|
||||
pause-before: true
|
||||
action: std.async_noop
|
||||
wf3:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.async_noop
|
||||
on-success:
|
||||
- task2
|
||||
task2:
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
wb_service.create_workbook_v2(workbook)
|
||||
|
||||
# Start workflow execution.
|
||||
wf_1_ex = self.engine.start_workflow('wb.wf1')
|
||||
|
||||
self.await_workflow_state(wf_1_ex.id, states.PAUSED)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_execs = db_api.get_workflow_executions()
|
||||
|
||||
# Get objects for the parent workflow execution.
|
||||
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
|
||||
|
||||
wf_1_task_execs = wf_1_ex.task_executions
|
||||
|
||||
wf_1_task_1_ex = self._assert_single_item(
|
||||
wf_1_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_1_task_1_action_exs = wf_1_task_1_ex.executions
|
||||
|
||||
wf_1_task_2_ex = self._assert_single_item(
|
||||
wf_1_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
wf_1_task_2_action_exs = wf_1_task_2_ex.executions
|
||||
|
||||
# Get objects for the subworkflow executions.
|
||||
wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2')
|
||||
|
||||
wf_2_task_execs = wf_2_ex.task_executions
|
||||
|
||||
wf_2_task_1_ex = self._assert_single_item(
|
||||
wf_2_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_2_task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_2_task_1_ex.id
|
||||
)
|
||||
|
||||
wf_2_task_2_ex = self._assert_single_item(
|
||||
wf_2_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
wf_2_task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_2_task_2_ex.id
|
||||
)
|
||||
|
||||
wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3')
|
||||
|
||||
wf_3_task_execs = wf_3_ex.task_executions
|
||||
|
||||
wf_3_task_1_ex = self._assert_single_item(
|
||||
wf_3_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_3_task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_3_task_1_ex.id
|
||||
)
|
||||
|
||||
self.assertEqual(states.PAUSED, wf_2_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.IDLE, wf_2_task_2_ex.state)
|
||||
self.assertEqual(0, len(wf_2_task_2_action_exs))
|
||||
self.assertEqual(states.PAUSED, wf_3_ex.state)
|
||||
self.assertEqual(states.RUNNING, wf_3_task_1_ex.state)
|
||||
self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.PAUSED, wf_1_task_1_ex.state)
|
||||
self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state)
|
||||
self.assertEqual(states.PAUSED, wf_1_task_2_ex.state)
|
||||
self.assertEqual(states.PAUSED, wf_1_ex.state)
|
||||
|
||||
# Resume the main workflow.
|
||||
self.engine.resume_workflow(wf_1_ex.id)
|
||||
|
||||
self.await_workflow_running(wf_1_ex.id)
|
||||
self.await_workflow_running(wf_2_ex.id)
|
||||
self.await_workflow_running(wf_3_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_execs = db_api.get_workflow_executions()
|
||||
|
||||
# Get objects for the parent workflow execution.
|
||||
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
|
||||
|
||||
wf_1_task_execs = wf_1_ex.task_executions
|
||||
|
||||
wf_1_task_1_ex = self._assert_single_item(
|
||||
wf_1_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_1_task_1_action_exs = wf_1_task_1_ex.executions
|
||||
|
||||
wf_1_task_2_ex = self._assert_single_item(
|
||||
wf_1_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
wf_1_task_2_action_exs = wf_1_task_2_ex.executions
|
||||
|
||||
# Get objects for the subworkflow executions.
|
||||
wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2')
|
||||
|
||||
wf_2_task_execs = wf_2_ex.task_executions
|
||||
|
||||
wf_2_task_1_ex = self._assert_single_item(
|
||||
wf_2_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_2_task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_2_task_1_ex.id
|
||||
)
|
||||
|
||||
wf_2_task_2_ex = self._assert_single_item(
|
||||
wf_2_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
wf_2_task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_2_task_2_ex.id
|
||||
)
|
||||
|
||||
wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3')
|
||||
|
||||
wf_3_task_execs = wf_3_ex.task_executions
|
||||
|
||||
wf_3_task_1_ex = self._assert_single_item(
|
||||
wf_3_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_3_task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_3_task_1_ex.id
|
||||
)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_2_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.RUNNING, wf_2_task_2_ex.state)
|
||||
self.assertEqual(states.RUNNING, wf_2_task_2_action_exs[0].state)
|
||||
self.assertEqual(states.RUNNING, wf_3_ex.state)
|
||||
self.assertEqual(states.RUNNING, wf_3_task_1_ex.state)
|
||||
self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.RUNNING, wf_1_task_1_ex.state)
|
||||
self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state)
|
||||
self.assertEqual(states.RUNNING, wf_1_task_2_ex.state)
|
||||
self.assertEqual(states.RUNNING, wf_1_ex.state)
|
||||
|
||||
# Complete action executions of the subworkflows.
|
||||
self.engine.on_action_complete(
|
||||
wf_2_task_2_action_exs[0].id,
|
||||
ml_actions.Result(data={'result': 'foobar'})
|
||||
)
|
||||
|
||||
self.engine.on_action_complete(
|
||||
wf_3_task_1_action_exs[0].id,
|
||||
ml_actions.Result(data={'result': 'foobar'})
|
||||
)
|
||||
|
||||
self.await_workflow_success(wf_2_ex.id)
|
||||
self.await_workflow_success(wf_3_ex.id)
|
||||
self.await_workflow_success(wf_1_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_execs = db_api.get_workflow_executions()
|
||||
|
||||
# Get objects for the parent workflow execution.
|
||||
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
|
||||
|
||||
wf_1_task_execs = wf_1_ex.task_executions
|
||||
|
||||
wf_1_task_1_ex = self._assert_single_item(
|
||||
wf_1_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_1_task_1_action_exs = wf_1_task_1_ex.executions
|
||||
|
||||
wf_1_task_2_ex = self._assert_single_item(
|
||||
wf_1_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
wf_1_task_2_action_exs = wf_1_task_2_ex.executions
|
||||
|
||||
wf_1_task_3_ex = self._assert_single_item(
|
||||
wf_1_ex.task_executions,
|
||||
name='task3'
|
||||
)
|
||||
|
||||
# Get objects for the subworkflow executions.
|
||||
wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2')
|
||||
|
||||
wf_2_task_execs = wf_2_ex.task_executions
|
||||
|
||||
wf_2_task_1_ex = self._assert_single_item(
|
||||
wf_2_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_2_task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_2_task_1_ex.id
|
||||
)
|
||||
|
||||
wf_2_task_2_ex = self._assert_single_item(
|
||||
wf_2_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
wf_2_task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_2_task_2_ex.id
|
||||
)
|
||||
|
||||
wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3')
|
||||
|
||||
wf_3_task_execs = wf_3_ex.task_executions
|
||||
|
||||
wf_3_task_1_ex = self._assert_single_item(
|
||||
wf_3_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
wf_3_task_1_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_3_task_1_ex.id
|
||||
)
|
||||
|
||||
wf_3_task_2_ex = self._assert_single_item(
|
||||
wf_3_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
wf_3_task_2_action_exs = db_api.get_action_executions(
|
||||
task_execution_id=wf_3_task_2_ex.id
|
||||
)
|
||||
|
||||
self.assertEqual(states.SUCCESS, wf_1_ex.state)
|
||||
self.assertEqual(3, len(wf_1_task_execs))
|
||||
self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_1_task_3_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, wf_2_ex.state)
|
||||
self.assertEqual(2, len(wf_2_task_execs))
|
||||
self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_2_task_2_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, wf_2_task_2_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, wf_3_ex.state)
|
||||
self.assertEqual(2, len(wf_3_task_execs))
|
||||
self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_3_task_2_ex.state)
|
||||
self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state)
|
||||
self.assertEqual(states.SUCCESS, wf_3_task_2_action_exs[0].state)
|
||||
|
|
Loading…
Reference in New Issue