Run autopep8

This commit is contained in:
Łukasz Oleś 2015-11-26 14:12:10 +01:00
parent 2a9157452a
commit fc61c88228
62 changed files with 631 additions and 411 deletions

View File

@ -10,7 +10,7 @@ install:
- pip-accel install coveralls
- pip-accel install -r test-requirements.txt
script:
- py.test --cov=solar -s solar
- tox -e pep8 && py.test --cov=solar -s solar
services:
- riak
after_success:

View File

@ -16,8 +16,9 @@ import click
class AliasedGroup(click.Group):
"""This class introduces iproute2-like behaviour, command will be inferred
by matching patterns.
"""This class introduces iproute2-like behaviour,
command will be inferredby matching patterns.
If there will be more than 1 matches - exception will be raised
Examples:
@ -25,6 +26,7 @@ class AliasedGroup(click.Group):
>> solar cha process
>> solar res action run rabbitmq_service1
"""
def get_command(self, ctx, cmd_name):
rv = click.Group.get_command(self, ctx, cmd_name)
if rv is not None:

View File

@ -33,7 +33,7 @@ def show(resource):
click.echo('Resource: {}'.format(resource))
offset = ' ' * 4
for ev in all_:
click.echo(offset+repr(ev))
click.echo(offset + repr(ev))
else:
click.echo('No events for resource {}'.format(resource))

View File

@ -1,7 +1,22 @@
from hashlib import md5
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
class DryRunExecutor(object):
def __init__(self, mapping=None):
from fabric import api as fabric_api
from fabric.contrib import project as fabric_project
@ -26,10 +41,11 @@ class DryRunExecutor(object):
fabric_api.put = mock.Mock(side_effect=dry_run_executor('PUT'))
fabric_api.run = mock.Mock(side_effect=dry_run_executor('SSH RUN'))
fabric_api.sudo = mock.Mock(side_effect=dry_run_executor('SSH SUDO'))
fabric_project.rsync_project = mock.Mock(side_effect=dry_run_executor('RSYNC PROJECT'))
fabric_project.rsync_project = mock.Mock(
side_effect=dry_run_executor('RSYNC PROJECT'))
def compute_hash(self, key):
return md5(str(key)).hexdigest()
return hashlib.md5(str(key)).hexdigest()
def find_hash(self, hash):
stripped_hashes = {k.replace('>', ''): k for k in self.mapping}
@ -37,7 +53,7 @@ class DryRunExecutor(object):
hashes = [k for k in stripped_hashes if hash.startswith(k)]
if len(hashes) == 0:
#raise Exception('Hash {} not found'.format(hash))
# raise Exception('Hash {} not found'.format(hash))
return ''
elif len(hashes) > 1:
raise Exception('Hash {} not unique in {}'.format(

View File

@ -17,31 +17,20 @@
On create "golden" resource should be moved to special place
"""
import collections
import json
import click
from fabric import api as fabric_api
import json
import networkx as nx
import os
import sys
import tabulate
import yaml
from collections import defaultdict
from solar.core import actions
from solar.core import resource as sresource
from solar.core import signals
from solar.core.tags_set_parser import Expression
from solar.core.resource import virtual_resource as vr
from solar.core.log import log
from solar import errors
from solar import utils
from solar.cli import base
from solar.cli import executors
from solar.cli.orch import orchestration
from solar.cli.system_log import changes
from solar.cli.events import events
from solar.cli.orch import orchestration
from solar.cli.resource import resource as cli_resource
from solar.cli.system_log import changes
from solar.core import resource as sresource
from solar.core import signals
# HELPERS
@ -54,7 +43,7 @@ def format_resource_input(resource_name, resource_input):
def show_emitter_connections(res):
db_obj = res.db_obj
d = defaultdict(list)
d = collections.defaultdict(list)
for emitter, receiver, _meta in db_obj.inputs._edges():
d[emitter].append(receiver)
@ -79,7 +68,8 @@ def init_actions():
def run(dry_run_mapping, dry_run, action, tags):
raise NotImplementedError("Not yet implemented")
# if dry_run:
# dry_run_executor = executors.DryRunExecutor(mapping=json.loads(dry_run_mapping))
# dry_run_executor = executors.DryRunExecutor(
# mapping=json.loads(dry_run_mapping))
# resources = filter(
# lambda r: Expression(tags, r.tags).evaluate(),
@ -94,8 +84,9 @@ def init_actions():
# click.echo('EXECUTED:')
# for key in dry_run_executor.executed:
# click.echo('{}: {}'.format(
# click.style(dry_run_executor.compute_hash(key), fg='green'),
# str(key)
# click.style(dry_run_executor.compute_hash(key),
# fg='green'),
# str(key)
# ))

View File

@ -18,19 +18,20 @@ import time
import click
from solar.cli.uids_history import remember_uid
from solar.cli.uids_history import SOLARUID
from solar import errors
from solar.orchestration import filters
from solar.orchestration import graph
from solar.orchestration import tasks
from solar.orchestration import filters
from solar.orchestration import utils
from solar.orchestration.traversal import states
from solar.cli.uids_history import SOLARUID, remember_uid
from solar import errors
from solar.orchestration import utils
@click.group(name='orch')
def orchestration():
"""
\b
"""\b
create solar/orchestration/examples/multi.yaml
<id>
run-once <id>
@ -53,13 +54,15 @@ def wait_report(uid, timeout, interval=3):
if timeout:
for summary in graph.wait_finish(uid, timeout=timeout):
stringified_summary = '\r' + ' '.join(
['{}: {}'.format(state, count) for state, count in summary.items()])
length = len(stringified_summary)
['{}: {}'.format(state, count)
for state, count in summary.items()])
click.echo(stringified_summary, nl=False)
sys.stdout.flush()
if summary[states.PENDING.name] + summary[states.INPROGRESS.name] != 0:
pending = states.PENDING.name
in_progress = states.INPROGRESS.name
if summary[pending] + summary[in_progress] != 0:
time.sleep(interval)
except errors.SolarError as err:
except errors.SolarError:
click.echo('')
click_report(uid)
sys.exit(1)
@ -83,7 +86,7 @@ def click_report(uid):
if item[2]:
msg += ' :: {}'.format(item[2])
if item[4] and item[3]:
delta = float(item[4])-float(item[3])
delta = float(item[4]) - float(item[3])
total += delta
msg += ' D: {}'.format(delta)
click.echo(click.style(msg, fg=colors[item[1]]))

View File

@ -12,28 +12,28 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
import os
import json
import yaml
import tabulate
import os
import sys
import click
from solar.core import actions
from solar.core import resource as sresource
from solar.core.resource import virtual_resource as vr
from solar.core.log import log
from solar import errors
from solar import utils
import tabulate
import yaml
from solar.cli import executors
from solar.core import actions
from solar.core.log import log
from solar.core import resource as sresource
from solar.core.resource import virtual_resource as vr
from solar import errors
from solar import utils
@click.group()
def resource():
pass
@resource.command()
@click.argument('action')
@click.argument('resource')
@ -41,7 +41,8 @@ def resource():
@click.option('-m', '--dry-run-mapping', default='{}')
def action(dry_run_mapping, dry_run, action, resource):
if dry_run:
dry_run_executor = executors.DryRunExecutor(mapping=json.loads(dry_run_mapping))
dry_run_executor = executors.DryRunExecutor(
mapping=json.loads(dry_run_mapping))
click.echo(
'action {} for resource {}'.format(action, resource)
@ -72,13 +73,16 @@ def backtrack_inputs(resource, input, values, real_values):
r = sresource.load(resource)
db_obj = r.db_obj
def single(resource, name, get_val=False):
db_obj = sresource.load(resource).db_obj
se = db_obj.inputs._single_edge(name)
se = tuple(se)
if not se:
if get_val:
return dict(resource=resource, name=name, value=db_obj.inputs[name])
return dict(resource=resource,
name=name,
value=db_obj.inputs[name])
else:
return dict(resource=resource, name=name)
l = []
@ -100,7 +104,9 @@ def backtrack_inputs(resource, input, values, real_values):
for name, values in inps.iteritems():
click.echo(yaml.safe_dump({name: values}, default_flow_style=False))
if real_values:
click.echo('! Real value: %r' % sresource.load(resource).db_obj.inputs[name] , nl=True)
click.echo('! Real value: %r' % sresource.load(
resource).db_obj.inputs[name], nl=True)
@resource.command()
def compile_all():
@ -111,12 +117,14 @@ def compile_all():
if os.path.exists(destination_path):
os.remove(destination_path)
for path in utils.find_by_mask(utils.read_config()['resources-files-mask']):
resources_files_mask = utils.read_config()['resources-files-mask']
for path in utils.find_by_mask(resources_files_mask):
meta = utils.yaml_load(path)
meta['base_path'] = os.path.dirname(path)
compiler.compile(meta)
@resource.command()
def clear_all():
from solar.dblayer.model import ModelMeta
@ -143,6 +151,7 @@ def create(args, base_path, name):
for res in resources:
click.echo(res.color_repr())
@resource.command()
@click.option('--name', '-n', default=None)
@click.option('--tag', '-t', multiple=True)
@ -159,7 +168,8 @@ def show(name, tag, as_json, color):
resources = sresource.load_all()
if as_json:
output = json.dumps([r.to_dict(inputs=True) for r in resources], indent=2)
output = json.dumps([r.to_dict(inputs=True)
for r in resources], indent=2)
echo = click.echo
else:
if color:
@ -171,6 +181,7 @@ def show(name, tag, as_json, color):
if output:
echo(output)
@resource.command()
@click.argument('resource_name')
@click.argument('tags', nargs=-1)
@ -184,6 +195,7 @@ def tag(add, tags, resource_name):
r.remove_tags(*tags)
click.echo('Tag(s) {} removed from {}'.format(tags, resource_name))
@resource.command()
@click.argument('name')
@click.argument('args', nargs=-1)
@ -199,6 +211,7 @@ def update(name, args):
res = sresource.load(name)
res.update(args_parsed)
@resource.command()
@click.option('--check-missing-connections', default=False, is_flag=True)
def validate(check_missing_connections):
@ -220,6 +233,7 @@ def validate(check_missing_connections):
])
)
@resource.command()
@click.argument('path', type=click.Path(exists=True, dir_okay=False))
def get_inputs(path):
@ -231,7 +245,8 @@ def get_inputs(path):
@resource.command()
@click.option('--name', '-n', default=None)
@click.option('--tag', '-t', multiple=True)
@click.option('-f', default=False, is_flag=True, help='force removal from database')
@click.option('-f', default=False, is_flag=True,
help='force removal from database')
def remove(name, tag, f):
if name:
resources = [sresource.load(name)]
@ -242,6 +257,8 @@ def remove(name, tag, f):
for res in resources:
res.remove(force=f)
if f:
click.echo('Resource %s removed from database' % res.name)
msg = 'Resource %s removed from database' % res.name
else:
click.echo('Resource %s will be removed after commiting changes.' % res.name)
msg = 'Resource %s will be removed after commiting changes.'
msg = msg % res.name
click.echo(msg)

