Merge pull request #159 from dshulyak/cover_graph_api

Cover graph api with tests and small cleanup
This commit is contained in:
CGenie 2015-09-16 16:50:18 +02:00
commit b64592f156
3 changed files with 109 additions and 25 deletions

View File

@ -99,16 +99,10 @@ def run_once(uid):
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def restart(uid):
graph.reset(uid)
graph.reset_by_uid(uid)
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def reset(uid):
graph.reset(uid)
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def stop(uid):
@ -119,17 +113,23 @@ def stop(uid):
tasks.soft_stop.apply_async(args=[uid], queue='scheduler')
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def reset(uid):
graph.reset_by_uid(uid)
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def resume(uid):
graph.reset(uid, ['SKIPPED'])
graph.reset_by_uid(uid, ['SKIPPED'])
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def retry(uid):
graph.reset(uid, ['ERROR'])
graph.reset_by_uid(uid, ['ERROR'])
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')

View File

@ -25,7 +25,7 @@ from solar.interfaces.db import get_db
db = get_db()
def save_graph(name, graph):
def save_graph(graph):
# maybe it is possible to store part of information in AsyncResult backend
uid = graph.graph['uid']
db.create(uid, graph.graph, db.COLLECTIONS.plan_graph)
@ -78,7 +78,7 @@ def parse_plan(plan_path):
def create_plan_from_graph(dg, save=True):
dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4()))
if save:
save_graph(dg.graph['uid'], dg)
save_graph(dg)
return dg
@ -110,27 +110,36 @@ def create_plan(plan_path, save=True):
def update_plan(uid, plan_path):
"""update preserves old status of tasks if they werent removed
"""
dg = parse_plan(plan_path)
old_dg = get_graph(uid)
dg.graph = old_dg.graph
for n in dg:
if n in old_dg:
dg.node[n]['status'] = old_dg.node[n]['status']
save_graph(uid, dg)
return uid
new = parse_plan(plan_path)
old = get_graph(uid)
return update_plan_from_graph(new, old).graph['uid']
def reset(uid, state_list=None):
def update_plan_from_graph(new, old):
new.graph = old.graph
for n in new:
if n in old:
new.node[n]['status'] = old.node[n]['status']
save_graph(new)
return new
def reset_by_uid(uid, state_list=None):
dg = get_graph(uid)
for n in dg:
if state_list is None or dg.node[n]['status'] in state_list:
dg.node[n]['status'] = states.PENDING.name
save_graph(uid, dg)
return reset(dg, state_list=state_list)
def reset(graph, state_list=None):
for n in graph:
if state_list is None or graph.node[n]['status'] in state_list:
graph.node[n]['status'] = states.PENDING.name
save_graph(graph)
def reset_filtered(uid):
reset(uid, state_list=[states.SKIPPED.name, states.NOOP.name])
reset_by_uid(uid, state_list=[states.SKIPPED.name, states.NOOP.name])
def report_topo(uid):

View File

@ -0,0 +1,75 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from copy import deepcopy
from pytest import fixture
from solar.orchestration import graph
from solar.orchestration.traversal import states
@fixture
def simple():
simple_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
'simple.yaml')
return graph.create_plan(simple_path)
def test_simple_plan_created_and_loaded(simple):
plan = graph.get_plan(simple.graph['uid'])
assert set(plan.nodes()) == {'just_fail', 'echo_stuff'}
def test_update_plan_with_new_node(simple):
new = deepcopy(simple)
new.add_node('one_more', {})
graph.update_plan_from_graph(new, simple)
updated = graph.get_plan(new.graph['uid'])
assert set(updated.nodes()) == {'one_more', 'just_fail', 'echo_stuff'}
def test_status_preserved_on_update(simple):
new = deepcopy(simple)
task_under_test = 'echo_stuff'
assert new.node[task_under_test]['status'] == states.PENDING.name
simple.node[task_under_test]['status'] = states.SUCCESS.name
graph.update_plan_from_graph(new, simple)
updated = graph.get_plan(new.graph['uid'])
assert new.node[task_under_test]['status'] == states.SUCCESS.name
def test_reset_all_states(simple):
for n in simple:
simple.node[n]['status'] == states.ERROR.name
graph.reset(simple)
for n in simple:
assert simple.node[n]['status'] == states.PENDING.name
def test_reset_only_provided(simple):
simple.node['just_fail']['status'] = states.ERROR.name
simple.node['echo_stuff']['status'] = states.SUCCESS.name
graph.reset(simple, [states.ERROR.name])
assert simple.node['just_fail']['status'] == states.PENDING.name
assert simple.node['echo_stuff']['status'] == states.SUCCESS.name