Merge "Added attribute object and its unit tests"

This commit is contained in:
Zuul 2018-05-31 02:32:25 +00:00 committed by Gerrit Code Review
commit 2f6f71849e
8 changed files with 630 additions and 40 deletions

View File

@ -95,6 +95,10 @@ class Connection(object):
marker=None, columns_to_join=None):
"""Get requested deployable by filters."""
@abc.abstractmethod
def deployable_get_by_filters_with_attributes(self, context,
filters):
"""Get requested deployable by filters with attributes."""
# attributes
@abc.abstractmethod
def attribute_create(self, context, key, value):
@ -105,8 +109,12 @@ class Connection(object):
"""Get requested attribute."""
@abc.abstractmethod
def attribute_get_by_deployable_uuid(self, context, deployable_uuid):
"""Get requested deployable by deployable uuid."""
def attribute_get_by_deployable_id(self, context, deployable_id):
"""Get requested attribute by attribute id."""
@abc.abstractmethod
def attribute_get_by_filter(self, context, filters):
"""Get requested attribute by kv pair and attribute id."""
@abc.abstractmethod
def attribute_update(self, context, uuid, key, value):

View File

@ -31,7 +31,8 @@ from cyborg.common import exception
from cyborg.common.i18n import _
from cyborg.db import api
from cyborg.db.sqlalchemy import models
from sqlalchemy import or_
from sqlalchemy import and_
_CONTEXT = threading.local()
LOG = log.getLogger(__name__)
@ -248,6 +249,38 @@ class Connection(api.Connection):
if count != 1:
raise exception.DeployableNotFound(uuid=uuid)
def deployable_get_by_filters_with_attributes(self, context,
filters):
exact_match_filter_names = ['uuid', 'name',
'parent_uuid', 'root_uuid',
'pcie_address', 'host',
'board', 'vendor', 'version',
'type', 'assignable', 'instance_uuid',
'availability', 'accelerator_id']
attribute_filters = {}
filters_copy = copy.deepcopy(filters)
for key, value in filters_copy.iteritems():
if key not in exact_match_filter_names:
# This key is not in the deployable regular fields
value = filters.pop(key)
attribute_filters.update({key: value})
query_prefix = model_query(context, models.Deployable)
filters = copy.deepcopy(filters)
# Filter the query
query_prefix = self._exact_deployable_filter_with_attributes(
query_prefix,
filters,
exact_match_filter_names,
attribute_filters
)
if query_prefix is None:
return []
deployables = query_prefix.all()
return deployables
def deployable_get_by_filters(self, context,
filters, sort_key='created_at',
sort_dir='desc', limit=None,
@ -262,6 +295,52 @@ class Connection(api.Connection):
sort_keys=[sort_key],
sort_dirs=[sort_dir])
def _exact_deployable_filter_with_attributes(self, query,
dpl_filters, legal_keys,
attribute_filters):
"""Applies exact match filtering to a deployable query.
Returns the updated query. Modifies dpl_filters argument to remove
dpl_filters consumed.
:param query: query to apply dpl_filters and attribute_filters to
:param dpl_filters: dictionary of filters; values that are lists,
tuples, sets, or frozensets cause an 'IN' test to
be performed, while exact matching ('==' operator)
is used for other values
:param legal_keys: list of keys to apply exact filtering to
:param attribute_filters: dictionary of attribute filters
"""
filter_dict = {}
model = models.Deployable
# Walk through all the keys
for key in legal_keys:
# Skip ones we're not filtering on
if key not in dpl_filters:
continue
# OK, filtering on this key; what value do we search for?
value = dpl_filters.pop(key)
if isinstance(value, (list, tuple, set, frozenset)):
if not value:
return None
# Looking for values in a list; apply to query directly
column_attr = getattr(model, key)
query = query.filter(column_attr.in_(value))
else:
filter_dict[key] = value
# Apply simple exact matches
if filter_dict:
query = query.filter(*[getattr(models.Deployable, k) == v
for k, v in filter_dict.items()])
if attribute_filters:
query = query.outerjoin(models.Attribute)
query = query.filter(or_(*[and_(models.Attribute.key == k,
models.Attribute.value == v)
for k, v in attribute_filters.items()]))
return query
def _exact_deployable_filter(self, query, filters, legal_keys):
"""Applies exact match filtering to a deployable query.
Returns the updated query. Modifies filters argument to remove
@ -334,12 +413,13 @@ class Connection(api.Connection):
deployables = query_prefix.all()
return deployables
def attribute_create(self, context, key, value):
update_fields = {'key': key, 'value': value}
update_fields['uuid'] = uuidutils.generate_uuid()
def attribute_create(self, context, values):
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
if values.get('id'):
values.pop('id', None)
attribute = models.Attribute()
attribute.update(update_fields)
attribute.update(values)
with _session_for_write() as session:
try:
@ -359,15 +439,42 @@ class Connection(api.Connection):
except NoResultFound:
raise exception.AttributeNotFound(uuid=uuid)
def attribute_get_by_deployable_uuid(self, context, deployable_uuid):
def attribute_get_by_deployable_id(self, context, deployable_id):
query = model_query(
context,
models.Attribute).filter_by(deployable_uuid=deployable_uuid)
models.Attribute).filter_by(deployable_id=deployable_id)
try:
return query.all()
except NoResultFound:
raise exception.AttributeNotFound(uuid=uuid)
def attribute_get_by_filter(self, context, filters):
"""Return attributes that matches the filters
"""
query_prefix = model_query(context, models.Attribute)
# Filter the query
query_prefix = self._exact_attribute_by_filter(query_prefix,
filters)
if query_prefix is None:
return []
return query_prefix.all()
def _exact_attribute_by_filter(self, query, filters):
"""Applies exact match filtering to a atrtribute query.
Returns the updated query.
:param filters: The filters specified by a dict of kv pairs
"""
model = models.Attribute
filter_dict = filters
# Apply simple exact matches
query = query.filter(*[getattr(models.Attribute, k) == v
for k, v in filter_dict.items()])
return query
def attribute_update(self, context, uuid, key, value):
return self._do_update_attribute(context, uuid, key, value)

View File

@ -27,3 +27,4 @@ def register_all():
# need to receive it via RPC.
__import__('cyborg.objects.accelerator')
__import__('cyborg.objects.deployable')
__import__('cyborg.objects.attribute')

View File

@ -33,9 +33,9 @@ class Attribute(base.CyborgObject, object_base.VersionedObjectDictCompat):
dbapi = dbapi.get_instance()
fields = {
'id': fields.IntegerField(nullable=False),
'id': object_fields.IntegerField(nullable=False),
'uuid': object_fields.UUIDField(nullable=False),
'deployable_id': fields.IntegerField(nullable=False),
'deployable_id': object_fields.IntegerField(nullable=False),
'key': object_fields.StringField(nullable=False),
'value': object_fields.StringField(nullable=False)
}
@ -47,26 +47,31 @@ class Attribute(base.CyborgObject, object_base.VersionedObjectDictCompat):
values = self.obj_get_changes()
db_attr = self.dbapi.attribute_create(context,
self.key,
self.value)
values)
self._from_db_object(self, db_attr)
@classmethod
def get(cls, context, uuid):
"""Find a DB Deployable and return an Obj Deployable."""
"""Find a DB attribute and return an Obj Deployable."""
db_attr = cls.dbapi.attribute_get(context, uuid)
obj_attr = cls._from_db_object(cls(context), db_attr)
return obj_attr
@classmethod
def attribute_get_by_deployable_uuid(cls, context, deployable_uuid):
"""Get a Deployable by host."""
db_attr = cls.dbapi.attribute_get_by_deployable_uuid(context,
deployable_uuid)
def get_by_deployable_id(cls, context, deployable_id):
"""Get a attribute by deployable_id"""
db_attr = cls.dbapi.attribute_get_by_deployable_id(context,
deployable_id)
return cls._from_db_object_list(db_attr, context)
@classmethod
def get_by_filter(cls, context, filters):
"""Get a attribute by specified filters"""
db_attr = cls.dbapi.attribute_get_by_filter(context, filters)
return cls._from_db_object_list(db_attr, context)
def save(self, context):
"""Update a Deployable record in the DB."""
"""Update a attribute record in the DB."""
updates = self.obj_get_changes()
db_attr = self.dbapi.attribute_update(context,
self.uuid,
@ -75,7 +80,7 @@ class Attribute(base.CyborgObject, object_base.VersionedObjectDictCompat):
self._from_db_object(self, db_attr)
def destroy(self, context):
"""Delete a Deployable from the DB."""
"""Delete a attribute from the DB."""
self.dbapi.attribute_delete(context, self.uuid)
self.obj_reset_changes()

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_log import log as logging
from oslo_versionedobjects import base as object_base
@ -20,7 +21,7 @@ from cyborg.common import exception
from cyborg.db import api as dbapi
from cyborg.objects import base
from cyborg.objects import fields as object_fields
from cyborg.objects.attribute import Attribute
LOG = logging.getLogger(__name__)
@ -78,25 +79,42 @@ class Deployable(base.CyborgObject, object_base.VersionedObjectDictCompat):
db_dep = self.dbapi.deployable_create(context, values)
self._from_db_object(self, db_dep)
del self.attributes_list[:]
@classmethod
def get(cls, context, uuid):
"""Find a DB Deployable and return an Obj Deployable."""
db_dep = cls.dbapi.deployable_get(context, uuid)
obj_dep = cls._from_db_object(cls(context), db_dep)
# retrieve all the attrobutes for this deployable
query = {"deployable_id": obj_dep.id}
attr_get_list = Attribute.get_by_filter(context,
query)
obj_dep.attributes_list = attr_get_list
return obj_dep
@classmethod
def get_by_host(cls, context, host):
"""Get a Deployable by host."""
db_deps = cls.dbapi.deployable_get_by_host(context, host)
query = {"deployable_id": db_deps.id}
attr_get_list = Attribute.get_by_filter(context,
query)
db_deps.attributes_list = attr_get_list
return cls._from_db_object_list(db_deps, context)
@classmethod
def list(cls, context):
"""Return a list of Deployable objects."""
db_deps = cls.dbapi.deployable_list(context)
return cls._from_db_object_list(db_deps, context)
obj_dpl_list = cls._from_db_object_list(db_deps, context)
for obj_dpl in obj_dpl_list:
query = {"deployable_id": obj_dpl.id}
attr_get_list = Attribute.get_by_filter(context,
query)
obj_dpl.attributes_list = attr_get_list
return obj_dpl_list
def save(self, context):
"""Update a Deployable record in the DB."""
@ -106,6 +124,7 @@ class Deployable(base.CyborgObject, object_base.VersionedObjectDictCompat):
def destroy(self, context):
"""Delete a Deployable from the DB."""
del self.attributes_list[:]
self.dbapi.deployable_delete(context, self.uuid)
self.obj_reset_changes()
@ -114,26 +133,57 @@ class Deployable(base.CyborgObject, object_base.VersionedObjectDictCompat):
If the attribute already exists, it will update the value,
otherwise, the vf will be appended to the list
"""
if not isinstance(vf, VirtualFunction) or vf.type != 'vf':
raise exception.InvalidDeployType()
for exist_vf in self.virtual_function_list:
if base.obj_equal_prims(vf, exist_vf):
LOG.warning("The vf already exists")
for exist_attr in self.attributes_list:
if base.obj_equal_prims(attribute, exist_attr):
LOG.warning("The attribute already exists")
return None
attribute.deployable_id = self.id
attribute_copy = copy.deepcopy(attribute)
self.attributes_list.append(attribute_copy)
def delete_attribute(self, context, attribute):
"""remove an attribute from the attributes_list
if the attribute does not exist, ignore it
"""
idx = 0
for exist_attribute in self.attributes_list:
if base.obj_equal_prims(attribute, exist_attribute):
removed_attribute = self.attributes_list.pop(idx)
removed_attribute.destroy(context)
return
idx = idx + 1
LOG.warning("The removing attribute does not exist!")
@classmethod
def get_by_filter(cls, context,
filters, sort_key='created_at',
sort_dir='desc', limit=None,
marker=None, join=None):
filters):
obj_dpl_list = []
db_dpl_list = cls.dbapi.deployable_get_by_filters(context, filters,
sort_key=sort_key,
sort_dir=sort_dir,
limit=limit,
marker=marker,
join_columns=join)
for db_dpl in db_dpl_list:
obj_dpl = cls._from_db_object(cls(context), db_dpl)
obj_dpl_list.append(obj_dpl)
db_dpl_list = cls.dbapi.deployable_get_by_filters_with_attributes(
context,
filters)
if db_dpl_list:
for db_dpl in db_dpl_list:
obj_dpl = cls._from_db_object(cls(context), db_dpl)
query = {"deployable_id": obj_dpl.id}
attr_get_list = Attribute.get_by_filter(context,
query)
obj_dpl.attributes_list = attr_get_list
obj_dpl_list.append(obj_dpl)
return obj_dpl_list
@classmethod
def _from_db_object(cls, obj, db_obj):
"""Converts a deployable to a formal object.
:param obj: An object of the class.
:param db_obj: A DB model of the object
:return: The object of the class with the database entity added
"""
obj = base.CyborgObject._from_db_object(obj, db_obj)
obj.attributes_list = []
return obj

