Merge "An execution hangs in the RUNNING state after rerun"
This commit is contained in:
commit
cdd1539ada
|
@ -82,6 +82,14 @@ def run_task(wf_cmd):
|
|||
_schedule_refresh_task_state(task.task_ex, 1)
|
||||
|
||||
|
||||
def rerun_task(task_ex, wf_spec):
|
||||
task = _build_task_from_execution(wf_spec, task_ex)
|
||||
|
||||
old_task_state = task_ex.state
|
||||
task.set_state(states.RUNNING, None, False)
|
||||
task.notify(old_task_state, states.RUNNING)
|
||||
|
||||
|
||||
@profiler.trace('task-handler-on-action-complete', hide_args=True)
|
||||
def _on_action_complete(action_ex):
|
||||
"""Handles action completion event.
|
||||
|
|
|
@ -222,11 +222,6 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
|
|||
|
||||
wf.rerun(task_ex, reset=reset, env=env)
|
||||
|
||||
_schedule_check_and_complete(wf_ex)
|
||||
|
||||
if wf_ex.task_execution_id:
|
||||
_schedule_check_and_complete(wf_ex.task_execution.workflow_execution)
|
||||
|
||||
|
||||
def resume_workflow(wf_ex, env=None):
|
||||
if not states.is_paused_or_idle(wf_ex.state):
|
||||
|
|
|
@ -233,7 +233,7 @@ class Workflow(object):
|
|||
|
||||
wf_service.update_workflow_execution_env(self.wf_ex, env)
|
||||
|
||||
self.set_state(states.RUNNING, recursive=True)
|
||||
self._recursive_rerun()
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
||||
|
||||
|
@ -248,6 +248,31 @@ class Workflow(object):
|
|||
|
||||
self._continue_workflow(cmds)
|
||||
|
||||
def _recursive_rerun(self):
|
||||
"""Rerun all parent workflow executions recursively.
|
||||
|
||||
If there is a parent execution that it reruns as well.
|
||||
"""
|
||||
|
||||
from mistral.engine import workflow_handler
|
||||
|
||||
self.set_state(states.RUNNING)
|
||||
workflow_handler._schedule_check_and_complete(self.wf_ex)
|
||||
|
||||
if self.wf_ex.task_execution_id:
|
||||
parent_task_ex = db_api.get_task_execution(
|
||||
self.wf_ex.task_execution_id
|
||||
)
|
||||
|
||||
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
|
||||
|
||||
parent_wf.lock()
|
||||
|
||||
parent_wf._recursive_rerun()
|
||||
|
||||
from mistral.engine import task_handler
|
||||
task_handler.rerun_task(parent_task_ex, parent_wf.wf_spec)
|
||||
|
||||
def _get_backlog(self):
|
||||
return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY)
|
||||
|
||||
|
@ -330,7 +355,7 @@ class Workflow(object):
|
|||
)
|
||||
|
||||
@profiler.trace('workflow-set-state')
|
||||
def set_state(self, state, state_info=None, recursive=False):
|
||||
def set_state(self, state, state_info=None):
|
||||
assert self.wf_ex
|
||||
|
||||
cur_state = self.wf_ex.state
|
||||
|
@ -376,22 +401,6 @@ class Workflow(object):
|
|||
|
||||
triggers.on_workflow_complete(self.wf_ex)
|
||||
|
||||
if recursive and self.wf_ex.task_execution_id:
|
||||
parent_task_ex = db_api.get_task_execution(
|
||||
self.wf_ex.task_execution_id
|
||||
)
|
||||
|
||||
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
|
||||
|
||||
parent_wf.lock()
|
||||
|
||||
parent_wf.set_state(state, recursive=recursive)
|
||||
|
||||
# TODO(rakhmerov): It'd be better to use instance of Task here.
|
||||
parent_task_ex.state = state
|
||||
parent_task_ex.state_info = None
|
||||
parent_task_ex.processed = False
|
||||
|
||||
@profiler.trace('workflow-check-and-complete')
|
||||
def check_and_complete(self):
|
||||
"""Completes the workflow if it needs to be completed.
|
||||
|
|
|
@ -1480,3 +1480,74 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
|||
self.assertEqual(6, len(action_executions))
|
||||
self.assertTrue(all(a.state == states.ERROR
|
||||
for a in action_executions))
|
||||
|
||||
@mock.patch.object(
|
||||
std_actions.NoOpAction,
|
||||
'run',
|
||||
mock.MagicMock(
|
||||
side_effect=[
|
||||
exc.ActionException(),
|
||||
'Success'
|
||||
]
|
||||
)
|
||||
)
|
||||
def test_rerun_sub_workflow(self):
|
||||
wf_service.create_workflows("""---
|
||||
version: '2.0'
|
||||
wf1:
|
||||
tasks:
|
||||
task1:
|
||||
workflow: wf2
|
||||
wf2:
|
||||
tasks:
|
||||
task2:
|
||||
workflow: wf3
|
||||
wf3:
|
||||
tasks:
|
||||
task3:
|
||||
action: std.noop""")
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf1_ex = self.engine.start_workflow('wf1')
|
||||
|
||||
self.await_workflow_error(wf1_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_exs = db_api.get_workflow_executions()
|
||||
task_exs = db_api.get_task_executions()
|
||||
|
||||
self.assertEqual(3, len(wf_exs),
|
||||
'The number of workflow executions')
|
||||
self.assertEqual(3, len(task_exs),
|
||||
'The number of task executions')
|
||||
|
||||
for wf_ex in wf_exs:
|
||||
self.assertEqual(states.ERROR, wf_ex.state,
|
||||
'The executions must fail the first time')
|
||||
for task_ex in task_exs:
|
||||
self.assertEqual(states.ERROR, task_ex.state,
|
||||
'The tasks must fail the first time')
|
||||
|
||||
wf3_ex = self._assert_single_item(wf_exs, name='wf3')
|
||||
task3_ex = self._assert_single_item(wf3_ex.task_executions,
|
||||
name="task3")
|
||||
|
||||
self.engine.rerun_workflow(task3_ex.id)
|
||||
|
||||
self.await_workflow_success(wf1_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_exs = db_api.get_workflow_executions()
|
||||
task_exs = db_api.get_task_executions()
|
||||
|
||||
self.assertEqual(3, len(wf_exs),
|
||||
'The number of workflow executions')
|
||||
self.assertEqual(3, len(task_exs),
|
||||
'The number of task executions')
|
||||
|
||||
for wf_ex in wf_exs:
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state,
|
||||
'The executions must success the second time')
|
||||
for task_ex in task_exs:
|
||||
self.assertEqual(states.SUCCESS, task_ex.state,
|
||||
'The tasks must success the second time')
|
||||
|
|
Loading…
Reference in New Issue