Implement traversal based on number of childs

By using childs weights for scheduling we can unlock
concurrent and decrease total time of execution.

As an example consider next variant:
Tasks A and B can't run concurrently because of node-limit.
Tasks A and C have logical dependency, and thus not concurrent.
Tasks B and C will be executed on different nodes, and doesnt
have any logical dependency.
As A and B doesnt have parents we may schedule any of this task
and logically execution will be correct, but in case if we will choose
B total time of execution will be - B + A + C, BUT
if we will select A - total time of execution may be reduced,
and will take in total - A + max(B, C).

Change-Id: I52a6c20e8c3d729ed20da822f45cbad90e51f2df
Closes-Bug: 1554105
This commit is contained in:
Dmitry Shulyak 2016-03-21 17:08:31 +02:00
parent 16072bce2d
commit e2cfa869d8
9 changed files with 149 additions and 11 deletions

View File

@ -1072,6 +1072,8 @@ class Task(Model):
type_limit = Field(int, default=int)
weight = Field(int, default=int)
@classmethod
def new(cls, data):
key = '%s~%s' % (data['execution'], data['name'])

View File

@ -57,8 +57,10 @@ def get_graph(uid):
mdg = nx.MultiDiGraph()
mdg.graph['uid'] = uid
mdg.graph['name'] = uid.split(':')[0]
mdg.add_nodes_from(Task.multi_get(Task.execution.filter(uid)))
mdg.add_edges_from([(parent, task) for task in mdg.nodes()
tasks_by_uid = {t.key: t for t
in Task.multi_get(Task.execution.filter(uid))}
mdg.add_nodes_from(tasks_by_uid.values())
mdg.add_edges_from([(tasks_by_uid[parent], task) for task in mdg.nodes()
for parent in task.parents.all()])
return mdg
@ -97,6 +99,20 @@ def total_delta(graph):
get_plan = get_graph
def assign_weights_nested(dg):
"""Based on number of childs assign weights that will be
used later for scheduling.
"""
#: NOTE reverse(copy=False) swaps successors and predecessors
# on same copy of graph, thus before returning it - reverse it back
reversed_graph = dg.reverse(copy=False)
for task in nx.topological_sort(reversed_graph):
task.weight = sum([t.weight + 1 for t
in reversed_graph.predecessors(task)])
task.save_lazy()
return reversed_graph.reverse(copy=False)
def parse_plan(plan_path):
"""parses yaml definition and returns graph"""
plan = utils.yaml_load(plan_path)

View File

@ -13,6 +13,7 @@
# under the License.
from functools import partial
from operator import attrgetter
import time
from solar.core.log import log
@ -38,7 +39,8 @@ class Scheduler(base.Worker):
return list(limits.get_default_chain(
plan,
[t for t in plan if t.status == states.INPROGRESS.name],
find_visitable_tasks(plan)))
sorted(find_visitable_tasks(plan),
key=attrgetter('weight'), reverse=True)))
def next(self, ctxt, plan_uid):
with Lock(

View File

@ -159,10 +159,11 @@ def send_to_orchestration(tags=None):
state_change.insert(changed_nodes, dg)
evapi.build_edges(dg, events)
# what `name` should be?
dg.graph['name'] = 'system_log'
return graph.create_plan_from_graph(dg)
built_graph = graph.create_plan_from_graph(dg)
graph.assign_weights_nested(built_graph)
return built_graph
def parameters(res, action, data):

View File

@ -78,6 +78,11 @@ def timelimit_plan():
return plan_from_fixture('timelimit')
@pytest.fixture
def concurrent_choice_plan():
return plan_from_fixture('concurrent_choice')
@pytest.fixture
def sequence_vr(tmpdir):
base_path = os.path.join(

View File

@ -59,8 +59,9 @@ def test_concurrent_sequences_with_no_handler(scale, clients):
scheduler_client = clients['scheduler']
assert len(change.staged_log()) == total_resources
ModelMeta.session_end()
ModelMeta.save_all_lazy()
plan = change.send_to_orchestration()
ModelMeta.save_all_lazy()
scheduler_client.next({}, plan.graph['uid'])
def wait_function(timeout):

View File

@ -0,0 +1,47 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import gevent
from mock import Mock
from solar.orchestration import graph
def test_concurrent_tasks_choice_based_on_weights(
scheduler, tasks, concurrent_choice_plan):
worker, client = scheduler
tracer = Mock()
plan = concurrent_choice_plan
worker.next.on_success(tracer.update)
worker.update_next.on_success(tracer.update)
def wait_function(timeout):
for summary in graph.wait_finish(plan.graph['uid'], timeout):
time.sleep(0.5)
return summary
client.next({}, concurrent_choice_plan.graph['uid'])
waiter = gevent.spawn(wait_function, 2)
waiter.join(timeout=3)
first_call = tracer.update.call_args_list[0]
args, _ = first_call
ctxt, rst, _ = args
assert len(rst) == 1
assert rst[0].name == 's2'
second_call = tracer.update.call_args_list[1]
args, _ = second_call
ctxt, rst, status, msg = args
assert len(rst) == 2
assert {t.name for t in rst} == {'s1', 's3'}

View File

@ -0,0 +1,21 @@
name: seq
tasks:
- uid: s1
parameters:
type: echo
args: ['s1']
target: '1'
weight: 0
- uid: s2
parameters:
type: echo
args: ['s2']
target: '1'
weight: 1
- uid: s3
after: [s2]
parameters:
type: echo
args: ['s3']
target: '2'
weight: 0

View File

@ -102,15 +102,15 @@ def test_several_updates(simple_plan):
def times():
rst = nx.DiGraph()
t1 = Mock(name='t1', start_time=1.0, end_time=12.0,
status='', errmsg='')
status='', errmsg='', weight=0)
t2 = Mock(name='t2', start_time=1.0, end_time=3.0,
status='', errmsg='')
status='', errmsg='', weight=0)
t3 = Mock(name='t3', start_time=3.0, end_time=7.0,
status='', errmsg='')
status='', errmsg='', weight=0)
t4 = Mock(name='t4', start_time=7.0, end_time=13.0,
status='', errmsg='')
status='', errmsg='', weight=0)
t5 = Mock(name='t5', start_time=12.0, end_time=14.0,
status='', errmsg='')
status='', errmsg='', weight=0)
rst.add_nodes_from([t1, t2, t3, t4, t5])
rst.add_path([t1, t5])
rst.add_path([t2, t3, t4])
@ -122,3 +122,46 @@ def test_report_progress(times):
assert report['total_time'] == 13.0
assert report['total_delta'] == 25.0
assert len(report['tasks']) == 5
def test_assigned_weights_simple_sequence():
dg = nx.DiGraph()
t1 = Mock(name='t1', weight=0)
t2 = Mock(name='t2', weight=0)
t3 = Mock(name='t3', weight=0)
dg.add_nodes_from([t1, t2, t3])
dg.add_path([t1, t2, t3])
graph.assign_weights_nested(dg)
assert t1.weight == 2
assert t2.weight == 1
assert t3.weight == 0
def test_weights_strictly_decreasing():
dg = nx.DiGraph()
tasks = [Mock(name='t%s' % i, weight=0) for i in range(10)]
dg.add_nodes_from(tasks)
for i in range(10):
first, rest = tasks[i], tasks[i + 1:]
dg.add_edges_from([(first, n) for n in rest])
graph.assign_weights_nested(dg)
weights = iter(t.weight for t in tasks)
previous = next(weights)
for item in weights:
assert previous > item
previous = item
def test_weights_multi_path():
dg = nx.DiGraph()
tasks = [Mock(name='t%s' % i, weight=0) for i in range(11)]
first = tasks[0]
half = (len(tasks) / 2) + 1
dg.add_nodes_from(tasks)
dg.add_path([first] + tasks[1:half])
dg.add_path([first] + tasks[half:])
graph.assign_weights_nested(dg)
assert first.weight == len(tasks) - 1
# two subtree are equal
for s1, s2 in zip(tasks[1:half], tasks[half:]):
assert s1.weight == s2.weight