[03/12] Introduce initial version of the new DbStore

This patch adds DbStore2, based on what we described in NB refactor
spec. It takes into account indexes and is model agnostic (no need to
define new methods for each new model). Once we're done porting all
models from the old code to new, DbStore2 will supercede DbStore and can
be renamed back.

Partially-Implements: blueprint refactor-nb-api
Change-Id: Ie3851a8ee3f46c55d766e3b112402bc1089fb72c
This commit is contained in:
Dima Kuznetsov 2017-01-09 10:44:42 +02:00
parent 4dce57fea6
commit 76f11e053b
4 changed files with 524 additions and 3 deletions

249
dragonflow/db/db_store2.py Normal file
View File

@ -0,0 +1,249 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from dragonflow._i18n import _LE
from dragonflow.utils import radix_tree
ANY = radix_tree.ANY
MISSING = None
class _IndexCache(object):
'''A cache for a specific index of a model.
This index class is responsible for keeping up-to-date key to object ID
mapping, and providing ability to query this mapping.
Internally a tree with sets on the leafs is used. We need a collection
because we might have more than one object per key (e.g. many ports per
topic. Set was chosen due to fast insert/delete but it forces us to
store immutable objects so we resort to storing IDs (strings).
The ID to object translation is done by model cache.
'''
def __init__(self, index):
self._index = index
self._tree = radix_tree.RadixTree(len(index))
# We save ID->key mapping for updating (object might have changed)
# and deletion (object might contain just the ID)
self._keys = {}
def delete(self, obj):
key = self._keys.pop(obj.id)
self._tree.delete(key, obj.id)
def update(self, obj):
new_key = self._get_key(obj)
old_key = self._keys.get(obj.id)
# Re-insert into cache only if key changed
if old_key == new_key:
return
if old_key is not None:
self.delete(obj)
self._keys[obj.id] = new_key
self._tree.set(new_key, obj.id)
def get_all(self, obj):
return self._tree.get_all(self._get_key(obj))
def _get_key(self, obj):
key = []
for f in self._index:
if obj.field_is_set(f):
value = getattr(obj, f)
else:
value = MISSING
key.append(value)
return tuple(key)
def _take_one(iterable):
try:
return next(iterable)
except StopIteration:
return None
class _ModelCache(object):
'''A cache for all instances of a model
This class stores all the instances (that were added to DbStore) of a
specific model, and maintains up to date indexes for the elements to allow
quick querying.
'''
def __init__(self, model):
self._objs = {}
self._indexes = {}
self._id_index = model.get_indexes()['id']
indexes = model.get_indexes()
for index in indexes.values():
if index == self._id_index:
continue
self._indexes[index] = _IndexCache(index)
def _get_by_id(self, obj_id):
return self._objs[obj_id]
def delete(self, obj):
for index in self._indexes.values():
index.delete(obj)
del self._objs[obj.id]
def update(self, obj):
for index in self._indexes.values():
index.update(obj)
self._objs[obj.id] = obj
def get_one(self, obj, index):
if index not in (None, self._id_index):
keys = self.get_keys(obj, index)
obj_id = _take_one(keys)
if obj_id is not None and _take_one(keys) is not None:
raise ValueError(_LE('More than one result available'))
else:
obj_id = obj.id
try:
return self._get_by_id(obj_id)
except KeyError:
return None
def get_keys(self, obj, index):
# No index, return all keys
if index is None:
return self._objs.keys()
elif index == ('id',):
return iter((obj.id,))
else:
return self._indexes[index].get_all(obj)
def get_all(self, obj, index):
ids = self.get_keys(obj, index)
return (self._get_by_id(id_) for id_ in ids)
class DbStore2(object):
def __init__(self):
self._cache = {}
def _get_cache(self, model):
try:
return self._cache[model]
except KeyError:
cache = _ModelCache(model)
self._cache[model] = cache
return cache
def get_one(self, obj, index=None):
"""Retrieve an object from cache by ID or by a provided index. If
several objects match the query, an ValueError is raised.
>>> db_store.get(Lport(id='id1'))
Lport(...)
>>> db_store.get(Lport(unique_key=1),
index=Lport.get_indexes()['unique_key'])
Lport(...)
"""
model = type(obj)
return self._get_cache(model).get_one(obj, index)
def get_all(self, obj, index=None):
"""Get all objects of a specific model, matching a specific index
lookup.
>>> db_store.get_all(Lport(topic='topic1'),
index=Lport.get_indexes()['topic'])
(Lport(...), Lport(...), ...)
"""
if type(obj) == type:
model = obj
obj = model()
else:
model = type(obj)
return self._get_cache(model).get_all(obj, index)
def get_keys(self, obj, index=None):
'''Returns IDs for all objects matching the query. If index is ommited,
we assume result should contain all object of the model.
>>> db_store.get_keys(Lport(topic='topic1'),
index=Lport.get_indexes()['topic'])
('id1', 'id2', 'id3', ...)
'''
if type(obj) == type:
model = obj
obj = model()
else:
model = type(obj)
return tuple(self._get_cache(model).get_keys(obj, index))
def delete(self, obj):
"""Deletes the object provided from the cache, by removing it from all
the indexes, a partial object can be provided, since we retrieve the
stored object by ID from the cache (to make sure we remove it by
correct keys)
>>> db_store.delete(Lport(id=lport_id))
"""
self._get_cache(type(obj)).delete(obj)
def update(self, obj):
"""Sets or updates an object int the cache. This will remove the older
version from all the indexes and populate them with the new object
"""
self._get_cache(type(obj)).update(obj)
def __contains__(self, elem):
return self.get_one(elem) == elem
def get_all_by_topic(self, model, topic=None):
return self.get_all(
model(topic=topic),
index=model.get_indexes()['topic'],
)
def get_keys_by_topic(self, model, topic=None):
return self.get_keys(
model(topic=topic),
index=model.get_indexes()['topic'],
)
_instance = None
_instance_lock = threading.Lock()
def get_instance():
global _instance
with _instance_lock:
if _instance is None:
_instance = DbStore2()
return _instance

