Merge "Allow unique keys to be used with get_object"

This commit is contained in:
Jenkins 2016-07-12 16:31:01 +00:00 committed by Gerrit Code Review
commit ed58510023
4 changed files with 178 additions and 9 deletions

View File

@ -177,3 +177,13 @@ class HasStandardAttributes(object):
# for all other modifications or when relevant children are being
# modified (e.g. fixed_ips change should bump port revision)
self.standard_attr.revision_number += 1
def get_unique_keys(model):
try:
constraints = model.__table__.constraints
except AttributeError:
constraints = []
return [[c.name for c in constraint.columns]
for constraint in constraints
if isinstance(constraint, sa.UniqueConstraint)]

View File

@ -212,6 +212,21 @@ class DeclarativeObject(abc.ABCMeta):
cls.fields_no_update += base.primary_keys
# avoid duplicate entries
cls.fields_no_update = list(set(cls.fields_no_update))
# generate unique_keys from the model
model = getattr(cls, 'db_model', None)
if model and not getattr(cls, 'unique_keys', None):
cls.unique_keys = []
obj_field_names = set(cls.fields.keys())
model_to_obj_translation = {
v: k for (k, v) in cls.fields_need_translation.items()}
for model_unique_key in model_base.get_unique_keys(model):
obj_unique_key = [model_to_obj_translation.get(key, key)
for key in model_unique_key]
if obj_field_names.issuperset(obj_unique_key):
cls.unique_keys.append(obj_unique_key)
if (hasattr(cls, 'has_standard_attributes') and
cls.has_standard_attributes()):
standardattributes.add_standard_attributes(cls)
@ -227,6 +242,11 @@ class NeutronDbObject(NeutronObject):
primary_keys = ['id']
# 'unique_keys' is a list of unique keys that can be used with get_object
# instead of 'primary_keys' (e.g. [['key1'], ['key2a', 'key2b']]).
# By default 'unique_keys' will be inherited from the 'db_model'
unique_keys = []
# this is a dict to store the association between the foreign key and the
# corresponding key in the main table, e.g. port extension have 'port_id'
# as foreign key, that is associated with the key 'id' of the table Port,
@ -327,8 +347,10 @@ class NeutronDbObject(NeutronObject):
:param kwargs: multiple keys defined by key=value pairs
:return: single object of NeutronDbObject class
"""
missing_keys = set(cls.primary_keys).difference(kwargs.keys())
if missing_keys:
lookup_keys = set(kwargs.keys())
all_keys = itertools.chain([cls.primary_keys], cls.unique_keys)
if not any(lookup_keys.issuperset(keys) for keys in all_keys):
missing_keys = set(cls.primary_keys).difference(lookup_keys)
raise NeutronPrimaryKeyMissing(object_class=cls.__class__,
missing_keys=missing_keys)

View File

@ -0,0 +1,50 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import sqlalchemy as sa
from neutron.db import model_base
from neutron.tests import base as test_base
class GetUniqueKeysTestCase(test_base.BaseTestCase):
def test_with_unique_constraints(self):
model = mock.Mock()
metadata = sa.MetaData()
model.__table__ = sa.Table(
"test_table", metadata,
sa.Column("a", sa.Integer, unique=True),
sa.Column("b", sa.Integer),
sa.Column("c", sa.Integer),
sa.Column("d", sa.Integer),
sa.UniqueConstraint("c", "d"))
expected = {("a",), ("c", "d")}
observed = {tuple(sorted(key)) for key in
model_base.get_unique_keys(model)}
self.assertEqual(expected, observed)
def test_without_unique_constraints(self):
model = mock.Mock()
metadata = sa.MetaData()
model.__table__ = sa.Table(
"test_table", metadata,
sa.Column("a", sa.Integer),
sa.Column("b", sa.Integer))
self.assertEqual([], model_base.get_unique_keys(model))
def test_not_a_model(self):
self.assertEqual([], model_base.get_unique_keys(None))

View File

@ -29,6 +29,7 @@ from neutron.common import constants
from neutron.common import utils as common_utils
from neutron import context
from neutron.db import db_base_plugin_v2
from neutron.db import model_base
from neutron.db import models_v2
from neutron.objects import base
from neutron.objects import common_types
@ -148,6 +149,28 @@ class FakeNeutronObjectCompositePrimaryKey(base.NeutronDbObject):
synthetic_fields = ['obj_field']
@obj_base.VersionedObjectRegistry.register_if(False)
class FakeNeutronObjectUniqueKey(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = FakeModel
primary_keys = ['id', 'id2']
unique_keys = [['unique_key'], ['id2']]
fields = {
'id': obj_fields.UUIDField(),
'id2': obj_fields.UUIDField(),
'unique_key': obj_fields.StringField(),
'field1': obj_fields.StringField(),
'obj_field': obj_fields.ObjectField('FakeSmallNeutronObject',
nullable=True)
}
synthetic_fields = ['obj_field']
@obj_base.VersionedObjectRegistry.register_if(False)
class FakeNeutronObjectRenamedField(base.NeutronDbObject):
"""
@ -316,12 +339,14 @@ class _BaseObjectTestCase(object):
return obj_cls.modify_fields_to_db(fields)
@classmethod
def generate_object_keys(cls, obj_cls):
def generate_object_keys(cls, obj_cls, field_names=None):
if field_names is None:
field_names = obj_cls.primary_keys
keys = {}
for field, field_obj in obj_cls.fields.items():
if field in obj_cls.primary_keys:
generator = FIELD_TYPE_VALUE_GENERATOR_MAP[type(field_obj)]
keys[field] = generator()
for field in field_names:
field_obj = obj_cls.fields[field]
generator = FIELD_TYPE_VALUE_GENERATOR_MAP[type(field_obj)]
keys[field] = generator()
return keys
def get_updatable_fields(self, fields):
@ -363,12 +388,38 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
self.assertIsNone(obj)
def test_get_object_missing_primary_key(self):
obj_keys = self.generate_object_keys(self._test_class)
obj_keys.popitem()
non_unique_fields = (set(self._test_class.fields.keys()) -
set(self._test_class.primary_keys) -
set(itertools.chain.from_iterable(
self._test_class.unique_keys)))
obj_keys = self.generate_object_keys(self._test_class,
non_unique_fields)
self.assertRaises(base.NeutronPrimaryKeyMissing,
self._test_class.get_object,
self.context, **obj_keys)
def test_get_object_unique_key(self):
if not self._test_class.unique_keys:
self.skipTest('No unique keys found in test class %r' %
self._test_class)
for unique_keys in self._test_class.unique_keys:
with mock.patch.object(obj_db_api, 'get_object',
return_value=self.db_obj) \
as get_object_mock:
with mock.patch.object(obj_db_api, 'get_objects',
side_effect=self.fake_get_objects):
obj_keys = self.generate_object_keys(self._test_class,
unique_keys)
obj = self._test_class.get_object(self.context,
**obj_keys)
self.assertTrue(self._is_test_class(obj))
self.assertEqual(self.obj_fields[0],
get_obj_db_fields(obj))
get_object_mock.assert_called_once_with(
self.context, self._test_class.db_model,
**self._test_class.modify_fields_to_db(obj_keys))
def _get_synthetic_fields_get_objects_calls(self, db_objs):
mock_calls = []
for db_obj in db_objs:
@ -666,6 +717,42 @@ class BaseDbObjectCompositePrimaryKeyTestCase(BaseObjectIfaceTestCase):
_test_class = FakeNeutronObjectCompositePrimaryKey
class BaseDbObjectUniqueKeysTestCase(BaseObjectIfaceTestCase):
_test_class = FakeNeutronObjectUniqueKey
class UniqueKeysTestCase(test_base.BaseTestCase):
def test_class_creation(self):
m_get_unique_keys = mock.patch.object(model_base, 'get_unique_keys')
with m_get_unique_keys as get_unique_keys:
get_unique_keys.return_value = [['field1'],
['field2', 'db_field3']]
@obj_base.VersionedObjectRegistry.register_if(False)
class UniqueKeysTestObject(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = FakeModel
primary_keys = ['id']
fields = {
'id': obj_fields.UUIDField(),
'field1': obj_fields.UUIDField(),
'field2': obj_fields.UUIDField(),
'field3': obj_fields.UUIDField(),
}
fields_need_translation = {'field3': 'db_field3'}
expected = {('field1',), ('field2', 'field3')}
observed = {tuple(sorted(key))
for key in UniqueKeysTestObject.unique_keys}
self.assertEqual(expected, observed)
class BaseDbObjectCompositePrimaryKeyWithIdTestCase(BaseObjectIfaceTestCase):
_test_class = FakeNeutronObjectCompositePrimaryKeyWithId