Merge "Rework staging procedure to support both implicit and explicit stages"

This commit is contained in:
Jenkins 2016-03-21 11:20:21 +00:00 committed by Gerrit Code Review
commit ee01ddce3b
12 changed files with 407 additions and 397 deletions

View File

@ -39,10 +39,14 @@ def validate():
@changes.command() @changes.command()
@click.option('--action', '-a', default=None, help='resource action')
@click.option('--name', '-n', default=None, help='resource name')
@click.option('--tag', '-t', multiple=True, help='resource tags')
@click.option('-d', default=False, is_flag=True, help='detailed view') @click.option('-d', default=False, is_flag=True, help='detailed view')
def stage(d): def stage(action, name, tag, d):
log = change.stage_changes() if action and (name or tag):
log.reverse() resource.stage_resources(name or tag, action)
log = change.staged_log(populate_with_changes=True)
for item in log: for item in log:
click.echo(data.compact(item)) click.echo(data.compact(item))
if d: if d:
@ -65,8 +69,9 @@ def staged_item(uid):
@changes.command() @changes.command()
def process(): @click.option('--tag', '-t', multiple=True, help='resource tags')
uid = change.send_to_orchestration().graph['uid'] def process(tag):
uid = change.send_to_orchestration(tag).graph['uid']
remember_uid(uid) remember_uid(uid)
click.echo(uid) click.echo(uid)

View File

@ -15,9 +15,10 @@
from solar.core.resource.resource import load from solar.core.resource.resource import load
from solar.core.resource.resource import load_all from solar.core.resource.resource import load_all
from solar.core.resource.resource import load_by_tags from solar.core.resource.resource import load_by_tags
from solar.core.resource.resource import load_updated from solar.core.resource.resource import load_childs
from solar.core.resource.resource import Resource from solar.core.resource.resource import Resource
from solar.core.resource.resource import RESOURCE_STATE from solar.core.resource.resource import RESOURCE_STATE
from solar.core.resource.resource import stage_resources
from solar.core.resource.resource import validate_resources from solar.core.resource.resource import validate_resources
__all__ = [ __all__ = [
@ -26,6 +27,7 @@ __all__ = [
'load', 'load',
'load_all', 'load_all',
'load_by_tags', 'load_by_tags',
'load_updated', 'load_childs',
'validate_resources' 'validate_resources',
'stage_resources',
] ]

View File

@ -182,7 +182,7 @@ def _get_template(name, content, kwargs, inputs):
def create_resources(base_path, resources, tags=None): def create_resources(base_path, resources, tags=None):
add_tags = tags
created_resources = [] created_resources = []
for r in resources: for r in resources:
resource_name = r['id'] resource_name = r['id']
@ -191,6 +191,8 @@ def create_resources(base_path, resources, tags=None):
values_from = r.get('values_from') values_from = r.get('values_from')
spec = r.get('from', None) spec = r.get('from', None)
tags = r.get('tags', []) tags = r.get('tags', [])
if add_tags:
tags.extend(add_tags)
is_composer_file = False is_composer_file = False
if spec.startswith('./') or spec.endswith('.yaml'): if spec.startswith('./') or spec.endswith('.yaml'):
spec = os.path.join(base_path, '..', spec) spec = os.path.join(base_path, '..', spec)

View File

@ -33,6 +33,7 @@ from solar.core import validation
from solar.dblayer.model import NONE from solar.dblayer.model import NONE
from solar.dblayer.model import StrInt from solar.dblayer.model import StrInt
from solar.dblayer.solar_models import CommitedResource from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import LogItem
from solar.dblayer.solar_models import Resource as DBResource from solar.dblayer.solar_models import Resource as DBResource
from solar.events import api from solar.events import api
from solar import utils from solar import utils
@ -90,6 +91,11 @@ class Resource(object):
self.create_inputs(args) self.create_inputs(args)
self.db_obj.save() self.db_obj.save()
LogItem.new({
'resource': self.name,
'action': 'run',
'log': 'staged',
'tags': self.tags}).save_lazy()
# Load # Load
def create_from_db(self, resource_db): def create_from_db(self, resource_db):
@ -209,6 +215,12 @@ class Resource(object):
for k, v in args.items(): for k, v in args.items():
self.db_obj.inputs[k] = v self.db_obj.inputs[k] = v
self.db_obj.save_lazy() self.db_obj.save_lazy()
# run and update are same things from solar pov
# so lets remove this redundancy
LogItem.new(
{'resource': self.name,
'action': 'run',
'tags': self.tags}).save_lazy()
def delete(self): def delete(self):
return self.db_obj.delete() return self.db_obj.delete()
@ -219,6 +231,11 @@ class Resource(object):
else: else:
self.db_obj.state = RESOURCE_STATE.removed.name self.db_obj.state = RESOURCE_STATE.removed.name
self.db_obj.save_lazy() self.db_obj.save_lazy()
LogItem.new(
{'resource': self.name,
'action': 'remove',
'log': 'staged',
'tags': self.tags}).save_lazy()
def set_operational(self): def set_operational(self):
self.db_obj.state = RESOURCE_STATE.operational.name self.db_obj.state = RESOURCE_STATE.operational.name
@ -312,6 +329,9 @@ class Resource(object):
use_defaults=False): use_defaults=False):
mapping = get_mapping(self, receiver, mapping) mapping = get_mapping(self, receiver, mapping)
self._connect_inputs(receiver, mapping) self._connect_inputs(receiver, mapping)
LogItem.new({'resource': receiver.name,
'action': 'run',
'tags': receiver.tags}).save_lazy()
# signals.connect(self, receiver, mapping=mapping) # signals.connect(self, receiver, mapping=mapping)
# TODO: implement events # TODO: implement events
if use_defaults: if use_defaults:
@ -350,17 +370,9 @@ def load(name):
return Resource(r) return Resource(r)
def load_updated(since=None, with_childs=True): def load_childs(parents):
if since is None: return [Resource(r) for r in
startkey = StrInt.p_min() DBResource.multi_get(DBResource.childs(parents))]
else:
startkey = since
candids = DBResource.updated.filter(startkey, StrInt.p_max())
if with_childs:
candids = DBResource.childs(candids)
return [Resource(r) for r in DBResource.multi_get(candids)]
# TODO
def load_all(startswith=None): def load_all(startswith=None):
@ -380,11 +392,33 @@ def load_by_tags(query):
parsed_tags = get_string_tokens(query) parsed_tags = get_string_tokens(query)
r_with_tags = [DBResource.tags.filter(tag) for tag in parsed_tags] r_with_tags = [DBResource.tags.filter(tag) for tag in parsed_tags]
r_with_tags = set(itertools.chain(*r_with_tags)) r_with_tags = set(itertools.chain(*r_with_tags))
candids = [Resource(r) for r in DBResource.multi_get(r_with_tags)] resources = [Resource(r) for r in DBResource.multi_get(r_with_tags)]
nodes = filter( return filter(lambda n: Expression(query, n.tags).evaluate(), resources)
lambda n: Expression(query, n.tags).evaluate(), candids)
return nodes
def stage_resources(resources_query, action):
"""Create log items for resources selected by query
:param resources_query: iterable with tags or basestring
:param action: basestring
"""
if isinstance(resources_query, basestring):
resources = [load(resources_query)]
else:
resources = load_by_tags(resources_query)
created = []
for resource in resources:
# save - cache doesnt cover all query in the same sesssion
# and this query will be triggered right after staging resources
log_item = LogItem.new(
{'resource': resource.name,
'action': action,
'log': 'staged',
'tags': resource.tags})
log_item.save()
created.append(log_item)
return created
def load_by_names(names): def load_by_names(names):

