969 lines
28 KiB
Python
969 lines
28 KiB
Python
# Copyright 2015 Mirantis, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from functools import total_ordering
|
|
from functools import wraps
|
|
|
|
import time
|
|
import uuid
|
|
import weakref
|
|
|
|
from collections import defaultdict
|
|
from random import getrandbits
|
|
import six
|
|
from threading import RLock
|
|
|
|
from solar.dblayer.conflict_resolution import dblayer_conflict_resolver
|
|
from solar.dblayer.lfu_cache import LFUCache
|
|
from solar.utils import get_local
|
|
|
|
|
|
class DBLayerException(Exception):
|
|
pass
|
|
|
|
|
|
class DBLayerNotFound(DBLayerException):
|
|
def __init__(self, spec):
|
|
self.message = 'Object {} not found in db layer'.format(spec)
|
|
|
|
def __str__(self):
|
|
return str(self.message)
|
|
|
|
|
|
class DBLayerNoRiakObj(DBLayerException):
|
|
pass
|
|
|
|
|
|
class NONE(object):
|
|
"""A None like type"""
|
|
|
|
|
|
class SingleIndexCache(object):
|
|
def __init__(self):
|
|
self.lock = RLock()
|
|
self.cached_vals = []
|
|
|
|
def __enter__(self):
|
|
self.lock.acquire()
|
|
return self
|
|
|
|
def fill(self, values):
|
|
self.cached_vals = values
|
|
|
|
def wipe(self):
|
|
self.cached_vals = []
|
|
|
|
def get_index(self, real_funct, ind_name, **kwargs):
|
|
kwargs.setdefault('max_results', 999999)
|
|
if not self.cached_vals:
|
|
recvs = real_funct(ind_name, **kwargs).results
|
|
self.fill(recvs)
|
|
|
|
def filter(self, startkey, endkey, max_results=1):
|
|
c = self.cached_vals
|
|
for (curr_val, obj_key) in c:
|
|
if max_results == 0:
|
|
break
|
|
if curr_val >= startkey:
|
|
if curr_val <= endkey:
|
|
max_results -= 1
|
|
yield (curr_val, obj_key)
|
|
else:
|
|
break
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
self.lock.release()
|
|
|
|
|
|
class SingleClassCache(object):
|
|
|
|
__slots__ = ['obj_cache', 'db_ch_state',
|
|
'lazy_save', 'origin_class',
|
|
'refs']
|
|
|
|
def __init__(self, origin_class):
|
|
self.obj_cache = LFUCache(origin_class, 200)
|
|
self.db_ch_state = {'index': set()}
|
|
self.lazy_save = set()
|
|
self.refs = defaultdict(weakref.WeakValueDictionary)
|
|
self.origin_class = origin_class
|
|
|
|
|
|
class ClassCache(object):
|
|
def __init__(self, *args, **kwargs):
|
|
self._l = RLock()
|
|
|
|
def __get__(self, inst, owner):
|
|
# th = current_thread()
|
|
with self._l:
|
|
l = Model._local
|
|
# better don't duplicate class names
|
|
cache_name = owner.__name__
|
|
try:
|
|
cache_id = l.cache_id
|
|
except AttributeError:
|
|
cache_id = uuid.UUID(int=getrandbits(128), version=4).hex
|
|
setattr(l, 'cache_id', cache_id)
|
|
if getattr(l, 'cache_id_cmp', None) != cache_id:
|
|
# new cache
|
|
setattr(l, 'cache_id_cmp', cache_id)
|
|
c = SingleClassCache(owner)
|
|
setattr(l, '_model_caches', {})
|
|
l._model_caches[cache_name] = c
|
|
try:
|
|
# already had this owner in cache
|
|
return l._model_caches[cache_name]
|
|
except KeyError:
|
|
# old cache but first time this owner
|
|
c = SingleClassCache(owner)
|
|
l._model_caches[cache_name] = c
|
|
return c
|
|
|
|
|
|
def clear_cache():
|
|
# th = current_thread()
|
|
l = Model._local
|
|
cache_id = uuid.UUID(int=getrandbits(128), version=4).hex
|
|
setattr(l, 'cache_id_cmp', cache_id)
|
|
|
|
|
|
def get_bucket(_, owner, mcs):
|
|
name = owner.get_bucket_name()
|
|
if owner.bucket_type:
|
|
bucket_type = mcs.riak_client.bucket_type(owner.bucket_type)
|
|
else:
|
|
# using default bucket type
|
|
bucket_type = mcs.riak_client
|
|
bucket = bucket_type.bucket(name)
|
|
if owner.bucket_properties:
|
|
bucket.set_properties(owner.bucket_properties)
|
|
if getattr(owner, 'conflict_resolver', None):
|
|
bucket.resolver = owner.conflict_resolver
|
|
else:
|
|
bucket.resolver = dblayer_conflict_resolver
|
|
return bucket
|
|
|
|
|
|
def changes_state_for(_type):
|
|
def _inner1(f):
|
|
@wraps(f)
|
|
def _inner2(obj, *args, **kwargs):
|
|
obj._c.db_ch_state['index'].add(obj.key)
|
|
obj.save_lazy()
|
|
return f(obj, *args, **kwargs)
|
|
|
|
return _inner2
|
|
|
|
return _inner1
|
|
|
|
|
|
def clears_state_for(_type):
|
|
def _inner1(f):
|
|
@wraps(f)
|
|
def _inner2(obj, *args, **kwargs):
|
|
try:
|
|
obj._c.db_ch_state[_type].remove(obj.key)
|
|
except KeyError:
|
|
pass
|
|
return f(obj, *args, **kwargs)
|
|
|
|
return _inner2
|
|
|
|
return _inner1
|
|
|
|
|
|
def requires_clean_state(_type):
|
|
def _inner1(f):
|
|
@wraps(f)
|
|
def _inner2(obj, *args, **kwargs):
|
|
check_state_for(_type, obj)
|
|
return f(obj, *args, **kwargs)
|
|
|
|
return _inner2
|
|
|
|
return _inner1
|
|
|
|
|
|
def check_state_for(_type, obj):
|
|
with obj._lock:
|
|
state = obj._c.db_ch_state.get(_type)
|
|
if state:
|
|
if True:
|
|
# TODO: solve it
|
|
obj.save_all_lazy()
|
|
state = obj._c.db_ch_state.get(_type)
|
|
if not state:
|
|
return
|
|
raise Exception("Dirty state, save all %r objects first" %
|
|
obj.__class__)
|
|
|
|
|
|
@total_ordering
|
|
class StrInt(object):
|
|
|
|
precision = 3
|
|
positive_char = 'p'
|
|
negative_char = 'n'
|
|
format_size = 10 + precision
|
|
|
|
def __init__(self, val=None):
|
|
self._val = self._make_val(val)
|
|
|
|
def __str__(self):
|
|
return self._val.__str__()
|
|
|
|
def __repr__(self):
|
|
return "<%s: %r>" % (self.__class__.__name__, self._val)
|
|
|
|
@classmethod
|
|
def p_max(cls):
|
|
return cls(int('9' * cls.format_size))
|
|
|
|
@classmethod
|
|
def p_min(cls):
|
|
return cls(1)
|
|
|
|
@classmethod
|
|
def n_max(cls):
|
|
return -cls.p_max()
|
|
|
|
@classmethod
|
|
def n_min(cls):
|
|
return -cls.p_min()
|
|
|
|
def __neg__(self):
|
|
time_ = self.int_val()
|
|
ret = self.__class__(-time_)
|
|
return ret
|
|
|
|
@classmethod
|
|
def greater(cls, inst):
|
|
if isinstance(inst, cls):
|
|
return cls(inst._val + 'g')
|
|
return cls(inst + 'g')
|
|
|
|
@classmethod
|
|
def to_hex(cls, value):
|
|
char = cls.positive_char
|
|
if value < 0:
|
|
value = int('9' * cls.format_size) + value
|
|
char = cls.negative_char
|
|
f = '%s%%.%dx' % (char, cls.format_size - 2)
|
|
value = f % value
|
|
if value[-1] == 'L':
|
|
value = value[:-1]
|
|
return value
|
|
|
|
@classmethod
|
|
def from_hex(cls, value):
|
|
v = int(value[1:], 16)
|
|
if value[0] == cls.negative_char:
|
|
v -= int('9' * cls.format_size)
|
|
return v
|
|
|
|
def int_val(self):
|
|
return self.from_hex(self._val)
|
|
|
|
@classmethod
|
|
def from_simple(cls, value):
|
|
return cls(value)
|
|
|
|
@classmethod
|
|
def to_simple(cls, value):
|
|
return value._val
|
|
|
|
@classmethod
|
|
def _make_val(cls, val):
|
|
if val is None:
|
|
val = time.time()
|
|
if isinstance(val, (long, int, float)):
|
|
if isinstance(val, float):
|
|
val = int(val * (10 ** cls.precision))
|
|
val = cls.to_hex(val)
|
|
elif isinstance(val, cls):
|
|
val = val._val
|
|
return val
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, basestring):
|
|
first_ch = other[0]
|
|
if first_ch not in (self.positive_char, self.negative_char):
|
|
raise Exception("Cannot compare %r with %r" % (self, other))
|
|
else:
|
|
other = self.from_simple(other)
|
|
if not isinstance(other, self.__class__):
|
|
raise Exception("Cannot compare %r with %r" % (self, other))
|
|
so = other._val[0]
|
|
ss = self._val[0]
|
|
son = so == other.negative_char
|
|
ssn = ss == self.negative_char
|
|
if son != ssn:
|
|
return False
|
|
return self._val[1:] == other._val[1:]
|
|
|
|
def __gt__(self, other):
|
|
if isinstance(other, basestring):
|
|
first_ch = other[0]
|
|
if first_ch not in (self.positive_char, self.negative_char):
|
|
raise Exception("Cannot compare %r with %r" % (self, other))
|
|
else:
|
|
other = self.from_simple(other)
|
|
if not isinstance(other, self.__class__):
|
|
raise Exception("Cannot compare %r with %r" % (self, other))
|
|
so = other._val[0]
|
|
ss = self._val[0]
|
|
if ss == self.positive_char and so == other.negative_char:
|
|
return -1
|
|
elif ss == self.negative_char and so == other.positive_char:
|
|
return 1
|
|
else:
|
|
return other._val[1:] < self._val[1:]
|
|
|
|
|
|
class Replacer(object):
|
|
def __init__(self, name, fget, *args):
|
|
self.name = name
|
|
self.fget = fget
|
|
self.args = args
|
|
|
|
def __get__(self, instance, owner):
|
|
val = self.fget(instance, owner, *self.args)
|
|
if instance is not None:
|
|
setattr(instance, self.name, val)
|
|
else:
|
|
setattr(owner, self.name, val)
|
|
return val
|
|
|
|
|
|
class FieldBase(object):
|
|
def __init__(self, fname, default):
|
|
self._fname = fname
|
|
self._default = default
|
|
|
|
@property
|
|
def fname(self):
|
|
return self._fname
|
|
|
|
@fname.setter
|
|
def fname(self, value):
|
|
if self._fname is None:
|
|
self._fname = value
|
|
else:
|
|
raise Exception("fname already set")
|
|
|
|
@property
|
|
def default(self):
|
|
if self._default is NONE:
|
|
return self._default
|
|
if callable(self._default):
|
|
return self._default()
|
|
return self._default
|
|
|
|
|
|
class Field(FieldBase):
|
|
|
|
# in from_dict, when you set value to None,
|
|
# then types that are *not* there are set to NONE
|
|
_not_nullable_types = {int, float, long, str, unicode, basestring}
|
|
_simple_types = {int, float, long, str, unicode, basestring, list, tuple,
|
|
dict}
|
|
|
|
def __init__(self, _type, fname=None, default=NONE):
|
|
if _type == str:
|
|
_type = basestring
|
|
self._type = _type
|
|
super(Field, self).__init__(fname=fname, default=default)
|
|
|
|
def __get__(self, instance, owner):
|
|
if instance is None:
|
|
return self
|
|
val = instance._data_container[self.fname]
|
|
if self._type in self._simple_types:
|
|
return val
|
|
else:
|
|
return self._type.from_simple(val)
|
|
|
|
def __set__(self, instance, value):
|
|
if not isinstance(value, self._type):
|
|
raise Exception("Invalid type %r for %r, expected %r" %
|
|
(type(value), self.fname, self._type))
|
|
if self._type not in self._simple_types:
|
|
value = self._type.to_simple(value)
|
|
instance._field_changed(self)
|
|
instance._data_container[self.fname] = value
|
|
return value
|
|
|
|
def __str__(self):
|
|
return "<%s:%r>" % (self.__class__.__name__, self.fname)
|
|
|
|
__repr__ = __str__
|
|
|
|
|
|
class IndexedField(Field):
|
|
def __set__(self, instance, value):
|
|
value = super(IndexedField, self).__set__(instance, value)
|
|
instance._set_index('{}_bin'.format(self.fname), value)
|
|
return value
|
|
|
|
def _filter(self, startkey, endkey=None, **kwargs):
|
|
if isinstance(startkey,
|
|
self._type) and self._type not in self._simple_types:
|
|
startkey = self._type.to_simple(startkey)
|
|
if isinstance(endkey,
|
|
self._type) and self._type not in self._simple_types:
|
|
endkey = self._type.to_simple(endkey)
|
|
kwargs.setdefault('max_results', 1000000)
|
|
res = self._declared_in._get_index('{}_bin'.format(self.fname),
|
|
startkey=startkey,
|
|
endkey=endkey,
|
|
**kwargs).results
|
|
return res
|
|
|
|
def filter(self, *args, **kwargs):
|
|
kwargs['return_terms'] = False
|
|
res = self._filter(*args, **kwargs)
|
|
return res
|
|
|
|
|
|
class IndexFieldWrp(object):
|
|
def __init__(self, field_obj, instance):
|
|
self._field_obj = field_obj
|
|
self._instance = instance
|
|
self._c = self._instance._c
|
|
|
|
@property
|
|
def fname(self):
|
|
return self._field_obj.fname
|
|
|
|
def __str__(self):
|
|
return "<%s for field %s>" % (self.__class__.__name__, self._field_obj)
|
|
|
|
def _get_field_val(self, name):
|
|
return self._instance._data_container[self.fname][name]
|
|
|
|
def __getitem__(self, name):
|
|
return self._get_field_val(name)
|
|
|
|
def __setitem__(self, name, value):
|
|
inst = self._instance
|
|
inst._add_index('%s_bin' % self.fname, '{}|{}'.format(name, value))
|
|
|
|
def __delitem__(self, name):
|
|
inst = self._instance
|
|
del inst._data_container[self.fname][name]
|
|
indexes = inst._riak_object.indexes
|
|
|
|
# TODO: move this to backend layer
|
|
for ind_name, ind_value in indexes:
|
|
if ind_name == ('%s_bin' % self.fname):
|
|
if ind_value.startswith('{}|'.format(name)):
|
|
inst._remove_index(ind_name, ind_value)
|
|
break
|
|
|
|
|
|
class IndexField(FieldBase):
|
|
|
|
_wrp_class = IndexFieldWrp
|
|
|
|
def __init__(self, fname=None, default=NONE):
|
|
super(IndexField, self).__init__(fname, default)
|
|
|
|
def _on_no_inst(self, instance, owner):
|
|
return self
|
|
|
|
def __get__(self, instance, owner):
|
|
if instance is None:
|
|
return self._on_no_inst(instance, owner)
|
|
cached = getattr(instance, '_real_obj_%s' % self.fname, None)
|
|
if cached:
|
|
return cached
|
|
obj = self._wrp_class(self, instance)
|
|
setattr(instance, '_real_obj_%s' % self.fname, obj)
|
|
return obj
|
|
|
|
def __set__(self, instance, value):
|
|
wrp = getattr(instance, self.fname)
|
|
instance._data_container[self.fname] = self.default
|
|
for f_name, f_value in value.iteritems():
|
|
wrp[f_name] = f_value
|
|
|
|
def _parse_key(self, k):
|
|
if '=' in k:
|
|
val, subval = k.split('=', 1)
|
|
if subval is None:
|
|
subval = ''
|
|
if not isinstance(subval, basestring):
|
|
subval = str(subval)
|
|
return '{}|{}'.format(val, subval)
|
|
|
|
def filter(self, startkey, endkey=None, **kwargs):
|
|
startkey = self._parse_key(startkey)
|
|
if endkey is None:
|
|
if startkey.endswith('*'):
|
|
startkey = startkey[:-1]
|
|
endkey = startkey + '~'
|
|
else:
|
|
endkey = startkey + ' '
|
|
kwargs.setdefault('max_results', 1000000)
|
|
kwargs['return_terms'] = False
|
|
res = self._declared_in._get_index('{}_bin'.format(self.fname),
|
|
startkey=startkey,
|
|
endkey=endkey,
|
|
**kwargs).results
|
|
return list(res)
|
|
|
|
|
|
class CompositeIndexFieldWrp(IndexFieldWrp):
|
|
def reset(self):
|
|
index = []
|
|
for f in self._field_obj.fields:
|
|
index.append(self._instance._data_container.get(f, ''))
|
|
index = '|'.join(index)
|
|
|
|
index_to_del = []
|
|
for index_name, index_val in self._instance._riak_object.indexes:
|
|
if index_name == '%s_bin' % self.fname:
|
|
if index != index_val:
|
|
index_to_del.append((index_name, index_val))
|
|
|
|
for index_name, index_val in index_to_del:
|
|
self._instance._remove_index(index_name, index_val)
|
|
|
|
self._instance._add_index('%s_bin' % self.fname, index)
|
|
|
|
|
|
class CompositeIndexField(IndexField):
|
|
|
|
_wrp_class = CompositeIndexFieldWrp
|
|
|
|
def __init__(self, fields=(), *args, **kwargs):
|
|
super(CompositeIndexField, self).__init__(*args, **kwargs)
|
|
self.fields = fields
|
|
|
|
def _parse_key(self, startkey):
|
|
vals = [startkey[f] for f in self.fields if f in startkey]
|
|
return '|'.join(vals) + '*'
|
|
|
|
|
|
class ModelMeta(type):
|
|
|
|
_defined_models = set()
|
|
|
|
def __new__(mcs, name, bases, attrs):
|
|
cls = super(ModelMeta, mcs).__new__(mcs, name, bases, attrs)
|
|
model_fields = set((name
|
|
for (name, attr) in attrs.items()
|
|
if isinstance(attr, FieldBase) and
|
|
not name.startswith('_')))
|
|
for f in model_fields:
|
|
field = getattr(cls, f)
|
|
if hasattr(field, 'fname') and field.fname is None:
|
|
setattr(field, 'fname', f)
|
|
setattr(field, 'gname', f)
|
|
# need to set declared_in because `with_tag`
|
|
# no need to wrap descriptor with another object then
|
|
setattr(field, '_declared_in', cls)
|
|
|
|
for base in bases:
|
|
try:
|
|
model_fields_base = base._model_fields
|
|
except AttributeError:
|
|
continue
|
|
else:
|
|
for given in model_fields_base:
|
|
model_fields.add(given)
|
|
|
|
cls._model_fields = [getattr(cls, x) for x in model_fields]
|
|
|
|
if bases == (object, ):
|
|
return cls
|
|
|
|
if issubclass(cls, NestedModel):
|
|
return cls
|
|
|
|
cls.bucket = Replacer('bucket', get_bucket, mcs)
|
|
mcs._defined_models.add(cls)
|
|
return cls
|
|
|
|
@classmethod
|
|
def setup(mcs, riak_client):
|
|
if hasattr(mcs, 'riak_client'):
|
|
raise DBLayerException("Setup already done")
|
|
mcs.riak_client = riak_client
|
|
|
|
@classmethod
|
|
def remove_all(mcs):
|
|
for model in mcs._defined_models:
|
|
model.delete_all()
|
|
|
|
@classmethod
|
|
def save_all_lazy(mcs, result=True):
|
|
for cls in mcs._defined_models:
|
|
for to_save in cls._c.lazy_save:
|
|
try:
|
|
to_save.save()
|
|
except DBLayerException:
|
|
continue
|
|
cls._c.lazy_save.clear()
|
|
|
|
@classmethod
|
|
def find_non_empty_lazy_saved(mcs):
|
|
for cls in mcs._defined_models:
|
|
if cls._c.lazy_save:
|
|
return cls._c.lazy_save
|
|
return None
|
|
|
|
@classmethod
|
|
def session_end(mcs, result=True):
|
|
mcs.save_all_lazy()
|
|
mcs.riak_client.session_end(result)
|
|
|
|
@classmethod
|
|
def session_start(mcs):
|
|
mcs.riak_client.session_start()
|
|
|
|
|
|
class NestedField(FieldBase):
|
|
def __init__(self, _class, fname=None, default=NONE, hash_key=None):
|
|
self._class = _class
|
|
self._hash_key = hash_key
|
|
super(NestedField, self).__init__(fname=fname, default=default)
|
|
|
|
def __get__(self, instance, owner):
|
|
if instance is None:
|
|
return self
|
|
cached = getattr(instance, '_real_obj_%s' % self.fname, None)
|
|
if cached:
|
|
return cached
|
|
if self._hash_key is not None:
|
|
obj = NestedModelHash(self, instance, self._class, self._hash_key)
|
|
else:
|
|
obj = self._class(self, instance)
|
|
setattr(instance, '_real_obj_%s' % self.fname, obj)
|
|
return obj
|
|
|
|
def __set__(self, instance, value):
|
|
obj = getattr(instance, self.fname)
|
|
obj.from_dict(value)
|
|
|
|
def __delete__(self, instance):
|
|
obj = getattr(instance, self.fname)
|
|
obj.delete()
|
|
|
|
|
|
@six.add_metaclass(ModelMeta)
|
|
class NestedModel(object):
|
|
|
|
_nested_value = None
|
|
|
|
def __init__(self, field, parent):
|
|
self._field = field
|
|
self._parent = parent
|
|
|
|
def from_dict(self, data):
|
|
for field in self._model_fields:
|
|
fname = field.fname
|
|
gname = field.gname
|
|
val = data.get(fname, NONE)
|
|
default = field.default
|
|
if val is NONE and default is not NONE:
|
|
setattr(self, gname, default)
|
|
elif val is not NONE:
|
|
setattr(self, gname, val)
|
|
return
|
|
|
|
@property
|
|
def _data_container(self):
|
|
pdc = self._parent._data_container
|
|
try:
|
|
ppdc = pdc[self._field.fname]
|
|
except KeyError:
|
|
ppdc = pdc[self._field.fname] = {}
|
|
if self._field._hash_key is None:
|
|
return ppdc
|
|
else:
|
|
try:
|
|
ret = ppdc[self._nested_value]
|
|
except KeyError:
|
|
ret = ppdc[self._nested_value] = {}
|
|
return ret
|
|
|
|
def _field_changed(self, field):
|
|
return self._parent._modified_fields.add(self._field.fname)
|
|
|
|
def delete(self):
|
|
if self._field._hash_key is None:
|
|
del self._parent._data_container[self._field.fname]
|
|
|
|
|
|
class NestedModelHash(object):
|
|
def __init__(self, field, parent, _class, hash_key):
|
|
self._field = field
|
|
self._parent = parent
|
|
self._class = _class
|
|
self._hash_key = hash_key
|
|
self._cache = {}
|
|
|
|
def __getitem__(self, name):
|
|
try:
|
|
return self._cache[name]
|
|
except KeyError:
|
|
obj = self._class(self._field, self._parent)
|
|
obj._nested_value = name
|
|
self._cache[name] = obj
|
|
return obj
|
|
|
|
def __setitem__(self, name, value):
|
|
obj = self[name]
|
|
return obj.from_dict(value)
|
|
|
|
def __delitem__(self, name):
|
|
obj = self[name]
|
|
obj.delete()
|
|
del self._cache[name]
|
|
|
|
def from_dict(self, data):
|
|
hk = data[self._hash_key]
|
|
self[hk] = data
|
|
|
|
|
|
@six.add_metaclass(ModelMeta)
|
|
class Model(object):
|
|
|
|
|
|
_c = ClassCache()
|
|
|
|
_key = None
|
|
_new = None
|
|
_real_riak_object = None
|
|
|
|
_changed = False
|
|
|
|
_local = get_local()()
|
|
_lock = RLock() # for class objs
|
|
|
|
bucket_properties = {}
|
|
bucket_type = None
|
|
|
|
def __init__(self, key=None):
|
|
self._modified_fields = set()
|
|
# TODO: that _indexes_changed should be smarter
|
|
self._indexes_changed = False
|
|
self.key = key
|
|
self._lock = RLock()
|
|
|
|
@property
|
|
def key(self):
|
|
return self._key
|
|
|
|
@key.setter
|
|
def key(self, value):
|
|
if self._key is None:
|
|
self._key = value
|
|
else:
|
|
raise Exception("Can't set key again")
|
|
|
|
@property
|
|
def _riak_object(self):
|
|
if self._real_riak_object is None:
|
|
raise DBLayerNoRiakObj("You cannot access _riak_object now")
|
|
return self._real_riak_object
|
|
|
|
@_riak_object.setter
|
|
def _riak_object(self, value):
|
|
if self._real_riak_object is not None:
|
|
raise DBLayerException("Already have _riak_object")
|
|
self._real_riak_object = value
|
|
|
|
@property
|
|
def _data_container(self):
|
|
return self._riak_object.data
|
|
|
|
@changes_state_for('index')
|
|
def _set_index(self, name, value):
|
|
self._indexes_changed = True
|
|
return self._riak_object.set_index(name, value)
|
|
|
|
@changes_state_for('index')
|
|
def _add_index(self, *args, **kwargs):
|
|
self._indexes_changed = True
|
|
return self._riak_object.add_index(*args, **kwargs)
|
|
|
|
@changes_state_for('index')
|
|
def _remove_index(self, *args, **kwargs):
|
|
self._indexes_changed = True
|
|
return self._riak_object.remove_index(*args, **kwargs)
|
|
|
|
@classmethod
|
|
def _get_index(cls, *args, **kwargs):
|
|
return cls.bucket.get_index(*args, **kwargs)
|
|
|
|
@property
|
|
def _bucket(self):
|
|
return self._riak_object.bucket
|
|
|
|
@classmethod
|
|
def get_bucket_name(cls):
|
|
# XXX: should be changed and more smart
|
|
return cls.__name__
|
|
|
|
def _field_changed(self, field):
|
|
self._modified_fields.add(field.fname)
|
|
|
|
def changed(self):
|
|
if self._modified_fields:
|
|
return True
|
|
return self._indexes_changed
|
|
|
|
def to_dict(self):
|
|
d = dict(self._riak_object.data)
|
|
d['key'] = self.key
|
|
return d
|
|
|
|
def __str__(self):
|
|
if self._riak_object is None:
|
|
return "<%s not initialized>" % (self.__class__.__name__)
|
|
return "<%s %s:%s>" % (self.__class__.__name__,
|
|
self._riak_object.bucket.name, self.key)
|
|
|
|
def __hash__(self):
|
|
return hash(self.key)
|
|
|
|
@classmethod
|
|
def new(cls, key, data):
|
|
return cls.from_dict(key, data)
|
|
|
|
@classmethod
|
|
def get_or_create(cls, key):
|
|
try:
|
|
return cls.get(key)
|
|
except DBLayerNotFound:
|
|
return cls.new(key, {})
|
|
|
|
@classmethod
|
|
def from_riakobj(cls, riak_obj):
|
|
obj = cls(riak_obj.key)
|
|
obj._riak_object = riak_obj
|
|
if obj._new is None:
|
|
obj._new = False
|
|
cache = cls._c.obj_cache
|
|
cache.set(riak_obj.key, obj)
|
|
# cache may adjust object
|
|
return cache.get(riak_obj.key)
|
|
|
|
@classmethod
|
|
def _pre_from_dict_check(cls, key, data=None):
|
|
if isinstance(key, dict) and data is None:
|
|
data = key
|
|
try:
|
|
key = data['key']
|
|
except KeyError:
|
|
raise DBLayerException("No key specified")
|
|
if key and 'key' in data and data['key'] != key:
|
|
raise DBLayerException("Different key values detected")
|
|
# shouldn't be needed, but may cover some weird usecase
|
|
# when inproperly using from_dict, because it then leads to conflicts
|
|
if key in cls._c.obj_cache:
|
|
raise DBLayerException("Object already exists in cache"
|
|
" cannot create second")
|
|
return key, data
|
|
|
|
@classmethod
|
|
def from_dict(cls, key, data=None):
|
|
key, data = cls._pre_from_dict_check(key, data)
|
|
data['key'] = key
|
|
|
|
with cls._c.obj_cache._lock:
|
|
if key in cls._c.obj_cache:
|
|
return cls._c.obj_cache.get(key)
|
|
riak_obj = cls.bucket.new(key, data={})
|
|
obj = cls.from_riakobj(riak_obj)
|
|
obj._new = True
|
|
|
|
for field in cls._model_fields:
|
|
# if field is cls._pkey_field:
|
|
# continue # pkey already set
|
|
fname = field.fname
|
|
gname = field.gname
|
|
val = data.get(fname, NONE)
|
|
default = field.default
|
|
if val is None and field._type not in field._not_nullable_types:
|
|
val = NONE
|
|
if val is NONE and default is not NONE:
|
|
setattr(obj, gname, default)
|
|
elif val is not NONE:
|
|
setattr(obj, gname, val)
|
|
return obj
|
|
|
|
@classmethod
|
|
def get(cls, key):
|
|
try:
|
|
return cls._c.obj_cache.get(key)
|
|
except KeyError:
|
|
riak_object = cls.bucket.get(key)
|
|
if not riak_object.exists:
|
|
raise DBLayerNotFound(key)
|
|
else:
|
|
return cls.from_riakobj(riak_object)
|
|
|
|
@classmethod
|
|
def multi_get(cls, keys):
|
|
# TODO: parallel execution
|
|
ret = map(cls.get, keys)
|
|
return ret
|
|
|
|
def _reset_state(self):
|
|
self._new = False
|
|
self._modified_fields.clear()
|
|
self._indexes_changed = False
|
|
|
|
@classmethod
|
|
def save_all_lazy(cls):
|
|
for to_save in set(cls._c.lazy_save):
|
|
try:
|
|
to_save.save()
|
|
except DBLayerException:
|
|
continue
|
|
cls._c.lazy_save.clear()
|
|
|
|
@clears_state_for('index')
|
|
def save(self, force=False, **kwargs):
|
|
with self._lock:
|
|
if self.changed() or force or self._new:
|
|
res = self._riak_object.store(**kwargs)
|
|
self._reset_state()
|
|
return res
|
|
else:
|
|
raise DBLayerException("No changes")
|
|
|
|
def save_lazy(self):
|
|
self._c.lazy_save.add(self)
|
|
|
|
@classmethod
|
|
def delete_all(cls):
|
|
cls.riak_client.delete_all(cls)
|
|
|
|
def delete(self):
|
|
ls = self._c.lazy_save
|
|
try:
|
|
ls.remove(self)
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
del self._c.obj_cache[self.key]
|
|
except KeyError:
|
|
pass
|
|
self._riak_object.delete()
|
|
return self
|