Refacor resume algorithm

Change-Id: I98301f7292841ee574a01308a5af398574f2352a
This commit is contained in:
Nikolay Mahotkin 2014-12-18 16:07:47 +03:00
parent 75f642219d
commit 63e6ac1e86
2 changed files with 18 additions and 14 deletions

View File

@ -122,8 +122,8 @@ def build_pause_before_policy(policies_spec):
pause_before_policy = policies_spec.get_pause_before()
return PauseBeforePolicy(pause_before_policy) \
if pause_before_policy else None
return (PauseBeforePolicy(pause_before_policy)
if pause_before_policy else None)
def _ensure_context_has_key(runtime_context, key):

View File

@ -175,7 +175,7 @@ class WorkflowHandler(object):
def filter_task_cmds(cmds):
return [cmd for cmd in cmds if isinstance(cmd, commands.RunTask)]
def get_tasks_to_schedule(task_db, schedule_tasks, schedule_cmds):
def get_tasks_to_schedule(task_db, schedule_tasks):
"""Finds tasks that should run after given task and searches them
in DB. If there are no tasks in the DB, it should be scheduled
now. If there are tasks in the DB, continue search to next tasks
@ -191,36 +191,40 @@ class WorkflowHandler(object):
if states.is_completed(task_db.state):
for task_name in next_t_names:
t_db = [t for t in tasks if t.name == task_name]
t_db = t_db[0] if t_db else None
task_spec = self.wf_spec.get_tasks()[task_name]
t_db = wf_utils.find_db_task(self.exec_db, task_spec)
if not t_db:
schedule_tasks += [task_name]
task_spec = self.wf_spec.get_tasks()[task_name]
schedule_cmds += [commands.RunTask(task_spec)]
else:
schedule_tasks += get_tasks_to_schedule(
t_db,
schedule_tasks,
schedule_cmds
schedule_tasks
)
elif states.is_idle(task_db.state):
idle_task_spec = self.wf_spec.get_tasks()[task_db.name]
schedule_cmds += [commands.RunTask(idle_task_spec, task_db)]
schedule_tasks += [task_db.name]
return schedule_cmds
return schedule_tasks
params = self.exec_db.start_params
start_task_cmds = filter_task_cmds(
self.start_workflow(**params if params else {})
)
schedule_cmds = []
task_names = []
for cmd in start_task_cmds:
task_db = [t for t in tasks
if t.name == cmd.task_spec.get_name()][0]
schedule_cmds += get_tasks_to_schedule(task_db, [], [])
task_names += get_tasks_to_schedule(task_db, [])
schedule_cmds = []
for t_name in task_names:
t_spec = self.wf_spec.get_tasks()[t_name]
t_db = wf_utils.find_db_task(self.exec_db, t_spec)
schedule_cmds += [commands.RunTask(t_spec, t_db)]
return schedule_cmds