Add new DB schema: sqlachemy and alembic scripts.

Add the implementation of spec(https://review.openstack.org/615462)
This patch only contains new DB schema: sqlachemy and migration
scripts parts, and still lost object and model api parts of all tables
which will be submitted in the following patches. The initial owners
of remaining implementation are zhenghao, yumeng and Coco. This patch
delete the old unit test of object and api, we will add new unit test
once all ovos are ready.

Change-Id: I280b093076c8131a2ab2c925d3e050915a28977b
This commit is contained in:
Coco 2018-12-17 10:34:17 -05:00
parent b12ae00844
commit 13a3d5a4be
14 changed files with 339 additions and 1646 deletions

View File

@ -90,8 +90,8 @@ class ConfigInvalid(CyborgException):
_msg_fmt = _("Invalid configuration file. %(error_msg)s")
class AcceleratorAlreadyExists(CyborgException):
_msg_fmt = _("Accelerator with uuid %(uuid)s already exists.")
class DeviceAlreadyExists(CyborgException):
_msg_fmt = _("Device with uuid %(uuid)s already exists.")
class DeployableAlreadyExists(CyborgException):
@ -148,8 +148,8 @@ class ConfGroupForServiceTypeNotFound(ServiceNotFound):
"%(stype)s.")
class AcceleratorNotFound(NotFound):
_msg_fmt = _("Accelerator %(uuid)s could not be found.")
class DeviceNotFound(NotFound):
_msg_fmt = _("Device %(uuid)s could not be found.")
class DeployableNotFound(NotFound):
@ -165,8 +165,8 @@ class Conflict(CyborgException):
code = http_client.CONFLICT
class DuplicateAcceleratorName(Conflict):
_msg_fmt = _("An accelerator with name %(name)s already exists.")
class DuplicateDeviceName(Conflict):
_msg_fmt = _("A device with name %(name)s already exists.")
class DuplicateDeployableName(Conflict):

View File

@ -41,27 +41,26 @@ class Connection(object):
def __init__(self):
"""Constructor."""
# accelerator
# device
@abc.abstractmethod
def accelerator_create(self, context, values):
"""Create a new accelerator."""
def device_create(self, context, values):
"""Create a new device when device is inserted into the host."""
@abc.abstractmethod
def accelerator_get(self, context, uuid):
"""Get requested accelerator."""
def device_get(self, context, uuid):
"""Get requested device."""
@abc.abstractmethod
def accelerator_list(self, context, limit, marker, sort_key, sort_dir,
project_only):
"""Get requested list of accelerators."""
def device_list(self, context, limit, marker, sort_key, sort_dir):
"""Get requested list of devices."""
@abc.abstractmethod
def accelerator_update(self, context, uuid, values):
"""Update an accelerator."""
def device_update(self, context, uuid, values):
"""Update a device."""
@abc.abstractmethod
def accelerator_delete(self, context, uuid):
"""Delete an accelerator."""
def device_delete(self, context, uuid):
"""Delete a device when device is removed from the host."""
# deployable
@abc.abstractmethod

View File

