Reformat rerun logic for tasks with join

Change-Id: I055bc2d5a4bdf839f1e262e49563616d8deff92f
Closes-Bug: #1833262
Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
Oleg Ovcharuk 2019-06-18 20:00:05 +03:00
parent 74b2fffec2
commit bdbfb82301
3 changed files with 62 additions and 19 deletions

View File

@ -395,8 +395,8 @@ class RetryPolicy(base.TaskPolicy):
runtime_context[context_key] = policy_context
# NOTE(vgvoleg): join tasks in direct workflows can't be
# retried as is, because this tasks can't start without
# the correct logical state.
# retried as-is, because these tasks can't start without
# a correct logical state.
if hasattr(task_spec, "get_join") and task_spec.get_join():
from mistral.engine import task_handler as t_h
_log_task_delay(task_ex, self.delay, states.WAITING)

View File

@ -39,7 +39,6 @@ from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import states
LOG = logging.getLogger(__name__)
@ -520,6 +519,16 @@ class RegularTask(Task):
@profiler.trace('task-run-existing')
def _run_existing(self):
# NOTE(vgvoleg): join tasks in direct workflows can't be
# rerun as-is, because these tasks can't start without
# a correct logical state.
if self.rerun and hasattr(self.task_spec, "get_join") \
and self.task_spec.get_join():
from mistral.engine import task_handler as t_h
self.set_state(states.WAITING, 'Task is waiting.')
t_h._schedule_refresh_task_state(self.task_ex.id)
return
if self.waiting:
return

View File

@ -161,6 +161,10 @@ workflows:
type: direct
tasks:
t0:
action: std.noop
on-success:
- t3
t1:
action: std.echo output="Task 1"
on-success:
@ -818,12 +822,14 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
self.assertEqual(states.SUCCESS, task_0_ex.state)
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.SUCCESS, task_2_ex.state)
self.assertEqual(states.ERROR, task_3_ex.state)
@ -847,12 +853,23 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 0.
self.assertEqual(states.SUCCESS, task_0_ex.state)
task_0_action_exs = db_api.get_action_executions(
task_execution_id=task_0_ex.id
)
self.assertEqual(1, len(task_0_action_exs))
self.assertEqual(states.SUCCESS, task_0_action_exs[0].state)
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
@ -906,16 +923,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1')
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
self.await_task_error(task_1_ex.id)
self.await_task_error(task_2_ex.id)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
@ -924,7 +931,11 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs))
self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
self.assertEqual(states.SUCCESS, task_0_ex.state)
task_1_ex = self._assert_single_item(task_execs, name='t1')
@ -936,6 +947,11 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, task_2_ex.state)
self.assertIsNotNone(task_2_ex.state_info)
task_3_ex = self._assert_single_item(task_execs, name='t3')
self.assertEqual(states.ERROR, task_3_ex.state)
self.assertIsNotNone(task_3_ex.state_info)
# Resume workflow and re-run failed task.
wf_ex = self.engine.rerun_workflow(task_1_ex.id)
@ -959,16 +975,24 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
self.assertEqual(states.SUCCESS, task_0_ex.state)
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
self.assertEqual(states.ERROR, task_3_ex.state)
# Check that join task did not start any action execution
task_3_action_exs = db_api.get_action_executions(
task_execution_id=task_3_ex.id
)
self.assertEqual(0, len(task_3_action_exs))
# Resume workflow and re-run failed task.
wf_ex = self.engine.rerun_workflow(task_2_ex.id)
@ -987,12 +1011,22 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertEqual(4, len(task_execs))
task_0_ex = self._assert_single_item(task_execs, name='t0')
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
task_3_ex = self._assert_single_item(task_execs, name='t3')
# Check action executions of task 0.
self.assertEqual(states.SUCCESS, task_0_ex.state)
self.assertIsNone(task_0_ex.state_info)
task_0_action_exs = db_api.get_action_executions(
task_execution_id=task_0_ex.id
)
self.assertEqual(1, len(task_0_action_exs))
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertIsNone(task_1_ex.state_info)
@ -1021,7 +1055,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self._assert_single_item(task_2_action_exs, state=states.SUCCESS)
self._assert_single_item(task_2_action_exs, state=states.ERROR)
# Check action executions of task 3.
# Check there is exactly 1 action execution of task 3.
self.assertEqual(states.SUCCESS, task_3_ex.state)
task_3_action_exs = db_api.get_action_executions(