View File

@ -1,6 +1,3 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -13,9 +10,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from jsonmodels import fields
import mock
from dragonflow.db import db_store
from dragonflow.db import db_store2
from dragonflow.db import model_framework
from dragonflow.tests import base as tests_base
@ -203,3 +203,122 @@ class TestDbStore(tests_base.BaseTestCase):
self.db_store.delete_chassis('chassis2')
self.assertIsNone(self.db_store.get_chassis('chassis2'))
@model_framework.construct_nb_db_model(indexes={'id': 'id', 'topic': 'topic'})
class ModelTest(model_framework.ModelBase):
id = fields.StringField()
topic = fields.StringField()
extra_field = fields.StringField()
class TestDbStore2(tests_base.BaseTestCase):
def setUp(self):
super(TestDbStore2, self).setUp()
# Skip singleton instance to have clean state for each test
self.db_store = db_store2.DbStore2()
def test_store_retrieve(self):
o1 = ModelTest(id='id1', topic='topic')
self.db_store.update(o1)
self.assertEqual(o1, self.db_store.get_one(ModelTest(id='id1')))
self.assertIn(o1, self.db_store)
self.assertIsNone(self.db_store.get_one(ModelTest(id='id2')))
def test_store_update(self):
o1 = ModelTest(id='id1', topic='topic')
self.db_store.update(o1)
o1_old = o1
o1 = ModelTest(id='id1', topic='topic', extra_field='foo')
self.db_store.update(o1)
self.assertEqual(o1, self.db_store.get_one(ModelTest(id='id1')))
self.assertIn(o1, self.db_store)
self.assertNotIn(o1_old, self.db_store)
def test_store_delete(self):
o1 = ModelTest(id='id1', topic='topic')
self.db_store.update(o1)
self.db_store.delete(o1)
self.assertNotIn(o1, self.db_store)
def test_get_all(self):
o1 = ModelTest(id='id1', topic='topic')
o2 = ModelTest(id='id2', topic='topic')
self.db_store.update(o1)
self.db_store.update(o2)
self.assertItemsEqual((o1, o2), self.db_store.get_all(ModelTest))
self.assertItemsEqual(
(o1, o2),
self.db_store.get_all(ModelTest(id=db_store2.ANY)),
)
self.assertItemsEqual(
(o1,),
self.db_store.get_all(
ModelTest(id='id1'),
index=ModelTest.get_indexes()['id'],
),
)
def test_get_all_by_topic(self):
o1 = ModelTest(id='id1', topic='topic')
o2 = ModelTest(id='id2', topic='topic1')
o3 = ModelTest(id='id3', topic='topic')
self.db_store.update(o1)
self.db_store.update(o2)
self.db_store.update(o3)
self.assertItemsEqual(
(o1, o3),
self.db_store.get_all_by_topic(ModelTest, topic='topic'),
)
self.assertItemsEqual(
(o2,),
self.db_store.get_all_by_topic(ModelTest, topic='topic1'),
)
def test_get_keys(self):
self.db_store.update(ModelTest(id='id1', topic='topic'))
self.db_store.update(ModelTest(id='id2', topic='topic'))
self.assertItemsEqual(
('id1', 'id2'),
self.db_store.get_keys(ModelTest),
)
def test_get_keys_by_topic(self):
self.db_store.update(ModelTest(id='id1', topic='topic'))
self.db_store.update(ModelTest(id='id2', topic='topic1'))
self.db_store.update(ModelTest(id='id3', topic='topic'))
self.assertItemsEqual(
('id1', 'id3'),
self.db_store.get_keys_by_topic(ModelTest, topic='topic'),
)
self.assertItemsEqual(
('id2',),
self.db_store.get_keys_by_topic(ModelTest, topic='topic1'),
)
def test_key_changed(self):
mt = ModelTest(id='id1', topic='topic')
self.db_store.update(mt)
mt.topic = 'topic2'
self.db_store.update(mt)
self.assertItemsEqual(
('id1',),
self.db_store.get_keys_by_topic(ModelTest, topic='topic2'),
)
self.assertItemsEqual(
(),
self.db_store.get_keys_by_topic(ModelTest, topic='topic'),
)

