Merge "An execution hangs in the RUNNING state after rerun"

This commit is contained in:
Zuul 2018-10-15 22:42:44 +00:00 committed by Gerrit Code Review
commit cdd1539ada
4 changed files with 106 additions and 23 deletions

View File

@ -82,6 +82,14 @@ def run_task(wf_cmd):
_schedule_refresh_task_state(task.task_ex, 1)
def rerun_task(task_ex, wf_spec):
task = _build_task_from_execution(wf_spec, task_ex)
old_task_state = task_ex.state
task.set_state(states.RUNNING, None, False)
task.notify(old_task_state, states.RUNNING)
@profiler.trace('task-handler-on-action-complete', hide_args=True)
def _on_action_complete(action_ex):
"""Handles action completion event.

View File

@ -222,11 +222,6 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
wf.rerun(task_ex, reset=reset, env=env)
_schedule_check_and_complete(wf_ex)
if wf_ex.task_execution_id:
_schedule_check_and_complete(wf_ex.task_execution.workflow_execution)
def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state):

View File

@ -233,7 +233,7 @@ class Workflow(object):
wf_service.update_workflow_execution_env(self.wf_ex, env)
self.set_state(states.RUNNING, recursive=True)
self._recursive_rerun()
wf_ctrl = wf_base.get_controller(self.wf_ex)
@ -248,6 +248,31 @@ class Workflow(object):
self._continue_workflow(cmds)
def _recursive_rerun(self):
"""Rerun all parent workflow executions recursively.
If there is a parent execution that it reruns as well.
"""
from mistral.engine import workflow_handler
self.set_state(states.RUNNING)
workflow_handler._schedule_check_and_complete(self.wf_ex)
if self.wf_ex.task_execution_id:
parent_task_ex = db_api.get_task_execution(
self.wf_ex.task_execution_id
)
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
parent_wf.lock()
parent_wf._recursive_rerun()
from mistral.engine import task_handler
task_handler.rerun_task(parent_task_ex, parent_wf.wf_spec)
def _get_backlog(self):
return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY)
@ -330,7 +355,7 @@ class Workflow(object):
)
@profiler.trace('workflow-set-state')
def set_state(self, state, state_info=None, recursive=False):
def set_state(self, state, state_info=None):
assert self.wf_ex
cur_state = self.wf_ex.state
@ -376,22 +401,6 @@ class Workflow(object):
triggers.on_workflow_complete(self.wf_ex)
if recursive and self.wf_ex.task_execution_id:
parent_task_ex = db_api.get_task_execution(
self.wf_ex.task_execution_id
)
parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
parent_wf.lock()
parent_wf.set_state(state, recursive=recursive)
# TODO(rakhmerov): It'd be better to use instance of Task here.
parent_task_ex.state = state
parent_task_ex.state_info = None
parent_task_ex.processed = False
@profiler.trace('workflow-check-and-complete')
def check_and_complete(self):
"""Completes the workflow if it needs to be completed.

View File

@ -1480,3 +1480,74 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(6, len(action_executions))
self.assertTrue(all(a.state == states.ERROR
for a in action_executions))
@mock.patch.object(
std_actions.NoOpAction,
'run',
mock.MagicMock(
side_effect=[
exc.ActionException(),
'Success'
]
)
)
def test_rerun_sub_workflow(self):
wf_service.create_workflows("""---
version: '2.0'
wf1:
tasks:
task1:
workflow: wf2
wf2:
tasks:
task2:
workflow: wf3
wf3:
tasks:
task3:
action: std.noop""")
# Run workflow and fail task.
wf1_ex = self.engine.start_workflow('wf1')
self.await_workflow_error(wf1_ex.id)
with db_api.transaction():
wf_exs = db_api.get_workflow_executions()
task_exs = db_api.get_task_executions()
self.assertEqual(3, len(wf_exs),
'The number of workflow executions')
self.assertEqual(3, len(task_exs),
'The number of task executions')
for wf_ex in wf_exs:
self.assertEqual(states.ERROR, wf_ex.state,
'The executions must fail the first time')
for task_ex in task_exs:
self.assertEqual(states.ERROR, task_ex.state,
'The tasks must fail the first time')
wf3_ex = self._assert_single_item(wf_exs, name='wf3')
task3_ex = self._assert_single_item(wf3_ex.task_executions,
name="task3")
self.engine.rerun_workflow(task3_ex.id)
self.await_workflow_success(wf1_ex.id)
with db_api.transaction():
wf_exs = db_api.get_workflow_executions()
task_exs = db_api.get_task_executions()
self.assertEqual(3, len(wf_exs),
'The number of workflow executions')
self.assertEqual(3, len(task_exs),
'The number of task executions')
for wf_ex in wf_exs:
self.assertEqual(states.SUCCESS, wf_ex.state,
'The executions must success the second time')
for task_ex in task_exs:
self.assertEqual(states.SUCCESS, task_ex.state,
'The tasks must success the second time')