View File

@ -16,13 +16,14 @@ import sys
import click
from solar import errors
from solar.core import testing
from solar.cli.uids_history import remember_uid
from solar.cli.uids_history import SOLARUID
from solar.core import resource
from solar.core import testing
from solar import errors
from solar.system_log import change
from solar.system_log import operations
from solar.system_log import data
from solar.cli.uids_history import get_uid, remember_uid, SOLARUID
from solar.system_log import operations
@click.group()
@ -35,7 +36,7 @@ def validate():
errors = resource.validate_resources()
if errors:
for r, error in errors:
print 'ERROR: %s: %s' % (r.name, error)
print('ERROR: %s: %s' % (r.name, error))
sys.exit(1)
@ -48,20 +49,22 @@ def stage(d):
click.echo(data.compact(item))
if d:
for line in data.details(item.diff):
click.echo(' '*4+line)
click.echo(' ' * 4 + line)
if not log:
click.echo('No changes')
@changes.command(name='staged-item')
@click.argument('uid')
def staged_item(uid):
item = data.LogItem.get(uid)
if not item:
click.echo('No staged changes for {}'.format(log_action))
click.echo('No staged changes for {}'.format(uid))
else:
click.echo(data.compact(item))
for line in data.details(item.diff):
click.echo(' '*4+line)
click.echo(' ' * 4 + line)
@changes.command()
def process():
@ -90,7 +93,7 @@ def history(n, d, s):
click.echo(data.compact(item))
if d:
for line in data.details(item.diff):
click.echo(' '*4+line)
click.echo(' ' * 4 + line)
if not log:
click.echo('No history')
@ -103,18 +106,18 @@ def revert(uid):
except errors.SolarError as er:
raise click.BadParameter(str(er))
@changes.command()
@click.argument('uids', nargs=-1)
@click.option('--all', is_flag=True, default=True)
def discard(uids, all):
"""
uids argument should be of a higher priority than all flag
"""
"""uids argument should be of a higher priority than all flag."""
if uids:
change.discard_uids(uids)
elif all:
change.discard_all()
@changes.command()
@click.option('--name', default=None)
def test(name):
@ -144,6 +147,7 @@ def test(name):
def clean_history():
change.clear_history()
@changes.command(help='USE ONLY FOR TESTING')
def commit():
change.commit_all()

View File

@ -12,10 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import click
import os
import re
import click
UIDS_HISTORY = os.path.join(os.getcwd(), '.solar_cli_uids')
@ -68,8 +70,9 @@ class SolarUIDParameterType(click.types.StringParamType):
try:
value = get_uid(value)
except IOError:
raise click.BadParameter("Unable to locate file %r so"
"you can't use 'last' shortcuts" % UIDS_HISTORY)
msg = ("Unable to locate file %r so"
"you can't use 'last' shortcuts" % UIDS_HISTORY)
raise click.BadParameter(msg)
return value

View File

@ -1,6 +1,24 @@
#
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
import yaml
from bunch import Bunch
import yaml
CWD = os.getcwd()
@ -26,7 +44,7 @@ def from_configs():
paths = [
os.getenv('SOLAR_CONFIG', os.path.join(CWD, '.config')),
os.path.join(CWD, '.config.override')
]
]
data = {}
def _load_from_path(data, path):

View File

@ -15,11 +15,8 @@
import handlers
# from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
# from solar.core.transports.rsync import RsyncSyncTransport
# from solar.core.transports.solar_agent_transport import SolarAgentRunTransport, SolarAgentSyncTransport
from solar.core.transports.bat import BatRunTransport, BatSyncTransport
from solar.core.transports.bat import BatRunTransport
from solar.core.transports.bat import BatSyncTransport
_default_transports = {
# 'sync': RsyncSyncTransport,
@ -39,5 +36,5 @@ def resource_action(resource, action):
def tag_action(tag, action):
#TODO
# TODO
pass

View File

@ -27,6 +27,7 @@ HANDLERS = {'ansible': AnsibleTemplate,
'none': Empty,
'puppetv2': PuppetV2}
def get(handler_name):
handler = HANDLERS.get(handler_name, None)
if handler:

View File

@ -15,16 +15,17 @@
import os
from ansible.playbook import PlayBook
from ansible import utils
from ansible import callbacks
import ansible.constants as C
from ansible.playbook import PlayBook
from ansible import utils
from fabric import api as fabric_api
from solar.core.log import log
from solar.core.handlers import base
from solar import errors
from solar.core.log import log
from solar.core.provider import SVNProvider
from solar import errors
ROLES_PATH = '/etc/ansible/roles'
@ -48,14 +49,16 @@ class AnsiblePlaybook(base.BaseHandler):
resource.metadata['actions'][action])
stats = callbacks.AggregateStats()
playbook_cb = callbacks.PlaybookCallbacks(verbose=utils.VERBOSITY)
runner_cb = callbacks.PlaybookRunnerCallbacks(stats, verbose=utils.VERBOSITY)
runner_cb = callbacks.PlaybookRunnerCallbacks(
stats, verbose=utils.VERBOSITY)
variables = resource.args_dict()
if 'roles' in variables:
self.download_roles(variables['roles'])
remote_user = variables.get('ssh_user') or C.DEFAULT_REMOTE_USER
private_key_file = variables.get('ssh_key') or C.DEFAULT_PRIVATE_KEY_FILE
private_key_file = variables.get(
'ssh_key') or C.DEFAULT_PRIVATE_KEY_FILE
if variables.get('ip'):
host = variables['ip']
transport = C.DEFAULT_TRANSPORT

View File

@ -16,8 +16,9 @@
from fabric.state import env
import os
from solar.core.handlers.base import SOLAR_TEMP_LOCAL_LOCATION
from solar.core.handlers.base import TempFileHandler
from solar.core.log import log
from solar.core.handlers.base import TempFileHandler, SOLAR_TEMP_LOCAL_LOCATION
from solar import errors
# otherwise fabric will sys.exit(1) in case of errors
@ -26,7 +27,8 @@ env.warn_only = True
# if we would have something like solar_agent that would render this then
# we would not need to render it there
# for now we redender it locally, sync to remote, run ansible on remote host as local
# for now we redender it locally, sync to remote, run ansible on remote
# host as local
class AnsibleTemplate(TempFileHandler):
def action(self, resource, action_name):
@ -35,17 +37,19 @@ class AnsibleTemplate(TempFileHandler):
log.debug('inventory_file: %s', inventory_file)
log.debug('playbook_file: %s', playbook_file)
# self.transport_sync.copy(resource, self.dirs[resource.name], self.dirs[resource.name])
self._copy_templates_and_scripts(resource, action_name)
self.transport_sync.copy(resource, self.dst, '/tmp')
self.transport_sync.copy(resource, '/vagrant/library', '/tmp')
self.transport_sync.sync_all()
# remote paths are not nested inside solar_local
remote_playbook_file = playbook_file.replace(SOLAR_TEMP_LOCAL_LOCATION, '/tmp/')
remote_inventory_file = inventory_file.replace(SOLAR_TEMP_LOCAL_LOCATION, '/tmp/')
remote_playbook_file = playbook_file.replace(
SOLAR_TEMP_LOCAL_LOCATION, '/tmp/')
remote_inventory_file = inventory_file.replace(
SOLAR_TEMP_LOCAL_LOCATION, '/tmp/')
call_args = ['ansible-playbook', '--module-path', '/tmp/library', '-i', remote_inventory_file, remote_playbook_file]
call_args = ['ansible-playbook', '--module-path', '/tmp/library',
'-i', remote_inventory_file, remote_playbook_file]
log.debug('EXECUTING: %s', ' '.join(call_args))
out = self.transport_run.run(resource, *call_args)

View File

@ -13,14 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import errno
import os
import shutil
import tempfile
import errno
from solar import utils
from solar.core.log import log
from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
from solar.core.transports.ssh import SSHRunTransport
from solar.core.transports.ssh import SSHSyncTransport
from solar import utils
tempfile.gettempdir()
@ -49,6 +50,7 @@ class BaseHandler(object):
class TempFileHandler(BaseHandler):
def __init__(self, resources, handlers=None):
super(TempFileHandler, self).__init__(resources, handlers)
self.dst = None
@ -122,7 +124,8 @@ class TempFileHandler(BaseHandler):
base_path = resource.db_obj.base_path
src_templates_dir = os.path.join(base_path, 'templates')
if os.path.exists(src_templates_dir):
trg_templates_dir = os.path.join(self.dirs[resource.name], 'templates')
trg_templates_dir = os.path.join(
self.dirs[resource.name], 'templates')
shutil.copytree(src_templates_dir, trg_templates_dir)
src_scripts_dir = os.path.join(base_path, 'scripts')
@ -153,5 +156,6 @@ class TempFileHandler(BaseHandler):
class Empty(BaseHandler):
def action(self, resource, action):
pass

View File

@ -13,19 +13,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import yaml
from solar.core.log import log
from solar.core.handlers.base import TempFileHandler
from solar.core.log import log
from solar import errors
# NOTE: We assume that:
# - puppet is installed
class Puppet(TempFileHandler):
def action(self, resource, action_name):
log.debug('Executing Puppet manifest %s %s', action_name, resource.name)
log.debug('Executing Puppet manifest %s %s',
action_name, resource.name)
action_file = self._compile_action_file(resource, action_name)
log.debug('action_file: %s', action_file)

View File

@ -19,6 +19,7 @@ from solar.core.handlers.base import TempFileHandler
class Python(TempFileHandler):
def action(self, resource, action_name):
action_file = self._compile_action_file(resource, action_name)
fabric_api.local('python {}'.format(action_file))

View File

@ -13,20 +13,23 @@
# License for the specific language governing permissions and limitations
# under the License.
from solar.core.log import log
from solar import errors
import os
from solar.core.handlers.base import TempFileHandler, SOLAR_TEMP_LOCAL_LOCATION
from solar.core.handlers.base import SOLAR_TEMP_LOCAL_LOCATION
from solar.core.handlers.base import TempFileHandler
from solar.core.log import log
from solar import errors
class Shell(TempFileHandler):
def action(self, resource, action_name):
action_file = self._compile_action_file(resource, action_name)
log.debug('action_file: %s', action_file)
action_file_name = os.path.join(self.dirs[resource.name], action_file)
action_file_name = action_file_name.replace(SOLAR_TEMP_LOCAL_LOCATION, '/tmp/')
action_file_name = action_file_name.replace(
SOLAR_TEMP_LOCAL_LOCATION, '/tmp/')
self._copy_templates_and_scripts(resource, action_name)