View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from dragonflow.tests import base as tests_base
from dragonflow.utils import radix_tree
ANY = radix_tree.ANY
class TestRadixTree(tests_base.BaseTestCase):
def test_create(self):
for i in range(10):
radix_tree.RadixTree(i)
def test_store(self):
rt = radix_tree.RadixTree(2)
rt.set((1, 2), object())
def test_retrieve_full_index(self):
rt = radix_tree.RadixTree(2)
rt.set((1, 2), 'a')
rt.set((1, 2), 'b')
self.assertItemsEqual({'a', 'b'}, rt.get_all((1, 2)))
rt.set((2, 2), 'c')
self.assertItemsEqual({'a', 'b'}, rt.get_all((1, 2)))
def test_retrieve_full_index_with_none(self):
rt = radix_tree.RadixTree(2)
rt.set((None, 2), 'a')
rt.set((None, 1), 'b')
self.assertItemsEqual({'a'}, rt.get_all((None, 2)))
def test_retrieve_partial_index(self):
rt = radix_tree.RadixTree(2)
rt.set((1, 2), 'a')
rt.set((1, 3), 'b')
rt.set((2, 3), 'c')
self.assertItemsEqual({'a', 'b'}, rt.get_all((1, ANY)))
def test_retrieve_partial_index2(self):
rt = radix_tree.RadixTree(4)
rt.set((1, 1, 1, 2), 'a')
rt.set((1, 2, 3, 2), 'b')
rt.set((1, 2, None, 2), 'c')
rt.set((2, 2, None, 2), 'd')
self.assertItemsEqual({'a', 'b', 'c'},
rt.get_all((1, ANY, ANY, 2)))
def test_delete(self):
rt = radix_tree.RadixTree(2)
rt.set((1, 2), 'a')
rt.set((1, 2), 'b')
rt.delete((1, 2), 'a')
self.assertItemsEqual({'b'}, rt.get_all((1, 2)))

View File

@ -0,0 +1,90 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import itertools
ANY = None
class RadixTree(object):
'''A constant depth radix tree written (originally) for indexing in DbStore
This implementation stores several (or none) items for each path
'''
def __init__(self, depth):
self._depth = depth
tree_type = set
for _ in range(depth):
tree_type = functools.partial(collections.defaultdict, tree_type)
self._root = tree_type()
def set(self, path, value):
'''Stores value at given path
'''
self._traverse_to_leaf_set(path).add(value)
def delete(self, path, value):
'''Deletes a value from a given path'''
traces = []
node = self._root
# Clean the dicts if path now empty
for key in path:
traces.append((key, node))
node = node[key]
node.discard(value)
for key, node in reversed(traces):
if key in node:
break
del node[key]
def _traverse_to_leaf_set(self, path):
node = self._root
for item in path:
node = node[item]
return node
def get_all(self, path):
'''Get all items matching the path provided, if None is present in the
path, it is treated as a wildcard
>>> t.get_all((None, None,))
(<all values>)
>>> t.get_all((1, 2))
(<All values at path 1, 2>)
'''
nodes = [self._root]
for segment in path:
next_nodes = []
for node in nodes:
if segment is ANY:
next_nodes.extend(node.values())
else:
if segment in node:
next_nodes.append(node[segment])
nodes = next_nodes
return itertools.chain(*nodes)