View File

@ -0,0 +1,59 @@
# Copyright 2018 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from cyborg import objects
from cyborg.objects import fields
def fake_db_attribute(**updates):
attr_uuid = uuidutils.generate_uuid()
db_attribute = {
'id': 0,
'uuid': attr_uuid,
'deployable_id': 1,
'key': 'attr_key',
'value': 'attr_val'
}
for name, field in objects.Attribute.fields.items():
if name in db_attribute:
continue
if field.nullable:
db_attribute[name] = None
elif field.default != fields.UnspecifiedDefault:
db_attribute[name] = field.default
else:
raise Exception('fake_db_attribute needs help with %s' % name)
if updates:
db_attribute.update(updates)
return db_attribute
def fake_attribute_obj(context, obj_attr_class=None, **updates):
if obj_attr_class is None:
obj_attr_class = objects.Attribute
expected_attrs = updates.pop('expected_attrs', None)
attribute = obj_attr_class._from_db_object(context,
obj_attr_class(),
fake_db_deployable(**updates),
expected_attrs=expected_attrs)
attribute.obj_reset_changes()
return attribute

View File

@ -0,0 +1,207 @@
# Copyright 2018 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
import netaddr
from oslo_db import exception as db_exc
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_context import context
from cyborg import db
from cyborg.common import exception
from cyborg import objects
from cyborg.objects import base
from cyborg import tests as test
from cyborg.tests.unit import fake_attribute
from cyborg.tests.unit import fake_deployable
from cyborg.tests.unit import fake_accelerator
from cyborg.tests.unit.objects import test_objects
from cyborg.tests.unit.db.base import DbTestCase
class _TestDeployableObject(DbTestCase):
@property
def fake_deployable(self):
db_deploy = fake_deployable.fake_db_deployable(id=1)
return db_deploy
@property
def fake_accelerator(self):
db_acc = fake_accelerator.fake_db_accelerator(id=2)
return db_acc
@property
def fake_attribute(self):
db_attr = fake_attribute.fake_db_attribute(id=2)
return db_attr
def test_create(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
self.assertEqual(db_attr['uuid'], attr.uuid)
def test_get(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr_get = objects.Attribute.get(self.context, attr.uuid)
self.assertEqual(db_attr['uuid'], attr_get.uuid)
def test_get_by_deployable_uuid(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr_get = objects.Attribute.get_by_deployable_id(
self.context, dpl_get.id)[0]
self.assertEqual(db_attr['uuid'], attr_get.uuid)
def test_save(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr_get = objects.Attribute.get(self.context, attr.uuid)
attr_get.set_key_value_pair("test_key", "test_val")
attr_get.save(self.context)
attr_get_2 = objects.Attribute.get(self.context, attr_get.uuid)
self.assertEqual(attr_get_2.key, "test_key")
self.assertEqual(attr_get_2.value, "test_val")
def test_destroy(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
self.assertEqual(db_attr['uuid'], attr.uuid)
attr.destroy(self.context)
self.assertRaises(exception.AttributeNotFound,
objects.Attribute.get, self.context,
attr.uuid)
def test_get_by_filter(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr_filter = {"key": "attr_key", "value": "attr_val"}
attr_get = objects.Attribute.get_by_filter(
self.context, attr_filter)[0]
self.assertEqual(db_attr['uuid'], attr_get.uuid)
attr_filter = {"key": "attr_key", "value": "attr_val2"}
attr_get_list = objects.Attribute.get_by_filter(
self.context, attr_filter)
self.assertEqual(len(attr_get_list), 0)

View File

@ -28,6 +28,7 @@ from cyborg.objects import base
from cyborg import tests as test
from cyborg.tests.unit import fake_accelerator
from cyborg.tests.unit import fake_deployable
from cyborg.tests.unit import fake_attribute
from cyborg.tests.unit.objects import test_objects
from cyborg.tests.unit.db.base import DbTestCase
@ -38,11 +39,31 @@ class _TestDeployableObject(DbTestCase):
db_deploy = fake_deployable.fake_db_deployable(id=1)
return db_deploy
@property
def fake_deployable2(self):
db_deploy = fake_deployable.fake_db_deployable(id=2)
return db_deploy
@property
def fake_accelerator(self):
db_acc = fake_accelerator.fake_db_accelerator(id=2)
return db_acc
@property
def fake_attribute(self):
db_attr = fake_attribute.fake_db_attribute(id=2)
return db_attr
@property
def fake_attribute2(self):
db_attr = fake_attribute.fake_db_attribute(id=3)
return db_attr
@property
def fake_attribute3(self):
db_attr = fake_attribute.fake_db_attribute(id=4)
return db_attr
def test_create(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
@ -125,6 +146,138 @@ class _TestDeployableObject(DbTestCase):
objects.Deployable.get, self.context,
dpl.uuid)
def test_add_attribute(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
dpl.add_attribute(attr)
dpl.save(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
self.assertEqual(len(dpl_get.attributes_list), 1)
self.assertEqual(dpl_get.attributes_list[0].id, attr.id)
def test_delete_attribute(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
dpl_get.add_attribute(attr)
dpl_get.save(self.context)
dpl_get = objects.Deployable.get(self.context, dpl_get.uuid)
self.assertEqual(len(dpl_get.attributes_list), 1)
self.assertEqual(dpl_get.attributes_list[0].id, attr.id)
dpl_get.delete_attribute(self.context, dpl_get.attributes_list[0])
self.assertEqual(len(dpl_get.attributes_list), 0)
self.assertRaises(exception.AttributeNotFound,
objects.Attribute.get, self.context,
attr.uuid)
def test_get_by_filter_with_attributes(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_dpl2 = self.fake_deployable2
dpl2 = objects.Deployable(context=self.context,
**db_dpl2)
dpl2.accelerator_id = acc_get.id
dpl2.create(self.context)
dpl2_get = objects.Deployable.get(self.context, dpl2.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
db_attr2 = self.fake_attribute2
attr2 = objects.Attribute(context=self.context,
**db_attr2)
attr2.deployable_id = dpl2_get.id
attr2.create(self.context)
db_attr3 = self.fake_attribute3
attr3 = objects.Attribute(context=self.context,
**db_attr3)
attr3.deployable_id = dpl2_get.id
attr3.create(self.context)
dpl.add_attribute(attr)
dpl.save(self.context)
dpl2.add_attribute(attr2)
dpl2.save(self.context)
dpl2.add_attribute(attr3)
dpl2.save(self.context)
query = {"attr_key": "attr_val"}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(len(dpl_get_list), 2)
self.assertEqual(dpl_get_list[0].uuid, dpl.uuid)
attr2.set_key_value_pair("test_key", "test_val")
attr2.save(self.context)
attr3.set_key_value_pair("test_key3", "test_val3")
attr3.save(self.context)
query = {"test_key": "test_val"}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(len(dpl_get_list), 1)
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
query = {"test_key": "test_val", "test_key3": "test_val3"}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(len(dpl_get_list), 1)
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
query = {"host": "host_name", "test_key3": "test_val3"}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(len(dpl_get_list), 1)
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
class TestDeployableObject(test_objects._LocalTest,
_TestDeployableObject):