View File

@ -22,11 +22,13 @@ log = logging.getLogger('solar')
def setup_logger():
handler = logging.FileHandler('solar.log')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s')
formatter = logging.Formatter(
'%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s') # NOQA
handler.setFormatter(formatter)
log.addHandler(handler)
print_formatter = logging.Formatter('%(levelname)s (%(filename)s::%(lineno)s)::%(message)s')
print_formatter = logging.Formatter(
'%(levelname)s (%(filename)s::%(lineno)s)::%(message)s')
print_handler = logging.StreamHandler(stream=sys.stdout)
print_handler.setFormatter(print_formatter)
log.addHandler(print_handler)

View File

@ -22,6 +22,7 @@ from solar import utils
class BaseProvider(object):
def __init__(self, base_path=None):
if base_path is None:
self.base_path = utils.read_config()['resources-directory']
@ -33,6 +34,7 @@ class BaseProvider(object):
class DirectoryProvider(BaseProvider):
def __init__(self, directory, *args, **kwargs):
self.directory = directory
@ -40,6 +42,7 @@ class DirectoryProvider(BaseProvider):
class GitProvider(BaseProvider):
def __init__(self, repository, branch='master', path='.', *args, **kwargs):
super(GitProvider, self).__init__(*args, **kwargs)
@ -121,18 +124,21 @@ class RemoteZipProvider(BaseProvider):
class SVNProvider(BaseProvider):
"""With git you cant checkout only directory from repo,
but with svn you can
"""
def __init__(self, url, path='.', base_path=None):
self.url = url
self.path = path
self.base_path = base_path or utils.read_config()['resources-directory']
self.base_path = base_path or utils.read_config()[
'resources-directory']
if path != '.':
self.repo_directory = os.path.join(self.base_path, path)
else:
self.repo_directory = self.base_path
self.directory = os.path.join(self.repo_directory, self.url.rsplit('/', 1)[-1])
self.directory = os.path.join(
self.repo_directory, self.url.rsplit('/', 1)[-1])
def run(self):
if not os.path.exists(self.repo_directory):

View File

@ -13,29 +13,22 @@
# License for the specific language governing permissions and limitations
# under the License.
from enum import Enum
from copy import deepcopy
from multipledispatch import dispatch
import os
from solar import utils
from solar.core import validation
from solar.core import signals
from solar.events import api
from uuid import uuid4
from hashlib import md5
import os
from uuid import uuid4
from enum import Enum
from multipledispatch import dispatch
import networkx
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import Resource as DBResource
from solar.dblayer.model import StrInt
from solar.core.signals import get_mapping
from solar.core import validation
from solar.dblayer.model import StrInt
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import Resource as DBResource
from solar.events import api
from solar import utils
def read_meta(base_path):
@ -51,7 +44,8 @@ def read_meta(base_path):
return metadata
RESOURCE_STATE = Enum('ResourceState', 'created operational removed error updated')
RESOURCE_STATE = Enum(
'ResourceState', 'created operational removed error updated')
class Resource(object):
@ -59,7 +53,8 @@ class Resource(object):
# Create
@dispatch(basestring, basestring)
def __init__(self, name, base_path, args=None, tags=None, virtual_resource=None):
def __init__(self, name, base_path, args=None, tags=None,
virtual_resource=None):
args = args or {}
self.name = name
if base_path:
@ -100,9 +95,8 @@ class Resource(object):
self.db_obj.save()
# Load
@dispatch(DBResource)
@dispatch(DBResource) # NOQA
def __init__(self, resource_db):
self.db_obj = resource_db
self.name = resource_db.name
@ -117,14 +111,15 @@ class Resource(object):
inputs.setdefault('location_id', {'value': "",
'schema': 'str!'})
inputs.setdefault('transports_id', {'value': "",
'schema': 'str'})
'schema': 'str'})
for inp in ('transports_id', 'location_id'):
if inputs[inp].get('value') == '$uuid':
inputs[inp]['value'] = md5(self.name + uuid4().hex).hexdigest()
def transports(self):
db_obj = self.db_obj
return db_obj.inputs._get_field_val('transports_id', other='transports')
return db_obj.inputs._get_field_val('transports_id',
other='transports')
def ip(self):
db_obj = self.db_obj
@ -166,7 +161,6 @@ class Resource(object):
# TODO: disconnect input when it is updated and end_node
# for some input_to_input relation
self.db_obj.state = RESOURCE_STATE.updated.name
resource_inputs = self.resource_inputs()
for k, v in args.items():
self.db_obj.inputs[k] = v
@ -213,16 +207,17 @@ class Resource(object):
@property
def connections(self):
"""
Gives you all incoming/outgoing connections for current resource,
stored as:
"""Gives you all incoming/outgoing connections for current resource.
Stored as:
[(emitter, emitter_input, receiver, receiver_input), ...]
"""
rst = set()
for (emitter_resource, emitter_input), (receiver_resource, receiver_input), meta in self.graph().edges(data=True):
for (emitter_resource, emitter_input), (receiver_resource, receiver_input), meta in self.graph().edges(data=True): # NOQA
if meta:
receiver_input = '{}:{}|{}'.format(receiver_input,
meta['destination_key'], meta['tag'])
meta['destination_key'],
meta['tag'])
rst.add(
(emitter_resource, emitter_input,
@ -269,9 +264,8 @@ class Resource(object):
self.db_obj.save_lazy()
receiver.db_obj.save_lazy()
def connect_with_events(self, receiver, mapping=None, events=None,
use_defaults=False):
use_defaults=False):
mapping = get_mapping(self, receiver, mapping)
self._connect_inputs(receiver, mapping)
# signals.connect(self, receiver, mapping=mapping)
@ -292,7 +286,6 @@ class Resource(object):
self.db_obj.save_lazy()
def load(name):
r = DBResource.get(name)
@ -313,6 +306,8 @@ def load_updated(since=None, with_childs=True):
return [Resource(r) for r in DBResource.multi_get(candids)]
# TODO
def load_all():
candids = DBResource.updated.filter(StrInt.p_min(), StrInt.p_max())
return [Resource(r) for r in DBResource.multi_get(candids)]

View File

@ -16,17 +16,20 @@
from collections import defaultdict
import os
from StringIO import StringIO
from jinja2 import Environment
from jinja2 import meta
from jinja2 import Template
import yaml
from jinja2 import Template, Environment, meta
from solar.core import provider
from solar.core import signals
from solar.core.log import log
from solar.core import provider
from solar.core.resource import load as load_resource
from solar.core.resource import Resource, load_by_tags
from solar.core.resource import load_by_tags
from solar.core.resource import Resource
from solar.events.api import add_event
from solar.events.controls import React, Dep
from solar.events.controls import Dep
from solar.events.controls import React
def create(name, base_path, args=None, tags=None, virtual_resource=None):
@ -54,7 +57,8 @@ def create(name, base_path, args=None, tags=None, virtual_resource=None):
return rs
def create_resource(name, base_path, args=None, tags=None, virtual_resource=None):
def create_resource(name, base_path, args=None, tags=None,
virtual_resource=None):
args = args or {}
if isinstance(base_path, provider.BaseProvider):
base_path = base_path.directory
@ -66,9 +70,8 @@ def create_resource(name, base_path, args=None, tags=None, virtual_resource=None
return filter(lambda res: not is_connection(res), value)
args = {key: _filter(value) for key, value in args.items()}
r = Resource(
name, base_path, args=args, tags=tags, virtual_resource=virtual_resource
)
r = Resource(name, base_path, args=args,
tags=tags, virtual_resource=virtual_resource)
return r
@ -109,7 +112,9 @@ def _get_template(name, content, kwargs, inputs):
if input not in kwargs:
missing.append(input)
if missing:
raise Exception('[{0}] Validation error. Missing data in input: {1}'.format(name, missing))
err = '[{0}] Validation error. Missing data in input: {1}'
err = err.format(name, missing)
raise Exception(err)
template = Template(content, trim_blocks=True, lstrip_blocks=True)
template = template.render(str=str, zip=zip, **kwargs)
return template
@ -158,6 +163,7 @@ def extend_resources(template_resources):
log.debug('Warrning: no resources with tags: {}'.format(tags))
return resources
def update_resources(template_resources):
resources = extend_resources(template_resources)
for r in resources:
@ -197,7 +203,7 @@ def extend_events(template_events):
resources = load_by_tags(tags)
for r in resources:
parent_action = '{}.{}'.format(r.name, parent['action'])
event = {'type' : e['type'],
event = {'type': e['type'],
'state': e['state'],
'depend_action': e['depend_action'],
'parent_action': parent_action
@ -205,6 +211,7 @@ def extend_events(template_events):
events.append(event)
return events
def parse_events(template_events):
parsed_events = []
events = extend_events(template_events)
@ -223,8 +230,6 @@ def parse_events(template_events):
return parsed_events
def parse_inputs(args):
connections = []
assignments = {}
@ -272,7 +277,7 @@ def parse_connection(child_input, element):
except ValueError:
events = None
return {'child_input': child_input,
'parent' : parent,
'parent': parent,
'parent_input': parent_input,
'events' : events
'events': events
}

View File

@ -16,7 +16,6 @@
import networkx
from solar.core.log import log
from solar.dblayer.solar_models import Resource as DBResource
def guess_mapping(emitter, receiver):
@ -59,29 +58,35 @@ def location_and_transports(emitter, receiver, orig_mapping):
orig_mapping.remove(single)
def _single(single, emitter, receiver, inps_emitter, inps_receiver):
# this function is responsible for doing magic with transports_id and location_id
# this function is responsible for doing magic with
# transports_id and location_id
# it tries to be safe and smart as possible
# it connects only when 100% that it can and should
# user can always use direct mappings,
# we also use direct mappings in VR
# when we will remove location_id and transports_id from inputs then this function,
# when we will remove location_id and transports_id from
# inputs then this function,
# will be deleted too
if inps_emitter and inps_receiver:
if not inps_emitter == inps_receiver:
if not '::' in inps_receiver:
if '::' not in inps_receiver:
pass
# log.warning("Different %r defined %r => %r", single, emitter.name, receiver.name)
# log.warning("Different %r defined %r => %r",
# single, emitter.name, receiver.name)
return
else:
# log.debug("The same %r defined for %r => %r, skipping", single, emitter.name, receiver.name)
# log.debug("The same %r defined for %r => %r, skipping",
# single, emitter.name, receiver.name)
return
emitter_single = emitter.db_obj.meta_inputs[single]
receiver_single = receiver.db_obj.meta_inputs[single]
emitter_single_reverse = emitter_single.get('reverse')
receiver_single_reverse = receiver_single.get('reverse')
if inps_receiver is None and inps_emitter is not None:
# we don't connect automaticaly when receiver is None and emitter is not None
# for cases when we connect existing transports to other data containers
# we don't connect automaticaly when
# receiver is None and emitter is not None
# for cases when we connect existing transports to other data
# containers
if receiver_single_reverse:
log.info("Didn't connect automaticaly %s::%s -> %s::%s",
receiver.name,
@ -91,13 +96,15 @@ def location_and_transports(emitter, receiver, orig_mapping):
return
if emitter_single.get('is_emit') is False:
# this case is when we connect resource to transport itself
# like adding ssh_transport for solar_agent_transport and we don't want then
# like adding ssh_transport for solar_agent_transport
# and we don't want then
# transports_id to be messed
# it forbids passing this value around
# log.debug("Disabled %r mapping for %r", single, emitter.name)
return
if receiver_single.get('is_own') is False:
# this case is when we connect resource which has location_id but that is
# this case is when we connect resource which has
# location_id but that is
# from another resource
log.debug("Not is_own %r for %r ", single, emitter.name)
return
@ -124,7 +131,8 @@ def location_and_transports(emitter, receiver, orig_mapping):
# with dirty_state_ok(DBResource, ('index', )):
for single in ('transports_id', 'location_id'):
if single in inps_emitter and single in inps_receiver:
_single(single, emitter, receiver, inps_emitter[single], inps_receiver[single])
_single(single, emitter, receiver, inps_emitter[
single], inps_receiver[single])
else:
log.warning('Unable to create connection for %s with'
' emitter %s, receiver %s',
@ -151,7 +159,7 @@ def disconnect_receiver_by_input(receiver, input_name):
def detailed_connection_graph(start_with=None, end_with=None, details=False):
from solar.core.resource import Resource, load_all
from solar.core.resource import load_all
if details:
def format_for_edge(resource, input):
@ -177,7 +185,7 @@ def detailed_connection_graph(start_with=None, end_with=None, details=False):
graph.add_edge(resource.name, resource_input)
graph.node[resource_input] = inp_props
conns = resource.connections
for (emitter_resource, emitter_input, receiver_resource, receiver_input) in conns:
for (emitter_resource, emitter_input, receiver_resource, receiver_input) in conns: # NOQA
e = format_for_edge(emitter_resource, emitter_input)
r = format_for_edge(receiver_resource, receiver_input)
graph.add_edge(emitter_resource, e)

View File

@ -26,10 +26,10 @@ tokens = (
"RPAREN")
t_STRING = r'[A-Za-z0-9-_/\\]+'
t_AND = '&|,'
t_OR = r'\|'
t_LPAREN = r'\('
t_RPAREN = r'\)'
t_AND = '&|,'
t_OR = r'\|'
t_LPAREN = r'\('
t_RPAREN = r'\)'
t_ignore = ' \t\r\n'
@ -48,6 +48,7 @@ class SubexpressionWrapper(object):
class ScalarWrapper(object):
def __init__(self, value):
global expression
self.value = (set([value]) <= set(expression.tags))
@ -60,7 +61,9 @@ class ScalarWrapper(object):
def p_expression_logical_op(p):
"""expression : expression AND expression
"""Parser
expression : expression AND expression
| expression OR expression
"""
result, arg1, op, arg2 = p
@ -73,13 +76,17 @@ def p_expression_logical_op(p):
def p_expression_string(p):
"""expression : STRING
"""Parser
expression : STRING
"""
p[0] = ScalarWrapper(p[1])
def p_expression_group(p):
"""expression : LPAREN expression RPAREN
"""Parser
expression : LPAREN expression RPAREN
"""
p[0] = p[2]
@ -90,10 +97,12 @@ def t_error(t):
def p_error(p):
raise errors.ParseError("Syntax error at '{0}'".format(getattr(p, 'value', '')))
raise errors.ParseError(
"Syntax error at '{0}'".format(getattr(p, 'value', '')))
class Expression(object):
def __init__(self, expression_text, tags):
self.expression_text = expression_text
self.tags = tags

