Refacor resume algorithm
Change-Id: I98301f7292841ee574a01308a5af398574f2352a
This commit is contained in:
parent
75f642219d
commit
63e6ac1e86
|
@ -122,8 +122,8 @@ def build_pause_before_policy(policies_spec):
|
|||
|
||||
pause_before_policy = policies_spec.get_pause_before()
|
||||
|
||||
return PauseBeforePolicy(pause_before_policy) \
|
||||
if pause_before_policy else None
|
||||
return (PauseBeforePolicy(pause_before_policy)
|
||||
if pause_before_policy else None)
|
||||
|
||||
|
||||
def _ensure_context_has_key(runtime_context, key):
|
||||
|
|
|
@ -175,7 +175,7 @@ class WorkflowHandler(object):
|
|||
def filter_task_cmds(cmds):
|
||||
return [cmd for cmd in cmds if isinstance(cmd, commands.RunTask)]
|
||||
|
||||
def get_tasks_to_schedule(task_db, schedule_tasks, schedule_cmds):
|
||||
def get_tasks_to_schedule(task_db, schedule_tasks):
|
||||
"""Finds tasks that should run after given task and searches them
|
||||
in DB. If there are no tasks in the DB, it should be scheduled
|
||||
now. If there are tasks in the DB, continue search to next tasks
|
||||
|
@ -191,36 +191,40 @@ class WorkflowHandler(object):
|
|||
|
||||
if states.is_completed(task_db.state):
|
||||
for task_name in next_t_names:
|
||||
t_db = [t for t in tasks if t.name == task_name]
|
||||
t_db = t_db[0] if t_db else None
|
||||
task_spec = self.wf_spec.get_tasks()[task_name]
|
||||
t_db = wf_utils.find_db_task(self.exec_db, task_spec)
|
||||
|
||||
if not t_db:
|
||||
schedule_tasks += [task_name]
|
||||
task_spec = self.wf_spec.get_tasks()[task_name]
|
||||
schedule_cmds += [commands.RunTask(task_spec)]
|
||||
else:
|
||||
schedule_tasks += get_tasks_to_schedule(
|
||||
t_db,
|
||||
schedule_tasks,
|
||||
schedule_cmds
|
||||
schedule_tasks
|
||||
)
|
||||
elif states.is_idle(task_db.state):
|
||||
idle_task_spec = self.wf_spec.get_tasks()[task_db.name]
|
||||
schedule_cmds += [commands.RunTask(idle_task_spec, task_db)]
|
||||
schedule_tasks += [task_db.name]
|
||||
|
||||
return schedule_cmds
|
||||
return schedule_tasks
|
||||
|
||||
params = self.exec_db.start_params
|
||||
start_task_cmds = filter_task_cmds(
|
||||
self.start_workflow(**params if params else {})
|
||||
)
|
||||
|
||||
schedule_cmds = []
|
||||
task_names = []
|
||||
|
||||
for cmd in start_task_cmds:
|
||||
task_db = [t for t in tasks
|
||||
if t.name == cmd.task_spec.get_name()][0]
|
||||
schedule_cmds += get_tasks_to_schedule(task_db, [], [])
|
||||
task_names += get_tasks_to_schedule(task_db, [])
|
||||
|
||||
schedule_cmds = []
|
||||
|
||||
for t_name in task_names:
|
||||
t_spec = self.wf_spec.get_tasks()[t_name]
|
||||
t_db = wf_utils.find_db_task(self.exec_db, t_spec)
|
||||
|
||||
schedule_cmds += [commands.RunTask(t_spec, t_db)]
|
||||
|
||||
return schedule_cmds
|
||||
|
||||
|
|
Loading…
Reference in New Issue