@ -0,0 +1,177 @@
"""new_db_schema
Revision ID: ede4e3f1a232
Revises: d6f033d8fa5b
Create Date: 2018-11-27 22:00:52.080713
"""
# revision identifiers, used by Alembic.
revision = 'ede4e3f1a232'
down_revision = 'd6f033d8fa5b'
from alembic import op
import sqlalchemy as sa
# TODO: The enum value should be further discussed.
state = sa.Enum('Initial', 'Bound', 'BindFailed', name='state')
substate = sa.Enum('Initial', name='substate')
attach_type = sa.Enum('PCI', 'MDEV', name='attach_type')
control_type = sa.Enum('PCI', name='control_type')
def upgrade():
# drop old table: deployable, accelerator
op.drop_table('attributes')
op.drop_table('deployables')
op.drop_table('accelerators')
op.create_table(
'devices',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('type', sa.String(length=255), nullable=False),
sa.Column('vendor', sa.String(length=255), nullable=False),
sa.Column('model', sa.String(length=255), nullable=False),
sa.Column('std_board_info', sa.Text(), nullable=True),
sa.Column('vendor_board_info', sa.Text(), nullable=True),
sa.Column('hostname', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_devices0uuid'),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)
op.create_table(
'deployables',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('parent_id', sa.Integer(),
sa.ForeignKey('deployables.id', ondelete='CASCADE'),
nullable=True),
sa.Column('root_id', sa.Integer(),
sa.ForeignKey('deployables.id', ondelete='CASCADE'),
nullable=True),
sa.Column('name', sa.String(length=32), nullable=False),
sa.Column('num_accelerators', sa.Integer(), nullable=False),
sa.Column('device_id', sa.Integer(),
sa.ForeignKey('devices.id', ondelete="RESTRICT"),
nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_deployables0uuid'),
sa.Index('deployables_parent_id_idx', 'parent_id'),
sa.Index('deployables_root_id_idx', 'root_id'),
sa.Index('deployables_device_id_idx', 'device_id'),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)
op.create_table(
'attributes',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('deployable_id', sa.Integer(),
sa.ForeignKey('deployables.id', ondelete="RESTRICT"),
nullable=False),
sa.Column('key', sa.Text(), nullable=False),
sa.Column('value', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_attributes0uuid'),
sa.Index('attributes_deployable_id_idx', 'deployable_id'),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)
op.create_table(
'controlpath_ids',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('device_id', sa.Integer(),
sa.ForeignKey('devices.id', ondelete="RESTRICT"),
nullable=False),
sa.Column('cpid_type', control_type, nullable=False),
sa.Column('cpid_info', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_controlpath_ids0uuid'),
sa.Index('controlpath_ids_device_id_idx', 'device_id'),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)
op.create_table(
'attach_handles',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('deployable_id', sa.Integer(),
sa.ForeignKey('deployables.id', ondelete="RESTRICT"),
nullable=False),
sa.Column('cpid_id', sa.Integer(),
sa.ForeignKey('controlpath_ids.id', ondelete="RESTRICT"),
nullable=False),
sa.Column('attach_type', attach_type, nullable=False),
sa.Column('attach_info', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_attach_handles0uuid'),
sa.Index('attach_handles_deployable_id_idx', 'deployable_id'),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)
op.create_table(
'device_profiles',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('profile_json', sa.Text(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid', name='uniq_device_profiles0uuid'),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)
op.create_table(
'external_accelerator_requests',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('project_id', sa.String(length=255), nullable=False),
sa.Column('state', state, nullable=False, default='Initial'),
sa.Column('substate', substate, nullable=False, default='Initial'),
sa.Column('device_profile_id', sa.Integer(),
sa.ForeignKey('device_profiles.id', ondelete="RESTRICT"),
nullable=False),
sa.Column('hostname', sa.String(length=255), nullable=True),
sa.Column('device_rp_uuid', sa.String(length=36), nullable=True),
sa.Column('device_instance_uuid', sa.String(length=36),
nullable=True),
sa.Column('attach_handle_id', sa.Integer(),
sa.ForeignKey('attach_handles.id', ondelete="RESTRICT"),
nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uuid',
name='uniq_external_accelerator_requests0uuid'),
sa.Index('external_accelerator_requests_project_id_idx', 'project_id'),
sa.Index('external_accelerator_requests_device_profile_id_idx',
'device_profile_id'),
sa.Index('external_accelerator_requests_device_rp_uuid_idx',
'device_rp_uuid'),
sa.Index('external_accelerator_requests_device_instance_uuid_idx',
'device_instance_uuid'),
sa.Index('external_accelerator_requests_attach_handle_id_idx',
'attach_handle_id'),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)

View File

@ -127,69 +127,69 @@ class Connection(api.Connection):
def __init__(self):
pass
def accelerator_create(self, context, values):
def device_create(self, context, values):
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
accelerator = models.Accelerator()
accelerator.update(values)
device = models.Device()
device.update(values)
with _session_for_write() as session:
try:
session.add(accelerator)
session.add(device)
session.flush()
except db_exc.DBDuplicateEntry:
raise exception.AcceleratorAlreadyExists(uuid=values['uuid'])
return accelerator
raise exception.DeviceAlreadyExists(uuid=values['uuid'])
return device
def accelerator_get(self, context, uuid):
def device_get(self, context, uuid):
query = model_query(
context,
models.Accelerator).filter_by(uuid=uuid)
models.Device).filter_by(uuid=uuid)
try:
return query.one()
except NoResultFound:
raise exception.AcceleratorNotFound(uuid=uuid)
raise exception.DeviceNotFound(uuid=uuid)
def accelerator_list(self, context, limit, marker, sort_key, sort_dir,
project_only):
query = model_query(context, models.Accelerator,
def device_list(self, context, limit, marker, sort_key, sort_dir,
project_only):
query = model_query(context, models.Device,
project_only=project_only)
return _paginate_query(context, models.Accelerator, limit, marker,
return _paginate_query(context, models.Device, limit, marker,
sort_key, sort_dir, query)
def accelerator_update(self, context, uuid, values):
def device_update(self, context, uuid, values):
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing Accelerator.")
msg = _("Cannot overwrite UUID for an existing Device.")
raise exception.InvalidParameterValue(err=msg)
try:
return self._do_update_accelerator(context, uuid, values)
return self._do_update_device(context, uuid, values)
except db_exc.DBDuplicateEntry as e:
if 'name' in e.columns:
raise exception.DuplicateAcceleratorName(name=values['name'])
raise exception.DuplicateDeviceName(name=values['name'])
@oslo_db_api.retry_on_deadlock
def _do_update_accelerator(self, context, uuid, values):
def _do_update_device(self, context, uuid, values):
with _session_for_write():
query = model_query(context, models.Accelerator)
query = model_query(context, models.Device)
query = add_identity_filter(query, uuid)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.AcceleratorNotFound(uuid=uuid)
raise exception.DeviceNotFound(uuid=uuid)
ref.update(values)
return ref
@oslo_db_api.retry_on_deadlock
def accelerator_delete(self, context, uuid):
def device_delete(self, context, uuid):
with _session_for_write():
query = model_query(context, models.Accelerator)
query = model_query(context, models.Device)
query = add_identity_filter(query, uuid)
count = query.delete()
if count != 1:
raise exception.AcceleratorNotFound(uuid=uuid)
raise exception.DeviceNotFound(uuid=uuid)
def deployable_create(self, context, values):
if not values.get('uuid'):
@ -265,12 +265,8 @@ class Connection(api.Connection):
filters):
exact_match_filter_names = ['uuid', 'name',
'parent_uuid', 'root_uuid',
'address', 'host',
'board', 'vendor', 'version',
'type', 'interface_type', 'assignable',
'instance_uuid', 'availability',
'accelerator_id']
'parent_id', 'root_id',
'num_accelerators', 'device_id']
attribute_filters = {}
filters_copy = copy.deepcopy(filters)
for key, value in filters_copy.items():
@ -407,12 +403,8 @@ class Connection(api.Connection):
filters = copy.deepcopy(filters)
exact_match_filter_names = ['uuid', 'name',
'parent_uuid', 'root_uuid',
'address', 'host',
'board', 'vendor', 'version',
'type', 'interface_type', 'assignable',
'instance_uuid', 'availability',
'accelerator_id']
'parent_id', 'root_id',
'num_accelerators', 'device_id']
# Filter the query
query_prefix = self._exact_deployable_filter(query_prefix,
@ -653,14 +645,13 @@ class Connection(api.Connection):
def _sync_acc_res(self, context, resource, project_id):
"""Quota sync funciton"""
res_in_use = self._accelerator_data_get_for_project(context, resource,
project_id)
res_in_use = self._device_data_get_for_project(context, resource,
project_id)
return {resource: res_in_use}
def _accelerator_data_get_for_project(self, context, resource, project_id):
def _device_data_get_for_project(self, context, resource, project_id):
"""Return the number of resource which is being used by a project"""
query = model_query(context, models.Accelerator).\
filter_by(project_id=project_id).filter_by(device_type=resource)
query = model_query(context, models.Device).filter_by(type=resource)
return query.count()

View File

@ -20,7 +20,7 @@ from oslo_db.sqlalchemy import models
from oslo_utils import timeutils
import six.moves.urllib.parse as urlparse
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, Boolean, ForeignKey, Index
from sqlalchemy import Column, String, Integer, Enum, ForeignKey, Index
from sqlalchemy import Text
from sqlalchemy import schema
from sqlalchemy import DateTime
@ -67,27 +67,23 @@ class CyborgBase(models.TimestampMixin, models.ModelBase):
Base = declarative_base(cls=CyborgBase)
class Accelerator(Base):
"""Represents the accelerators."""
class Device(Base):
"""Represents the devices."""
__tablename__ = 'accelerators'
__tablename__ = 'devices'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_accelerators0uuid'),
schema.UniqueConstraint('uuid', name='uniq_devices0uuid'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36), nullable=False)
name = Column(String(255), nullable=False)
description = Column(String(255), nullable=True)
project_id = Column(String(36), nullable=True)
user_id = Column(String(36), nullable=True)
device_type = Column(String(255), nullable=False)
acc_type = Column(String(255), nullable=True)
acc_capability = Column(String(255), nullable=True)
vendor_id = Column(String(255), nullable=False)
product_id = Column(String(255), nullable=False)
remotable = Column(Integer, nullable=False)
type = Column(String(255), nullable=False)
vendor = Column(String(255), nullable=False)
model = Column(String(255), nullable=False)
std_board_info = Column(Text, nullable=True)
vendor_board_info = Column(Text, nullable=True)
hostname = Column(String(255), nullable=False)
class Deployable(Base):
@ -96,32 +92,20 @@ class Deployable(Base):
__tablename__ = 'deployables'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_deployables0uuid'),
Index('deployables_parent_uuid_idx', 'parent_uuid'),
Index('deployables_root_uuid_idx', 'root_uuid'),
Index('deployables_accelerator_id_idx', 'accelerator_id'),
Index('deployables_parent_id_idx', 'parent_id'),
Index('deployables_root_id_idx', 'root_id'),
Index('deployables_device_id_idx', 'device_id'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36), nullable=False)
name = Column(String(255), nullable=False)
parent_uuid = Column(String(36),
ForeignKey('deployables.uuid'), nullable=True)
root_uuid = Column(String(36),
ForeignKey('deployables.uuid'), nullable=True)
address = Column(String(255), nullable=False)
host = Column(String(255), nullable=False)
board = Column(String(255), nullable=False)
vendor = Column(String(255), nullable=False)
version = Column(String(255), nullable=False)
type = Column(String(255), nullable=False)
interface_type = Column(String(255), nullable=False)
assignable = Column(Boolean, nullable=False)
instance_uuid = Column(String(36), nullable=True)
availability = Column(String(255), nullable=False)
accelerator_id = Column(Integer,
ForeignKey('accelerators.id', ondelete="CASCADE"),
nullable=False)
parent_id = Column(Integer, ForeignKey('deployables.id'), nullable=True)
root_id = Column(Integer, ForeignKey('deployables.id'), nullable=True)
name = Column(String(32), nullable=False)
num_accelerators = Column(Integer, nullable=False)
device_id = Column(Integer, ForeignKey('devices.id', ondelete="RESTRICT"),
nullable=False)
class Attribute(Base):
@ -135,12 +119,105 @@ class Attribute(Base):
id = Column(Integer, primary_key=True)
uuid = Column(String(36), nullable=False)
deployable_id = Column(Integer,
ForeignKey('deployables.id', ondelete="CASCADE"),
ForeignKey('deployables.id', ondelete="RESTRICT"),
nullable=False)
key = Column(Text, nullable=False)
value = Column(Text, nullable=False)
class ControlpathID(Base):
"""Identifier for the Device when driver reporting to agent, IDs is
needed especially when multiple PFs exist in one Devices."""
__tablename__ = 'controlpath_ids'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_controlpath_ids0uuid'),
Index('controlpath_ids_device_id_idx', 'device_id'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36), nullable=False)
device_id = Column(Integer,
ForeignKey('devices.id', ondelete="RESTRICT"),
nullable=False)
cpid_type = Column(Enum('PCI', name='control_type'), nullable=False)
cpid_info = Column(Text, nullable=False)
class AttachHandle(Base):
"""Reprensents device's VFs and PFs which can be attached to a VM."""
__tablename__ = 'attach_handles'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_attach_handles0uuid'),
Index('attach_handles_deployable_id_idx', 'deployable_id'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36), nullable=False)
deployable_id = Column(Integer,
ForeignKey('deployables.id', ondelete="RESTRICT"),
nullable=False)
cpid_id = Column(Integer,
ForeignKey('controlpath_ids.id', ondelete="RESTRICT"),
nullable=False)
attach_type = Column(Enum('PCI', 'MDEV', name='attach_type'),
nullable=False)
attach_info = Column(Text, nullable=False)
class DeviceProfile(Base):
"""Represents users' specific requirements."""
__tablename__ = 'device_profiles'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_device_profiles0uuid'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36), nullable=False)
name = Column(String(255), nullable=False)
profile_json = Column(Text, nullable=False)
class ExternalAcceleratorRequest(Base):
"""Represents external nova requests for attach related operations."""
__tablename__ = 'external_accelerator_requests'
__table_args__ = (
schema.UniqueConstraint('uuid', name='uniq_ext_arqs0uuid'),
Index('external_accelerator_requests_project_id_idx', 'project_id'),
Index('external_accelerator_requests_device_profile_id_idx',
'device_profile_id'),
Index('external_accelerator_requests_device_rp_uuid_idx',
'device_rp_uuid'),
Index('external_accelerator_requests_device_instance_uuid_idx',
'device_instance_uuid'),
Index('external_accelerator_requests_attach_handle_id_idx',
'attach_handle_id'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36), nullable=False)
project_id = Column(String(255), nullable=False)
state = Column(Enum('Initial', 'Bound', 'BindFailed', name='state'),
nullable=False)
substate = Column(Enum('Initial', name='substate'), nullable=False),
device_profile_id = Column(Integer, ForeignKey('device_profiles.id',
ondelete="RESTRICT"),
nullable=False)
hostname = Column(String(255), nullable=True)
device_rp_uuid = Column(String(36), nullable=True)
device_instance_uuid = Column(String(36), nullable=True)
attach_handle_id = Column(Integer(), ForeignKey('attach_handles.id',
ondelete="RESTRICT"),
nullable=True)
class QuotaUsage(Base):
"""Represents the current usage for a given resource."""

View File

@ -1,164 +0,0 @@
# Copyright 2017 Huawei Technologies Co.,LTD.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
from oslo_utils import timeutils
from six.moves import http_client
from cyborg.api.controllers.v1.accelerators import Accelerator
from cyborg.conductor import rpcapi
from cyborg.tests.unit.api.controllers.v1 import base as v1_test
from cyborg.tests.unit.db import utils as db_utils
from cyborg.tests.unit.objects import utils as obj_utils
def gen_post_body(**kw):
return db_utils.get_test_accelerator(**kw)
def _rpcapi_accelerator_create(context, obj_acc):
"""Fake used to mock out the conductor RPCAPI's accelerator_create method.
Performs creation of the accelerator object and returns the created
accelerator as-per the real method.
"""
obj_acc.create(context)
return obj_acc
class TestPost(v1_test.APITestV1):
ACCELERATOR_UUID = '10efe63d-dfea-4a37-ad94-4116fba50981'
def setUp(self):
super(TestPost, self).setUp()
self.headers = self.gen_headers(self.context)
p = mock.patch.object(rpcapi.ConductorAPI, 'accelerator_create')
self.mock_create = p.start()
self.mock_create.side_effect = _rpcapi_accelerator_create
self.addCleanup(p.stop)
@mock.patch('oslo_utils.uuidutils.generate_uuid')
def test_post(self, mock_uuid):
mock_uuid.return_value = self.ACCELERATOR_UUID
body = gen_post_body(name='post_accelerator')
response = self.post_json('/accelerators', body, headers=self.headers)
self.assertEqual(http_client.CREATED, response.status_int)
response = response.json
self.assertEqual(self.ACCELERATOR_UUID, response['uuid'])
self.assertEqual(body['name'], response['name'])
self.mock_create.assert_called_once_with(mock.ANY, mock.ANY)
class TestList(v1_test.APITestV1):
def setUp(self):
super(TestList, self).setUp()
self.accs = []
for i in range(3):
acc = obj_utils.create_test_accelerator(self.context)
self.accs.append(acc)
self.acc = self.accs[0]
self.context.tenant = self.acc.project_id
self.headers = self.gen_headers(self.context)
def test_get_one(self):
data = self.get_json('/accelerators/%s' % self.acc.uuid,
headers=self.headers)
self.assertEqual(self.acc.uuid, data['uuid'])
for attr in Accelerator._wsme_attributes:
self.assertIn(attr.name, data)
def test_get_all(self):
data = self.get_json('/accelerators', headers=self.headers)
self.assertEqual(3, len(data['accelerators']))
data_uuids = [d['uuid'] for d in data['accelerators']]
acc_uuids = [acc.uuid for acc in self.accs]
self.assertItemsEqual(acc_uuids, data_uuids)
def _rpcapi_accelerator_update(context, obj_acc):
"""Fake used to mock out the conductor RPCAPI's accelerator_update method.
Performs update of the accelerator object and returns the updated
accelerator as-per the real method.
"""
obj_acc.save(context)
return obj_acc
class TestPatch(v1_test.APITestV1):
def setUp(self):
super(TestPatch, self).setUp()
self.acc = obj_utils.create_test_accelerator(self.context)
self.context.tenant = self.acc.project_id
self.headers = self.gen_headers(self.context)
p = mock.patch.object(rpcapi.ConductorAPI, 'accelerator_update')
self.mock_update = p.start()
self.mock_update.side_effect = _rpcapi_accelerator_update
self.addCleanup(p.stop)
@mock.patch.object(timeutils, 'utcnow')
def test_patch(self, mock_utcnow):
test_time = datetime.datetime(2012, 12, 12, 12, 12)
mock_utcnow.return_value = test_time
description = 'new-description'
response = self.patch_json('/accelerators/%s' % self.acc.uuid,
[{'path': '/description',
'value': description,
'op': 'replace'}],
headers=self.headers)
self.assertEqual(http_client.OK, response.status_code)
data = self.get_json('/accelerators/%s' % self.acc.uuid,
headers=self.headers)
self.assertEqual(description, data['description'])
return_updated_at = timeutils.parse_isotime(
data['updated_at']).replace(tzinfo=None)
self.assertEqual(test_time, return_updated_at)
self.mock_update.assert_called_once_with(mock.ANY, mock.ANY)
def _rpcapi_accelerator_delete(context, obj_acc):
"""Fake used to mock out the conductor RPCAPI's accelerator_delete method.
Performs deletion of the accelerator object as-per the real method.
"""
obj_acc.destroy(context)
class TestDelete(v1_test.APITestV1):
def setUp(self):
super(TestDelete, self).setUp()
self.acc = obj_utils.create_test_accelerator(self.context)
self.context.tenant = self.acc.project_id
self.headers = self.gen_headers(self.context)
p = mock.patch.object(rpcapi.ConductorAPI, 'accelerator_delete')
self.mock_delete = p.start()
self.mock_delete.side_effect = _rpcapi_accelerator_delete
self.addCleanup(p.stop)
def test_delete(self):
response = self.delete('/accelerators/%s' % self.acc.uuid,
headers=self.headers)
self.assertEqual(http_client.NO_CONTENT, response.status_code)
self.mock_delete.assert_called_once_with(mock.ANY, mock.ANY)

View File

@ -1,93 +0,0 @@
# Copyright 2018 Lenovo, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from six.moves import http_client
from cyborg.api.controllers.v1.deployables import Deployable
from cyborg.tests.unit.api.controllers.v1 import base as v1_test
from cyborg.tests.unit import fake_deployable
from cyborg.tests.unit.objects import utils as obj_utils
class TestDeployableController(v1_test.APITestV1):
def setUp(self):
super(TestDeployableController, self).setUp()
self.headers = self.gen_headers(self.context)
self.deployable_uuids = ['10efe63d-dfea-4a37-ad94-4116fba50981',
'10efe63d-dfea-4a37-ad94-4116fba50982']
@mock.patch('cyborg.objects.Deployable.get')
def test_get_one(self, mock_get_dep):
dep_uuid = self.deployable_uuids[0]
mock_get_dep.return_value = fake_deployable.\
fake_deployable_obj(self.context, uuid=dep_uuid)
data = self.get_json('/accelerators/deployables/%s' % dep_uuid,
headers=self.headers)
self.assertEqual(dep_uuid, data['uuid'])
for attr in Deployable._wsme_attributes:
self.assertIn(attr.name, data)
mock_get_dep.assert_called_once_with(mock.ANY, dep_uuid)
@mock.patch('cyborg.objects.Deployable.list')
def test_get_all(self, mock_list_dep):
fake_deps = []
for uuid in self.deployable_uuids:
fake_dep = fake_deployable.fake_deployable_obj(self.context,
uuid=uuid)
fake_deps.append(fake_dep)
mock_list_dep.return_value = fake_deps
data = self.get_json('/accelerators/deployables',
headers=self.headers)
self.assertEqual(len(self.deployable_uuids), len(data['deployables']))
mock_list_dep.assert_called_once()
@mock.patch('cyborg.conductor.rpcapi.ConductorAPI.deployable_update')
@mock.patch('cyborg.objects.Deployable.get')
def test_patch(self, mock_get_dep, mock_deployable_update):
self.headers['X-Roles'] = 'admin'
self.headers['Content-Type'] = 'application/json'
dep_uuid = self.deployable_uuids[0]
fake_dep = fake_deployable.fake_deployable_obj(self.context,
uuid=dep_uuid)
mock_get_dep.return_value = fake_dep
instance_uuid = '10efe63d-dfea-4a37-ad94-4116fba50981'
fake_dep.instance_uuid = instance_uuid
mock_deployable_update.return_value = fake_dep
response = self.patch_json('/accelerators/deployables/%s' % dep_uuid,
[{'path': '/instance_uuid',
'value': instance_uuid,
'op': 'replace'}],
headers=self.headers)
self.assertEqual(http_client.OK, response.status_code)
data = response.json_body
self.assertEqual(instance_uuid, data['instance_uuid'])
mock_deployable_update.assert_called_once()
def test_get_all_with_sort(self):
dps = []
for uuid in self.deployable_uuids:
dp = obj_utils.create_test_deployable(self.context,
uuid=uuid)
dps.append(dp)
data = self.get_json('/accelerators/deployables?'
'filters.field=sort_key&filters.value=created_at'
'&filters.field=sort_dir&filters.value=desc',
headers=self.headers)
self.assertEqual(self.deployable_uuids[1],
data['deployables'][0]['uuid'])
self.assertEqual(self.deployable_uuids[0],
data['deployables'][1]['uuid'])

View File

@ -1,104 +0,0 @@
# Copyright 2018 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
import netaddr
from oslo_db import exception as db_exc
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_context import context
from cyborg import db
from cyborg.common import exception
from cyborg import objects
from cyborg.objects import base
from cyborg import tests as test
from cyborg.tests.unit import fake_accelerator
from cyborg.tests.unit.objects import test_objects
from cyborg.tests.unit.db.base import DbTestCase
class _TestAcceleratorObject(DbTestCase):
@property
def fake_accelerator(self):
db_acc = fake_accelerator.fake_db_accelerator(id=2)
return db_acc
@mock.patch.object(db.api.Connection, 'accelerator_create')
def test_create(self, mock_create):
mock_create.return_value = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**mock_create.return_value)
acc.create(self.context)
self.assertEqual(self.fake_accelerator['id'], acc.id)
@mock.patch.object(db.api.Connection, 'accelerator_get')
def test_get(self, mock_get):
mock_get.return_value = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**mock_get.return_value)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc['uuid'])
self.assertEqual(acc_get.uuid, acc.uuid)
@mock.patch.object(db.api.Connection, 'accelerator_update')
def test_save(self, mock_save):
mock_save.return_value = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**mock_save.return_value)
acc.create(self.context)
acc.name = 'test_save'
acc.save(self.context)
acc_get = objects.Accelerator.get(self.context, acc['uuid'])
self.assertEqual(acc_get.name, 'test_save')
@mock.patch.object(db.api.Connection, 'accelerator_delete')
def test_destroy(self, mock_destroy):
mock_destroy.return_value = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**mock_destroy.return_value)
acc.create(self.context)
self.assertEqual(self.fake_accelerator['id'], acc.id)
acc.destroy(self.context)
self.assertRaises(exception.AcceleratorNotFound,
objects.Accelerator.get, self.context,
acc['uuid'])
class TestAcceleratorObject(test_objects._LocalTest,
_TestAcceleratorObject):
def _test_save_objectfield_fk_constraint_fails(self, foreign_key,
expected_exception):
error = db_exc.DBReferenceError('table', 'constraint', foreign_key,
'key_table')
# Prevent lazy-loading any fields, results in InstanceNotFound
accelerator = fake_accelerator.fake_accelerator_obj(self.context)
fields_with_save_methods = [field for field in accelerator.fields
if hasattr(accelerator,
'_save_%s' % field)]
for field in fields_with_save_methods:
@mock.patch.object(accelerator, '_save_%s' % field)
@mock.patch.object(accelerator, 'obj_attr_is_set')
def _test(mock_is_set, mock_save_field):
mock_is_set.return_value = True
mock_save_field.side_effect = error
accelerator.obj_reset_changes(fields=[field])
accelerator._changed_fields.add(field)
self.assertRaises(expected_exception, accelerator.save)
accelerator.obj_reset_changes(fields=[field])
_test()