View File

@ -16,7 +16,8 @@
class Executor(object):
def __init__(self, resource, executor, params=None):
"""
"""Executor
:param resource: solar resource
:param executor: callable executor, that will perform action
:param params: optional argument
@ -90,7 +91,8 @@ class SolarTransport(object):
except AttributeError:
if name is None:
name = self.preffered_transport_name
transport = next(x for x in resource.transports() if x['name'] == name)
transport = next(x for x in resource.transports()
if x['name'] == name)
setattr(resource, key, transport)
return transport
@ -102,9 +104,7 @@ class SolarTransport(object):
class SyncTransport(SolarTransport):
"""
Transport that is responsible for file / directory syncing.
"""
"""Transport that is responsible for file / directory syncing."""
preffered_transport_name = None
_mode = 'sync'
@ -135,7 +135,8 @@ class SyncTransport(SolarTransport):
executor.run(self)
def sync_all(self):
"""
"""Syncs all
It checks if action is required first,
then runs all sequentially.
Could be someday changed to parallel thing.
@ -146,8 +147,9 @@ class SyncTransport(SolarTransport):
class RunTransport(SolarTransport):
"""
Transport that is responsible for executing remote commands, rpc like thing.
"""Transport that is responsible for executing remote commands,
rpc like thing
"""
preffered_transport_name = None

View File

@ -1,8 +1,27 @@
from solar.core.transports.base import SyncTransport, RunTransport, SolarTransport
from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from solar.core.transports.base import RunTransport
from solar.core.transports.base import SolarTransport
from solar.core.transports.base import SyncTransport
from solar.core.transports.rsync import RsyncSyncTransport
from solar.core.transports.ssh import SSHRunTransport
from solar.core.transports.ssh import SSHSyncTransport
try:
from solar.core.transports.solar_agent_transport import SolarAgentRunTransport, SolarAgentSyncTransport
from solar.core.transports.solar_agent_transport import SolarAgentRunTransport # NOQA
from solar.core.transports.solar_agent_transport import SolarAgentSyncTransport # NOQA
except ImportError:
_solar_agent_available = False
else:
@ -63,7 +82,8 @@ class BatTransport(SolarTransport):
except AttributeError:
transports = resource.transports()
for pref in self._order:
selected = next((x for x in transports if x['name'] == pref), None)
selected = next(
(x for x in transports if x['name'] == pref), None)
if selected:
break
if not selected:
@ -78,7 +98,8 @@ class BatTransport(SolarTransport):
def get_transport_data(self, resource, *args, **kwargs):
self.select_valid_transport(resource)
return super(BatTransport, self).get_transport_data(resource, *args, **kwargs)
return super(BatTransport, self).get_transport_data(resource,
*args, **kwargs)
def bind_with(self, other):
self._other_remember = other

View File

@ -1,14 +1,30 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO: change to something less naive
#
import libtorrent as lt
from operator import attrgetter
import time
import sys
import os
import sys
import time
state_str = ['queued', 'checking', 'downloading metadata', \
'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume']
import libtorrent as lt
state_str = ['queued', 'checking', 'downloading metadata',
'downloading', 'finished', 'seeding', 'allocating',
'checking fastresume']
class MultiTorrent(object):
@ -31,7 +47,8 @@ class MultiTorrent(object):
@property
def progress(self):
total_progress = map(attrgetter('progress'), map(lambda x: x.status(), self.torrents))
total_progress = map(attrgetter('progress'), map(
lambda x: x.status(), self.torrents))
return sum(total_progress) / len(total_progress)
def numbers(self):
@ -52,10 +69,10 @@ def init_session(args, seed=False):
if os.path.exists(magnet_or_path):
e = lt.bdecode(open(magnet_or_path, 'rb').read())
info = lt.torrent_info(e)
params = { 'save_path': save_path,
'storage_mode': lt.storage_mode_t.storage_mode_sparse,
'ti': info,
'seed_mode': seed}
params = {'save_path': save_path,
'storage_mode': lt.storage_mode_t.storage_mode_sparse,
'ti': info,
'seed_mode': seed}
h = ses.add_torrent(params)
else:
h = ses.add_torrent({
@ -105,21 +122,18 @@ def _seeder(torrents, save_path='.', max_seed_ratio=5):
if peers_0 < now - no_peers:
sys.exit("No peers for %d seconds exiting" % no_peers)
if i % 5 == 0:
print "%.2f%% up=%.1f kB/s peers=%s total_upload_B=%.1f" \
% (mt.progress * 100,
s.upload_rate / 1000,
s.num_peers,
(s.total_upload))
print("%.2f%% up=%.1f kB/s peers=%s total_upload_B=%.1f"
% (mt.progress * 100, s.upload_rate / 1000, s.num_peers,
s.total_upload))
if s.num_peers != 0:
peers_0 = now
sys.stdout.flush()
time.sleep(1)
else:
print 'Seed timeout exiting'
print('Seed timeout exiting')
sys.exit(0)
def _getter(torrents, max_seed_ratio=3):
ses = lt.session()
ses.listen_on(6881, 6981)
@ -136,9 +150,9 @@ def _getter(torrents, max_seed_ratio=3):
# mt.force_reannounce()
s = ses.status()
if i % 5 == 0:
print '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s' % \
(mt.progress * 100, s.download_rate / 1000, s.upload_rate / 1000, \
s.num_peers, mt.numbers())
print('%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s' %
(mt.progress * 100, s.download_rate / 1000, s.upload_rate / 1000,
s.num_peers, mt.numbers()))
now = time.time()
current_state = (now, mt.progress)
if current_state[-1] != last_state[-1]:
@ -154,7 +168,7 @@ def _getter(torrents, max_seed_ratio=3):
args = sys.argv[:]
args[-2] = 's'
args.insert(0, sys.executable)
print "Entering seeder mode"
print("Entering seeder mode")
check_output(args, shell=False)
else:
# err
@ -164,7 +178,7 @@ if __name__ == '__main__':
mode = sys.argv[1]
torrents = sys.argv[2]
torrents = [x.split('|') for x in torrents.split(';')]
print repr(torrents)
print(repr(torrents))
if mode == 'g':
_getter(torrents, *sys.argv[3:])
elif mode == 's':

View File

@ -15,7 +15,8 @@
from fabric import api as fabric_api
from solar.core.log import log
from solar.core.transports.base import SyncTransport, Executor
from solar.core.transports.base import Executor
from solar.core.transports.base import SyncTransport
# XXX:
# currently we don't support key verification or acceptation
@ -29,7 +30,7 @@ class RsyncSyncTransport(SyncTransport):
transport = self.get_transport_data(resource)
host = resource.ip()
user = transport['user']
port = transport['port']
# port = transport['port']
# TODO: user port somehow
key = transport['key']
return {

View File

@ -15,8 +15,11 @@
from solar_agent.client import SolarAgentClient
from solar.core.transports.base import RunTransport, SyncTransport, Executor, SolarRunResult
from solar.core.log import log
from solar.core.transports.base import Executor
from solar.core.transports.base import RunTransport
from solar.core.transports.base import SolarRunResult
from solar.core.transports.base import SyncTransport
class SolarAgentTransport(object):
@ -29,8 +32,8 @@ class SolarAgentTransport(object):
auth = transport['password']
transport_class = transport.get('transport_class')
client = SolarAgentClient(auth={'user': user, 'auth': auth},
transport_args=(host, port),
transport_class=transport_class)
transport_args=(host, port),
transport_class=transport_class)
return client
@ -61,4 +64,3 @@ class SolarAgentRunTransport(RunTransport, SolarAgentTransport):
client = self.get_client(resource)
res = client.run(' '.join(args), **kwargs)
return self.get_result(res)

View File

@ -12,15 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from contextlib import nested
import os
from fabric import api as fabric_api
from fabric.contrib import project as fabric_project
from solar.core.log import log
from solar.core.transports.base import RunTransport, SyncTransport, Executor
from solar.core.transports.base import Executor
from solar.core.transports.base import RunTransport
from solar.core.transports.base import SolarRunResult
from solar.core.transports.base import SyncTransport
class _SSHTransport(object):
@ -88,9 +90,7 @@ class SSHRunTransport(RunTransport, _SSHTransport):
preffered_transport_name = 'ssh'
def get_result(self, output):
"""
Needed for compatibility with other handlers / transports
"""
"""Needed for compatibility with other handlers / transports"""
return SolarRunResult(output)
def run(self, resource, *args, **kwargs):

View File

@ -2,16 +2,18 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fabric import api as fabric_api
from solar.core.log import log
from solar.core.transports.base import RunTransport
@ -33,7 +35,6 @@ class _RawSSHTransport(object):
return ('ssh', '-i', props['ssh_key'])
class RawSSHRunTransport(RunTransport, _RawSSHTransport):
def run(self, resource, *args, **kwargs):
@ -59,4 +60,3 @@ class RawSSHRunTransport(RunTransport, _RawSSHTransport):
log.debug("SSH CMD: %r", ssh_cmd)
return fabric_api.local(' '.join(ssh_cmd))

View File

@ -1,16 +1,29 @@
from solar.core.log import log
from solar.core.transports.ssh import (SSHSyncTransport,
SSHRunTransport)
from solar.core.transports.base import SyncTransport, Executor
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import errno
from collections import defaultdict
from operator import attrgetter, itemgetter
import libtorrent as lt
import os
from uuid import uuid4
import libtorrent as lt
from solar.core.log import log
from solar.core.transports.base import Executor
from solar.core.transports.base import SyncTransport
from solar.core.transports.ssh import SSHSyncTransport
class TorrentSyncTransport(SyncTransport):
@ -45,7 +58,8 @@ class TorrentSyncTransport(SyncTransport):
def _create_torrent(self, resource, fs, root='.', use_sudo=False):
t = lt.create_torrent(fs)
transports = resource.transports()
torrent_transport = next((x for x in transports if x['name'] == 'torrent'))
torrent_transport = next(
(x for x in transports if x['name'] == 'torrent'))
trackers = torrent_transport['trackers']
for tracker in trackers:
t.add_tracker(tracker)
@ -77,7 +91,8 @@ class TorrentSyncTransport(SyncTransport):
# we don't need use sudo there for now
from fabric import api as fabric_api
torrents = self._torrents + self._sudo_torrents
to_seed = ["%s|%s" % (os.path.abspath(os.path.join(x[2], '..')), x[0]) for x in torrents]
to_seed = ["%s|%s" % (os.path.abspath(
os.path.join(x[2], '..')), x[0]) for x in torrents]
seed_args = ';'.join(to_seed)
# TODO: 'g' is just for debug, it should be 's', remove when sure
cmd = ['/usr/bin/python',
@ -95,7 +110,8 @@ class TorrentSyncTransport(SyncTransport):
torrents = self._torrents
else:
torrents = self._sudo_torrents
to_get = ["%s|%s" % (os.path.abspath(os.path.join(x[2], '..')), x[1]) for x in torrents]
to_get = ["%s|%s" % (os.path.abspath(
os.path.join(x[2], '..')), x[1]) for x in torrents]
get_args = ';'.join(to_get)
cmd = ['/usr/bin/python',
'/var/tmp/solar_torrent.py',
@ -115,4 +131,3 @@ class TorrentSyncTransport(SyncTransport):
self._start_remote_fetch(resource, use_sudo=False)
if self._sudo_torrents:
self._start_remote_fetch(resource, use_sudo=True)

View File

@ -56,14 +56,13 @@ mount_points:
"""
import json
from jsonschema import validate, ValidationError
from jsonschema import validate
from jsonschema import ValidationError
import requests
from solar.core.log import log
def schema_input_type(schema):
"""Input type from schema
@ -99,7 +98,8 @@ def _construct_jsonschema(schema, definition_base=''):
return {'type': 'boolean'}, {}
if isinstance(schema, list):
items, definitions = _construct_jsonschema(schema[0], definition_base=definition_base)
items, definitions = _construct_jsonschema(
schema[0], definition_base=definition_base)
return {
'type': 'array',
@ -114,9 +114,11 @@ def _construct_jsonschema(schema, definition_base=''):
if isinstance(v, dict) or isinstance(v, list):
key = '{}_{}'.format(definition_base, k)
properties[k] = {'$ref': '#/definitions/{}'.format(key)}
definitions[key], new_definitions = _construct_jsonschema(v, definition_base=key)
definitions[key], new_definitions = _construct_jsonschema(
v, definition_base=key)
else:
properties[k], new_definitions = _construct_jsonschema(v, definition_base=definition_base)
properties[k], new_definitions = _construct_jsonschema(
v, definition_base=definition_base)
definitions.update(new_definitions)
@ -177,7 +179,7 @@ def validate_resource(r):
for input_name, _ in inputs.items():
errors = validate_input(
args.get(input_name),
#jsonschema=input_definition.get('jsonschema'),
# jsonschema=input_definition.get('jsonschema'),
schema=r.db_obj.meta_inputs[input_name]['schema']
)
if errors:

View File

@ -22,6 +22,7 @@ from solar.events.controls import Dep, React, StateChange
from solar.dblayer.solar_models import Resource
def create_event(event_dict):
etype = event_dict['etype']
kwargs = {'child': event_dict['child'],
@ -125,7 +126,6 @@ def bft_events_graph(start):
return dg
def build_edges(changes_graph, events):
"""
:param changes_graph: nx.DiGraph object with actions to be executed
@ -143,7 +143,8 @@ def build_edges(changes_graph, events):
event_name = stack.pop(0)
if event_name in events_graph:
log.debug('Next events after %s are %s', event_name, events_graph.successors(event_name))
log.debug('Next events after %s are %s', event_name,
events_graph.successors(event_name))
else:
log.debug('No outgoing events based on %s', event_name)

