From e2cfa869d808c9c52d8b75bf63e05078f16feeeb Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Mon, 21 Mar 2016 17:08:31 +0200 Subject: [PATCH] Implement traversal based on number of childs By using childs weights for scheduling we can unlock concurrent and decrease total time of execution. As an example consider next variant: Tasks A and B can't run concurrently because of node-limit. Tasks A and C have logical dependency, and thus not concurrent. Tasks B and C will be executed on different nodes, and doesnt have any logical dependency. As A and B doesnt have parents we may schedule any of this task and logically execution will be correct, but in case if we will choose B total time of execution will be - B + A + C, BUT if we will select A - total time of execution may be reduced, and will take in total - A + max(B, C). Change-Id: I52a6c20e8c3d729ed20da822f45cbad90e51f2df Closes-Bug: 1554105 --- solar/dblayer/solar_models.py | 2 + solar/orchestration/graph.py | 20 ++++++- solar/orchestration/workers/scheduler.py | 4 +- solar/system_log/change.py | 5 +- solar/test/conftest.py | 5 ++ .../test_complete_solar_workflow.py | 3 +- solar/test/functional/test_weights.py | 47 ++++++++++++++++ .../test/orch_fixtures/concurrent_choice.yaml | 21 ++++++++ solar/test/test_graph_api.py | 53 +++++++++++++++++-- 9 files changed, 149 insertions(+), 11 deletions(-) create mode 100644 solar/test/functional/test_weights.py create mode 100644 solar/test/orch_fixtures/concurrent_choice.yaml diff --git a/solar/dblayer/solar_models.py b/solar/dblayer/solar_models.py index 4f257db4..9179bc14 100644 --- a/solar/dblayer/solar_models.py +++ b/solar/dblayer/solar_models.py @@ -1072,6 +1072,8 @@ class Task(Model): type_limit = Field(int, default=int) + weight = Field(int, default=int) + @classmethod def new(cls, data): key = '%s~%s' % (data['execution'], data['name']) diff --git a/solar/orchestration/graph.py b/solar/orchestration/graph.py index 5fa680ca..f78e38ad 100644 --- a/solar/orchestration/graph.py +++ b/solar/orchestration/graph.py @@ -57,8 +57,10 @@ def get_graph(uid): mdg = nx.MultiDiGraph() mdg.graph['uid'] = uid mdg.graph['name'] = uid.split(':')[0] - mdg.add_nodes_from(Task.multi_get(Task.execution.filter(uid))) - mdg.add_edges_from([(parent, task) for task in mdg.nodes() + tasks_by_uid = {t.key: t for t + in Task.multi_get(Task.execution.filter(uid))} + mdg.add_nodes_from(tasks_by_uid.values()) + mdg.add_edges_from([(tasks_by_uid[parent], task) for task in mdg.nodes() for parent in task.parents.all()]) return mdg @@ -97,6 +99,20 @@ def total_delta(graph): get_plan = get_graph +def assign_weights_nested(dg): + """Based on number of childs assign weights that will be + used later for scheduling. + """ + #: NOTE reverse(copy=False) swaps successors and predecessors + # on same copy of graph, thus before returning it - reverse it back + reversed_graph = dg.reverse(copy=False) + for task in nx.topological_sort(reversed_graph): + task.weight = sum([t.weight + 1 for t + in reversed_graph.predecessors(task)]) + task.save_lazy() + return reversed_graph.reverse(copy=False) + + def parse_plan(plan_path): """parses yaml definition and returns graph""" plan = utils.yaml_load(plan_path) diff --git a/solar/orchestration/workers/scheduler.py b/solar/orchestration/workers/scheduler.py index 56a8200d..ec9a5919 100644 --- a/solar/orchestration/workers/scheduler.py +++ b/solar/orchestration/workers/scheduler.py @@ -13,6 +13,7 @@ # under the License. from functools import partial +from operator import attrgetter import time from solar.core.log import log @@ -38,7 +39,8 @@ class Scheduler(base.Worker): return list(limits.get_default_chain( plan, [t for t in plan if t.status == states.INPROGRESS.name], - find_visitable_tasks(plan))) + sorted(find_visitable_tasks(plan), + key=attrgetter('weight'), reverse=True))) def next(self, ctxt, plan_uid): with Lock( diff --git a/solar/system_log/change.py b/solar/system_log/change.py index db56d6e6..73a0a1ee 100644 --- a/solar/system_log/change.py +++ b/solar/system_log/change.py @@ -159,10 +159,11 @@ def send_to_orchestration(tags=None): state_change.insert(changed_nodes, dg) evapi.build_edges(dg, events) - # what `name` should be? dg.graph['name'] = 'system_log' - return graph.create_plan_from_graph(dg) + built_graph = graph.create_plan_from_graph(dg) + graph.assign_weights_nested(built_graph) + return built_graph def parameters(res, action, data): diff --git a/solar/test/conftest.py b/solar/test/conftest.py index cb8ea358..c16656f1 100644 --- a/solar/test/conftest.py +++ b/solar/test/conftest.py @@ -78,6 +78,11 @@ def timelimit_plan(): return plan_from_fixture('timelimit') +@pytest.fixture +def concurrent_choice_plan(): + return plan_from_fixture('concurrent_choice') + + @pytest.fixture def sequence_vr(tmpdir): base_path = os.path.join( diff --git a/solar/test/functional/test_complete_solar_workflow.py b/solar/test/functional/test_complete_solar_workflow.py index e514ee3f..f6542a93 100644 --- a/solar/test/functional/test_complete_solar_workflow.py +++ b/solar/test/functional/test_complete_solar_workflow.py @@ -59,8 +59,9 @@ def test_concurrent_sequences_with_no_handler(scale, clients): scheduler_client = clients['scheduler'] assert len(change.staged_log()) == total_resources - ModelMeta.session_end() + ModelMeta.save_all_lazy() plan = change.send_to_orchestration() + ModelMeta.save_all_lazy() scheduler_client.next({}, plan.graph['uid']) def wait_function(timeout): diff --git a/solar/test/functional/test_weights.py b/solar/test/functional/test_weights.py new file mode 100644 index 00000000..c9ffe745 --- /dev/null +++ b/solar/test/functional/test_weights.py @@ -0,0 +1,47 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import gevent +from mock import Mock + +from solar.orchestration import graph + + +def test_concurrent_tasks_choice_based_on_weights( + scheduler, tasks, concurrent_choice_plan): + worker, client = scheduler + tracer = Mock() + plan = concurrent_choice_plan + worker.next.on_success(tracer.update) + worker.update_next.on_success(tracer.update) + + def wait_function(timeout): + for summary in graph.wait_finish(plan.graph['uid'], timeout): + time.sleep(0.5) + return summary + client.next({}, concurrent_choice_plan.graph['uid']) + waiter = gevent.spawn(wait_function, 2) + waiter.join(timeout=3) + first_call = tracer.update.call_args_list[0] + args, _ = first_call + ctxt, rst, _ = args + assert len(rst) == 1 + assert rst[0].name == 's2' + second_call = tracer.update.call_args_list[1] + args, _ = second_call + ctxt, rst, status, msg = args + assert len(rst) == 2 + assert {t.name for t in rst} == {'s1', 's3'} diff --git a/solar/test/orch_fixtures/concurrent_choice.yaml b/solar/test/orch_fixtures/concurrent_choice.yaml new file mode 100644 index 00000000..bb88f158 --- /dev/null +++ b/solar/test/orch_fixtures/concurrent_choice.yaml @@ -0,0 +1,21 @@ +name: seq +tasks: + - uid: s1 + parameters: + type: echo + args: ['s1'] + target: '1' + weight: 0 + - uid: s2 + parameters: + type: echo + args: ['s2'] + target: '1' + weight: 1 + - uid: s3 + after: [s2] + parameters: + type: echo + args: ['s3'] + target: '2' + weight: 0 diff --git a/solar/test/test_graph_api.py b/solar/test/test_graph_api.py index 1fa4a835..c2af4931 100644 --- a/solar/test/test_graph_api.py +++ b/solar/test/test_graph_api.py @@ -102,15 +102,15 @@ def test_several_updates(simple_plan): def times(): rst = nx.DiGraph() t1 = Mock(name='t1', start_time=1.0, end_time=12.0, - status='', errmsg='') + status='', errmsg='', weight=0) t2 = Mock(name='t2', start_time=1.0, end_time=3.0, - status='', errmsg='') + status='', errmsg='', weight=0) t3 = Mock(name='t3', start_time=3.0, end_time=7.0, - status='', errmsg='') + status='', errmsg='', weight=0) t4 = Mock(name='t4', start_time=7.0, end_time=13.0, - status='', errmsg='') + status='', errmsg='', weight=0) t5 = Mock(name='t5', start_time=12.0, end_time=14.0, - status='', errmsg='') + status='', errmsg='', weight=0) rst.add_nodes_from([t1, t2, t3, t4, t5]) rst.add_path([t1, t5]) rst.add_path([t2, t3, t4]) @@ -122,3 +122,46 @@ def test_report_progress(times): assert report['total_time'] == 13.0 assert report['total_delta'] == 25.0 assert len(report['tasks']) == 5 + + +def test_assigned_weights_simple_sequence(): + dg = nx.DiGraph() + t1 = Mock(name='t1', weight=0) + t2 = Mock(name='t2', weight=0) + t3 = Mock(name='t3', weight=0) + dg.add_nodes_from([t1, t2, t3]) + dg.add_path([t1, t2, t3]) + graph.assign_weights_nested(dg) + assert t1.weight == 2 + assert t2.weight == 1 + assert t3.weight == 0 + + +def test_weights_strictly_decreasing(): + dg = nx.DiGraph() + tasks = [Mock(name='t%s' % i, weight=0) for i in range(10)] + dg.add_nodes_from(tasks) + for i in range(10): + first, rest = tasks[i], tasks[i + 1:] + dg.add_edges_from([(first, n) for n in rest]) + graph.assign_weights_nested(dg) + weights = iter(t.weight for t in tasks) + previous = next(weights) + for item in weights: + assert previous > item + previous = item + + +def test_weights_multi_path(): + dg = nx.DiGraph() + tasks = [Mock(name='t%s' % i, weight=0) for i in range(11)] + first = tasks[0] + half = (len(tasks) / 2) + 1 + dg.add_nodes_from(tasks) + dg.add_path([first] + tasks[1:half]) + dg.add_path([first] + tasks[half:]) + graph.assign_weights_nested(dg) + assert first.weight == len(tasks) - 1 + # two subtree are equal + for s1, s2 in zip(tasks[1:half], tasks[half:]): + assert s1.weight == s2.weight