View File

@ -25,6 +25,9 @@ from enum import Enum
from solar.computable_inputs import ComputablePassedTypes from solar.computable_inputs import ComputablePassedTypes
from solar.computable_inputs.processor import get_processor from solar.computable_inputs.processor import get_processor
from solar.config import C from solar.config import C
from solar.core.tags_set_parser import Expression
from solar.core.tags_set_parser import get_string_tokens
from solar.dblayer.conflict_resolution import naive_resolver
from solar.dblayer.model import check_state_for from solar.dblayer.model import check_state_for
from solar.dblayer.model import CompositeIndexField from solar.dblayer.model import CompositeIndexField
from solar.dblayer.model import DBLayerException from solar.dblayer.model import DBLayerException
@ -1129,44 +1132,96 @@ class LogItem(Model):
action = Field(basestring) action = Field(basestring)
diff = Field(list) diff = Field(list)
connections_diff = Field(list) connections_diff = Field(list)
state = Field(basestring)
base_path = Field(basestring) # remove me base_path = Field(basestring) # remove me
updated = Field(StrInt)
history = IndexedField(StrInt) state = Field(basestring)
log = Field(basestring) # staged/history # based on tags we will filter staged log items during process part
# of staging changes procedure, it will allow to isolate graphs for
composite = CompositeIndexField(fields=('log', 'resource', 'action')) # different parts of infrastructure managed by solar (e.g. cluster)
tags = TagsField(default=list)
@property @property
def log_action(self): def log_action(self):
return '.'.join((self.resource, self.action)) return '.'.join((self.resource, self.action))
@classmethod
def history_last(cls):
items = cls.history.filter(StrInt.n_max(),
StrInt.n_min(),
max_results=1)
if not items:
return None
return cls.get(items[0])
def save(self):
if any(f in self._modified_fields for f in LogItem.composite.fields):
self.composite.reset()
if 'log' in self._modified_fields and self.log == 'history':
self.history = StrInt(next(NegativeCounter.get_or_create(
'history')))
return super(LogItem, self).save()
@classmethod @classmethod
def new(cls, data): def new(cls, data):
vals = {} vals = {}
if 'uid' not in vals: if 'uid' not in vals:
vals['uid'] = cls.uid.default vals['uid'] = cls.uid.default
vals.update(data) vals.update(data)
return LogItem.from_dict(vals['uid'], vals) return LogItem.from_dict(
'{}.{}'.format(vals['resource'], vals['action']), vals)
@classmethod
def from_dict(cls, key, *args, **kwargs):
if key in cls._c.obj_cache:
return cls._c.obj_cache[key]
return super(LogItem, cls).from_dict(key, *args, **kwargs)
@classmethod
def get(cls, key):
try:
return super(LogItem, cls).get(key)
except DBLayerException:
return None
def to_history(self):
return HistoryItem.new(
self.uid,
{'uid': self.uid,
'resource': self.resource,
'action': self.action,
'base_path': self.base_path,
'diff': self.diff,
'connections_diff': self.connections_diff})
@classmethod
def log_items_by_tags(cls, tags):
query = '|'.join(tags)
parsed_tags = get_string_tokens(query)
log_items = set(map(
cls.get,
chain.from_iterable(
[cls.tags.filter(tag) for tag in parsed_tags])))
return filter(lambda li: Expression(query, li.tags).evaluate(),
log_items)
@staticmethod
def conflict_resolver(riak_object):
#: it is safe to pick any log item with data, because the key
# if particular log_action
for sibling in riak_object.siblings:
if sibling.encoded_data:
riak_object.siblings = [sibling]
return
naive_resolver(riak_object)
class HistoryItem(Model):
uid = IndexedField(basestring)
resource = Field(basestring)
action = Field(basestring)
diff = Field(list)
connections_diff = Field(list)
base_path = Field(basestring) # remove me
history = IndexedField(StrInt)
composite = CompositeIndexField(fields=('resource', 'action'))
@property
def log_action(self):
return '.'.join((self.resource, self.action))
def save(self):
if any(f in self._modified_fields for
f in HistoryItem.composite.fields):
self.composite.reset()
self.history = StrInt(next(NegativeCounter.get_or_create(
'history')))
return super(HistoryItem, self).save()
class Lock(Model): class Lock(Model):