View File

@ -34,6 +34,7 @@ trigger action even if no changes noticed on dependent resource.
from solar.dblayer.solar_models import Resource
from solar.dblayer.model import DBLayerNotFound
class Event(object):
etype = None
@ -84,12 +85,13 @@ class Dependency(Event):
def insert(self, changed_resources, changes_graph):
if (self.parent_node in changes_graph and
self.child_node in changes_graph):
self.child_node in changes_graph):
changes_graph.add_edge(
self.parent_node, self.child_node, state=self.state)
Dep = Dependency
class React(Event):
etype = 'react_on'
@ -99,7 +101,8 @@ class React(Event):
if self.parent_node in changes_graph:
if self.child_node not in changes_graph:
try:
location_id = Resource.get(self.child).inputs['location_id']
location_id = Resource.get(self.child).inputs[
'location_id']
except DBLayerNotFound:
location_id = None
changes_graph.add_node(

View File

@ -1,3 +0,0 @@

View File

@ -44,7 +44,6 @@ def start_from(dg, start_nodes):
if not preds and node in start_nodes:
visited.add(node)
if preds:
for pred in preds:
if pred not in visited:
@ -63,9 +62,11 @@ def validate(dg, start_nodes, end_nodes, err_msgs):
for n in end_nodes:
if n not in dg:
if start_nodes:
error_msgs.append('No path from {} to {}'.format(start_nodes, n))
error_msgs.append(
'No path from {} to {}'.format(start_nodes, n))
else:
error_msgs.append(not_in_the_graph_msg.format(n, dg.graph['uid']))
error_msgs.append(
not_in_the_graph_msg.format(n, dg.graph['uid']))
return error_msgs

View File

@ -86,7 +86,7 @@ def parse_plan(plan_path):
defaults = {
'status': 'PENDING',
'errmsg': '',
}
}
defaults.update(task['parameters'])
dg.add_node(
task['uid'], **defaults)
@ -129,7 +129,6 @@ def create_plan(plan_path, save=True):
return create_plan_from_graph(dg, save=save)
def reset_by_uid(uid, state_list=None):
dg = get_graph(uid)
return reset(dg, state_list=state_list)

View File

