Cascade pause and resume to and from subworkflows

When a workflow is paused/resumed, the state will cascade down to
subworkflows. When a subworkflow is paused, the parent workflow
will also be paused. When a subworkflow is resumed, the parent
workflow will only resume if there are no other subworkflows in
paused state.

Change-Id: I4ae8cfb08d17f326fc2ef0864376ed594cd0b3cf
Closes-Bug: #1700196
This commit is contained in:
Winson Chan 2017-07-12 23:36:31 +00:00
parent 3de1918c30
commit 8b642e9856
8 changed files with 2082 additions and 18 deletions

View File

@ -147,6 +147,21 @@ def _on_action_update(action_ex):
try:
task.on_action_update(action_ex)
if states.is_paused(action_ex.state):
wf_handler.pause_workflow(wf_ex)
if states.is_running(action_ex.state):
# If any subworkflow of the parent workflow is paused,
# then keep the parent workflow execution paused.
for task_ex in wf_ex.task_executions:
if states.is_paused(task_ex.state):
return
# Otherwise if no other subworkflow is paused,
# then resume the parent workflow execution.
wf_handler.resume_workflow(wf_ex)
except exc.MistralException as e:
wf_ex = task_ex.workflow_execution

View File

@ -123,9 +123,21 @@ def _check_and_complete(wf_ex_id):
def pause_workflow(wf_ex, msg=None):
wf = workflows.Workflow(wf_ex=wf_ex)
# Pause subworkflows first.
for task_ex in wf_ex.task_executions:
sub_wf_exs = db_api.get_workflow_executions(
task_execution_id=task_ex.id
)
wf.set_state(states.PAUSED, msg)
for sub_wf_ex in sub_wf_exs:
if not states.is_completed(sub_wf_ex.state):
pause_workflow(sub_wf_ex, msg=msg)
# If all subworkflows paused successfully, pause the main workflow.
# If any subworkflows failed to pause for temporary reason, this
# allows pause to be executed again on the main workflow.
wf = workflows.Workflow(wf_ex=wf_ex)
wf.pause(msg=msg)
def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
@ -146,8 +158,19 @@ def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state):
return wf_ex.get_clone()
wf = workflows.Workflow(wf_ex=wf_ex)
# Resume subworkflows first.
for task_ex in wf_ex.task_executions:
sub_wf_exs = db_api.get_workflow_executions(
task_execution_id=task_ex.id
)
for sub_wf_ex in sub_wf_exs:
if not states.is_completed(sub_wf_ex.state):
resume_workflow(sub_wf_ex)
# Resume current workflow here so to trigger continue workflow only
# after all other subworkflows are placed back in running state.
wf = workflows.Workflow(wf_ex=wf_ex)
wf.resume(env=env)

View File

@ -125,6 +125,29 @@ class Workflow(object):
return self._cancel_workflow(msg)
def pause(self, msg=None):
"""Pause workflow.
:param msg: Additional explaining message.
"""
assert self.wf_ex
if states.is_paused(self.wf_ex.state):
return
# Set the state of this workflow to paused.
self.set_state(states.PAUSED, state_info=msg)
# If workflow execution is a subworkflow,
# schedule update to the task execution.
if self.wf_ex.task_execution_id:
# Import the task_handler module here to avoid circular reference.
from mistral.engine import task_handler
task_handler.schedule_on_action_update(self.wf_ex)
return
def resume(self, env=None):
"""Resume workflow.
@ -135,7 +158,7 @@ class Workflow(object):
wf_service.update_workflow_execution_env(self.wf_ex, env)
self.set_state(states.RUNNING, recursive=True)
self.set_state(states.RUNNING)
wf_ctrl = wf_base.get_controller(self.wf_ex)
@ -144,6 +167,13 @@ class Workflow(object):
self._continue_workflow(cmds)
# If workflow execution is a subworkflow,
# schedule update to the task execution.
if self.wf_ex.task_execution_id:
# Import the task_handler module here to avoid circular reference.
from mistral.engine import task_handler
task_handler.schedule_on_action_update(self.wf_ex)
def prepare_input(self, input_dict):
for k, v in self.wf_spec.get_input().items():
if k not in input_dict or input_dict[k] is utils.NotDefined:

View File

@ -249,6 +249,10 @@ class EngineTestCase(base.DbTestCase):
timeout
)
def await_workflow_running(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_workflow_state(ex_id, states.RUNNING, delay, timeout)
def await_workflow_success(self, ex_id, delay=DEFAULT_DELAY,
timeout=DEFAULT_TIMEOUT):
self.await_workflow_state(ex_id, states.SUCCESS, delay, timeout)

View File

@ -361,7 +361,7 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual(1, len(task_execs))
self.assertEqual(states.PAUSED, task_execs[0].state)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.PAUSED, wf_ex.state)
action_execs = db_api.get_action_executions(
task_execution_id=task1_ex.id

View File

@ -155,10 +155,12 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
states.SUCCESS,
self.engine.resume_workflow(wf_ex.id).state
)
self.assertRaises(
exc.WorkflowException,
self.engine.pause_workflow, wf_ex.id
)
self.assertEqual(
states.SUCCESS,
self.engine.stop_workflow(wf_ex.id, states.ERROR).state

File diff suppressed because it is too large Load Diff

View File

@ -83,7 +83,7 @@ class TaskPauseResumeTest(base.EngineTestCase):
task_execution_id=task_1_ex.id
)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(1, len(task_execs))
self.assertEqual(states.PAUSED, task_1_ex.state)
self.assertEqual(1, len(task_1_action_exs))
@ -174,8 +174,9 @@ class TaskPauseResumeTest(base.EngineTestCase):
name='task1'
)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
task_1_action_exs = sorted(
db_api.get_action_executions(task_execution_id=task_1_ex.id),
key=lambda x: x['runtime_context']['index']
)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -201,11 +202,12 @@ class TaskPauseResumeTest(base.EngineTestCase):
name='task1'
)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
task_1_action_exs = sorted(
db_api.get_action_executions(task_execution_id=task_1_ex.id),
key=lambda x: x['runtime_context']['index']
)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(1, len(task_execs))
self.assertEqual(states.PAUSED, task_1_ex.state)
self.assertEqual(3, len(task_1_action_exs))
@ -234,11 +236,12 @@ class TaskPauseResumeTest(base.EngineTestCase):
name='task1'
)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
task_1_action_exs = sorted(
db_api.get_action_executions(task_execution_id=task_1_ex.id),
key=lambda x: x['runtime_context']['index']
)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(1, len(task_execs))
self.assertEqual(states.PAUSED, task_1_ex.state)
self.assertEqual(3, len(task_1_action_exs))
@ -259,8 +262,9 @@ class TaskPauseResumeTest(base.EngineTestCase):
name='task1'
)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
task_1_action_exs = sorted(
db_api.get_action_executions(task_execution_id=task_1_ex.id),
key=lambda x: x['runtime_context']['index']
)
self.assertEqual(states.RUNNING, wf_ex.state)
@ -287,8 +291,9 @@ class TaskPauseResumeTest(base.EngineTestCase):
task_1_ex = self._assert_single_item(task_execs, name='task1')
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id
task_1_action_exs = sorted(
db_api.get_action_executions(task_execution_id=task_1_ex.id),
key=lambda x: x['runtime_context']['index']
)
task_2_ex = self._assert_single_item(task_execs, name='task2')