View File

@ -13,54 +13,21 @@
# under the License. # under the License.
from solar.dblayer.model import StrInt from solar.dblayer.model import StrInt
from solar.dblayer.solar_models import LogItem from solar.dblayer.solar_models import HistoryItem
from solar.dblayer.solar_models import NegativeCounter from solar.dblayer.solar_models import NegativeCounter
def test_separate_logs(): def test_composite_filter():
history = 'history' l1 = HistoryItem.new('a', {'log': 'history', 'resource': 'a'})
staged = 'staged' l2 = HistoryItem.new('b', {'log': 'history', 'resource': 'b'})
history_uids = set()
staged_uids = set()
for i in range(2):
l = LogItem.new({'log': history})
l.save()
history_uids.add(l.key)
for i in range(3):
l = LogItem.new({'log': staged})
l.save()
staged_uids.add(l.key)
assert set(LogItem.composite.filter({'log': history})) == history_uids
assert set(LogItem.composite.filter({'log': staged})) == staged_uids
def test_multiple_filter():
l1 = LogItem.new({'log': 'history', 'resource': 'a'})
l2 = LogItem.new({'log': 'history', 'resource': 'b'})
l1.save() l1.save()
l2.save() l2.save()
assert LogItem.composite.filter({'log': 'history', assert HistoryItem.composite.filter({'log': 'history',
'resource': 'a'}) == [l1.key] 'resource': 'a'}) == [l1.key]
assert LogItem.composite.filter({'log': 'history', assert HistoryItem.composite.filter({'log': 'history',
'resource': 'b'}) == [l2.key] 'resource': 'b'}) == [l2.key]
def test_changed_index():
l = LogItem.new({'log': 'staged', 'resource': 'a', 'action': 'run'})
l.save()
assert LogItem.composite.filter({'log': 'staged'}) == [l.key]
l.log = 'history'
l.save()
assert LogItem.composite.filter({'log': 'staged'}) == []
assert LogItem.composite.filter({'log': 'history'}) == [l.key]
def test_negative_counter(): def test_negative_counter():
@ -71,40 +38,10 @@ def test_negative_counter():
def test_reversed_order_is_preserved(): def test_reversed_order_is_preserved():
added = [] added = []
for i in range(4): for i in range(4):
li = LogItem.new({'log': 'history'}) li = HistoryItem.new(str(i), {})
li.save() li.save()
added.append(li.key) added.append(li.key)
added.reverse() added.reverse()
assert list(LogItem.history.filter(StrInt.n_max(), assert list(HistoryItem.history.filter(StrInt.n_max(),
StrInt.n_min(), StrInt.n_min(),
max_results=2)) == added[:2] max_results=2)) == added[:2]
def test_staged_not_indexed():
added = []
for i in range(3):
li = LogItem.new({'log': 'staged'})
li.save()
added.append(li)
for li in added[:2]:
li.log = 'history'
li.save()
assert set(LogItem.history.filter(StrInt.n_max(), StrInt.n_min())) == {
li.key
for li in added[:2]
}
def test_history_last_filter():
for i in range(4):
li = LogItem.new({'log': 'history'})
li.save()
last = li
assert LogItem.history_last() == last
def test_history_last_returns_none():
assert LogItem.history_last() is None

View File