@ -51,8 +51,10 @@ def type_based_rule(dg, inprogress, item):
type_limit: 2
"""
_type = dg.node[item].get('resource_type')
if 'type_limit' not in dg.node[item]: return True
if not _type: return True
if 'type_limit' not in dg.node[item]:
return True
if not _type:
return True
type_count = 0
for n in inprogress:
@ -63,7 +65,8 @@ def type_based_rule(dg, inprogress, item):
def target_based_rule(dg, inprogress, item, limit=1):
target = dg.node[item].get('target')
if not target: return True
if not target:
return True
target_count = 0
for n in inprogress:

View File

@ -22,5 +22,5 @@ app = Celery(
include=['solar.system_log.tasks', 'solar.orchestration.tasks'],
backend=_url,
broker=_url)
app.conf.update(CELERY_ACCEPT_CONTENT = ['json'])
app.conf.update(CELERY_TASK_SERIALIZER = 'json')
app.conf.update(CELERY_ACCEPT_CONTENT=['json'])
app.conf.update(CELERY_TASK_SERIALIZER='json')

View File

@ -54,10 +54,12 @@ class ReportTask(task.Task):
report_task = partial(app.task, base=ReportTask, bind=True)
@task_prerun.connect
def start_solar_session(task_id, task, *args, **kwargs):
ModelMeta.session_start()
@task_postrun.connect
def end_solar_session(task_id, task, *args, **kwargs):
ModelMeta.session_end()
@ -104,7 +106,7 @@ def fault_tolerance(ctxt, percent):
if dg.node[s]['status'] == 'SUCCESS':
success += 1
succes_percent = (success/lth) * 100
succes_percent = (success / lth) * 100
if succes_percent < percent:
raise Exception('Cant proceed with, {0} < {1}'.format(
succes_percent, percent))
@ -117,7 +119,8 @@ def echo(ctxt, message):
@report_task(name='anchor')
def anchor(ctxt, *args):
# such tasks should be walked when atleast 1/3/exact number of resources visited
# such tasks should be walked when atleast 1/3/exact number of resources
# visited
dg = graph.get_graph('current')
for s in dg.predecessors(ctxt.request.id):
if dg.node[s]['status'] != 'SUCCESS':
@ -155,6 +158,7 @@ def soft_stop(plan_uid):
dg.node[n]['status'] = 'SKIPPED'
graph.update_graph(dg)
@app.task(name='schedule_next')
def schedule_next(task_id, status, errmsg=None):
plan_uid, task_name = task_id.rsplit(':', 1)

View File

@ -34,5 +34,6 @@ def write_graph(plan):
nx.write_dot(plan, '{name}.dot'.format(name=plan.graph['name']))
subprocess.call(
'tred {name}.dot | dot -Tsvg -o {name}.svg'.format(name=plan.graph['name']),
'tred {name}.dot | dot -Tsvg -o {name}.svg'.format(
name=plan.graph['name']),
shell=True)

View File

@ -29,6 +29,7 @@ from solar.errors import CannotFindID
from solar.dblayer.solar_models import Resource, LogItem, CommitedResource, StrInt
def guess_action(from_, to):
# NOTE(dshulyak) imo the way to solve this - is dsl for orchestration,
# something where this action will be excplicitly specified
@ -47,12 +48,12 @@ def create_diff(staged, commited):
def create_logitem(resource, action, diffed, connections_diffed,
base_path=''):
return LogItem.new(
{'resource': resource,
'action': action,
'diff': diffed,
'connections_diff': connections_diffed,
'base_path': base_path,
'log': 'staged'})
{'resource': resource,
'action': action,
'diff': diffed,
'connections_diff': connections_diffed,
'base_path': base_path,
'log': 'staged'})
def create_sorted_diff(staged, commited):
@ -104,7 +105,7 @@ def stage_changes():
last = LogItem.history_last()
since = StrInt.greater(last.updated) if last else None
staged_log = utils.solar_map(make_single_stage_item,
resource.load_updated(since), concurrency=10)
resource.load_updated(since), concurrency=10)
staged_log = filter(None, staged_log)
return staged_log
@ -139,9 +140,10 @@ def _get_args_to_update(args, connections):
"""
inherited = [i[3].split(':')[0] for i in connections]
return {
key:args[key] for key in args
key: args[key] for key in args
if key not in inherited
}
}
def revert_uids(uids):
"""
@ -167,14 +169,16 @@ def _revert_remove(logitem):
"""
commited = CommitedResource.get(logitem.resource)
args = dictdiffer.revert(logitem.diff, commited.inputs)
connections = dictdiffer.revert(logitem.connections_diff, sorted(commited.connections))
connections = dictdiffer.revert(
logitem.connections_diff, sorted(commited.connections))
resource.Resource(logitem.resource, logitem.base_path,
args=_get_args_to_update(args, connections), tags=commited.tags)
args=_get_args_to_update(args, connections), tags=commited.tags)
for emitter, emitter_input, receiver, receiver_input in connections:
emmiter_obj = resource.load(emitter)
receiver_obj = resource.load(receiver)
signals.connect(emmiter_obj, receiver_obj, {emitter_input: receiver_input})
signals.connect(emmiter_obj, receiver_obj, {
emitter_input: receiver_input})
def _update_inputs_connections(res_obj, args, old_connections, new_connections):
@ -213,7 +217,8 @@ def _revert_update(logitem):
res_obj = resource.load(logitem.resource)
commited = res_obj.load_commited()
connections = dictdiffer.revert(logitem.connections_diff, sorted(commited.connections))
connections = dictdiffer.revert(
logitem.connections_diff, sorted(commited.connections))
args = dictdiffer.revert(logitem.diff, commited.inputs)
_update_inputs_connections(
@ -237,12 +242,14 @@ def _discard_remove(item):
def _discard_update(item):
resource_obj = resource.load(item.resource)
old_connections = resource_obj.connections
new_connections = dictdiffer.revert(item.connections_diff, sorted(old_connections))
new_connections = dictdiffer.revert(
item.connections_diff, sorted(old_connections))
args = dictdiffer.revert(item.diff, resource_obj.args)
_update_inputs_connections(
resource_obj, _get_args_to_update(args, new_connections), old_connections, new_connections)
def _discard_run(item):
resource.load(item.resource).remove(force=True)
@ -265,6 +272,7 @@ def discard_uids(uids):
def discard_uid(uid):
return discard_uids([uid])
def discard_all():
staged_log = data.SL()
return discard_uids([l.uid for l in staged_log])
@ -277,6 +285,7 @@ def commit_all():
for item in data.SL():
move_to_commited(item.log_action)
def clear_history():
LogItem.delete_all()
CommitedResource.delete_all()

View File

@ -15,9 +15,9 @@
from enum import Enum
CHANGES = Enum(
'Changes',
'run remove update'
)
'Changes',
'run remove update'
)
STATES = Enum('States', 'error inprogress pending success')

View File

@ -16,17 +16,16 @@
from solar.dblayer.solar_models import LogItem
def SL():
rst = LogItem.composite.filter({'log': 'staged'})
return LogItem.multi_get(rst)
def CL():
rst = LogItem.composite.filter({'log': 'history'})
return LogItem.multi_get(rst)
def compact(logitem):
return 'log task={} uid={}'.format(logitem.log_action, logitem.uid)
@ -36,13 +35,13 @@ def details(diff):
for type_, val, change in diff:
if type_ == 'add':
for key, val in change:
rst.append('++ {}: {}'.format(key ,val))
rst.append('++ {}: {}'.format(key, val))
elif type_ == 'change':
rst.append('-+ {}: {} >> {}'.format(
unwrap_change_val(val), change[0], change[1]))
elif type_ == 'remove':
for key, val in change:
rst.append('-- {}: {}'.format(key ,val))
rst.append('-- {}: {}'.format(key, val))
return rst
@ -62,4 +61,3 @@ def unwrap_change_val(val):
return '{}:[{}] '.format(val[0], val[1])
else:
return val

View File

