Refactor rerun of joins

* This patch moves logic that schedules a task state refreshing
  periodic job in case of rerun from the Task class to
  task_handler.run_task() so that Task doesn't have to know any
  language specific details and call task handler back. It is
  more architecturally clean.

Change-Id: If7a054bbf77f9ed761d8f3ac36b6d329544f5ff5
This commit is contained in:
Renat Akhmerov 2019-11-11 16:41:39 +07:00
parent bdbfb82301
commit 59bf2509eb
5 changed files with 31 additions and 18 deletions

View File

@ -60,6 +60,11 @@ def run_task(wf_cmd):
task = _build_task_from_command(wf_cmd)
try:
if task.waiting and task.rerun:
task.set_state(states.WAITING, 'Task is waiting.')
_schedule_refresh_task_state(task.task_ex.id)
task.run()
except exc.MistralException as e:
wf_ex = wf_cmd.wf_ex
@ -77,11 +82,13 @@ def run_task(wf_cmd):
_check_affected_tasks(task)
def rerun_task(task_ex, wf_spec):
def mark_task_running(task_ex, wf_spec):
task = _build_task_from_execution(wf_spec, task_ex)
old_task_state = task_ex.state
task.set_state(states.RUNNING, None, False)
task.notify(old_task_state, states.RUNNING)

View File

@ -519,16 +519,6 @@ class RegularTask(Task):
@profiler.trace('task-run-existing')
def _run_existing(self):
# NOTE(vgvoleg): join tasks in direct workflows can't be
# rerun as-is, because these tasks can't start without
# a correct logical state.
if self.rerun and hasattr(self.task_spec, "get_join") \
and self.task_spec.get_join():
from mistral.engine import task_handler as t_h
self.set_state(states.WAITING, 'Task is waiting.')
t_h._schedule_refresh_task_state(self.task_ex.id)
return
if self.waiting:
return

View File

@ -260,17 +260,17 @@ class Workflow(object):
def _recursive_rerun(self):
"""Rerun all parent workflow executions recursively.
If there is a parent execution that it reruns as well.
If there is a parent execution then it reruns as well.
"""
from mistral.engine import workflow_handler
self.set_state(states.RUNNING)
# TODO(rakhmerov): We call a internal method of a module here.
# TODO(rakhmerov): We call an internal method of a module here.
# The simplest way is to make it public, however, I believe
# it's another "bad smell" that tells that some refactoring
# of the architecture should be made.
# of the architecture is needed.
workflow_handler._schedule_check_and_fix_integrity(self.wf_ex)
if self.wf_ex.task_execution_id:
@ -285,7 +285,7 @@ class Workflow(object):
parent_wf._recursive_rerun()
from mistral.engine import task_handler
task_handler.rerun_task(parent_task_ex, parent_wf.wf_spec)
task_handler.mark_task_running(parent_task_ex, parent_wf.wf_spec)
def _get_backlog(self):
return self.wf_ex.runtime_context.get(dispatcher.BACKLOG_KEY)

View File

@ -144,8 +144,13 @@ class WorkflowController(object):
return []
cmds = [
commands.RunExistingTask(self.wf_ex, self.wf_spec, t_e, reset,
rerun=True)
commands.RunExistingTask(
self.wf_ex,
self.wf_spec,
t_e,
reset,
rerun=True
)
for t_e in task_execs
]

View File

@ -154,7 +154,7 @@ class DirectWorkflowController(base.WorkflowController):
return cmds
def _configure_if_join(self, cmd):
if not isinstance(cmd, commands.RunTask):
if not isinstance(cmd, (commands.RunTask, commands.RunExistingTask)):
return
if not cmd.task_spec.get_join():
@ -166,6 +166,17 @@ class DirectWorkflowController(base.WorkflowController):
def _get_join_unique_key(self, cmd):
return 'join-task-%s-%s' % (self.wf_ex.id, cmd.task_spec.get_name())
def rerun_tasks(self, task_execs, reset=True):
cmds = super(DirectWorkflowController, self).rerun_tasks(
task_execs,
reset
)
for cmd in cmds:
self._configure_if_join(cmd)
return cmds
# TODO(rakhmerov): Need to refactor this method to be able to pass tasks
# whose contexts need to be merged.
def evaluate_workflow_final_context(self):