Add AttachHandle and ControlpathID objects

Add AttachHandle and ControlpathID objects and related DB APIs.

Change-Id: I9f5a0d63ca1a181c4a328881951a93b5356e1d3a
Stroy: 2004248
This commit is contained in:
Xinran WANG 2019-01-14 11:42:34 +08:00 committed by Yumeng Bao
parent c17b4280a1
commit ebe865a5ba
10 changed files with 694 additions and 56 deletions

View File

@ -86,6 +86,14 @@ class CyborgException(Exception):
return unicode(self.args[0])
class AttachHandleAlreadyExists(CyborgException):
_msg_fmt = _("AttachHandle with uuid %(uuid)s already exists.")
class ControlpathIDAlreadyExists(CyborgException):
_msg_fmt = _("ControlpathID with uuid %(uuid)s already exists.")
class ConfigInvalid(CyborgException):
_msg_fmt = _("Invalid configuration file. %(error_msg)s")
@ -147,6 +155,14 @@ class ServiceNotFound(NotFound):
msg_fmt = _("Service %(service_id)s could not be found.")
class AttachHandleNotFound(NotFound):
_msg_fmt = _("AttachHandle %(uuid)s could not be found.")
class ControlpathIDNotFound(NotFound):
_msg_fmt = _("ControlpathID %(uuid)s could not be found.")
class ConfGroupForServiceTypeNotFound(ServiceNotFound):
msg_fmt = _("No conf group name could be found for service type "
"%(stype)s.")

View File

