Refresh a number of retry a task when task was rerun
Change-Id: If0a8219bb54ee0d01084dbaf5c9ed5b2041c2bc4 Closes-Bug: #1772265 Signed-off-by: Vitalii Solodilov <mcdkr@yandex.ru>
This commit is contained in:
parent
5691d36383
commit
78b542c4c5
|
@ -405,6 +405,15 @@ class RetryPolicy(base.TaskPolicy):
|
|||
task_ex_id=task_ex.id,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def refresh_runtime_context(task_ex):
|
||||
runtime_context = task_ex.runtime_context or {}
|
||||
retry_task_policy = runtime_context.get('retry_task_policy')
|
||||
|
||||
if retry_task_policy:
|
||||
retry_task_policy['retry_no'] = 0
|
||||
task_ex.runtime_context['retry_task_policy'] = retry_task_policy
|
||||
|
||||
|
||||
class TimeoutPolicy(base.TaskPolicy):
|
||||
_schema = {
|
||||
|
|
|
@ -240,6 +240,12 @@ class Workflow(object):
|
|||
# Calculate commands to process next.
|
||||
cmds = wf_ctrl.rerun_tasks([task_ex], reset=reset)
|
||||
|
||||
if cmds:
|
||||
# Import the task_handler module here to avoid circular reference.
|
||||
from mistral.engine import policies
|
||||
|
||||
policies.RetryPolicy.refresh_runtime_context(task_ex)
|
||||
|
||||
self._continue_workflow(cmds)
|
||||
|
||||
def _get_backlog(self):
|
||||
|
|
|
@ -21,6 +21,7 @@ from mistral.actions import std_actions
|
|||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workbooks as wb_service
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
|
||||
|
@ -1430,3 +1431,52 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
|||
|
||||
self.assertEqual(1, len(task_3_action_exs))
|
||||
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
|
||||
|
||||
def test_rerun_task_with_retry_policy(self):
|
||||
wf_service.create_workflows("""---
|
||||
version: '2.0'
|
||||
wf_fail:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.fail
|
||||
retry:
|
||||
delay: 0
|
||||
count: 2""")
|
||||
|
||||
wf_ex = self.engine.start_workflow("wf_fail")
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions,
|
||||
name="task1")
|
||||
action_executions = task_ex.executions
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIsNotNone(wf_ex.state_info)
|
||||
self.assertEqual(3, len(action_executions))
|
||||
self.assertTrue(all(a.state == states.ERROR
|
||||
for a in action_executions))
|
||||
|
||||
self.engine.rerun_workflow(task_ex.id)
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions,
|
||||
name="task1")
|
||||
action_executions = task_ex.executions
|
||||
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIsNotNone(wf_ex.state_info)
|
||||
self.assertEqual(6, len(action_executions))
|
||||
self.assertTrue(all(a.state == states.ERROR
|
||||
for a in action_executions))
|
||||
|
|
Loading…
Reference in New Issue