Merge "Reformat retry logic for tasks with join"

This commit is contained in:
Zuul 2019-05-07 09:27:18 +00:00 committed by Gerrit Code Review
commit 2c430f4242
2 changed files with 54 additions and 32 deletions

View File

@ -33,11 +33,11 @@ _FAIL_IF_INCOMPLETE_TASK_PATH = (
)
def _log_task_delay(task_ex, delay_sec):
def _log_task_delay(task_ex, delay_sec, state=states.RUNNING_DELAYED):
wf_trace.info(
task_ex,
"Task '%s' [%s -> %s, delay = %s sec]" %
(task_ex.name, task_ex.state, states.RUNNING_DELAYED, delay_sec)
(task_ex.name, task_ex.state, state, delay_sec)
)
@ -160,15 +160,6 @@ def _ensure_context_has_key(runtime_context, key):
return runtime_context
def _has_incomplete_inbound_tasks(task_ex):
if "triggered_by" not in task_ex.runtime_context:
return False
for trigger in task_ex.runtime_context["triggered_by"]:
if trigger["event"] == "not triggered":
return True
return False
class WaitBeforePolicy(base.TaskPolicy):
_schema = {
"properties": {
@ -390,11 +381,6 @@ class RetryPolicy(base.TaskPolicy):
(self._continue_on_clause and not continue_on_evaluation)
)
stop_continue_flag = (
stop_continue_flag or
_has_incomplete_inbound_tasks(task_ex)
)
break_triggered = (
task_ex.state == states.ERROR and
break_on_evaluation
@ -403,15 +389,24 @@ class RetryPolicy(base.TaskPolicy):
if not retries_remain or break_triggered or stop_continue_flag:
return
_log_task_delay(task_ex, self.delay)
data_flow.invalidate_task_execution_result(task_ex)
task_ex.state = states.RUNNING_DELAYED
policy_context['retry_no'] = retry_no + 1
runtime_context[context_key] = policy_context
# NOTE(vgvoleg): join tasks in direct workflows can't be
# retried as is, because this tasks can't start without
# the correct logical state.
if hasattr(task_spec, "get_join") and task_spec.get_join():
from mistral.engine import task_handler as t_h
_log_task_delay(task_ex, self.delay, states.WAITING)
task_ex.state = states.WAITING
t_h._schedule_refresh_task_state(task_ex.id, self.delay)
return
_log_task_delay(task_ex, self.delay)
task_ex.state = states.RUNNING_DELAYED
scheduler.schedule_call(
None,
_CONTINUE_TASK_PATH,

View File

@ -1227,12 +1227,10 @@ class PoliciesTest(base.EngineTestCase):
self.assertDictEqual({'result': 'mocked result'}, wf_output)
def test_retry_failed_join_task(self):
def test_retry_join_task_after_failed_task(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
task-defaults:
@ -1241,13 +1239,11 @@ class PoliciesTest(base.EngineTestCase):
delay: 0
tasks:
task1:
action: std.noop
on-success: join_task
task2:
action: std.fail
on-success: join_task
join_task:
action: std.noop
join: all
"""
wb_service.create_workbook_v2(retry_wb)
@ -1260,15 +1256,46 @@ class PoliciesTest(base.EngineTestCase):
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self._assert_single_item(
tasks, name="task2", state=states.ERROR
)
self._assert_single_item(
tasks, name="join_task", state=states.ERROR
)
self._assert_single_item(tasks, name="task2", state=states.ERROR)
self._assert_single_item(tasks, name="join_task", state=states.ERROR)
def test_retry_join_task_after_idle_task(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
task-defaults:
retry:
count: 1
delay: 0
tasks:
task1:
on-success: join_task
task2:
action: std.fail
on-success: task3
task3:
on-success: join_task
join_task:
join: all
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self._assert_single_item(tasks, name="task2", state=states.ERROR)
self._assert_single_item(tasks, name="join_task", state=states.ERROR)
@mock.patch.object(
std_actions.EchoAction,