@ -260,7 +260,9 @@ class ResourceListTemplate(BaseTemplate):
)
def on_each(self, resource_path, args=None):
"""Create resource form resource_path on each resource in self.resources.
"""Create resource form resource_path
on each resource in self.resources
"""
args = args or {}

View File

@ -23,6 +23,7 @@ import pytest
def patched_get_bucket_name(cls):
return cls.__name__ + str(time.time())
@pytest.fixture
def resources():
base_path = os.path.join(
@ -30,15 +31,16 @@ def resources():
'resource_fixtures')
node_path = os.path.join(base_path, 'node')
node1 = Resource('node1', node_path, args={'ip':'10.0.0.1'})
node2 = Resource('node2', node_path, args={'ip':'10.0.0.2'})
node1 = Resource('node1', node_path, args={'ip': '10.0.0.1'})
node2 = Resource('node2', node_path, args={'ip': '10.0.0.2'})
base_service_path = os.path.join(base_path, 'base_service')
service1 = Resource('service1', base_service_path)
return {'node1' : node1,
'node2' : node2,
return {'node1': node1,
'node2': node2,
'service1': service1
}
}
@pytest.fixture(autouse=True)
def setup(request):
@ -53,15 +55,20 @@ def setup(request):
for model in ModelMeta._defined_models:
model.bucket = get_bucket(None, model, ModelMeta)
def pytest_runtest_teardown(item, nextitem):
ModelMeta.session_end(result=True)
return nextitem
# It will run before all fixtures
def pytest_runtest_setup(item):
ModelMeta.session_start()
# it will run after fixtures but before test
def pytest_runtest_call(item):
ModelMeta.session_end()
ModelMeta.session_start()

View File

@ -24,24 +24,26 @@ def staged():
return {'id': 'res.1',
'tags': ['res', 'node.1'],
'input': {'ip': {'value': '10.0.0.2'},
'list_val': {'value': [1, 2]}},
'list_val': {'value': [1, 2]}},
'metadata': {},
'connections': [
['node.1', 'res.1', ['ip', 'ip']],
['node.1', 'res.1', ['key', 'key']]]
}
@fixture
def commited():
return {'id': 'res.1',
'tags': ['res', 'node.1'],
'input': {'ip': '10.0.0.2',
'list_val': [1]},
'list_val': [1]},
'metadata': {},
'connections': [
['node.1', 'res.1', ['ip', 'ip']]]
}
@fixture
def full_diff(staged):
return change.create_diff(staged, {})
@ -54,7 +56,8 @@ def diff_for_update(staged, commited):
def test_create_diff_with_empty_commited(full_diff):
# add will be executed
expected = [('add', '', [('connections', [['node.1', 'res.1', ['ip', 'ip']], ['node.1', 'res.1', ['key', 'key']]]), ('input', {'ip': {'value': '10.0.0.2'}, 'list_val': {'value': [1, 2]}}), ('metadata', {}), ('id', 'res.1'), ('tags', ['res', 'node.1'])])]
expected = [('add', '', [('connections', [['node.1', 'res.1', ['ip', 'ip']], ['node.1', 'res.1', ['key', 'key']]]), ('input', {
'ip': {'value': '10.0.0.2'}, 'list_val': {'value': [1, 2]}}), ('metadata', {}), ('id', 'res.1'), ('tags', ['res', 'node.1'])])]
assert full_diff == expected
@ -62,8 +65,8 @@ def test_create_diff_modified(diff_for_update):
assert diff_for_update == [
('add', 'connections',
[(1, ['node.1', 'res.1', ['key', 'key']])]),
('change', 'input.ip', ('10.0.0.2', {'value': '10.0.0.2'})),
('change', 'input.list_val', ([1], {'value': [1, 2]}))]
('change', 'input.ip', ('10.0.0.2', {'value': '10.0.0.2'})),
('change', 'input.list_val', ([1], {'value': [1, 2]}))]
def test_verify_patch_creates_expected(staged, diff_for_update, commited):
@ -79,7 +82,7 @@ def test_revert_update(staged, diff_for_update, commited):
@fixture
def resources():
r = {'n.1':
{'uid': 'n.1',
{'uid': 'n.1',
'args': {'ip': '10.20.0.2'},
'connections': [],
'tags': []},
@ -88,7 +91,7 @@ def resources():
'args': {'ip': '10.20.0.2'},
'connections': [['n.1', 'r.1', ['ip', 'ip']]],
'tags': []},
'h.1':
'h.1':
{'uid': 'h.1',
'args': {'ip': '10.20.0.2',
'ips': ['10.20.0.2']},

View File

@ -99,7 +99,8 @@ def rmq_deps():
'rmq.2': [evapi.Dep('rmq.2', 'run', 'success', 'rmq_cluster.2', 'join')],
'rmq.3': [evapi.Dep('rmq.3', 'run', 'success', 'rmq_cluster.3', 'join')],
'rmq_cluster.1': [
evapi.Dep('rmq_cluster.1', 'create', 'success', 'rmq_cluster.2', 'join'),
evapi.Dep('rmq_cluster.1', 'create',
'success', 'rmq_cluster.2', 'join'),
evapi.Dep('rmq_cluster.1', 'create', 'success', 'rmq_cluster.3', 'join')]}
@ -121,13 +122,16 @@ def test_riak():
events = {
'riak_service1': [
evapi.React('riak_service1', 'run', 'success', 'riak_service2', 'run'),
evapi.React('riak_service1', 'run', 'success',
'riak_service2', 'run'),
evapi.React('riak_service1', 'run', 'success', 'riak_service3', 'run')],
'riak_service3': [
evapi.React('riak_service3', 'join', 'success', 'riak_service1', 'commit'),
evapi.React('riak_service3', 'join', 'success',
'riak_service1', 'commit'),
evapi.React('riak_service3', 'run', 'success', 'riak_service3', 'join')],
'riak_service2': [
evapi.React('riak_service2', 'run', 'success', 'riak_service2', 'join'),
evapi.React('riak_service2', 'run', 'success',
'riak_service2', 'join'),
evapi.React('riak_service2', 'join', 'success', 'riak_service1', 'commit')],
}
@ -135,4 +139,5 @@ def test_riak():
changes_graph = nx.MultiDiGraph()
changes_graph.add_node('riak_service1.run')
evapi.build_edges(changes_graph, events)
assert set(changes_graph.predecessors('riak_service1.commit')) == {'riak_service2.join', 'riak_service3.join'}
assert set(changes_graph.predecessors('riak_service1.commit')) == {
'riak_service2.join', 'riak_service3.join'}

View File

@ -34,6 +34,7 @@ def test_simple_plan_created_and_loaded(simple):
plan = graph.get_plan(simple.graph['uid'])
assert set(plan.nodes()) == {'just_fail', 'echo_stuff'}
def test_reset_all_states(simple):
for n in simple:
simple.node[n]['status'] == states.ERROR.name
@ -57,16 +58,19 @@ def test_wait_finish(simple):
for n in simple:
simple.node[n]['status'] = states.SUCCESS.name
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {'SKIPPED': 0, 'SUCCESS': 2, 'NOOP': 0, 'ERROR': 0, 'INPROGRESS': 0, 'PENDING': 0}
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 2, 'NOOP': 0, 'ERROR': 0, 'INPROGRESS': 0, 'PENDING': 0}
def test_several_updates(simple):
simple.node['just_fail']['status'] = states.ERROR.name
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {'SKIPPED': 0, 'SUCCESS': 0, 'NOOP': 0, 'ERROR': 1, 'INPROGRESS': 0, 'PENDING': 1}
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 0, 'NOOP': 0, 'ERROR': 1, 'INPROGRESS': 0, 'PENDING': 1}
simple.node['echo_stuff']['status'] = states.ERROR.name
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {'SKIPPED': 0, 'SUCCESS': 0, 'NOOP': 0, 'ERROR': 2, 'INPROGRESS': 0, 'PENDING': 0}
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 0, 'NOOP': 0, 'ERROR': 2, 'INPROGRESS': 0, 'PENDING': 0}

View File

@ -42,6 +42,7 @@ def dg_ex1():
def test_end_at(dg_ex1, end_nodes, visited):
assert set(filters.end_at(dg_ex1, end_nodes)) == visited
@mark.parametrize("start_nodes,visited", [
(['n3'], {'n3'}),
(['n1'], {'n1', 'n2', 'n4'}),
@ -50,6 +51,7 @@ def test_end_at(dg_ex1, end_nodes, visited):
def test_start_from(dg_ex1, start_nodes, visited):
assert set(filters.start_from(dg_ex1, start_nodes)) == visited
@fixture
def dg_ex2():
dg = nx.DiGraph()
@ -68,11 +70,13 @@ def riak_plan():
def test_riak_start_node1(riak_plan):
assert filters.start_from(riak_plan, ['node1.run']) == {'node1.run', 'hosts_file1.run', 'riak_service1.run'}
assert filters.start_from(riak_plan, ['node1.run']) == {
'node1.run', 'hosts_file1.run', 'riak_service1.run'}
def test_riak_end_hosts_file1(riak_plan):
assert filters.end_at(riak_plan, ['hosts_file1.run']) == {'node1.run', 'hosts_file1.run'}
assert filters.end_at(riak_plan, ['hosts_file1.run']) == {
'node1.run', 'hosts_file1.run'}
def test_start_at_two_nodes(riak_plan):
@ -83,7 +87,8 @@ def test_start_at_two_nodes(riak_plan):
def test_initial_from_node1_traverse(riak_plan):
filters.filter(riak_plan, start=['node1.run'])
pending = {n for n in riak_plan if riak_plan.node[n]['status'] == states.PENDING.name}
pending = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.PENDING.name}
assert pending == {'hosts_file1.run', 'riak_service1.run', 'node1.run'}
@ -92,7 +97,8 @@ def test_second_from_node2_with_node1_walked(riak_plan):
for n in success:
riak_plan.node[n]['status'] = states.SUCCESS.name
filters.filter(riak_plan, start=['node2.run'])
pending = {n for n in riak_plan if riak_plan.node[n]['status'] == states.PENDING.name}
pending = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.PENDING.name}
assert pending == {'hosts_file2.run', 'riak_service2.run',
'node2.run', 'riak_service2.join'}
@ -102,6 +108,7 @@ def test_end_joins(riak_plan):
riak_plan,
start=['node1.run', 'node2.run', 'node3.run'],
end=['riak_service2.join', 'riak_service3.join'])
skipped = {n for n in riak_plan if riak_plan.node[n]['status'] == states.SKIPPED.name}
skipped = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.SKIPPED.name}
assert skipped == {'riak_service1.commit'}

View File

@ -46,8 +46,8 @@ def test_type_limit_rule(dg):
def test_items_rule(dg):
assert limits.items_rule(dg, ['1']*99, '2')
assert limits.items_rule(dg, ['1']*99, '2', limit=10) == False
assert limits.items_rule(dg, ['1'] * 99, '2')
assert limits.items_rule(dg, ['1'] * 99, '2', limit=10) == False
@fixture

View File

@ -23,13 +23,13 @@ from solar.dblayer.model import ModelMeta
def tagged_resources():
tags = ['n1', 'n2', 'n3']
t1 = Resource.from_dict('t1',
{'name': 't1', 'tags': tags, 'base_path': 'x'})
{'name': 't1', 'tags': tags, 'base_path': 'x'})
t1.save_lazy()
t2 = Resource.from_dict('t2',
{'name': 't2', 'tags': tags, 'base_path': 'x'})
{'name': 't2', 'tags': tags, 'base_path': 'x'})
t2.save_lazy()
t3 = Resource.from_dict('t3',
{'name': 't3', 'tags': tags, 'base_path': 'x'})
{'name': 't3', 'tags': tags, 'base_path': 'x'})
t3.save_lazy()
ModelMeta.save_all_lazy()
return [t1, t2, t3]

View File

@ -19,6 +19,7 @@ from solar.core import signals
class TestResource(base.BaseResourceTest):
def test_resource_args(self):
sample_meta_dir = self.make_resource_meta("""
id: sample

View File