@ -123,6 +123,7 @@ class Connection(object):
def attribute_delete(self, context, uuid):
"""Delete an attribute."""
# quota
@abc.abstractmethod
def quota_reserve(self, context, resources, deltas, expire,
until_refresh, max_age, project_id=None,
@ -153,3 +154,63 @@ class Connection(object):
@abc.abstractmethod
def extarq_get(self, context, uuid):
"""Get requested extarq."""
# attach_handle
@abc.abstractmethod
def attach_handle_create(self, context, values):
"""Create a new attach_handle"""
@abc.abstractmethod
def attach_handle_get_by_uuid(self, context, uuid):
"""Get requested attach_handle"""
@abc.abstractmethod
def attach_handle_get_by_id(self, context, id):
"""Get requested attach_handle"""
@abc.abstractmethod
def attach_handle_get_by_filters(self, context,
filters, sort_key='created_at',
sort_dir='desc', limit=None,
marker=None, columns_to_join=None):
"""Get requested deployable by filters."""
@abc.abstractmethod
def attach_handle_list(self, context):
"""Get requested list of attach_handles"""
@abc.abstractmethod
def attach_handle_delete(self, context, uuid):
"""Delete an attach_handle"""
@abc.abstractmethod
def attach_handle_update(self, context, uuid, values):
"""Update an attach_handle"""
# control_path_id
@abc.abstractmethod
def control_path_create(self, context, values):
"""Create a new control path id"""
@abc.abstractmethod
def control_path_get_by_uuid(self, context, uuid):
"""Get requested control path id"""
@abc.abstractmethod
def control_path_get_by_filters(self, context,
filters, sort_key='created_at',
sort_dir='desc', limit=None,
marker=None, columns_to_join=None):
"""Get requested deployable by filters."""
@abc.abstractmethod
def control_path_list(self, context):
"""Get requested list of control path ids"""
@abc.abstractmethod
def control_path_delete(self, context, uuid):
"""Delete a control path id"""
@abc.abstractmethod
def control_path_update(self, context, uuid, values):
"""Update a control path id"""

View File

@ -127,6 +127,213 @@ class Connection(api.Connection):
def __init__(self):
pass
def attach_handle_create(self, context, values):
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
attach_handle = models.AttachHandle()
attach_handle.update(values)
with _session_for_write() as session:
try:
session.add(attach_handle)
session.flush()
except db_exc.DBDuplicateEntry:
raise exception.AttachHandleAlreadyExists(uuid=values['uuid'])
return attach_handle
def attach_handle_get_by_uuid(self, context, uuid):
query = model_query(
context,
models.AttachHandle).filter_by(uuid=uuid)
try:
return query.one()
except NoResultFound:
raise exception.AttachHandleNotFound(uuid=uuid)
def attach_handle_get_by_id(self, context, id):
query = model_query(
context,
models.AttachHandle).filter_by(id=id)
try:
return query.one()
except NoResultFound:
raise exception.NotFound()
def attach_handle_get_by_filters(self, context,
filters, sort_key='created_at',
sort_dir='desc', limit=None,
marker=None, join_columns=None):
"""Return attach_handle that match all filters sorted by the given
keys. Deleted attach_handle will be returned by default, unless
there's a filter that says otherwise.
"""
if limit == 0:
return []
query_prefix = model_query(context, models.AttachHandle)
filters = copy.deepcopy(filters)
exact_match_filter_names = ['uuid', 'id', 'deployable_id']
# Filter the query
query_prefix = self._exact_filter(models.AttachHandle, query_prefix,
filters, exact_match_filter_names)
if query_prefix is None:
return []
return _paginate_query(context, models.AttachHandle, limit, marker,
sort_key, sort_dir, query_prefix)
def _exact_filter(self, model, query, filters, legal_keys=[]):
"""Applies exact match filtering to a deployable query.
Returns the updated query. Modifies filters argument to remove
filters consumed.
:param model: DB model
:param query: query to apply filters to
:param filters: dictionary of filters; values that are lists,
tuples, sets, or frozensets cause an 'IN' test to
be performed, while exact matching ('==' operator)
is used for other values
:param legal_keys: list of keys to apply exact filtering to
"""
filter_dict = {}
# Walk through all the keys
for key in legal_keys:
# Skip ones we're not filtering on
if key not in filters:
continue
# OK, filtering on this key; what value do we search for?
value = filters.pop(key)
if isinstance(value, (list, tuple, set, frozenset)):
if not value:
return None
# Looking for values in a list; apply to query directly
column_attr = getattr(model, key)
query = query.filter(column_attr.in_(value))
else:
filter_dict[key] = value
# Apply simple exact matches
if filter_dict:
query = query.filter(*[getattr(model, k) == v
for k, v in filter_dict.items()])
return query
def attach_handle_list(self, context):
query = model_query(context, models.AttachHandle)
return _paginate_query(context, models.AttachHandle)
def attach_handle_update(self, context, uuid, values):
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing AttachHandle.")
raise exception.InvalidParameterValue(err=msg)
return self._do_update_attach_handle(context, uuid, values)
@oslo_db_api.retry_on_deadlock
def _do_update_attach_handle(self, context, uuid, values):
with _session_for_write():
query = model_query(context, models.AttachHandle)
query = add_identity_filter(query, uuid)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.AttachHandleNotFound(uuid=uuid)
ref.update(values)
return ref
@oslo_db_api.retry_on_deadlock
def attach_handle_delete(self, context, uuid):
with _session_for_write():
query = model_query(context, models.AttachHandle)
query = add_identity_filter(query, uuid)
count = query.delete()
if count != 1:
raise exception.AttachHandleNotFound(uuid=uuid)
def control_path_create(self, context, values):
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
control_path_id = models.ControlpathID()
control_path_id.update(values)
with _session_for_write() as session:
try:
session.add(control_path_id)
session.flush()
except db_exc.DBDuplicateEntry:
raise exception.ControlpathIDAlreadyExists(uuid=values['uuid'])
return control_path_id
def control_path_get_by_uuid(self, context, uuid):
query = model_query(
context,
models.ControlpathID).filter_by(uuid=uuid)
try:
return query.one()
except NoResultFound:
raise exception.ControlpathIDNotFound(uuid=uuid)
def control_path_get_by_filters(self, context,
filters, sort_key='created_at',
sort_dir='desc', limit=None,
marker=None, join_columns=None):
"""Return attach_handle that match all filters sorted by the given
keys. Deleted attach_handle will be returned by default, unless
there's a filter that says otherwise.
"""
if limit == 0:
return []
query_prefix = model_query(context, models.AttachHandle)
filters = copy.deepcopy(filters)
exact_match_filter_names = ['uuid', 'id', 'deployable_id']
# Filter the query
query_prefix = self._exact_filter(models.ControlpathID, query_prefix,
filters, exact_match_filter_names)
if query_prefix is None:
return []
return _paginate_query(context, models.ControlpathID, limit, marker,
sort_key, sort_dir, query_prefix)
def control_path_list(self, context):
query = model_query(context, models.ControlpathID)
return _paginate_query(context, models.ControlpathID)
def control_path_update(self, context, uuid, values):
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing ControlpathID.")
raise exception.InvalidParameterValue(err=msg)
return self._do_update_control_path(context, uuid, values)
@oslo_db_api.retry_on_deadlock
def _do_update_control_path(self, context, uuid, values):
with _session_for_write():
query = model_query(context, models.ControlpathID)
query = add_identity_filter(query, uuid)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.ControlpathIDNotFound(uuid=uuid)
ref.update(values)
return ref
@oslo_db_api.retry_on_deadlock
def control_path_delete(self, context, uuid):
with _session_for_write():
query = model_query(context, models.ControlpathID)
query = add_identity_filter(query, uuid)
count = query.delete()
if count != 1:
raise exception.ControlpathNotFound(uuid=uuid)
def device_create(self, context, values):
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
@ -350,44 +557,6 @@ class Connection(api.Connection):
for k, v in attribute_filters.items()]))
return query
def _exact_deployable_filter(self, query, filters, legal_keys):
"""Applies exact match filtering to a deployable query.
Returns the updated query. Modifies filters argument to remove
filters consumed.
:param query: query to apply filters to
:param filters: dictionary of filters; values that are lists,
tuples, sets, or frozensets cause an 'IN' test to
be performed, while exact matching ('==' operator)
is used for other values
:param legal_keys: list of keys to apply exact filtering to
"""
filter_dict = {}
model = models.Deployable
# Walk through all the keys
for key in legal_keys:
# Skip ones we're not filtering on
if key not in filters:
continue
# OK, filtering on this key; what value do we search for?
value = filters.pop(key)
if isinstance(value, (list, tuple, set, frozenset)):
if not value:
return None
# Looking for values in a list; apply to query directly
column_attr = getattr(model, key)
query = query.filter(column_attr.in_(value))
else:
filter_dict[key] = value
# Apply simple exact matches
if filter_dict:
query = query.filter(*[getattr(models.Deployable, k) == v
for k, v in filter_dict.items()])
return query
def deployable_get_by_filters_sort(self, context, filters, limit=None,
marker=None, join_columns=None,
sort_key=None, sort_dir=None):
@ -407,9 +576,9 @@ class Connection(api.Connection):
'num_accelerators', 'device_id']
# Filter the query
query_prefix = self._exact_deployable_filter(query_prefix,
filters,
exact_match_filter_names)
query_prefix = self._exact_filter(models.Deployable, query_prefix,
filters,
exact_match_filter_names)
if query_prefix is None:
return []
return _paginate_query(context, models.Deployable, limit, marker,
@ -453,26 +622,26 @@ class Connection(api.Connection):
query_prefix = model_query(context, models.Attribute)
# Filter the query
query_prefix = self._exact_attribute_by_filter(query_prefix,
filters)
query_prefix = self._exact_filter(models.Attribute, query_prefix,
filters)
if query_prefix is None:
return []
return query_prefix.all()
def _exact_attribute_by_filter(self, query, filters):
"""Applies exact match filtering to a atrtribute query.
Returns the updated query.
:param filters: The filters specified by a dict of kv pairs
"""
model = models.Attribute
filter_dict = filters
# Apply simple exact matches
query = query.filter(*[getattr(models.Attribute, k) == v
for k, v in filter_dict.items()])
return query
# def _exact_attribute_by_filter(self, query, filters):
# """Applies exact match filtering to a atrtribute query.
# Returns the updated query.
# :param filters: The filters specified by a dict of kv pairs
# """
#
# model = models.Attribute
# filter_dict = filters
#
# # Apply simple exact matches
# query = query.filter(*[getattr(models.Attribute, k) == v
# for k, v in filter_dict.items()])
# return query
def attribute_update(self, context, uuid, key, value):
return self._do_update_attribute(context, uuid, key, value)

View File

@ -30,3 +30,5 @@ def register_all():
__import__('cyborg.objects.attribute')
__import__('cyborg.objects.arq')
__import__('cyborg.objects.ext_arq')
__import__('cyborg.objects.attach_handle')
__import__('cyborg.objects.control_path')

View File

@ -0,0 +1,92 @@
# Copyright 2019 Intel, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_versionedobjects import base as object_base
from cyborg.db import api as dbapi
from cyborg.objects import base
from cyborg.objects import fields as object_fields
LOG = logging.getLogger(__name__)
ATTACH_TYPE = ["PCI", "MDEV"]
@base.CyborgObjectRegistry.register
class AttachHandle(base.CyborgObject, object_base.VersionedObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
dbapi = dbapi.get_instance()
fields = {
'id': object_fields.IntegerField(nullable=False),
'uuid': object_fields.UUIDField(nullable=False),
'deployable_id': object_fields.IntegerField(nullable=False),
'attach_type': object_fields.EnumField(valid_values=ATTACH_TYPE,
nullable=False),
# attach_info should be JSON here.
'attach_info': object_fields.StringField(nullable=False)
}
def create(self, context):
"""Create a AttachHandle record in the DB."""
values = self.obj_get_changes()
db_ah = self.dbapi.attach_handle_create(context, values)
self._from_db_object(self, db_ah)
@classmethod
def get(cls, context, uuid):
"""Find a DB AttachHandle and return an Obj AttachHandle."""
db_ah = cls.dbapi.attach_handle_get_by_uuid(context, uuid)
obj_ah = cls._from_db_object(cls(context), db_ah)
return obj_ah
@classmethod
def get_by_id(cls, context, id):
"""Find a DB AttachHandle by ID and return an Obj AttachHandle."""
db_ah = cls.dbapi.attach_handle_get_by_id(context, id)
obj_ah = cls._from_db_object(cls(context), db_ah)
return obj_ah
@classmethod
def list(cls, context, filters={}):
"""Return a list of AttachHandle objects."""
if filters:
sort_dir = filters.pop('sort_dir', 'desc')
sort_key = filters.pop('sort_key', 'create_at')
limit = filters.pop('limit', None)
marker = filters.pop('marker_obj', None)
db_ahs = cls.dbapi.attach_handle_get_by_filters(context, filters,
sort_dir=sort_dir,
sort_key=sort_key,
limit=limit,
marker=marker)
else:
db_ahs = cls.dbapi.attach_handle_list(context)
obj_ah_list = cls._from_db_object_list(db_ahs, context)
return obj_ah_list
def save(self, context):
"""Update an AttachHandle record in the DB"""
updates = self.obj_get_changes()
db_ahs = self.dbapi.attach_handle_update(context, self.uuid, updates)
self._from_db_object(self, db_ahs)
def destroy(self, context):
"""Delete a AttachHandle from the DB."""
self.dbapi.attach_handle_delete(context, self.uuid)
self.obj_reset_changes()

View File

@ -0,0 +1,84 @@
# Copyright 2019 Intel, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_versionedobjects import base as object_base
from cyborg.db import api as dbapi
from cyborg.objects import base
from cyborg.objects import fields as object_fields
LOG = logging.getLogger(__name__)
CPID_TYPE = ["PCI", "MDEV"]
@base.CyborgObjectRegistry.register
class ControlpathID(base.CyborgObject, object_base.VersionedObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
dbapi = dbapi.get_instance()
fields = {
'id': object_fields.IntegerField(nullable=False),
'uuid': object_fields.UUIDField(nullable=False),
'deployable_id': object_fields.IntegerField(nullable=False),
'cpid_type': object_fields.EnumField(valid_values=CPID_TYPE,
nullable=False),
'cpid_info': object_fields.StringField(nullable=False)
}
def create(self, context):
"""Create a ControlPathID record in the DB."""
values = self.obj_get_changes()
db_cp = self.dbapi.control_path_create(context, values)
self._from_db_object(self, db_cp)
@classmethod
def get(cls, context, uuid):
"""Find a DB ControlpathID and return an Obj ControlpathID."""
db_cp = cls.dbapi.control_path_get_by_uuid(context, uuid)
obj_cp = cls._from_db_object(cls(context), db_cp)
return obj_cp
@classmethod
def list(cls, context, filters={}):
"""Return a list of ControlpathID objects."""
if filters:
sort_dir = filters.pop('sort_dir', 'desc')
sort_key = filters.pop('sort_key', 'create_at')
limit = filters.pop('limit', None)
marker = filters.pop('marker_obj', None)
db_cps = cls.dbapi.control_path_get_by_filters(context, filters,
sort_dir=sort_dir,
sort_key=sort_key,
limit=limit,
marker=marker)
else:
db_cps = cls.dbapi.control_path_list(context)
obj_cp_list = cls._from_db_object_list(db_cps, context)
return obj_cp_list
def save(self, context):
"""Update an ControlpathID record in the DB"""
updates = self.obj_get_changes()
db_cps = self.dbapi.control_path_update(context, self.uuid, updates)
self._from_db_object(self, db_cps)
def destroy(self, context):
"""Delete a ControlpathID from the DB."""
self.dbapi.control_path_delete(context, self.uuid)
self.obj_reset_changes()

View File

@ -18,6 +18,7 @@ from oslo_versionedobjects import fields as object_fields
from cyborg.common import constants
# Import fields from oslo_versionedobjects
EnumField = object_fields.EnumField
IntegerField = object_fields.IntegerField
UUIDField = object_fields.UUIDField
StringField = object_fields.StringField

View File

@ -86,3 +86,27 @@ def get_test_arq(**kwargs):
'updated_at': kwargs.get('updated_at', None),
'substate': kwargs.get('substate', 'Initial'),
}
def get_test_attach_handle(**kw):
return {
'uuid': kw.get('uuid', '10efe63d-dfea-4a37-ad94-4116fba5098'),
'id': kw.get('id', 1),
'deployable_id': kw.get('deployable_id', 1),
'attach_type': kw.get('attach_type', "PCI"),
'attach_info': kw.get('attach_info', "attach_info"),
'created_at': kw.get('create_at', None),
'updated_at': kw.get('updated_at', None),
}
def get_test_control_path(**kw):
return {
'uuid': kw.get('uuid', '10efe63d-dfea-4a37-ad94-4116fba5098'),
'id': kw.get('id', 1),
'deployable_id': kw.get('deployable_id', 1),
'cpid_type': kw.get('cpid_type', "PCI"),
'cpid_info': kw.get('cpid_info', "cpid_info"),
'created_at': kw.get('create_at', None),
'updated_at': kw.get('updated_at', None),
}

View File

@ -0,0 +1,99 @@
# Copyright 2019 Intel, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cyborg import objects
from cyborg.tests.unit.db import base
from cyborg.tests.unit.db import utils
class TestAttachHandleObject(base.DbTestCase):
def setUp(self):
super(TestAttachHandleObject, self).setUp()
self.fake_attach_handle = utils.get_test_attach_handle()
def test_get(self):
uuid = self.fake_attach_handle['uuid']
with mock.patch.object(self.dbapi, 'attach_handle_get_by_uuid',
autospec=True) as mock_attach_handle_get:
mock_attach_handle_get.return_value = self.fake_attach_handle
attach_handle = objects.AttachHandle.get(self.context, uuid)
mock_attach_handle_get.assert_called_once_with(self.context, uuid)
self.assertEqual(self.context, attach_handle._context)
def test_get_by_id(self):
id = self.fake_attach_handle['id']
with mock.patch.object(self.dbapi, 'attach_handle_get_by_id',
autospec=True) as mock_attach_handle_get:
mock_attach_handle_get.return_value = self.fake_attach_handle
attach_handle = objects.AttachHandle.get_by_id(self.context, id)
mock_attach_handle_get.assert_called_once_with(self.context, id)
self.assertEqual(self.context, attach_handle._context)
def test_list(self):
with mock.patch.object(self.dbapi, 'attach_handle_list',
autospec=True) as mock_attach_handle_list:
mock_attach_handle_list.return_value = [self.fake_attach_handle]
attach_handles = objects.AttachHandle.list(self.context)
self.assertEqual(1, mock_attach_handle_list.call_count)
self.assertEqual(1, len(attach_handles))
self.assertIsInstance(attach_handles[0], objects.AttachHandle)
self.assertEqual(self.context, attach_handles[0]._context)
def test_create(self):
with mock.patch.object(self.dbapi, 'attach_handle_create',
autospec=True) as mock_attach_handle_create:
mock_attach_handle_create.return_value = self.fake_attach_handle
attach_handle = objects.AttachHandle(self.context,
**self.fake_attach_handle)
attach_handle.create(self.context)
mock_attach_handle_create.assert_called_once_with(
self.context, self.fake_attach_handle)
self.assertEqual(self.context, attach_handle._context)
def test_destroy(self):
uuid = self.fake_attach_handle['uuid']
with mock.patch.object(self.dbapi, 'attach_handle_get_by_uuid',
autospec=True) as mock_attach_handle_get:
mock_attach_handle_get.return_value = self.fake_attach_handle
with mock.patch.object(self.dbapi, 'attach_handle_delete',
autospec=True) as mock_attach_handle_delete:
attach_handle = objects.AttachHandle.get(self.context, uuid)
attach_handle.destroy(self.context)
mock_attach_handle_delete.assert_called_once_with(self.context,
uuid)
self.assertEqual(self.context, attach_handle._context)
def test_update(self):
uuid = self.fake_attach_handle['uuid']
with mock.patch.object(self.dbapi, 'attach_handle_get_by_uuid',
autospec=True) as mock_attach_handle_get:
mock_attach_handle_get.return_value = self.fake_attach_handle
with mock.patch.object(self.dbapi, 'attach_handle_update',
autospec=True) as mock_attach_handle_update:
fake = self.fake_attach_handle
fake["attach_info"] = "new_attach_info"
mock_attach_handle_update.return_value = fake
attach_handle = objects.AttachHandle.get(self.context, uuid)
attach_handle.attach_info = 'new_attach_info'
attach_handle.save(self.context)
mock_attach_handle_get.assert_called_once_with(self.context,
uuid)
mock_attach_handle_update.assert_called_once_with(
self.context, uuid,
{'attach_info': 'new_attach_info'})
self.assertEqual(self.context, attach_handle._context)

View File

@ -0,0 +1,90 @@
# Copyright 2019 Intel, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cyborg import objects
from cyborg.tests.unit.db import base
from cyborg.tests.unit.db import utils
class TestControlpathIDObject(base.DbTestCase):
def setUp(self):
super(TestControlpathIDObject, self).setUp()
self.fake_control_path = utils.get_test_control_path()
def test_get(self):
uuid = self.fake_control_path['uuid']
with mock.patch.object(self.dbapi, 'control_path_get_by_uuid',
autospec=True) as mock_control_path_get:
mock_control_path_get.return_value = self.fake_control_path
control_path = objects.ControlpathID.get(self.context, uuid)
mock_control_path_get.assert_called_once_with(self.context, uuid)
self.assertEqual(self.context, control_path._context)
def test_list(self):
with mock.patch.object(self.dbapi, 'control_path_list',
autospec=True) as mock_control_path_list:
mock_control_path_list.return_value = [self.fake_control_path]
control_paths = objects.ControlpathID.list(self.context)
self.assertEqual(1, mock_control_path_list.call_count)
self.assertEqual(1, len(control_paths))
self.assertIsInstance(control_paths[0], objects.ControlpathID)
self.assertEqual(self.context, control_paths[0]._context)
def test_create(self):
with mock.patch.object(self.dbapi, 'control_path_create',
autospec=True) as mock_control_path_create:
mock_control_path_create.return_value = self.fake_control_path
control_path = objects.ControlpathID(self.context,
**self.fake_control_path)
control_path.create(self.context)
mock_control_path_create.assert_called_once_with(
self.context, self.fake_control_path)
self.assertEqual(self.context, control_path._context)
def test_destroy(self):
uuid = self.fake_control_path['uuid']
with mock.patch.object(self.dbapi, 'control_path_get_by_uuid',
autospec=True) as mock_control_path_get:
mock_control_path_get.return_value = self.fake_control_path
with mock.patch.object(self.dbapi, 'control_path_delete',
autospec=True) as mock_control_path_delete:
control_path = objects.ControlpathID.get(self.context, uuid)
control_path.destroy(self.context)
mock_control_path_delete.assert_called_once_with(self.context,
uuid)
self.assertEqual(self.context, control_path._context)
def test_update(self):
uuid = self.fake_control_path['uuid']
with mock.patch.object(self.dbapi, 'control_path_get_by_uuid',
autospec=True) as mock_control_path_get:
mock_control_path_get.return_value = self.fake_control_path
with mock.patch.object(self.dbapi, 'control_path_update',
autospec=True) as mock_control_path_update:
fake = self.fake_control_path
fake["cpid_info"] = "new_cpid_info"
mock_control_path_update.return_value = fake
control_path = objects.ControlpathID.get(self.context, uuid)
control_path.cpid_info = 'new_cpid_info'
control_path.save(self.context)
mock_control_path_get.assert_called_once_with(self.context,
uuid)
mock_control_path_update.assert_called_once_with(
self.context, uuid,
{'cpid_info': 'new_cpid_info'})
self.assertEqual(self.context, control_path._context)