Merge "Cascade pause from pause-before in subworkflows"

This commit is contained in:
Jenkins 2017-08-11 12:03:28 +00:00 committed by Gerrit Code Review
commit e7bbe4cd75
2 changed files with 304 additions and 1 deletions

View File

@ -16,6 +16,7 @@
from mistral.db.v2 import api as db_api
from mistral.engine import action_queue
from mistral.engine import base
from mistral.engine import workflow_handler as wf_handler
from mistral import expressions
from mistral.services import scheduler
from mistral.utils import wf_trace
@ -420,8 +421,8 @@ class PauseBeforePolicy(base.TaskPolicy):
(task_ex.name, task_ex.workflow_execution.state, states.PAUSED)
)
task_ex.workflow_execution.state = states.PAUSED
task_ex.state = states.IDLE
wf_handler.pause_workflow(task_ex.workflow_execution)
class ConcurrencyPolicy(base.TaskPolicy):

View File

@ -1983,3 +1983,305 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_3_ex.state)
self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state)
self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state)
def test_pause_resume_cascade_up_from_subworkflow_pause_before(self):
workbook = """
version: '2.0'
name: wb
workflows:
wf1:
tasks:
task1:
workflow: wf2
on-success:
- task3
task2:
workflow: wf3
on-success:
- task3
task3:
join: all
action: std.noop
wf2:
tasks:
task1:
action: std.noop
on-success:
- task2
task2:
pause-before: true
action: std.async_noop
wf3:
tasks:
task1:
action: std.async_noop
on-success:
- task2
task2:
action: std.noop
"""
wb_service.create_workbook_v2(workbook)
# Start workflow execution.
wf_1_ex = self.engine.start_workflow('wb.wf1')
self.await_workflow_state(wf_1_ex.id, states.PAUSED)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
)
wf_1_task_1_action_exs = wf_1_task_1_ex.executions
wf_1_task_2_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task2'
)
wf_1_task_2_action_exs = wf_1_task_2_ex.executions
# Get objects for the subworkflow executions.
wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2')
wf_2_task_execs = wf_2_ex.task_executions
wf_2_task_1_ex = self._assert_single_item(
wf_2_ex.task_executions,
name='task1'
)
wf_2_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_2_task_1_ex.id
)
wf_2_task_2_ex = self._assert_single_item(
wf_2_ex.task_executions,
name='task2'
)
wf_2_task_2_action_exs = db_api.get_action_executions(
task_execution_id=wf_2_task_2_ex.id
)
wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3')
wf_3_task_execs = wf_3_ex.task_executions
wf_3_task_1_ex = self._assert_single_item(
wf_3_ex.task_executions,
name='task1'
)
wf_3_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_3_task_1_ex.id
)
self.assertEqual(states.PAUSED, wf_2_ex.state)
self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state)
self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state)
self.assertEqual(states.IDLE, wf_2_task_2_ex.state)
self.assertEqual(0, len(wf_2_task_2_action_exs))
self.assertEqual(states.PAUSED, wf_3_ex.state)
self.assertEqual(states.RUNNING, wf_3_task_1_ex.state)
self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state)
self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state)
self.assertEqual(states.PAUSED, wf_1_task_1_ex.state)
self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state)
self.assertEqual(states.PAUSED, wf_1_task_2_ex.state)
self.assertEqual(states.PAUSED, wf_1_ex.state)
# Resume the main workflow.
self.engine.resume_workflow(wf_1_ex.id)
self.await_workflow_running(wf_1_ex.id)
self.await_workflow_running(wf_2_ex.id)
self.await_workflow_running(wf_3_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
)
wf_1_task_1_action_exs = wf_1_task_1_ex.executions
wf_1_task_2_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task2'
)
wf_1_task_2_action_exs = wf_1_task_2_ex.executions
# Get objects for the subworkflow executions.
wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2')
wf_2_task_execs = wf_2_ex.task_executions
wf_2_task_1_ex = self._assert_single_item(
wf_2_ex.task_executions,
name='task1'
)
wf_2_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_2_task_1_ex.id
)
wf_2_task_2_ex = self._assert_single_item(
wf_2_ex.task_executions,
name='task2'
)
wf_2_task_2_action_exs = db_api.get_action_executions(
task_execution_id=wf_2_task_2_ex.id
)
wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3')
wf_3_task_execs = wf_3_ex.task_executions
wf_3_task_1_ex = self._assert_single_item(
wf_3_ex.task_executions,
name='task1'
)
wf_3_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_3_task_1_ex.id
)
self.assertEqual(states.RUNNING, wf_2_ex.state)
self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state)
self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state)
self.assertEqual(states.RUNNING, wf_2_task_2_ex.state)
self.assertEqual(states.RUNNING, wf_2_task_2_action_exs[0].state)
self.assertEqual(states.RUNNING, wf_3_ex.state)
self.assertEqual(states.RUNNING, wf_3_task_1_ex.state)
self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state)
self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state)
self.assertEqual(states.RUNNING, wf_1_task_1_ex.state)
self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state)
self.assertEqual(states.RUNNING, wf_1_task_2_ex.state)
self.assertEqual(states.RUNNING, wf_1_ex.state)
# Complete action executions of the subworkflows.
self.engine.on_action_complete(
wf_2_task_2_action_exs[0].id,
ml_actions.Result(data={'result': 'foobar'})
)
self.engine.on_action_complete(
wf_3_task_1_action_exs[0].id,
ml_actions.Result(data={'result': 'foobar'})
)
self.await_workflow_success(wf_2_ex.id)
self.await_workflow_success(wf_3_ex.id)
self.await_workflow_success(wf_1_ex.id)
with db_api.transaction():
wf_execs = db_api.get_workflow_executions()
# Get objects for the parent workflow execution.
wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1')
wf_1_task_execs = wf_1_ex.task_executions
wf_1_task_1_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task1'
)
wf_1_task_1_action_exs = wf_1_task_1_ex.executions
wf_1_task_2_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task2'
)
wf_1_task_2_action_exs = wf_1_task_2_ex.executions
wf_1_task_3_ex = self._assert_single_item(
wf_1_ex.task_executions,
name='task3'
)
# Get objects for the subworkflow executions.
wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2')
wf_2_task_execs = wf_2_ex.task_executions
wf_2_task_1_ex = self._assert_single_item(
wf_2_ex.task_executions,
name='task1'
)
wf_2_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_2_task_1_ex.id
)
wf_2_task_2_ex = self._assert_single_item(
wf_2_ex.task_executions,
name='task2'
)
wf_2_task_2_action_exs = db_api.get_action_executions(
task_execution_id=wf_2_task_2_ex.id
)
wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3')
wf_3_task_execs = wf_3_ex.task_executions
wf_3_task_1_ex = self._assert_single_item(
wf_3_ex.task_executions,
name='task1'
)
wf_3_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_3_task_1_ex.id
)
wf_3_task_2_ex = self._assert_single_item(
wf_3_ex.task_executions,
name='task2'
)
wf_3_task_2_action_exs = db_api.get_action_executions(
task_execution_id=wf_3_task_2_ex.id
)
self.assertEqual(states.SUCCESS, wf_1_ex.state)
self.assertEqual(3, len(wf_1_task_execs))
self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state)
self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state)
self.assertEqual(states.SUCCESS, wf_1_task_3_ex.state)
self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state)
self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state)
self.assertEqual(states.SUCCESS, wf_2_ex.state)
self.assertEqual(2, len(wf_2_task_execs))
self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state)
self.assertEqual(states.SUCCESS, wf_2_task_2_ex.state)
self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state)
self.assertEqual(states.SUCCESS, wf_2_task_2_action_exs[0].state)
self.assertEqual(states.SUCCESS, wf_3_ex.state)
self.assertEqual(2, len(wf_3_task_execs))
self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state)
self.assertEqual(states.SUCCESS, wf_3_task_2_ex.state)
self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state)
self.assertEqual(states.SUCCESS, wf_3_task_2_action_exs[0].state)