@ -20,6 +20,7 @@ import pytest
class TestBaseInput(base.BaseResourceTest):
def test_no_self_connection(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
@ -40,7 +41,6 @@ input:
'Trying to connect value-.* to itself'):
xs.connect(sample, sample, {'value'})
def test_input_dict_type(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
@ -83,7 +83,7 @@ input:
sample1.args['values'],
{'a': 3}
)
#self.assertEqual(
# self.assertEqual(
# sample2.args['values'],
# {'a': 2}
#)
@ -138,11 +138,11 @@ input:
xs.connect(sample_port, sample)
self.assertEqual(sample.args['ip'], sample_ip.args['ip'])
self.assertEqual(sample.args['port'], sample_port.args['port'])
#self.assertEqual(
# self.assertEqual(
# sample.args['ip'].emitter,
# sample_ip.args['ip']
#)
#self.assertEqual(
# self.assertEqual(
# sample.args['port'].emitter,
# sample_port.args['port']
#)
@ -171,7 +171,7 @@ input:
xs.connect(sample1, sample)
self.assertEqual(sample1.args['ip'], sample.args['ip'])
#self.assertEqual(len(list(sample1.args['ip'].receivers)), 1)
#self.assertEqual(
# self.assertEqual(
# sample.args['ip'].emitter,
# sample1.args['ip']
#)
@ -211,6 +211,7 @@ input:
class TestListInput(base.BaseResourceTest):
def test_list_input_single(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
@ -249,7 +250,7 @@ input:
sample1.args['ip'],
]
)
#self.assertListEqual(
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip')]
#)
@ -263,7 +264,7 @@ input:
sample2.args['ip'],
]
)
#self.assertListEqual(
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip'),
# (sample2.args['ip'].attached_to.name, 'ip')]
@ -289,7 +290,7 @@ input:
sample1.args['ip'],
]
)
#self.assertListEqual(
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip')]
#)
@ -330,7 +331,8 @@ input:
'list-input-multi', list_input_multi_meta_dir, args={'ips': [], 'ports': []}
)
xs.connect(sample1, list_input_multi, mapping={'ip': 'ips', 'port': 'ports'})
xs.connect(sample1, list_input_multi, mapping={
'ip': 'ips', 'port': 'ports'})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_multi.args['ips']],
list_input_multi.args['ips'],
@ -342,7 +344,8 @@ input:
[sample1.args['port']]
)
xs.connect(sample2, list_input_multi, mapping={'ip': 'ips', 'port': 'ports'})
xs.connect(sample2, list_input_multi, mapping={
'ip': 'ips', 'port': 'ports'})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_multi.args['ips']],
list_input_multi.args['ips'],
@ -351,7 +354,7 @@ input:
sample2.args['ip'],
]
)
#self.assertListEqual(
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_multi.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip'),
# (sample2.args['ip'].attached_to.name, 'ip')]
@ -364,7 +367,7 @@ input:
sample2.args['port'],
]
)
#self.assertListEqual(
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_multi.args['ports']],
# [(sample1.args['port'].attached_to.name, 'port'),
# (sample2.args['port'].attached_to.name, 'port')]
@ -469,6 +472,7 @@ input:
class TestHashInput(base.BaseResourceTest):
@pytest.mark.xfail(reason="Connect should raise an error if already connected")
def test_hash_input_basic(self):
sample_meta_dir = self.make_resource_meta("""
@ -504,7 +508,8 @@ input:
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
xs.connect(sample1, receiver, mapping={'ip': 'server:ip', 'port': 'server:port'})
xs.connect(sample1, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
self.assertDictEqual(
{'ip': sample1.args['ip'], 'port': sample1.args['port']},
receiver.args['server'],
@ -521,7 +526,8 @@ input:
{'ip': sample2.args['ip'], 'port': sample1.args['port']},
receiver.args['server'],
)
xs.connect(sample3, receiver, mapping={'ip': 'server:ip', 'port': 'server:port'})
xs.connect(sample3, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
self.assertDictEqual(
{'ip': sample3.args['ip'], 'port': sample3.args['port']},
receiver.args['server'],
@ -594,7 +600,8 @@ input:
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
xs.connect(sample1, receiver, mapping={'ip': 'server:ip', 'port': 'server:port'})
xs.connect(sample1, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample1.args['port']}],
receiver.args['server'],
@ -602,7 +609,8 @@ input:
sample2 = self.create_resource(
'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001}
)
xs.connect(sample2, receiver, mapping={'ip': 'server:ip', 'port': 'server:port'})
xs.connect(sample2, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample1.args['port']},
{'ip': sample2.args['ip'], 'port': sample2.args['port']}],
@ -692,7 +700,8 @@ input:
sample3 = self.create_resource(
'sample3', sample_meta_dir, args={'ip': '10.0.0.3', 'port': 5002}
)
sample3.connect(receiver, mapping={'ip': 'server:ip', 'port': 'server:port'})
sample3.connect(receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']},
{'ip': sample3.args['ip'], 'port': sample3.args['port']}],

View File

@ -30,8 +30,8 @@ def test_revert_update():
commit = {'a': '10'}
previous = {'a': '9'}
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1', 'base_path': 'x',
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res.save()
action = 'update'
res.inputs['a'] = '9'
@ -40,7 +40,7 @@ def test_revert_update():
assert resource_obj.args == previous
log = data.SL()
logitem =change.create_logitem(
logitem = change.create_logitem(
res.name, action, change.create_diff(commit, previous), [],
base_path=res.base_path)
log.append(logitem)
@ -56,23 +56,23 @@ def test_revert_update():
def test_revert_update_connected():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test2', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res2.inputs['a'] = ''
res2.save_lazy()
res3 = DBResource.from_dict('test3',
{'name': 'test3', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test3', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res3.inputs['a'] = ''
res3.save_lazy()
@ -113,15 +113,15 @@ def test_revert_update_connected():
def test_revert_removal():
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res.inputs['a'] = '9'
res.save_lazy()
commited = CommitedResource.from_dict('test1',
{'inputs': {'a': '9'},
'state': 'operational'})
{'inputs': {'a': '9'},
'state': 'operational'})
commited.save_lazy()
resource_obj = resource.load(res.name)
@ -138,7 +138,8 @@ def test_revert_removal():
assert DBResource.bucket.get('test1').siblings == []
with mock.patch.object(resource, 'read_meta') as mread:
mread.return_value = {'input': {'a': {'schema': 'str!'}}, 'id': 'mocked'}
mread.return_value = {
'input': {'a': {'schema': 'str!'}}, 'id': 'mocked'}
change.revert(changes[0].uid)
ModelMeta.save_all_lazy()
assert len(DBResource.bucket.get('test1').siblings) == 1
@ -183,9 +184,9 @@ def test_revert_removed_child():
def test_revert_create():
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res.inputs['a'] = '9'
res.save_lazy()
ModelMeta.save_all_lazy()
@ -210,16 +211,16 @@ def test_revert_create():
def test_discard_all_pending_changes_resources_created():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test2', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res2.inputs['a'] = '0'
res2.save_lazy()
ModelMeta.save_all_lazy()
@ -235,16 +236,16 @@ def test_discard_all_pending_changes_resources_created():
def test_discard_connection():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test2', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res2.inputs['a'] = '0'
res2.save_lazy()
ModelMeta.save_all_lazy()
@ -266,9 +267,9 @@ def test_discard_connection():
def test_discard_removed():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
ModelMeta.save_all_lazy()
@ -288,9 +289,9 @@ def test_discard_removed():
def test_discard_update():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1', 'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
ModelMeta.save_all_lazy()

View File

@ -22,10 +22,11 @@ def host_diff():
return [
[u'add', u'', [
[u'ip', u'10.0.0.3'],
[u'hosts_names', ['riak_server1.solar', 'riak_server2.solar', 'riak_server3.solar']],
[u'hosts_names', ['riak_server1.solar',
'riak_server2.solar', 'riak_server3.solar']],
[u'ssh_user', u'vagrant'],
[u'ssh_key', u'/vagrant/.vagrant/machines/solar-dev1/virtualbox/private_key'],
]]]
]]]
def test_details_for_add(host_diff):
@ -39,8 +40,10 @@ def test_details_for_add(host_diff):
def list_change():
return [[u'change', [u'configs_ports', 0, u'value', 0, u'value'], [18098, 88888]]]
def test_list_details_for_change(list_change):
assert data.details(list_change) == ['-+ configs_ports:[0] : 18098 >> 88888']
assert data.details(list_change) == [
'-+ configs_ports:[0] : 18098 >> 88888']
@fixture

View File

@ -17,6 +17,7 @@ from pytest import fixture
from solar.orchestration.traversal import traverse
@fixture
def tasks():
return [
@ -26,6 +27,7 @@ def tasks():
{'id': 't4', 'status': 'PENDING'},
{'id': 't5', 'status': 'PENDING'}]
@fixture
def dg(tasks):
ex = nx.DiGraph()
@ -61,6 +63,7 @@ def test_nothing_will_be_walked_if_parent_is_skipped(dg):
assert set(traverse(dg)) == set()
def test_node_will_be_walked_if_parent_is_noop(dg):
dg.add_path(['t1', 't2', 't3', 't4', 't5'])
dg.node['t1']['status'] = 'NOOP'

View File

@ -19,6 +19,7 @@ from solar.core import validation as sv
class TestInputValidation(base.BaseResourceTest):
def test_input_str_type(self):
sample_meta_dir = self.make_resource_meta("""
id: sample

View File

@ -36,6 +36,7 @@ def good_events():
'''
return yaml.load(StringIO(events))
@pytest.fixture
def bad_event_type():
events = '''
@ -46,12 +47,14 @@ def bad_event_type():
'''
return yaml.load(StringIO(events))
def test_create_path_does_not_exists():
with pytest.raises(Exception) as excinfo:
vr.create('node1', '/path/does/not/exists')
err = 'Base resource does not exist: /path/does/not/exists'
assert str(excinfo.value) == err
def test_create_resource():
node_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
@ -60,6 +63,7 @@ def test_create_resource():
assert len(resources) == 1
assert resources[0].name == 'node1'
def test_create_virtual_resource(tmpdir):
base_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
@ -73,6 +77,7 @@ def test_create_virtual_resource(tmpdir):
resources = vr.create('nodes', str(vr_file))
assert len(resources) == 2
def test_create_virtual_resource_with_list(tmpdir):
base_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
@ -110,16 +115,18 @@ def test_update(tmpdir):
vr.create('updates', str(update_file))
assert resources[0].args['ip'] == '10.0.0.4'
def test_parse_events(good_events):
events =[Dep(parent='service1', parent_action='run',
child='config1', child_action='run',
state='success'),
React(parent='config1', parent_action='run',
child='service1', child_action='apply_config',
state='success')]
events = [Dep(parent='service1', parent_action='run',
child='config1', child_action='run',
state='success'),
React(parent='config1', parent_action='run',
child='service1', child_action='apply_config',
state='success')]
parsed = vr.parse_events(good_events)
assert events == parsed
def test_parse_bad_event(bad_event_type):
with pytest.raises(Exception) as execinfo:
vr.parse_events(bad_event_type)
@ -128,43 +135,47 @@ def test_parse_bad_event(bad_event_type):
def test_add_connections(mocker, resources):
mocked_signals = mocker.patch('solar.core.resource.resource.Resource.connect_with_events')
mocked_signals = mocker.patch(
'solar.core.resource.resource.Resource.connect_with_events')
args = {'ip': 'node1::ip',
'servers': ['node1::ip', 'node2::ip'],
'alias': 'ser1'
}
}
vr.update_inputs('service1', args)
assert mocked_signals.call_count == 2
def test_add_list_values(mocker, resources):
mocked_signals = mocker.patch('solar.core.resource.resource.Resource.connect_with_events')
mocked_signals = mocker.patch(
'solar.core.resource.resource.Resource.connect_with_events')
args = {'ip': 'node1::ip',
'servers': ['server1', 'server2'],
'alias': 'ser1'
}
}
vr.update_inputs('service1', args)
assert mocked_signals.call_count == 1
def test_parse_connection():
correct_connection = {'child_input': 'ip',
'parent' : 'node1',
'parent_input': 'ip',
'events' : None
}
'parent': 'node1',
'parent_input': 'ip',
'events': None
}
connection = vr.parse_connection('ip', 'node1::ip')
assert correct_connection == connection
def test_parse_connection_disable_events():
correct_connection = {'child_input': 'ip',
'parent' : 'node1',
'parent_input': 'ip',
'events' : False
}
'parent': 'node1',
'parent_input': 'ip',
'events': False
}
connection = vr.parse_connection('ip', 'node1::ip::NO_EVENTS')
assert correct_connection == connection
def test_setting_location(tmpdir):
# XXX: make helper for it
base_path = os.path.join(

View File

@ -12,16 +12,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import glob
import io
import json
import glob
import yaml
import logging
import os
from uuid import uuid4
import uuid
from jinja2 import Environment
import yaml
logger = logging.getLogger(__name__)
@ -80,7 +80,7 @@ def load_by_mask(mask):
def generate_uuid():
return str(uuid4())
return str(uuid.uuid4())
def render_template(template_path, **params):
@ -134,5 +134,5 @@ def solar_map(funct, args, **kwargs):
def get_local():
from threading import local
return local
import threading
return threading.local

View File

@ -12,10 +12,10 @@ deps = -r{toxinidir}/test-requirements.txt
commands = ostestr --serial
[testenv:pep8]
deps = hacking==0.7
deps = hacking==0.10.2
usedevelop = False
commands =
flake8 {posargs:solar}
flake8 {posargs:solar/core}
[testenv:venv]
@ -27,10 +27,7 @@ envdir = devenv
usedevelop = True
[flake8]
# NOTE(eli): H304 is "No relative imports" error, relative
# imports are required for extensions which can be moved
# from nailgun directory to different place
ignore = H234,H302,H802,H304
ignore = H101,H236,E731
exclude = .venv,.git,.tox,dist,doc,*lib/python*,*egg,build,tools,__init__.py,docs
show-pep8 = True
show-source = True