View File

@ -1,221 +0,0 @@
# Copyright 2018 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cyborg.common import exception
from cyborg import objects
from cyborg.tests.unit import fake_attribute
from cyborg.tests.unit import fake_deployable
from cyborg.tests.unit import fake_accelerator
from cyborg.tests.unit.db.base import DbTestCase
class _TestDeployableObject(DbTestCase):
@property
def fake_deployable(self):
db_deploy = fake_deployable.fake_db_deployable(id=1)
return db_deploy
@property
def fake_accelerator(self):
db_acc = fake_accelerator.fake_db_accelerator(id=2)
return db_acc
@property
def fake_attribute(self):
db_attr = fake_attribute.fake_db_attribute(id=2)
return db_attr
def test_create(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
self.assertEqual(db_attr['uuid'], attr.uuid)
def test_get(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr_get = objects.Attribute.get(self.context, attr.uuid)
self.assertEqual(db_attr['uuid'], attr_get.uuid)
def test_get_by_deployable_uuid(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr_get = objects.Attribute.get_by_deployable_id(
self.context, dpl_get.id)[0]
self.assertEqual(db_attr['uuid'], attr_get.uuid)
def test_save(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr_get = objects.Attribute.get(self.context, attr.uuid)
attr_get.set_key_value_pair("test_key", "test_val")
attr_get.save(self.context)
attr_get_2 = objects.Attribute.get(self.context, attr_get.uuid)
self.assertEqual(attr_get_2.key, "test_key")
self.assertEqual(attr_get_2.value, "test_val")
def test_destroy(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
self.assertEqual(db_attr['uuid'], attr.uuid)
attr.destroy(self.context)
self.assertRaises(exception.AttributeNotFound,
objects.Attribute.get, self.context,
attr.uuid)
def test_get_by_filter(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr_filter = {"key": "attr_key", "value": "attr_val"}
attr_get = objects.Attribute.get_by_filter(
self.context, attr_filter)[0]
self.assertEqual(db_attr['uuid'], attr_get.uuid)
attr_filter = {"key": "attr_key", "value": "attr_val2"}
attr_get_list = objects.Attribute.get_by_filter(
self.context, attr_filter)
self.assertEqual(len(attr_get_list), 0)
def test_create_exists(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
attr = objects.Attribute(context=self.context,
**db_attr)
attr.deployable_id = dpl_get.id
attr.create(self.context)
attr2 = objects.Attribute(context=self.context,
**db_attr)
attr2.deployable_id = dpl_get.id
self.assertRaises(exception.AttributeAlreadyExists,
attr2.create, self.context)

View File

@ -1,293 +0,0 @@
# Copyright 2018 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_db import exception as db_exc
from cyborg.common import exception
from cyborg import objects
from cyborg.tests.unit import fake_accelerator
from cyborg.tests.unit import fake_deployable
from cyborg.tests.unit import fake_attribute
from cyborg.tests.unit.objects import test_objects
from cyborg.tests.unit.db.base import DbTestCase
class _TestDeployableObject(DbTestCase):
@property
def fake_deployable(self):
db_deploy = fake_deployable.fake_db_deployable(id=1)
return db_deploy
@property
def fake_deployable2(self):
db_deploy = fake_deployable.fake_db_deployable(id=2)
return db_deploy
@property
def fake_accelerator(self):
db_acc = fake_accelerator.fake_db_accelerator(id=2)
return db_acc
@property
def fake_attribute(self):
db_attr = fake_attribute.fake_db_attribute(id=2)
return db_attr
@property
def fake_attribute2(self):
db_attr = fake_attribute.fake_db_attribute(id=3)
return db_attr
@property
def fake_attribute3(self):
db_attr = fake_attribute.fake_db_attribute(id=4)
return db_attr
def test_create(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
self.assertEqual(db_dpl['uuid'], dpl.uuid)
def test_get(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
self.assertEqual(dpl_get.uuid, dpl.uuid)
def test_get_by_filter(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
query = {"uuid": dpl['uuid']}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(dpl_get_list[0].uuid, dpl.uuid)
def test_save(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl.host = 'test_save'
dpl.save(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
self.assertEqual(dpl_get.host, 'test_save')
def test_destroy(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
self.assertEqual(db_dpl['uuid'], dpl.uuid)
dpl.destroy(self.context)
self.assertRaises(exception.DeployableNotFound,
objects.Deployable.get, self.context,
dpl.uuid)
def test_add_attribute(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
dpl.add_attribute(self.context, db_attr['key'], db_attr['value'])
dpl.save(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
self.assertEqual(len(dpl_get.attributes_list), 1)
self.assertEqual(dpl_get.attributes_list[0].key,
db_attr['key'])
self.assertEqual(dpl_get.attributes_list[0].value,
db_attr['value'])
dpl.add_attribute(self.context, db_attr['key'], 'change_value')
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
self.assertEqual(len(dpl_get.attributes_list), 1)
self.assertEqual(dpl_get.attributes_list[0].key,
db_attr['key'])
self.assertEqual(dpl_get.attributes_list[0].value,
'change_value')
def test_delete_attribute(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_attr = self.fake_attribute
dpl_get.add_attribute(self.context, db_attr['key'], db_attr['value'])
dpl_get.save(self.context)
dpl_get = objects.Deployable.get(self.context, dpl_get.uuid)
self.assertEqual(len(dpl_get.attributes_list), 1)
dpl_get.delete_attribute(self.context, dpl_get.attributes_list[0])
self.assertEqual(len(dpl_get.attributes_list), 0)
def test_get_by_filter_with_attributes(self):
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
db_dpl = self.fake_deployable
dpl = objects.Deployable(context=self.context,
**db_dpl)
dpl.accelerator_id = acc_get.id
dpl.create(self.context)
dpl_get = objects.Deployable.get(self.context, dpl.uuid)
db_dpl2 = self.fake_deployable2
dpl2 = objects.Deployable(context=self.context,
**db_dpl2)
dpl2.accelerator_id = acc_get.id
dpl2.create(self.context)
dpl2_get = objects.Deployable.get(self.context, dpl2.uuid)
db_attr = self.fake_attribute
db_attr2 = self.fake_attribute2
db_attr3 = self.fake_attribute3
dpl.add_attribute(self.context, 'attr_key', 'attr_val')
dpl.save(self.context)
dpl2.add_attribute(self.context, 'test_key', 'test_val')
dpl2.save(self.context)
dpl2.add_attribute(self.context, 'test_key3', 'test_val3')
dpl2.save(self.context)
query = {"attr_key": "attr_val"}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(len(dpl_get_list), 1)
self.assertEqual(dpl_get_list[0].uuid, dpl.uuid)
query = {"test_key": "test_val"}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(len(dpl_get_list), 1)
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
query = {"test_key": "test_val", "test_key3": "test_val3"}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(len(dpl_get_list), 1)
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
query = {"host": "host_name", "test_key3": "test_val3"}
dpl_get_list = objects.Deployable.get_by_filter(self.context, query)
self.assertEqual(len(dpl_get_list), 1)
self.assertEqual(dpl_get_list[0].uuid, dpl2.uuid)
def test_get_by_host(self):
dep1 = self.fake_deployable
dep2 = self.fake_deployable2
fake_hostname = 'host_name'
dep_obj1 = objects.Deployable(context=self.context,
**dep1)
dep_obj2 = objects.Deployable(context=self.context,
**dep2)
dep_obj1.create(self.context)
dep_obj2.create(self.context)
dep_obj1.save(self.context)
dep_obj2.save(self.context)
dep_objs = objects.Deployable.get_by_host(self.context, fake_hostname)
self.assertEqual(dep_objs[0].host, fake_hostname)
self.assertEqual(dep_objs[1].host, fake_hostname)
class TestDeployableObject(test_objects._LocalTest,
_TestDeployableObject):
def _test_save_objectfield_fk_constraint_fails(self, foreign_key,
expected_exception):
error = db_exc.DBReferenceError('table', 'constraint', foreign_key,
'key_table')
# Prevent lazy-loading any fields, results in InstanceNotFound
deployable = fake_deployable.fake_deployable_obj(self.context)
fields_with_save_methods = [field for field in deployable.fields
if hasattr(deployable, '_save_%s' % field)]
for field in fields_with_save_methods:
@mock.patch.object(deployable, '_save_%s' % field)
@mock.patch.object(deployable, 'obj_attr_is_set')
def _test(mock_is_set, mock_save_field):
mock_is_set.return_value = True
mock_save_field.side_effect = error
deployable.obj_reset_changes(fields=[field])
deployable._changed_fields.add(field)
self.assertRaises(expected_exception, deployable.save)
deployable.obj_reset_changes(fields=[field])
_test()

View File

@ -1,227 +0,0 @@
# Copyright 2018 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import datetime
import inspect
import os
import fixtures
import mock
from oslo_log import log
from oslo_utils import timeutils
from oslo_versionedobjects import base as ovo_base
from oslo_versionedobjects import exception as ovo_exc
from oslo_versionedobjects import fixture
import six
from oslo_context import context
from cyborg.common import exception
from cyborg import objects
from cyborg.objects import base
from cyborg.objects import fields
from cyborg import tests as test
LOG = log.getLogger(__name__)
class MyOwnedObject(base.CyborgPersistentObject, base.CyborgObject):
VERSION = '1.0'
fields = {'baz': fields.IntegerField()}
class MyObj(base.CyborgPersistentObject, base.CyborgObject,
base.CyborgObjectDictCompat):
VERSION = '1.6'
fields = {'foo': fields.IntegerField(default=1),
'bar': fields.StringField(),
'missing': fields.StringField(),
'readonly': fields.IntegerField(read_only=True),
'rel_object': fields.ObjectField('MyOwnedObject', nullable=True),
'rel_objects': fields.ListOfObjectsField('MyOwnedObject',
nullable=True),
'mutable_default': fields.ListOfStringsField(default=[]),
}
@staticmethod
def _from_db_object(context, obj, db_obj):
self = MyObj()
self.foo = db_obj['foo']
self.bar = db_obj['bar']
self.missing = db_obj['missing']
self.readonly = 1
self._context = context
return self
def obj_load_attr(self, attrname):
setattr(self, attrname, 'loaded!')
def query(cls, context):
obj = cls(context=context, foo=1, bar='bar')
obj.obj_reset_changes()
return obj
def marco(self):
return 'polo'
def _update_test(self):
self.bar = 'updated'
def save(self):
self.obj_reset_changes()
def refresh(self):
self.foo = 321
self.bar = 'refreshed'
self.obj_reset_changes()
def modify_save_modify(self):
self.bar = 'meow'
self.save()
self.foo = 42
self.rel_object = MyOwnedObject(baz=42)
def obj_make_compatible(self, primitive, target_version):
super(MyObj, self).obj_make_compatible(primitive, target_version)
# NOTE(danms): Simulate an older version that had a different
# format for the 'bar' attribute
if target_version == '1.1' and 'bar' in primitive:
primitive['bar'] = 'old%s' % primitive['bar']
class RandomMixInWithNoFields(object):
"""Used to test object inheritance using a mixin that has no fields."""
pass
@base.CyborgObjectRegistry.register_if(False)
class TestSubclassedObject(RandomMixInWithNoFields, MyObj):
fields = {'new_field': fields.StringField()}
class TestObjToPrimitive(test.base.TestCase):
def test_obj_to_primitive_list(self):
@base.CyborgObjectRegistry.register_if(False)
class MyObjElement(base.CyborgObject):
fields = {'foo': fields.IntegerField()}
def __init__(self, foo):
super(MyObjElement, self).__init__()
self.foo = foo
@base.CyborgObjectRegistry.register_if(False)
class MyList(base.ObjectListBase, base.CyborgObject):
fields = {'objects': fields.ListOfObjectsField('MyObjElement')}
mylist = MyList()
mylist.objects = [MyObjElement(1), MyObjElement(2), MyObjElement(3)]
self.assertEqual([1, 2, 3],
[x['foo'] for x in base.obj_to_primitive(mylist)])
def test_obj_to_primitive_dict(self):
base.CyborgObjectRegistry.register(MyObj)
myobj = MyObj(foo=1, bar='foo')
self.assertEqual({'foo': 1, 'bar': 'foo'},
base.obj_to_primitive(myobj))
def test_obj_to_primitive_recursive(self):
base.CyborgObjectRegistry.register(MyObj)
class MyList(base.ObjectListBase, base.CyborgObject):
fields = {'objects': fields.ListOfObjectsField('MyObj')}
mylist = MyList(objects=[MyObj(), MyObj()])
for i, value in enumerate(mylist):
value.foo = i
self.assertEqual([{'foo': 0}, {'foo': 1}],
base.obj_to_primitive(mylist))
def test_obj_to_primitive_with_ip_addr(self):
@base.CyborgObjectRegistry.register_if(False)
class TestObject(base.CyborgObject):
fields = {'addr': fields.IPAddressField(),
'cidr': fields.IPNetworkField()}
obj = TestObject(addr='1.2.3.4', cidr='1.1.1.1/16')
self.assertEqual({'addr': '1.2.3.4', 'cidr': '1.1.1.1/16'},
base.obj_to_primitive(obj))
def compare_obj(test, obj, db_obj, subs=None, allow_missing=None,
comparators=None):
"""Compare a CyborgObject and a dict-like database object.
This automatically converts TZ-aware datetimes and iterates over
the fields of the object.
:param:test: The TestCase doing the comparison
:param:obj: The CyborgObject to examine
:param:db_obj: The dict-like database object to use as reference
:param:subs: A dict of objkey=dbkey field substitutions
:param:allow_missing: A list of fields that may not be in db_obj
:param:comparators: Map of comparator functions to use for certain fields
"""
if subs is None:
subs = {}
if allow_missing is None:
allow_missing = []
if comparators is None:
comparators = {}
for key in obj.fields:
if key in allow_missing and not obj.obj_attr_is_set(key):
continue
obj_val = getattr(obj, key)
db_key = subs.get(key, key)
db_val = db_obj[db_key]
if isinstance(obj_val, datetime.datetime):
obj_val = obj_val.replace(tzinfo=None)
if key in comparators:
comparator = comparators[key]
comparator(db_val, obj_val)
else:
test.assertEqual(db_val, obj_val)
class _BaseTestCase(test.base.TestCase):
def setUp(self):
super(_BaseTestCase, self).setUp()
self.user_id = 'fake-user'
self.project_id = 'fake-project'
self.context = context.RequestContext(self.user_id, self.project_id)
base.CyborgObjectRegistry.register(MyObj)
base.CyborgObjectRegistry.register(MyOwnedObject)
def compare_obj(self, obj, db_obj, subs=None, allow_missing=None,
comparators=None):
compare_obj(self, obj, db_obj, subs=subs, allow_missing=allow_missing,
comparators=comparators)
def str_comparator(self, expected, obj_val):
"""Compare an object field to a string in the db by performing
a simple coercion on the object field value.
"""
self.assertEqual(expected, str(obj_val))
class _LocalTest(_BaseTestCase):
def setUp(self):
super(_LocalTest, self).setUp()

View File

@ -1,186 +0,0 @@
import mock
import netaddr
from oslo_db import exception as db_exc
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_context import context
from cyborg import db
from cyborg.common import exception
from cyborg import objects
from cyborg.objects import base
from cyborg import tests as test
from cyborg.tests.unit import fake_physical_function
from cyborg.tests.unit import fake_virtual_function
from cyborg.tests.unit import fake_accelerator
from cyborg.tests.unit.objects import test_objects
from cyborg.tests.unit.db.base import DbTestCase
class _TestPhysicalFunctionObject(DbTestCase):
@property
def fake_physical_function(self):
db_pf = fake_physical_function.fake_db_physical_function(id=1)
return db_pf
@property
def fake_virtual_function(self):
db_vf = fake_virtual_function.fake_db_virtual_function(id=3)
return db_vf
@property
def fake_accelerator(self):
db_acc = fake_accelerator.fake_db_accelerator(id=2)
return db_acc
def test_create(self):
db_pf = self.fake_physical_function
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
pf = objects.PhysicalFunction(context=self.context,
**db_pf)
pf.accelerator_id = acc_get.id
pf.create(self.context)
self.assertEqual(db_pf['uuid'], pf.uuid)
def test_get(self):
db_pf = self.fake_physical_function
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
pf = objects.PhysicalFunction(context=self.context,
**db_pf)
pf.accelerator_id = acc_get.id
pf.create(self.context)
pf_get = objects.PhysicalFunction.get(self.context, pf.uuid)
self.assertEqual(pf_get.uuid, pf.uuid)
def test_get_by_filter(self):
db_acc = self.fake_accelerator
db_pf = self.fake_physical_function
db_vf = self.fake_virtual_function
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
pf = objects.PhysicalFunction(context=self.context,
**db_pf)
pf.accelerator_id = acc_get.id
pf.create(self.context)
pf_get = objects.PhysicalFunction.get(self.context, pf.uuid)
vf = objects.VirtualFunction(context=self.context,
**db_vf)
vf.accelerator_id = pf_get.accelerator_id
vf.create(self.context)
vf_get = objects.VirtualFunction.get(self.context, vf.uuid)
pf_get.add_vf(vf_get)
pf_get.save(self.context)
query = {"vendor": pf['vendor']}
pf_get_list = objects.PhysicalFunction.get_by_filter(self.context,
query)
self.assertEqual(len(pf_get_list), 1)
self.assertEqual(pf_get_list[0].uuid, pf.uuid)
self.assertEqual(objects.PhysicalFunction, type(pf_get_list[0]))
self.assertEqual(objects.VirtualFunction,
type(pf_get_list[0].virtual_function_list[0]))
self.assertEqual(pf_get_list[0].virtual_function_list[0].uuid,
vf.uuid)
def test_save(self):
db_pf = self.fake_physical_function
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
pf = objects.PhysicalFunction(context=self.context,
**db_pf)
pf.accelerator_id = acc_get.id
pf.create(self.context)
pf_get = objects.PhysicalFunction.get(self.context, pf.uuid)
pf_get.host = 'test_save'
pf_get.save(self.context)
pf_get_2 = objects.PhysicalFunction.get(self.context, pf.uuid)
self.assertEqual(pf_get_2.host, 'test_save')
def test_destroy(self):
db_pf = self.fake_physical_function
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
pf = objects.PhysicalFunction(context=self.context,
**db_pf)
pf.accelerator_id = acc_get.id
pf.create(self.context)
pf_get = objects.PhysicalFunction.get(self.context, pf.uuid)
self.assertEqual(db_pf['uuid'], pf_get.uuid)
pf_get.destroy(self.context)
self.assertRaises(exception.DeployableNotFound,
objects.PhysicalFunction.get, self.context,
pf_get['uuid'])
def test_add_vf(self):
db_pf = self.fake_physical_function
db_vf = self.fake_virtual_function
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
pf = objects.PhysicalFunction(context=self.context,
**db_pf)
pf.accelerator_id = acc_get.id
pf.create(self.context)
pf_get = objects.PhysicalFunction.get(self.context, pf.uuid)
vf = objects.VirtualFunction(context=self.context,
**db_vf)
vf.accelerator_id = pf_get.accelerator_id
vf.create(self.context)
vf_get = objects.VirtualFunction.get(self.context, vf.uuid)
pf_get.add_vf(vf_get)
pf_get.save(self.context)
pf_get_2 = objects.PhysicalFunction.get(self.context, pf.uuid)
self.assertEqual(db_vf['uuid'],
pf_get_2.virtual_function_list[0].uuid)
class TestPhysicalFunctionObject(test_objects._LocalTest,
_TestPhysicalFunctionObject):
def _test_save_objectfield_fk_constraint_fails(self, foreign_key,
expected_exception):
error = db_exc.DBReferenceError('table', 'constraint', foreign_key,
'key_table')
# Prevent lazy-loading any fields, results in InstanceNotFound
pf = fake_physical_function.physical_function_obj(self.context)
fields_with_save_methods = [field for field in pf.fields
if hasattr(pf, '_save_%s' % field)]
for field in fields_with_save_methods:
@mock.patch.object(pf, '_save_%s' % field)
@mock.patch.object(pf, 'obj_attr_is_set')
def _test(mock_is_set, mock_save_field):
mock_is_set.return_value = True
mock_save_field.side_effect = error
pf.obj_reset_changes(fields=[field])
pf._changed_fields.add(field)
self.assertRaises(expected_exception, pf.save)
pf.obj_reset_changes(fields=[field])
_test()

View File

@ -1,202 +0,0 @@
# Copyright 2018 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
import netaddr
from oslo_db import exception as db_exc
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_context import context
from cyborg import db
from cyborg.common import exception
from cyborg import objects
from cyborg.objects import base
from cyborg import tests as test
from cyborg.tests.unit import fake_physical_function
from cyborg.tests.unit import fake_virtual_function
from cyborg.tests.unit import fake_accelerator
from cyborg.tests.unit.objects import test_objects
from cyborg.tests.unit.db.base import DbTestCase
class _TestVirtualFunctionObject(DbTestCase):
@property
def fake_accelerator(self):
db_acc = fake_accelerator.fake_db_accelerator(id=1)
return db_acc
@property
def fake_virtual_function(self):
db_vf = fake_virtual_function.fake_db_virtual_function(id=2)
return db_vf
@property
def fake_physical_function(self):
db_pf = fake_physical_function.fake_db_physical_function(id=3)
return db_pf
def test_create(self):
db_acc = self.fake_accelerator
db_vf = self.fake_virtual_function
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
vf = objects.VirtualFunction(context=self.context,
**db_vf)
vf.accelerator_id = acc_get.id
vf.create(self.context)
self.assertEqual(db_vf['uuid'], vf.uuid)
def test_get(self):
db_vf = self.fake_virtual_function
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
vf = objects.VirtualFunction(context=self.context,
**db_vf)
vf.accelerator_id = acc_get.id
vf.create(self.context)
vf_get = objects.VirtualFunction.get(self.context, vf.uuid)
self.assertEqual(vf_get.uuid, vf.uuid)
def test_get_by_filter(self):
db_acc = self.fake_accelerator
db_pf = self.fake_physical_function
db_vf = self.fake_virtual_function
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
pf = objects.PhysicalFunction(context=self.context,
**db_pf)
pf.accelerator_id = acc_get.id
pf.create(self.context)
pf_get = objects.PhysicalFunction.get(self.context, pf.uuid)
vf = objects.VirtualFunction(context=self.context,
**db_vf)
vf.accelerator_id = pf_get.accelerator_id
vf.create(self.context)
vf_get = objects.VirtualFunction.get(self.context, vf.uuid)
pf_get.add_vf(vf_get)
pf_get.save(self.context)
query = {"vendor": pf_get['vendor']}
vf_get_list = objects.VirtualFunction.get_by_filter(self.context,
query)
self.assertEqual(len(vf_get_list), 1)
self.assertEqual(vf_get_list[0].uuid, vf.uuid)
self.assertEqual(objects.VirtualFunction, type(vf_get_list[0]))
self.assertEqual(1, 1)
def test_get_by_filter2(self):
db_acc = self.fake_accelerator
db_pf = self.fake_physical_function
db_vf = self.fake_virtual_function
db_pf2 = self.fake_physical_function
db_vf2 = self.fake_virtual_function
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
pf = objects.PhysicalFunction(context=self.context,
**db_pf)
pf.accelerator_id = acc_get.id
pf.create(self.context)
pf_get = objects.PhysicalFunction.get(self.context, pf.uuid)
pf2 = objects.PhysicalFunction(context=self.context,
**db_pf2)
pf2.accelerator_id = acc_get.id
pf2.create(self.context)
pf_get2 = objects.PhysicalFunction.get(self.context, pf2.uuid)
query = {"uuid": pf2.uuid}
pf_get_list = objects.PhysicalFunction.get_by_filter(self.context,
query)
self.assertEqual(1, 1)
def test_save(self):
db_vf = self.fake_virtual_function
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
vf = objects.VirtualFunction(context=self.context,
**db_vf)
vf.accelerator_id = acc_get.id
vf.create(self.context)
vf_get = objects.VirtualFunction.get(self.context, vf.uuid)
vf_get.host = 'test_save'
vf_get.save(self.context)
vf_get_2 = objects.VirtualFunction.get(self.context, vf.uuid)
self.assertEqual(vf_get_2.host, 'test_save')
def test_destroy(self):
db_vf = self.fake_virtual_function
db_acc = self.fake_accelerator
acc = objects.Accelerator(context=self.context,
**db_acc)
acc.create(self.context)
acc_get = objects.Accelerator.get(self.context, acc.uuid)
vf = objects.VirtualFunction(context=self.context,
**db_vf)
vf.accelerator_id = acc_get.id
vf.create(self.context)
vf_get = objects.VirtualFunction.get(self.context, vf.uuid)
self.assertEqual(db_vf['uuid'], vf_get.uuid)
vf_get.destroy(self.context)
self.assertRaises(exception.DeployableNotFound,
objects.VirtualFunction.get, self.context,
vf_get['uuid'])
class TestVirtualFunctionObject(test_objects._LocalTest,
_TestVirtualFunctionObject):
def _test_save_objectfield_fk_constraint_fails(self, foreign_key,
expected_exception):
error = db_exc.DBReferenceError('table', 'constraint', foreign_key,
'key_table')
# Prevent lazy-loading any fields, results in InstanceNotFound
vf = fake_virtual_function.virtual_function_obj(self.context)
fields_with_save_methods = [field for field in vf.fields
if hasattr(vf, '_save_%s' % field)]
for field in fields_with_save_methods:
@mock.patch.object(vf, '_save_%s' % field)
@mock.patch.object(vf, 'obj_attr_is_set')
def _test(mock_is_set, mock_save_field):
mock_is_set.return_value = True
mock_save_field.side_effect = error
vf.obj_reset_changes(fields=[field])
vf._changed_fields.add(field)
self.assertRaises(expected_exception, vf.save)
vf.obj_reset_changes(fields=[field])
_test()

View File

@ -1,61 +0,0 @@
# Copyright 2017 Huawei Technologies Co.,LTD.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Cyborg object test utilities."""
from cyborg import objects
from cyborg.tests.unit.db import utils as db_utils
def get_test_accelerator(ctxt, **kw):
"""Return an Accelerator object with appropriate attributes.
NOTE: The object leaves the attributes marked as changed, such
that a create() could be used to commit it to the DB.
"""
test_acc = db_utils.get_test_accelerator(**kw)
obj_acc = objects.Accelerator(ctxt, **test_acc)
return obj_acc
def create_test_accelerator(ctxt, **kw):
"""Create and return a test accelerator object.
Create an accelerator in the DB and return an Accelerator object with
appropriate attributes.
"""
acc = get_test_accelerator(ctxt, **kw)
acc.create(ctxt)
return acc
def get_test_deployable(ctxt, **kw):
"""Return an Deployable object with appropriate attributes.
NOTE: The object leaves the attributes marked as changed, such
that a create() could be used to commit it to the DB.
"""
test_dp = db_utils.get_test_deployable(**kw)
return objects.Deployable(ctxt, **test_dp)
def create_test_deployable(ctxt, **kw):
"""Create and return a test deployable object.
Create an deployable in the DB and return an Deployable object with
appropriate attributes.
"""
dp = get_test_deployable(ctxt, **kw)
return dp.create(ctxt)