@ -33,6 +33,7 @@ if no changes noticed on dependent resource.
""" """
from solar.dblayer.model import DBLayerNotFound from solar.dblayer.model import DBLayerNotFound
from solar.dblayer.solar_models import DBLayerSolarException
from solar.dblayer.solar_models import Resource from solar.dblayer.solar_models import Resource
@ -104,7 +105,7 @@ class React(Event):
try: try:
location_id = Resource.get(self.child).inputs[ location_id = Resource.get(self.child).inputs[
'location_id'] 'location_id']
except DBLayerNotFound: except (DBLayerNotFound, DBLayerSolarException):
location_id = None location_id = None
changes_graph.add_node( changes_graph.add_node(
self.child_node, status='PENDING', self.child_node, status='PENDING',
@ -126,7 +127,7 @@ class StateChange(Event):
changed_resources.append(self.parent_node) changed_resources.append(self.parent_node)
try: try:
location_id = Resource.get(self.parent).inputs['location_id'] location_id = Resource.get(self.parent).inputs['location_id']
except DBLayerNotFound: except (DBLayerNotFound, DBLayerSolarException):
location_id = None location_id = None
changes_graph.add_node( changes_graph.add_node(
self.parent_node, status='PENDING', self.parent_node, status='PENDING',

View File

@ -20,8 +20,8 @@ from solar.core import resource
from solar.core.resource.resource import RESOURCE_STATE from solar.core.resource.resource import RESOURCE_STATE
from solar.core import signals from solar.core import signals
from solar.dblayer.solar_models import CommitedResource from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import HistoryItem
from solar.dblayer.solar_models import LogItem from solar.dblayer.solar_models import LogItem
from solar.dblayer.solar_models import StrInt
from solar.events import api as evapi from solar.events import api as evapi
from solar.events.controls import StateChange from solar.events.controls import StateChange
from solar.orchestration import graph from solar.orchestration import graph
@ -53,27 +53,10 @@ def create_diff(staged, commited):
return listify(res) return listify(res)
def create_logitem(resource, action, diffed, connections_diffed, def populate_log_item(log_item):
base_path=''): resource_obj = resource.load(log_item.resource)
return LogItem.new(
{'resource': resource,
'action': action,
'diff': diffed,
'connections_diff': connections_diffed,
'base_path': base_path,
'log': 'staged'})
def create_sorted_diff(staged, commited):
staged.sort()
commited.sort()
return create_diff(staged, commited)
def make_single_stage_item(resource_obj):
commited = resource_obj.load_commited() commited = resource_obj.load_commited()
base_path = resource_obj.base_path log_item.base_path = resource_obj.base_path
if resource_obj.to_be_removed(): if resource_obj.to_be_removed():
resource_args = {} resource_args = {}
resource_connections = [] resource_connections = []
@ -88,42 +71,87 @@ def make_single_stage_item(resource_obj):
commited_args = commited.inputs commited_args = commited.inputs
commited_connections = commited.connections commited_connections = commited.connections
inputs_diff = create_diff(resource_args, commited_args) log_item.diff = create_diff(resource_args, commited_args)
connections_diff = create_sorted_diff( log_item.connections_diff = create_sorted_diff(
resource_connections, commited_connections) resource_connections, commited_connections)
return log_item
# if new connection created it will be reflected in inputs
# but using inputs to reverse connections is not possible
if inputs_diff:
li = create_logitem(
resource_obj.name,
guess_action(commited_args, resource_args),
inputs_diff,
connections_diff,
base_path=base_path)
li.save()
return li
return None
def stage_changes(): def create_logitem(resource, action, populate=True):
for li in data.SL(): """Create log item in staged log
li.delete() :param resource: basestring
:param action: basestring
last = LogItem.history_last() """
since = StrInt.greater(last.updated) if last else None log_item = LogItem.new(
staged_log = utils.solar_map(make_single_stage_item, {'resource': resource,
resource.load_updated(since), concurrency=10) 'action': action,
staged_log = filter(None, staged_log) 'log': 'staged'})
return staged_log if populate:
populate_log_item(log_item)
return log_item
def send_to_orchestration(): def create_run(resource):
return create_logitem(resource, 'run')
def create_remove(resource):
return create_logitem(resource, 'remove')
def create_sorted_diff(staged, commited):
staged.sort()
commited.sort()
return create_diff(staged, commited)
def staged_log(populate_with_changes=True):
"""Staging procedure takes manually created log items, populate them
with diff and connections diff
Current implementation prevents from several things to occur:
- same log_action (resource.action pair) cannot not be staged multiple
times
- child will be staged only if diff or connections_diff is changed,
and we can execute *run* action to apply that diff - in all other cases
child should be staged explicitly
"""
log_actions = set()
resources_names = set()
staged_log = data.SL()
without_duplicates = []
for log_item in staged_log:
if log_item.log_action in log_actions:
log_item.delete()
continue
resources_names.add(log_item.resource)
log_actions.add(log_item.log_action)
without_duplicates.append(log_item)
utils.solar_map(lambda li: populate_log_item(li),
without_duplicates, concurrency=10)
# this is backward compatible change, there might better way
# to "guess" child actions
childs = filter(lambda child: child.name not in resources_names,
resource.load_childs(list(resources_names)))
child_log_items = filter(
lambda li: li.diff or li.connections_diff,
utils.solar_map(create_run, [c.name for c in childs], concurrency=10))
for log_item in child_log_items + without_duplicates:
log_item.save_lazy()
return without_duplicates + child_log_items
def send_to_orchestration(tags=None):
dg = nx.MultiDiGraph() dg = nx.MultiDiGraph()
events = {} events = {}
changed_nodes = [] changed_nodes = []
for logitem in data.SL(): if tags:
staged_log = LogItem.log_items_by_tags(tags)
else:
staged_log = data.SL()
for logitem in staged_log:
events[logitem.resource] = evapi.all_events(logitem.resource) events[logitem.resource] = evapi.all_events(logitem.resource)
changed_nodes.append(logitem.resource) changed_nodes.append(logitem.resource)
@ -155,20 +183,27 @@ def _get_args_to_update(args, connections):
} }
def is_create(logitem):
return all((item[0] == 'add' for item in logitem.diff))
def is_update(logitem):
return any((item[0] == 'change' for item in logitem.diff))
def revert_uids(uids): def revert_uids(uids):
"""Reverts uids """Reverts uids
:param uids: iterable not generator :param uids: iterable not generator
""" """
items = LogItem.multi_get(uids) items = HistoryItem.multi_get(uids)
for item in items: for item in items:
if is_update(item):
if item.action == CHANGES.update.name:
_revert_update(item) _revert_update(item)
elif item.action == CHANGES.remove.name: elif item.action == CHANGES.remove.name:
_revert_remove(item) _revert_remove(item)
elif item.action == CHANGES.run.name: elif is_create(item):
_revert_run(item) _revert_run(item)
else: else:
log.debug('Action %s for resource %s is a side' log.debug('Action %s for resource %s is a side'
@ -219,8 +254,8 @@ def _update_inputs_connections(res_obj, args, old_connections, new_connections):
# that some values can not be updated # that some values can not be updated
# even if connection was removed # even if connection was removed
receiver_obj.db_obj.save() receiver_obj.db_obj.save()
if args:
res_obj.update(args) res_obj.update(args)
def _revert_update(logitem): def _revert_update(logitem):
@ -256,10 +291,9 @@ def _discard_update(item):
old_connections = resource_obj.connections old_connections = resource_obj.connections
new_connections = dictdiffer.revert( new_connections = dictdiffer.revert(
item.connections_diff, sorted(old_connections)) item.connections_diff, sorted(old_connections))
args = dictdiffer.revert(item.diff, resource_obj.args) inputs = dictdiffer.revert(item.diff, resource_obj.args)
_update_inputs_connections( _update_inputs_connections(
resource_obj, _get_args_to_update(args, new_connections), resource_obj, _get_args_to_update(inputs, old_connections),
old_connections, new_connections) old_connections, new_connections)
@ -268,13 +302,13 @@ def _discard_run(item):
def discard_uids(uids): def discard_uids(uids):
items = LogItem.multi_get(uids) items = filter(bool, LogItem.multi_get(uids))
for item in items: for item in items:
if item.action == CHANGES.update.name: if is_update(item):
_discard_update(item) _discard_update(item)
elif item.action == CHANGES.remove.name: elif item.action == CHANGES.remove.name:
_discard_remove(item) _discard_remove(item)
elif item.action == CHANGES.run.name: elif is_create(item):
_discard_run(item) _discard_run(item)
else: else:
log.debug('Action %s for resource %s is a side' log.debug('Action %s for resource %s is a side'
@ -288,7 +322,7 @@ def discard_uid(uid):
def discard_all(): def discard_all():
staged_log = data.SL() staged_log = data.SL()
return discard_uids([l.uid for l in staged_log]) return discard_uids([l.key for l in staged_log])
def commit_all(): def commit_all():

View File

@ -12,18 +12,22 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from solar.dblayer.solar_models import HistoryItem
from solar.dblayer.solar_models import LogItem from solar.dblayer.solar_models import LogItem
def SL(): def SL():
rst = LogItem.composite.filter({'log': 'staged'}) rst = LogItem.bucket.get_index('$bucket',
return LogItem.multi_get(rst) startkey='_',
max_results=100000).results
return filter(bool, LogItem.multi_get(rst))
def CL(): def CL():
rst = LogItem.composite.filter({'log': 'history'}) rst = HistoryItem.bucket.get_index('$bucket',
return LogItem.multi_get(rst) startkey='_',
max_results=100000).results
return HistoryItem.multi_get(rst)
def compact(logitem): def compact(logitem):

View File

@ -28,34 +28,32 @@ def set_error(log_action, *args, **kwargs):
resource_obj = resource.load(item.resource) resource_obj = resource.load(item.resource)
resource_obj.set_error() resource_obj.set_error()
item.state = 'error' item.state = 'error'
item.save() item.delete()
def commit_log_item(item):
resource_obj = resource.load(item.resource)
commited = CommitedResource.get_or_create(item.resource)
if item.action == CHANGES.remove.name:
resource_obj.delete()
commited.state = resource.RESOURCE_STATE.removed.name
else:
resource_obj.set_operational()
commited.state = resource.RESOURCE_STATE.operational.name
commited.base_path = item.base_path
resource_obj.db_obj.save_lazy()
commited.inputs = patch(item.diff, commited.inputs)
# TODO fix TagsWrp to return list
# commited.tags = resource_obj.tags
sorted_connections = sorted(commited.connections)
commited.connections = patch(item.connections_diff, sorted_connections)
commited.save_lazy()
item.to_history().save_lazy()
item.delete()
def move_to_commited(log_action, *args, **kwargs): def move_to_commited(log_action, *args, **kwargs):
sl = data.SL() sl = data.SL()
item = next((i for i in sl if i.log_action == log_action), None) item = next((i for i in sl if i.log_action == log_action), None)
if item: if item:
resource_obj = resource.load(item.resource) commit_log_item(item)
commited = CommitedResource.get_or_create(item.resource)
updated = resource_obj.db_obj.updated
if item.action == CHANGES.remove.name:
resource_obj.delete()
commited.state = resource.RESOURCE_STATE.removed.name
else:
resource_obj.set_operational()
commited.state = resource.RESOURCE_STATE.operational.name
commited.base_path = item.base_path
updated = resource_obj.db_obj.updated
# required to update `updated` field
resource_obj.db_obj.save()
commited.inputs = patch(item.diff, commited.inputs)
# TODO fix TagsWrp to return list
# commited.tags = resource_obj.tags
sorted_connections = sorted(commited.connections)
commited.connections = patch(item.connections_diff, sorted_connections)
commited.save()
item.log = 'history'
item.state = 'success'
item.updated = updated
item.save()

View File

@ -20,6 +20,7 @@ import pytest
from solar.config import C # NOQA from solar.config import C # NOQA
from solar.core.resource import composer from solar.core.resource import composer
from solar.dblayer.model import clear_cache from solar.dblayer.model import clear_cache
from solar.dblayer.model import ModelMeta
from solar.errors import ExecutionTimeout from solar.errors import ExecutionTimeout
from solar import orchestration from solar import orchestration
from solar.orchestration.graph import wait_finish from solar.orchestration.graph import wait_finish
@ -57,7 +58,8 @@ def test_concurrent_sequences_with_no_handler(scale, clients):
timeout = scale * 2 timeout = scale * 2
scheduler_client = clients['scheduler'] scheduler_client = clients['scheduler']
assert len(change.stage_changes()) == total_resources assert len(change.staged_log()) == total_resources
ModelMeta.session_end()
plan = change.send_to_orchestration() plan = change.send_to_orchestration()
scheduler_client.next({}, plan.graph['uid']) scheduler_client.next({}, plan.graph['uid'])
@ -75,4 +77,4 @@ def test_concurrent_sequences_with_no_handler(scale, clients):
assert res[states.SUCCESS.name] == total_resources assert res[states.SUCCESS.name] == total_resources
assert len(data.CL()) == total_resources assert len(data.CL()) == total_resources
clear_cache() clear_cache()
assert len(change.stage_changes()) == 0 assert len(change.staged_log()) == 0

View File

@ -13,78 +13,63 @@
# under the License. # under the License.
import mock import mock
from pytest import mark
from solar.core.resource import repository from solar.core.resource import repository
from solar.core.resource import resource from solar.core.resource import resource
from solar.core.resource import RESOURCE_STATE from solar.core.resource import stage_resources
from solar.core import signals
from solar.dblayer.model import clear_cache
from solar.dblayer.model import ModelMeta from solar.dblayer.model import ModelMeta
from solar.dblayer.solar_models import CommitedResource from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import Resource as DBResource from solar.dblayer.solar_models import Resource as DBResource
from solar.system_log import change from solar.system_log import change
from solar.system_log import data
from solar.system_log import operations from solar.system_log import operations
def create_resource(name, tags=None):
resource = DBResource.from_dict(
name,
{'name': name,
'base_path': 'x',
'state': '',
'tags': tags or [],
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
resource.save_lazy()
return resource
def test_revert_update(): def test_revert_update():
commit = {'a': '10'} prev = {'a': '9'}
previous = {'a': '9'} new = {'a': '10'}
res = DBResource.from_dict('test1', res = create_resource('test1')
{'name': 'test1',
'base_path': 'x',
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.save() res.save()
action = 'update' action = 'run'
res.inputs['a'] = '9'
resource_obj = resource.load(res.name) resource_obj = resource.load(res.name)
assert resource_obj.args == previous resource_obj.update(prev)
logitem = change.create_logitem(res.name, action)
log = data.SL() operations.commit_log_item(logitem)
logitem = change.create_logitem(res.name, resource_obj.update(new)
action,
change.create_diff(commit, previous),
[],
base_path=res.base_path)
log.append(logitem)
resource_obj.update(commit)
operations.move_to_commited(logitem.log_action)
logitem = change.create_logitem(res.name, action)
uid = logitem.uid
assert logitem.diff == [['change', 'a', ['9', '10']]] assert logitem.diff == [['change', 'a', ['9', '10']]]
assert resource_obj.args == commit operations.commit_log_item(logitem)
assert resource_obj.args == new
change.revert(logitem.uid) change.revert(uid)
assert resource_obj.args == previous assert resource_obj.args == {'a': '9'}
def test_revert_update_connected(): def test_revert_update_connected():
res1 = DBResource.from_dict('test1', res1 = create_resource('test1')
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9' res1.inputs['a'] = '9'
res1.save_lazy() res1.save_lazy()
res2 = DBResource.from_dict('test2', res2 = create_resource('test2')
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = '' res2.inputs['a'] = ''
res2.save_lazy() res2.save_lazy()
res3 = DBResource.from_dict('test3', res3 = create_resource('test3')
{'name': 'test3',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res3.inputs['a'] = '' res3.inputs['a'] = ''
res3.save_lazy() res3.save_lazy()
@ -95,41 +80,36 @@ def test_revert_update_connected():
res2.connect(res3) res2.connect(res3)
ModelMeta.save_all_lazy() ModelMeta.save_all_lazy()
staged_log = change.stage_changes() staged_log = map(lambda res: change.create_run(res.name),
(res1, res2, res3))
assert len(staged_log) == 3 assert len(staged_log) == 3
for item in staged_log: for item in staged_log:
assert item.action == 'run' assert item.action == 'run'
operations.move_to_commited(item.log_action) operations.commit_log_item(item)
assert len(change.stage_changes()) == 0
res1.disconnect(res2) res1.disconnect(res2)
staged_log = change.stage_changes() staged_log = map(lambda res: change.create_run(res.name),
assert len(staged_log) == 2 (res2, res3))
to_revert = [] to_revert = []
for item in staged_log: for item in staged_log:
assert item.action == 'update' assert item.action == 'run'
operations.move_to_commited(item.log_action)
to_revert.append(item.uid) to_revert.append(item.uid)
operations.commit_log_item(item)
change.revert_uids(sorted(to_revert, reverse=True)) change.revert_uids(sorted(to_revert, reverse=True))
ModelMeta.save_all_lazy() ModelMeta.save_all_lazy()
staged_log = change.stage_changes() staged_log = map(lambda res: change.create_run(res.name),
(res2, res3))
assert len(staged_log) == 2
for item in staged_log: for item in staged_log:
assert item.diff == [['change', 'a', ['', '9']]] assert item.diff == [['change', 'a', ['', '9']]]
def test_revert_removal(): def test_revert_removal():
res = DBResource.from_dict('test1', res = create_resource('test1')
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.inputs['a'] = '9' res.inputs['a'] = '9'
res.save_lazy() res.save_lazy()
@ -141,14 +121,13 @@ def test_revert_removal():
resource_obj.remove() resource_obj.remove()
ModelMeta.save_all_lazy() ModelMeta.save_all_lazy()
changes = change.stage_changes() log_item = change.create_remove(resource_obj.name)
assert len(changes) == 1 log_item.save()
assert changes[0].diff == [['remove', '', [['a', '9']]]] uid = log_item.uid
operations.move_to_commited(changes[0].log_action) assert log_item.diff == [['remove', '', [['a', '9']]]]
operations.commit_log_item(log_item)
clear_cache() ModelMeta.save_all_lazy()
assert DBResource._c.obj_cache == {}
# assert DBResource.bucket.get('test1').siblings == []
with mock.patch.object(repository.Repository, 'read_meta') as mread: with mock.patch.object(repository.Repository, 'read_meta') as mread:
mread.return_value = { mread.return_value = {
@ -157,10 +136,9 @@ def test_revert_removal():
} }
with mock.patch.object(repository.Repository, 'get_path') as mpath: with mock.patch.object(repository.Repository, 'get_path') as mpath:
mpath.return_value = 'x' mpath.return_value = 'x'
change.revert(uid)
change.revert(changes[0].uid)
ModelMeta.save_all_lazy() ModelMeta.save_all_lazy()
# assert len(DBResource.bucket.get('test1').siblings) == 1
resource_obj = resource.load('test1') resource_obj = resource.load('test1')
assert resource_obj.args == { assert resource_obj.args == {
@ -170,177 +148,135 @@ def test_revert_removal():
} }
@mark.xfail(
reason="""With current approach child will
notice changes after parent is removed"""
)
def test_revert_removed_child():
res1 = orm.DBResource(id='test1', name='test1', base_path='x') # NOQA
res1.save()
res1.add_input('a', 'str', '9')
res2 = orm.DBResource(id='test2', name='test2', base_path='x') # NOQA
res2.save()
res2.add_input('a', 'str', 0)
res1 = resource.load('test1')
res2 = resource.load('test2')
signals.connect(res1, res2)
staged_log = change.stage_changes()
assert len(staged_log) == 2
for item in staged_log:
operations.move_to_commited(item.log_action)
res2.remove()
staged_log = change.stage_changes()
assert len(staged_log) == 1
logitem = next(staged_log.collection())
operations.move_to_commited(logitem.log_action)
with mock.patch.object(repository, 'read_meta') as mread:
mread.return_value = {'input': {'a': {'schema': 'str!'}}}
change.revert(logitem.uid)
res2 = resource.load('test2')
assert res2.args == {'a': '9'}
def test_revert_create(): def test_revert_create():
res = DBResource.from_dict('test1', res = create_resource('test1')
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.inputs['a'] = '9' res.inputs['a'] = '9'
res.save_lazy() res.save_lazy()
ModelMeta.save_all_lazy()
staged_log = change.stage_changes() logitem = change.create_run(res.name)
assert len(staged_log) == 1
logitem = staged_log[0]
operations.move_to_commited(logitem.log_action)
assert logitem.diff == [['add', '', [['a', '9']]]] assert logitem.diff == [['add', '', [['a', '9']]]]
uid = logitem.uid
operations.commit_log_item(logitem)
commited = CommitedResource.get('test1') commited = CommitedResource.get('test1')
assert commited.inputs == {'a': '9'} assert commited.inputs == {'a': '9'}
change.revert(logitem.uid) change.revert(uid)
ModelMeta.save_all_lazy()
staged_log = change.stage_changes() staged_log = change.staged_log()
assert len(staged_log) == 1 assert len(staged_log) == 1
for item in staged_log: for item in staged_log:
operations.move_to_commited(item.log_action) operations.commit_log_item(item)
assert resource.load_all() == [] assert resource.load_all() == []
def test_discard_all_pending_changes_resources_created(): def test_discard_all_pending_changes_resources_created():
res1 = DBResource.from_dict('test1', res1 = create_resource('test1')
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9' res1.inputs['a'] = '9'
res1.save_lazy() res1.save_lazy()
res2 = DBResource.from_dict('test2', res2 = create_resource('test2')
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = '0' res2.inputs['a'] = '0'
res2.save_lazy() res2.save_lazy()
ModelMeta.save_all_lazy() staged_log = map(change.create_run, (res1.name, res2.name))
staged_log = change.stage_changes()
assert len(staged_log) == 2
change.discard_all() change.discard_all()
staged_log = change.stage_changes() staged_log = change.staged_log()
assert len(staged_log) == 0 assert len(staged_log) == 0
assert resource.load_all() == [] assert resource.load_all() == []
def test_discard_connection(): def test_discard_connection():
res1 = DBResource.from_dict('test1', res1 = create_resource('test1')
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9' res1.inputs['a'] = '9'
res1.save_lazy() res1.save_lazy()
res2 = DBResource.from_dict('test2', res2 = create_resource('test2')
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = '0' res2.inputs['a'] = '0'
res2.save_lazy() res2.save_lazy()
ModelMeta.save_all_lazy()
staged_log = change.stage_changes() staged_log = map(change.create_run, (res1.name, res2.name))
for item in staged_log: for item in staged_log:
operations.move_to_commited(item.log_action) operations.commit_log_item(item)
res1 = resource.load('test1') res1 = resource.load('test1')
res2 = resource.load('test2') res2 = resource.load('test2')
res1.connect(res2, {'a': 'a'}) res1.connect(res2, {'a': 'a'})
staged_log = change.stage_changes() ModelMeta.save_all_lazy()
staged_log = change.staged_log()
assert len(staged_log) == 1 assert len(staged_log) == 1
assert res2.args == {'a': '9'} assert res2.args == {'a': '9'}
change.discard_all() change.discard_all()
assert res2.args == {'a': '0'} assert res2.args == {'a': '0'}
assert len(change.stage_changes()) == 0 assert len(change.staged_log()) == 0
def test_discard_removed(): def test_discard_removed():
res1 = DBResource.from_dict('test1', res1 = create_resource('test1')
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9' res1.inputs['a'] = '9'
res1.save_lazy() res1.save_lazy()
ModelMeta.save_all_lazy()
staged_log = change.stage_changes()
for item in staged_log:
operations.move_to_commited(item.log_action)
res1 = resource.load('test1') res1 = resource.load('test1')
res1.remove() res1.remove()
assert len(change.stage_changes()) == 1 ModelMeta.save_all_lazy()
assert len(change.staged_log()) == 1
assert res1.to_be_removed() assert res1.to_be_removed()
change.discard_all() change.discard_all()
assert len(change.stage_changes()) == 0 assert len(change.staged_log()) == 0
assert not resource.load('test1').to_be_removed() assert not resource.load('test1').to_be_removed()
def test_discard_update(): def test_discard_update():
res1 = DBResource.from_dict('test1', res1 = create_resource('test1')
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9' res1.inputs['a'] = '9'
res1.save_lazy() res1.save_lazy()
ModelMeta.save_all_lazy() operations.commit_log_item(change.create_run(res1.name))
staged_log = change.stage_changes()
for item in staged_log:
operations.move_to_commited(item.log_action)
res1 = resource.load('test1') res1 = resource.load('test1')
res1.update({'a': '11'}) res1.update({'a': '11'})
assert len(change.stage_changes()) == 1 ModelMeta.save_all_lazy()
assert len(change.staged_log()) == 1
assert res1.args == {'a': '11'} assert res1.args == {'a': '11'}
change.discard_all() change.discard_all()
assert res1.args == {'a': '9'} assert res1.args == {'a': '9'}
def test_stage_and_process_partially():
a = ['a']
b = ['b']
both = a + b
range_a = range(1, 4)
range_b = range(4, 6)
with_tag_a = [create_resource(str(n), tags=a) for n in range_a]
with_tag_b = [create_resource(str(n), tags=b) for n in range_b]
ModelMeta.save_all_lazy()
created_log_items_with_a = stage_resources(a, 'restart')
assert len(created_log_items_with_a) == len(with_tag_a)
created_log_items_with_b = stage_resources(b, 'restart')
assert len(created_log_items_with_b) == len(with_tag_b)
a_graph = change.send_to_orchestration(a)
a_expected = set(['%s.restart' % n for n in range_a])
assert set(a_graph.nodes()) == a_expected
b_graph = change.send_to_orchestration(b)
b_expected = set(['%s.restart' % n for n in range_b])
assert set(b_graph.nodes()) == b_expected
both_graph = change.send_to_orchestration(both)
assert set(both_graph.nodes()) == a_expected | b_expected
def test_childs_added_on_stage():
res_0, res_1 = [create_resource(str(n)) for n in range(2)]
res_0.connect(res_1, {'a': 'a'})
ModelMeta.save_all_lazy()
created_log_items = stage_resources(res_0.name, 'run')
assert len(created_log_items) == 1
assert created_log_items[0].resource == res_0.name
staged_log = change.staged_log()
assert len(staged_log) == 2
child_log_item = next(li for li in staged_log
if li.resource == res_1.name)
assert child_log_item.action == 'run'