An execution hangs in the RUNNING state after rerun

When we rerun an execution we must create the "_check_and_complete"
delayed calls for all parent workflows. The problem was that we created
the delayed call only for the rerun execution and its parent.

Recursive rerun was extracted in the separated function. Because we
need to execute some additional operations, for example, create delayed
call for every a rerun execution.

Change-Id: I530094e916daf25bb9c672c445afa980ad4311ae
Closes-Bug: #1792451
Signed-off-by: Vitalii Solodilov <mcdkr@yandex.ru>
This commit is contained in:
Vitalii Solodilov 2018-09-15 00:57:24 +04:00 committed by Renat Akhmerov
parent 9be7e928d6
commit 041f3bd35c
4 changed files with 106 additions and 23 deletions

View File

@ -82,6 +82,14 @@ def run_task(wf_cmd):
_schedule_refresh_task_state(task.task_ex, 1)
def rerun_task(task_ex, wf_spec):
task = _build_task_from_execution(wf_spec, task_ex)
old_task_state = task_ex.state
task.set_state(states.RUNNING, None, False)
task.notify(old_task_state, states.RUNNING)
@profiler.trace('task-handler-on-action-complete', hide_args=True)
def _on_action_complete(action_ex):
"""Handles action completion event.

View File

@ -222,11 +222,6 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
wf.rerun(task_ex, reset=reset, env=env)
_schedule_check_and_complete(wf_ex)
if wf_ex.task_execution_id:
_schedule_check_and_complete(wf_ex.task_execution.workflow_execution)
def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state):

View File

@ -233,7 +233,7 @@ class Workflow(object):
wf_service.update_workflow_execution_env(self.wf_ex, env)
self.set_state(states.RUNNING, recursive=True)
self._recursive_rerun()
wf_ctrl = wf_base.get_controller(self.wf_ex)
@ -248,6 +248,31 @@ class Workflow(object):
self._continue_workflow(cmds)
def _recursive_rerun(self):
"""Rerun all parent workflow executions recursively.
If there is a parent execution that it reruns as well.
"""
from mistral.engine import workflow_handler
self.set_state(states.RUNNING)
workflow_handler._schedule_check_and_complete(self.wf_ex)
if self.wf_ex.task_execution_id:
parent_task_ex = db_api.get_task_execution(
self.wf_ex.task_execution_id
)
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
parent_wf.lock()
parent_wf._recursive_rerun()
from mistral.engine import task_handler
task_handler.rerun_task(parent_task_ex, parent_wf.wf_spec)
def _get_backlog(self):
return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY)
@ -330,7 +355,7 @@ class Workflow(object):
)
@profiler.trace('workflow-set-state')
def set_state(self, state, state_info=None, recursive=False):
def set_state(self, state, state_info=None):
assert self.wf_ex
cur_state = self.wf_ex.state
@ -376,22 +401,6 @@ class Workflow(object):
triggers.on_workflow_complete(self.wf_ex)
if recursive and self.wf_ex.task_execution_id:
parent_task_ex = db_api.get_task_execution(
self.wf_ex.task_execution_id
)
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
parent_wf.lock()
parent_wf.set_state(state, recursive=recursive)
# TODO(rakhmerov): It'd be better to use instance of Task here.
parent_task_ex.state = state
parent_task_ex.state_info = None
parent_task_ex.processed = False
@profiler.trace('workflow-check-and-complete')
def check_and_complete(self):
"""Completes the workflow if it needs to be completed.

View File

@ -1480,3 +1480,74 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(6, len(action_executions))
self.assertTrue(all(a.state == states.ERROR
for a in action_executions))
@mock.patch.object(
std_actions.NoOpAction,
'run',
mock.MagicMock(
side_effect=[
exc.ActionException(),
'Success'
]
)
)
def test_rerun_sub_workflow(self):
wf_service.create_workflows("""---
version: '2.0'
wf1:
tasks:
task1:
workflow: wf2
wf2:
tasks:
task2:
workflow: wf3
wf3:
tasks:
task3:
action: std.noop""")
# Run workflow and fail task.
wf1_ex = self.engine.start_workflow('wf1')
self.await_workflow_error(wf1_ex.id)
with db_api.transaction():
wf_exs = db_api.get_workflow_executions()
task_exs = db_api.get_task_executions()
self.assertEqual(3, len(wf_exs),
'The number of workflow executions')
self.assertEqual(3, len(task_exs),
'The number of task executions')
for wf_ex in wf_exs:
self.assertEqual(states.ERROR, wf_ex.state,
'The executions must fail the first time')
for task_ex in task_exs:
self.assertEqual(states.ERROR, task_ex.state,
'The tasks must fail the first time')
wf3_ex = self._assert_single_item(wf_exs, name='wf3')
task3_ex = self._assert_single_item(wf3_ex.task_executions,
name="task3")
self.engine.rerun_workflow(task3_ex.id)
self.await_workflow_success(wf1_ex.id)
with db_api.transaction():
wf_exs = db_api.get_workflow_executions()
task_exs = db_api.get_task_executions()
self.assertEqual(3, len(wf_exs),
'The number of workflow executions')
self.assertEqual(3, len(task_exs),
'The number of task executions')
for wf_ex in wf_exs:
self.assertEqual(states.SUCCESS, wf_ex.state,
'The executions must success the second time')
for task_ex in task_exs:
self.assertEqual(states.SUCCESS, task_ex.state,
'The tasks must success the second time')