An execution hangs in the RUNNING state after rerun
When we rerun an execution we must create the "_check_and_complete" delayed calls for all parent workflows. The problem was that we created the delayed call only for the rerun execution and its parent. Recursive rerun was extracted in the separated function. Because we need to execute some additional operations, for example, create delayed call for every a rerun execution. Change-Id: I530094e916daf25bb9c672c445afa980ad4311ae Closes-Bug: #1792451 Signed-off-by: Vitalii Solodilov <mcdkr@yandex.ru>
This commit is contained in:
parent
9be7e928d6
commit
041f3bd35c
|
@ -82,6 +82,14 @@ def run_task(wf_cmd):
|
|||
_schedule_refresh_task_state(task.task_ex, 1)
|
||||
|
||||
|
||||
def rerun_task(task_ex, wf_spec):
|
||||
task = _build_task_from_execution(wf_spec, task_ex)
|
||||
|
||||
old_task_state = task_ex.state
|
||||
task.set_state(states.RUNNING, None, False)
|
||||
task.notify(old_task_state, states.RUNNING)
|
||||
|
||||
|
||||
@profiler.trace('task-handler-on-action-complete', hide_args=True)
|
||||
def _on_action_complete(action_ex):
|
||||
"""Handles action completion event.
|
||||
|
|
|
@ -222,11 +222,6 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
|
|||
|
||||
wf.rerun(task_ex, reset=reset, env=env)
|
||||
|
||||
_schedule_check_and_complete(wf_ex)
|
||||
|
||||
if wf_ex.task_execution_id:
|
||||
_schedule_check_and_complete(wf_ex.task_execution.workflow_execution)
|
||||
|
||||
|
||||
def resume_workflow(wf_ex, env=None):
|
||||
if not states.is_paused_or_idle(wf_ex.state):
|
||||
|
|
|
@ -233,7 +233,7 @@ class Workflow(object):
|
|||
|
||||
wf_service.update_workflow_execution_env(self.wf_ex, env)
|
||||
|
||||
self.set_state(states.RUNNING, recursive=True)
|
||||
self._recursive_rerun()
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
||||
|
||||
|
@ -248,6 +248,31 @@ class Workflow(object):
|
|||
|
||||
self._continue_workflow(cmds)
|
||||
|
||||
def _recursive_rerun(self):
|
||||
"""Rerun all parent workflow executions recursively.
|
||||
|
||||
If there is a parent execution that it reruns as well.
|
||||
"""
|
||||
|
||||
from mistral.engine import workflow_handler
|
||||
|
||||
self.set_state(states.RUNNING)
|
||||
workflow_handler._schedule_check_and_complete(self.wf_ex)
|
||||
|
||||
if self.wf_ex.task_execution_id:
|
||||
parent_task_ex = db_api.get_task_execution(
|
||||
self.wf_ex.task_execution_id
|
||||
)
|
||||
|
||||
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
|
||||
|
||||
parent_wf.lock()
|
||||
|
||||
parent_wf._recursive_rerun()
|
||||
|
||||
from mistral.engine import task_handler
|
||||
task_handler.rerun_task(parent_task_ex, parent_wf.wf_spec)
|
||||
|
||||
def _get_backlog(self):
|
||||
return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY)
|
||||
|
||||
|
@ -330,7 +355,7 @@ class Workflow(object):
|
|||
)
|
||||
|
||||
@profiler.trace('workflow-set-state')
|
||||
def set_state(self, state, state_info=None, recursive=False):
|
||||
def set_state(self, state, state_info=None):
|
||||
assert self.wf_ex
|
||||
|
||||
cur_state = self.wf_ex.state
|
||||
|
@ -376,22 +401,6 @@ class Workflow(object):
|
|||
|
||||
triggers.on_workflow_complete(self.wf_ex)
|
||||
|
||||
if recursive and self.wf_ex.task_execution_id:
|
||||
parent_task_ex = db_api.get_task_execution(
|
||||
self.wf_ex.task_execution_id
|
||||
)
|
||||
|
||||
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
|
||||
|
||||
parent_wf.lock()
|
||||
|
||||
parent_wf.set_state(state, recursive=recursive)
|
||||
|
||||
# TODO(rakhmerov): It'd be better to use instance of Task here.
|
||||
parent_task_ex.state = state
|
||||
parent_task_ex.state_info = None
|
||||
parent_task_ex.processed = False
|
||||
|
||||
@profiler.trace('workflow-check-and-complete')
|
||||
def check_and_complete(self):
|
||||
"""Completes the workflow if it needs to be completed.
|
||||
|
|
|
@ -1480,3 +1480,74 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
|||
self.assertEqual(6, len(action_executions))
|
||||
self.assertTrue(all(a.state == states.ERROR
|
||||
for a in action_executions))
|
||||
|
||||
@mock.patch.object(
|
||||
std_actions.NoOpAction,
|
||||
'run',
|
||||
mock.MagicMock(
|
||||
side_effect=[
|
||||
exc.ActionException(),
|
||||
'Success'
|
||||
]
|
||||
)
|
||||
)
|
||||
def test_rerun_sub_workflow(self):
|
||||
wf_service.create_workflows("""---
|
||||
version: '2.0'
|
||||
wf1:
|
||||
tasks:
|
||||
task1:
|
||||
workflow: wf2
|
||||
wf2:
|
||||
tasks:
|
||||
task2:
|
||||
workflow: wf3
|
||||
wf3:
|
||||
tasks:
|
||||
task3:
|
||||
action: std.noop""")
|
||||
|
||||
# Run workflow and fail task.
|
||||
wf1_ex = self.engine.start_workflow('wf1')
|
||||
|
||||
self.await_workflow_error(wf1_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_exs = db_api.get_workflow_executions()
|
||||
task_exs = db_api.get_task_executions()
|
||||
|
||||
self.assertEqual(3, len(wf_exs),
|
||||
'The number of workflow executions')
|
||||
self.assertEqual(3, len(task_exs),
|
||||
'The number of task executions')
|
||||
|
||||
for wf_ex in wf_exs:
|
||||
self.assertEqual(states.ERROR, wf_ex.state,
|
||||
'The executions must fail the first time')
|
||||
for task_ex in task_exs:
|
||||
self.assertEqual(states.ERROR, task_ex.state,
|
||||
'The tasks must fail the first time')
|
||||
|
||||
wf3_ex = self._assert_single_item(wf_exs, name='wf3')
|
||||
task3_ex = self._assert_single_item(wf3_ex.task_executions,
|
||||
name="task3")
|
||||
|
||||
self.engine.rerun_workflow(task3_ex.id)
|
||||
|
||||
self.await_workflow_success(wf1_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_exs = db_api.get_workflow_executions()
|
||||
task_exs = db_api.get_task_executions()
|
||||
|
||||
self.assertEqual(3, len(wf_exs),
|
||||
'The number of workflow executions')
|
||||
self.assertEqual(3, len(task_exs),
|
||||
'The number of task executions')
|
||||
|
||||
for wf_ex in wf_exs:
|
||||
self.assertEqual(states.SUCCESS, wf_ex.state,
|
||||
'The executions must success the second time')
|
||||
for task_ex in task_exs:
|
||||
self.assertEqual(states.SUCCESS, task_ex.state,
|
||||
'The tasks must success the second time')
|
||||
|
|
Loading…
Reference in New Issue