Merge branch 'master' into speed_libvirt

Conflicts:
	vagrant-settings.yaml_defaults
This commit is contained in:
Łukasz Oleś 2015-09-17 11:35:26 +02:00
commit 47a6b8e163
15 changed files with 284 additions and 82 deletions

6
Vagrantfile vendored
View File

@ -30,7 +30,9 @@ end
SLAVES_COUNT = cfg["slaves_count"]
SLAVES_RAM = cfg["slaves_ram"]
SLAVES_IMAGE = cfg["slaves_image"]
MASTER_RAM = cfg["master_ram"]
MASTER_IMAGE = cfg["master_image"]
SYNC_TYPE = cfg["sync_type"]
MASTER_CPUS = cfg["master_cpus"]
SLAVES_CPUS = cfg["slaves_cpus"]
@ -50,7 +52,7 @@ slave_celery = ansible_playbook_command("celery.yaml", ["--skip-tags", "master"]
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.define "solar-dev", primary: true do |config|
config.vm.box = "cgenie/solar-master"
config.vm.box = MASTER_IMAGE
config.vm.provision "shell", inline: solar_script, privileged: true
config.vm.provision "shell", inline: master_celery, privileged: true
@ -95,7 +97,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
ip_index = i + 3
config.vm.define "solar-dev#{index}" do |config|
# standard box with all stuff preinstalled
config.vm.box = "cgenie/solar-master"
config.vm.box = SLAVES_IMAGE
config.vm.provision "file", source: "bootstrap/ansible.cfg", destination: "/home/vagrant/.ansible.cfg"
config.vm.provision "shell", inline: slave_script, privileged: true

View File

@ -0,0 +1,13 @@
# Demo of the `solar_bootstrap` Resource
You need to instantiate Vagrant with a slave node which is unprovisioned
(i.e. started from the `trusty64` Vagrant box).
You can start the boxes from the `Vagrantfile` in master directory and
`vagrant-settings.yml` from this directory.
Running
```bash
python example-bootstrap.py deploy
```
will deploy full Solar env to node `solar-dev2`.

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
import click
import sys
import time
@ -12,19 +14,6 @@ from solar import errors
from solar.interfaces.db import get_db
GIT_PUPPET_LIBS_URL = 'https://github.com/CGenie/puppet-libs-resource'
# TODO
# Resource for repository OR puppet apt-module in run.pp
# add-apt-repository cloud-archive:juno
# To discuss: install stuff in Docker container
# NOTE
# No copy of manifests, pull from upstream (implemented in the puppet handler)
# Official puppet manifests, not fuel-library
db = get_db()
@ -38,15 +27,15 @@ def setup_resources():
signals.Connections.clear()
node3 = vr.create('node3', 'resources/ro_node/', {
'ip': '10.0.0.5',
'ssh_key': '/vagrant/.vagrant/machines/solar-dev3/virtualbox/private_key',
node2 = vr.create('node2', 'resources/ro_node/', {
'ip': '10.0.0.4',
'ssh_key': '/vagrant/.vagrant/machines/solar-dev2/virtualbox/private_key',
'ssh_user': 'vagrant'
})[0]
solar_bootstrap3 = vr.create('solar_bootstrap3', 'resources/solar_bootstrap', {'master_ip': '10.0.0.2'})[0]
solar_bootstrap2 = vr.create('solar_bootstrap2', 'resources/solar_bootstrap', {'master_ip': '10.0.0.2'})[0]
signals.connect(node3, solar_bootstrap3)
signals.connect(node2, solar_bootstrap2)
has_errors = False
for r in locals().values():
@ -63,7 +52,7 @@ def setup_resources():
sys.exit(1)
resources_to_run = [
'solar_bootstrap3',
'solar_bootstrap2',
]

View File

@ -0,0 +1,5 @@
# rename it to vagrant-settings.yml then Vagrantfile
# will use values from this file
slaves_count: 3
slaves_image: ubuntu/trusty64

View File

@ -165,22 +165,13 @@ def init_cli_connections():
@connections.command()
@click.option('--start-with', default=None)
@click.option('--end-with', default=None)
def graph(end_with, start_with):
#g = xs.connection_graph()
def graph(start_with, end_with):
g = signals.detailed_connection_graph(start_with=start_with,
end_with=end_with)
nx.write_dot(g, 'graph.dot')
fabric_api.local('dot -Tpng graph.dot -o graph.png')
# Matplotlib
#pos = nx.spring_layout(g)
#nx.draw_networkx_nodes(g, pos)
#nx.draw_networkx_edges(g, pos, arrows=True)
#nx.draw_networkx_labels(g, pos)
#plt.axis('off')
#plt.savefig('graph.png')
def init_cli_resource():
@main.group()

View File

@ -99,16 +99,10 @@ def run_once(uid):
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def restart(uid):
graph.reset(uid)
graph.reset_by_uid(uid)
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def reset(uid):
graph.reset(uid)
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def stop(uid):
@ -119,17 +113,23 @@ def stop(uid):
tasks.soft_stop.apply_async(args=[uid], queue='scheduler')
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def reset(uid):
graph.reset_by_uid(uid)
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def resume(uid):
graph.reset(uid, ['SKIPPED'])
graph.reset_by_uid(uid, ['SKIPPED'])
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@orchestration.command()
@click.argument('uid', type=SOLARUID)
def retry(uid):
graph.reset(uid, ['ERROR'])
graph.reset_by_uid(uid, ['ERROR'])
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')

View File

@ -113,7 +113,7 @@ class Resource(object):
def resource_inputs(self):
return {
i.name: i for i in self.db_obj.inputs.value
i.name: i for i in self.db_obj.inputs.as_set()
}
def to_dict(self):

View File

@ -13,9 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import networkx
from solar.core.log import log
from solar.events.api import add_events
from solar.events.controls import Dependency
from solar.interfaces import orm
def guess_mapping(emitter, receiver):
@ -92,7 +95,7 @@ def connect_single(emitter, src, receiver, dst):
# Check for cycles
# TODO: change to get_paths after it is implemented in drivers
if emitter_input in receiver_input.receivers.value:
if emitter_input in receiver_input.receivers.as_set():
raise Exception('Prevented creating a cycle')
log.debug('Connecting {}::{} -> {}::{}'.format(
@ -103,6 +106,10 @@ def connect_single(emitter, src, receiver, dst):
def connect_multi(emitter, src, receiver, dst):
receiver_input_name, receiver_input_key = dst.split(':')
if '|' in receiver_input_key:
receiver_input_key, receiver_input_tag = receiver_input_key.split('|')
else:
receiver_input_tag = None
emitter_input = emitter.resource_inputs()[src]
receiver_input = receiver.resource_inputs()[receiver_input_name]
@ -113,11 +120,16 @@ def connect_multi(emitter, src, receiver, dst):
'Receiver input {} must be a hash or a list of hashes'.format(receiver_input_name)
)
log.debug('Connecting {}::{} -> {}::{}[{}]'.format(
log.debug('Connecting {}::{} -> {}::{}[{}], tag={}'.format(
emitter.name, emitter_input.name, receiver.name, receiver_input.name,
receiver_input_key
receiver_input_key,
receiver_input_tag
))
emitter_input.receivers.add_hash(receiver_input, receiver_input_key)
emitter_input.receivers.add_hash(
receiver_input,
receiver_input_key,
tag=receiver_input_tag
)
def disconnect_receiver_by_input(receiver, input_name):
@ -130,3 +142,40 @@ def disconnect(emitter, receiver):
for emitter_input in emitter.resource_inputs().values():
for receiver_input in receiver.resource_inputs().values():
emitter_input.receivers.remove(receiver_input)
def detailed_connection_graph(start_with=None, end_with=None):
resource_inputs_graph = orm.DBResource.inputs.graph()
inputs_graph = orm.DBResourceInput.receivers.graph()
def node_attrs(n):
if isinstance(n, orm.DBResource):
return {
'color': 'yellowgreen',
'style': 'filled',
}
elif isinstance(n, orm.DBResourceInput):
return {
'color': 'lightskyblue',
'style': 'filled, rounded',
}
def format_name(i):
if isinstance(i, orm.DBResource):
return i.name
elif isinstance(i, orm.DBResourceInput):
return '{}/{}'.format(i.resource.name, i.name)
for r, i in resource_inputs_graph.edges():
inputs_graph.add_edge(r, i)
ret = networkx.MultiDiGraph()
for u, v in inputs_graph.edges():
u_n = format_name(u)
v_n = format_name(v)
ret.add_edge(u_n, v_n)
ret.node[u_n] = node_attrs(u)
ret.node[v_n] = node_attrs(v)
return ret

View File

@ -13,6 +13,7 @@
# under the License.
import inspect
import networkx
import uuid
from solar import errors
@ -89,6 +90,24 @@ class DBRelatedField(object):
self.name = name
self.source_db_object = source_db_object
@classmethod
def graph(self):
relations = db.get_relations(type_=self.relation_type)
g = networkx.MultiDiGraph()
for r in relations:
source = self.source_db_class(**r.start_node.properties)
dest = self.destination_db_class(**r.end_node.properties)
properties = r.properties.copy()
g.add_edge(
source,
dest,
attr_dict=properties
)
return g
def all(self):
source_db_node = self.source_db_object._db_node
@ -123,7 +142,7 @@ class DBRelatedField(object):
type_=self.relation_type
)
def add_hash(self, destination_db_object, destination_key):
def add_hash(self, destination_db_object, destination_key, tag=None):
if not isinstance(destination_db_object, self.destination_db_class):
raise errors.SolarError(
'Object {} is of incompatible type {}.'.format(
@ -134,7 +153,7 @@ class DBRelatedField(object):
db.get_or_create_relation(
self.source_db_object._db_node,
destination_db_object._db_node,
properties={'destination_key': destination_key},
properties={'destination_key': destination_key, 'tag': tag},
type_=self.relation_type
)
@ -146,8 +165,7 @@ class DBRelatedField(object):
type_=self.relation_type
)
@property
def value(self):
def as_set(self):
"""
Return DB objects that are destinations for self.source_db_object.
"""
@ -165,7 +183,7 @@ class DBRelatedField(object):
def sources(self, destination_db_object):
"""
Reverse of self.value, i.e. for given destination_db_object,
Reverse of self.as_set, i.e. for given destination_db_object,
return source DB objects.
"""
@ -392,16 +410,19 @@ class DBResourceInput(DBObject):
return [i.backtrack_value() for i in inputs]
# NOTE: we return a list of values, but we need to group them
# by resource name, hence this dict here
# hence this dict here
# NOTE: grouping is done by resource.name by default, but this
# can be overwritten by the 'tag' property in relation
ret = {}
for r in relations:
source = source_class(**r.start_node.properties)
ret.setdefault(source.resource.name, {})
tag = r.properties['tag'] or source.resource.name
ret.setdefault(tag, {})
key = r.properties['destination_key']
value = source.backtrack_value()
ret[source.resource.name].update({key: value})
ret[tag].update({key: value})
return ret.values()
elif self.is_hash:

View File

@ -25,7 +25,7 @@ from solar.interfaces.db import get_db
db = get_db()
def save_graph(name, graph):
def save_graph(graph):
# maybe it is possible to store part of information in AsyncResult backend
uid = graph.graph['uid']
db.create(uid, graph.graph, db.COLLECTIONS.plan_graph)
@ -78,7 +78,7 @@ def parse_plan(plan_path):
def create_plan_from_graph(dg, save=True):
dg.graph['uid'] = "{0}:{1}".format(dg.graph['name'], str(uuid.uuid4()))
if save:
save_graph(dg.graph['uid'], dg)
save_graph(dg)
return dg
@ -110,27 +110,36 @@ def create_plan(plan_path, save=True):
def update_plan(uid, plan_path):
"""update preserves old status of tasks if they werent removed
"""
dg = parse_plan(plan_path)
old_dg = get_graph(uid)
dg.graph = old_dg.graph
for n in dg:
if n in old_dg:
dg.node[n]['status'] = old_dg.node[n]['status']
save_graph(uid, dg)
return uid
new = parse_plan(plan_path)
old = get_graph(uid)
return update_plan_from_graph(new, old).graph['uid']
def reset(uid, state_list=None):
def update_plan_from_graph(new, old):
new.graph = old.graph
for n in new:
if n in old:
new.node[n]['status'] = old.node[n]['status']
save_graph(new)
return new
def reset_by_uid(uid, state_list=None):
dg = get_graph(uid)
for n in dg:
if state_list is None or dg.node[n]['status'] in state_list:
dg.node[n]['status'] = states.PENDING.name
save_graph(uid, dg)
return reset(dg, state_list=state_list)
def reset(graph, state_list=None):
for n in graph:
if state_list is None or graph.node[n]['status'] in state_list:
graph.node[n]['status'] = states.PENDING.name
save_graph(graph)
def reset_filtered(uid):
reset(uid, state_list=[states.SKIPPED.name, states.NOOP.name])
reset_by_uid(uid, state_list=[states.SKIPPED.name, states.NOOP.name])
def report_topo(uid):

View File

@ -126,7 +126,7 @@ def schedule(plan_uid, dg):
tasks)
execution = executor.celery_executor(
dg, limit_chain, control_tasks=('fault_tolerance',))
graph.save_graph(plan_uid, dg)
graph.save_graph(dg)
execution()
@ -147,7 +147,7 @@ def soft_stop(plan_uid):
for n in dg:
if dg.node[n]['status'] == 'PENDING':
dg.node[n]['status'] = 'SKIPPED'
graph.save_graph(plan_uid, dg)
graph.save_graph(dg)
@app.task(name='schedule_next')

View File

@ -0,0 +1,75 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from copy import deepcopy
from pytest import fixture
from solar.orchestration import graph
from solar.orchestration.traversal import states
@fixture
def simple():
simple_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
'simple.yaml')
return graph.create_plan(simple_path)
def test_simple_plan_created_and_loaded(simple):
plan = graph.get_plan(simple.graph['uid'])
assert set(plan.nodes()) == {'just_fail', 'echo_stuff'}
def test_update_plan_with_new_node(simple):
new = deepcopy(simple)
new.add_node('one_more', {})
graph.update_plan_from_graph(new, simple)
updated = graph.get_plan(new.graph['uid'])
assert set(updated.nodes()) == {'one_more', 'just_fail', 'echo_stuff'}
def test_status_preserved_on_update(simple):
new = deepcopy(simple)
task_under_test = 'echo_stuff'
assert new.node[task_under_test]['status'] == states.PENDING.name
simple.node[task_under_test]['status'] = states.SUCCESS.name
graph.update_plan_from_graph(new, simple)
updated = graph.get_plan(new.graph['uid'])
assert new.node[task_under_test]['status'] == states.SUCCESS.name
def test_reset_all_states(simple):
for n in simple:
simple.node[n]['status'] == states.ERROR.name
graph.reset(simple)
for n in simple:
assert simple.node[n]['status'] == states.PENDING.name
def test_reset_only_provided(simple):
simple.node['just_fail']['status'] = states.ERROR.name
simple.node['echo_stuff']['status'] = states.SUCCESS.name
graph.reset(simple, [states.ERROR.name])
assert simple.node['just_fail']['status'] == states.PENDING.name
assert simple.node['echo_stuff']['status'] == states.SUCCESS.name

View File

@ -137,7 +137,6 @@ class TestORM(BaseResourceTest):
self.assertEqual(t1, t2)
class TestORMRelation(BaseResourceTest):
def test_children_value(self):
class TestDBRelatedObject(orm.DBObject):
@ -164,25 +163,25 @@ class TestORMRelation(BaseResourceTest):
o = TestDBObject(id='a')
o.save()
self.assertSetEqual(o.related.value, set())
self.assertSetEqual(o.related.as_set(), set())
o.related.add(r1)
self.assertSetEqual(o.related.value, {r1})
self.assertSetEqual(o.related.as_set(), {r1})
o.related.add(r2)
self.assertSetEqual(o.related.value, {r1, r2})
self.assertSetEqual(o.related.as_set(), {r1, r2})
o.related.remove(r2)
self.assertSetEqual(o.related.value, {r1})
self.assertSetEqual(o.related.as_set(), {r1})
o.related.add(r2)
self.assertSetEqual(o.related.value, {r1, r2})
self.assertSetEqual(o.related.as_set(), {r1, r2})
o.related.remove(r1, r2)
self.assertSetEqual(o.related.value, set())
self.assertSetEqual(o.related.as_set(), set())
o.related.add(r1, r2)
self.assertSetEqual(o.related.value, {r1, r2})
self.assertSetEqual(o.related.as_set(), {r1, r2})
with self.assertRaisesRegexp(errors.SolarError, '.*incompatible type.*'):
o.related.add(o)
@ -208,8 +207,8 @@ class TestORMRelation(BaseResourceTest):
o1.related.add(o2)
o2.related.add(o3)
self.assertEqual(o1.related.value, {o2})
self.assertEqual(o2.related.value, {o3})
self.assertEqual(o1.related.as_set(), {o2})
self.assertEqual(o2.related.as_set(), {o3})
class TestResourceORM(BaseResourceTest):
@ -227,4 +226,4 @@ class TestResourceORM(BaseResourceTest):
r.add_input('ip', 'str!', '10.0.0.2')
self.assertEqual(len(r.inputs.value), 1)
self.assertEqual(len(r.inputs.as_set()), 1)

View File

@ -654,3 +654,50 @@ input:
{'ip': sample.args['ip']},
receiver.args['server'],
)
def test_hash_input_multiple_resources_with_tag_connect(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
handler: ansible
version: 1.0.0
input:
ip:
schema: str!
value:
port:
schema: int!
value:
""")
receiver_meta_dir = self.make_resource_meta("""
id: receiver
handler: ansible
version: 1.0.0
input:
server:
schema: [{ip: str!, port: int!}]
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
xs.connect(sample1, receiver, mapping={'ip': 'server:ip'})
xs.connect(sample2, receiver, mapping={'port': 'server:port|sample1'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']}],
receiver.args['server'],
)
sample3 = self.create_resource(
'sample3', sample_meta_dir, args={'ip': '10.0.0.3', 'port': 5002}
)
xs.connect(sample3, receiver, mapping={'ip': 'server:ip', 'port': 'server:port'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']},
{'ip': sample3.args['ip'], 'port': sample3.args['port']}],
receiver.args['server'],
)

View File

@ -3,6 +3,8 @@
slaves_count: 2
slaves_ram: 1024
master_image: cgenie/solar-master
slaves_image: cgenie/solar-master
master_ram: 1024
master_cpus: 1
slaves_cpus: 1