Add block_storage v3 API support

Existing block_storage v2 API methods were duplicated to v3 API.  v3
API is a superset of v2 API.  All tests are passing with only
modifications to the change the module API version from 2 --> 3.

Change-Id: I8cbabfb547a6b33e45af4fc779c786a23d441c91
This commit is contained in:
Matt Smith 2019-01-13 15:01:50 +00:00 committed by Monty Taylor
parent 0b1adb8664
commit 4dfd5a85b4
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
21 changed files with 1571 additions and 2 deletions

View File

@ -10,7 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.block_storage.v2 import _proxy
from openstack.block_storage.v2 import _proxy as _v2_proxy
from openstack.block_storage.v3 import _proxy as _v3_proxy
from openstack import service_description
@ -18,5 +19,6 @@ class BlockStorageService(service_description.ServiceDescription):
"""The block storage service."""
supported_versions = {
'2': _proxy.Proxy,
'3': _v3_proxy.Proxy,
'2': _v2_proxy.Proxy,
}

View File

View File

@ -0,0 +1,354 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.block_storage.v3 import backup as _backup
from openstack.block_storage.v3 import snapshot as _snapshot
from openstack.block_storage.v3 import stats as _stats
from openstack.block_storage.v3 import type as _type
from openstack.block_storage.v3 import volume as _volume
from openstack import exceptions
from openstack import proxy
from openstack import resource
class Proxy(proxy.Proxy):
def get_snapshot(self, snapshot):
"""Get a single snapshot
:param snapshot: The value can be the ID of a snapshot or a
:class:`~openstack.volume.v3.snapshot.Snapshot`
instance.
:returns: One :class:`~openstack.volume.v3.snapshot.Snapshot`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(_snapshot.Snapshot, snapshot)
def snapshots(self, details=True, **query):
"""Retrieve a generator of snapshots
:param bool details: When set to ``False``
:class:`~openstack.block_storage.v3.snapshot.Snapshot`
objects will be returned. The default, ``True``, will cause
:class:`~openstack.block_storage.v3.snapshot.SnapshotDetail`
objects to be returned.
:param kwargs query: Optional query parameters to be sent to limit
the snapshots being returned. Available parameters include:
* name: Name of the snapshot as a string.
* all_projects: Whether return the snapshots in all projects.
* volume_id: volume id of a snapshot.
* status: Value of the status of the snapshot so that you can
filter on "available" for example.
:returns: A generator of snapshot objects.
"""
snapshot = _snapshot.SnapshotDetail if details else _snapshot.Snapshot
return self._list(snapshot, paginated=True, **query)
def create_snapshot(self, **attrs):
"""Create a new snapshot from attributes
:param dict attrs: Keyword arguments which will be used to create
a :class:`~openstack.volume.v3.snapshot.Snapshot`,
comprised of the properties on the Snapshot class.
:returns: The results of snapshot creation
:rtype: :class:`~openstack.volume.v3.snapshot.Snapshot`
"""
return self._create(_snapshot.Snapshot, **attrs)
def delete_snapshot(self, snapshot, ignore_missing=True):
"""Delete a snapshot
:param snapshot: The value can be either the ID of a snapshot or a
:class:`~openstack.volume.v3.snapshot.Snapshot`
instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the snapshot does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent snapshot.
:returns: ``None``
"""
self._delete(_snapshot.Snapshot, snapshot,
ignore_missing=ignore_missing)
def get_type(self, type):
"""Get a single type
:param type: The value can be the ID of a type or a
:class:`~openstack.volume.v3.type.Type` instance.
:returns: One :class:`~openstack.volume.v3.type.Type`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(_type.Type, type)
def types(self, **query):
"""Retrieve a generator of volume types
:returns: A generator of volume type objects.
"""
return self._list(_type.Type, paginated=False, **query)
def create_type(self, **attrs):
"""Create a new type from attributes
:param dict attrs: Keyword arguments which will be used to create
a :class:`~openstack.volume.v3.type.Type`,
comprised of the properties on the Type class.
:returns: The results of type creation
:rtype: :class:`~openstack.volume.v3.type.Type`
"""
return self._create(_type.Type, **attrs)
def delete_type(self, type, ignore_missing=True):
"""Delete a type
:param type: The value can be either the ID of a type or a
:class:`~openstack.volume.v3.type.Type` instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the type does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent type.
:returns: ``None``
"""
self._delete(_type.Type, type, ignore_missing=ignore_missing)
def get_volume(self, volume):
"""Get a single volume
:param volume: The value can be the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:returns: One :class:`~openstack.volume.v3.volume.Volume`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(_volume.Volume, volume)
def volumes(self, details=True, **query):
"""Retrieve a generator of volumes
:param bool details: When set to ``False``
:class:`~openstack.block_storage.v3.volume.Volume` objects
will be returned. The default, ``True``, will cause
:class:`~openstack.block_storage.v3.volume.VolumeDetail`
objects to be returned.
:param kwargs query: Optional query parameters to be sent to limit
the volumes being returned. Available parameters include:
* name: Name of the volume as a string.
* all_projects: Whether return the volumes in all projects
* status: Value of the status of the volume so that you can filter
on "available" for example.
:returns: A generator of volume objects.
"""
volume = _volume.VolumeDetail if details else _volume.Volume
return self._list(volume, paginated=True, **query)
def create_volume(self, **attrs):
"""Create a new volume from attributes
:param dict attrs: Keyword arguments which will be used to create
a :class:`~openstack.volume.v3.volume.Volume`,
comprised of the properties on the Volume class.
:returns: The results of volume creation
:rtype: :class:`~openstack.volume.v3.volume.Volume`
"""
return self._create(_volume.Volume, **attrs)
def delete_volume(self, volume, ignore_missing=True):
"""Delete a volume
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the volume does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent volume.
:returns: ``None``
"""
self._delete(_volume.Volume, volume, ignore_missing=ignore_missing)
def extend_volume(self, volume, size):
"""Extend a volume
:param volume: The value can be either the ID of a volume or a
:class:`~openstack.volume.v3.volume.Volume` instance.
:param size: New volume size
:returns: None
"""
volume = self._get_resource(_volume.Volume, volume)
volume.extend(self, size)
def backend_pools(self):
"""Returns a generator of cinder Back-end storage pools
:returns A generator of cinder Back-end storage pools objects
"""
return self._list(_stats.Pools, paginated=False)
def backups(self, details=True, **query):
"""Retrieve a generator of backups
:param bool details: When set to ``False``
:class:`~openstack.block_storage.v3.backup.Backup` objects
will be returned. The default, ``True``, will cause
:class:`~openstack.block_storage.v3.backup.BackupDetail`
objects to be returned.
:param dict query: Optional query parameters to be sent to limit the
resources being returned:
* offset: pagination marker
* limit: pagination limit
* sort_key: Sorts by an attribute. A valid value is
name, status, container_format, disk_format, size, id,
created_at, or updated_at. Default is created_at.
The API uses the natural sorting direction of the
sort_key attribute value.
* sort_dir: Sorts by one or more sets of attribute and sort
direction combinations. If you omit the sort direction
in a set, default is desc.
:returns: A generator of backup objects.
"""
if not self._connection.has_service('object-store'):
raise exceptions.SDKException(
'Object-store service is required for block-store backups'
)
backup = _backup.BackupDetail if details else _backup.Backup
return self._list(backup, paginated=True, **query)
def get_backup(self, backup):
"""Get a backup
:param backup: The value can be the ID of a backup
or a :class:`~openstack.block_storage.v3.backup.Backup`
instance.
:returns: Backup instance
:rtype: :class:`~openstack.block_storage.v3.backup.Backup`
"""
if not self._connection.has_service('object-store'):
raise exceptions.SDKException(
'Object-store service is required for block-store backups'
)
return self._get(_backup.Backup, backup)
def create_backup(self, **attrs):
"""Create a new Backup from attributes with native API
:param dict attrs: Keyword arguments which will be used to create
a :class:`~openstack.block_storage.v3.backup.Backup`
comprised of the properties on the Backup class.
:returns: The results of Backup creation
:rtype: :class:`~openstack.block_storage.v3.backup.Backup`
"""
if not self._connection.has_service('object-store'):
raise exceptions.SDKException(
'Object-store service is required for block-store backups'
)
return self._create(_backup.Backup, **attrs)
def delete_backup(self, backup, ignore_missing=True):
"""Delete a CloudBackup
:param backup: The value can be the ID of a backup or a
:class:`~openstack.block_storage.v3.backup.Backup` instance
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be raised when
the zone does not exist.
When set to ``True``, no exception will be set when attempting to
delete a nonexistent zone.
:returns: ``None``
"""
if not self._connection.has_service('object-store'):
raise exceptions.SDKException(
'Object-store service is required for block-store backups'
)
self._delete(_backup.Backup, backup,
ignore_missing=ignore_missing)
def restore_backup(self, backup, volume_id, name):
"""Restore a Backup to volume
:param backup: The value can be the ID of a backup or a
:class:`~openstack.block_storage.v3.backup.Backup` instance
:param volume_id: The ID of the volume to restore the backup to.
:param name: The name for new volume creation to restore.
:returns: Updated backup instance
:rtype: :class:`~openstack.block_storage.v3.backup.Backup`
"""
if not self._connection.has_service('object-store'):
raise exceptions.SDKException(
'Object-store service is required for block-store backups'
)
backup = self._get_resource(_backup.Backup, backup)
return backup.restore(self, volume_id=volume_id, name=name)
def wait_for_status(self, res, status='ACTIVE', failures=None,
interval=2, wait=120):
"""Wait for a resource to be in a particular status.
:param res: The resource to wait on to reach the specified status.
The resource must have a ``status`` attribute.
:type resource: A :class:`~openstack.resource.Resource` object.
:param status: Desired status.
:param failures: Statuses that would be interpreted as failures.
:type failures: :py:class:`list`
:param interval: Number of seconds to wait before to consecutive
checks. Default to 2.
:param wait: Maximum number of seconds to wait before the change.
Default to 120.
:returns: The resource is returned on success.
:raises: :class:`~openstack.exceptions.ResourceTimeout` if transition
to the desired status failed to occur in specified seconds.
:raises: :class:`~openstack.exceptions.ResourceFailure` if the resource
has transited to one of the failure statuses.
:raises: :class:`~AttributeError` if the resource does not have a
``status`` attribute.
"""
failures = ['Error'] if failures is None else failures
return resource.wait_for_status(
self, res, status, failures, interval, wait)
def wait_for_delete(self, res, interval=2, wait=120):
"""Wait for a resource to be deleted.
:param res: The resource to wait on to be deleted.
:type resource: A :class:`~openstack.resource.Resource` object.
:param interval: Number of seconds to wait before to consecutive
checks. Default to 2.
:param wait: Maximum number of seconds to wait before the change.
Default to 120.
:returns: The resource is returned on success.
:raises: :class:`~openstack.exceptions.ResourceTimeout` if transition
to delete failed to occur in the specified seconds.
"""
return resource.wait_for_delete(self, res, interval, wait)

View File

@ -0,0 +1,100 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import resource
from openstack import utils
class Backup(resource.Resource):
"""Volume Backup"""
resource_key = "backup"
resources_key = "backups"
base_path = "/backups"
_query_mapping = resource.QueryParameters(
'all_tenants', 'limit', 'marker',
'sort_key', 'sort_dir')
# capabilities
allow_fetch = True
allow_create = True
allow_delete = True
allow_list = True
allow_get = True
#: Properties
#: backup availability zone
availability_zone = resource.Body("availability_zone")
#: The container backup in
container = resource.Body("container")
#: The date and time when the resource was created.
created_at = resource.Body("created_at")
#: data timestamp
#: The time when the data on the volume was first saved.
#: If it is a backup from volume, it will be the same as created_at
#: for a backup. If it is a backup from a snapshot,
#: it will be the same as created_at for the snapshot.
data_timestamp = resource.Body('data_timestamp')
#: backup description
description = resource.Body("description")
#: Backup fail reason
fail_reason = resource.Body("fail_reason")
#: Force backup
force = resource.Body("force", type=bool)
#: has_dependent_backups
#: If this value is true, there are other backups depending on this backup.
has_dependent_backups = resource.Body('has_dependent_backups', type=bool)
#: Indicates whether the backup mode is incremental.
#: If this value is true, the backup mode is incremental.
#: If this value is false, the backup mode is full.
is_incremental = resource.Body("is_incremental", type=bool)
#: A list of links associated with this volume. *Type: list*
links = resource.Body("links", type=list)
#: backup name
name = resource.Body("name")
#: backup object count
object_count = resource.Body("object_count", type=int)
#: The size of the volume, in gibibytes (GiB).
size = resource.Body("size", type=int)
#: The UUID of the source volume snapshot.
snapshot_id = resource.Body("snapshot_id")
#: backup status
#: values: creating, available, deleting, error, restoring, error_restoring
status = resource.Body("status")
#: The date and time when the resource was updated.
updated_at = resource.Body("updated_at")
#: The UUID of the volume.
volume_id = resource.Body("volume_id")
def restore(self, session, volume_id=None, name=None):
"""Restore current backup to volume
:param session: openstack session
:param volume_id: The ID of the volume to restore the backup to.
:param name: The name for new volume creation to restore.
:return:
"""
url = utils.urljoin(self.base_path, self.id, "restore")
body = {"restore": {"volume_id": volume_id, "name": name}}
response = session.post(url,
json=body)
self._translate_response(response)
return self
class BackupDetail(Backup):
"""Volume Backup with Details"""
base_path = "/backups/detail"
# capabilities
allow_list = True
#: Properties

View File

@ -0,0 +1,70 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import format
from openstack import resource
class Snapshot(resource.Resource):
resource_key = "snapshot"
resources_key = "snapshots"
base_path = "/snapshots"
_query_mapping = resource.QueryParameters(
'name', 'status', 'volume_id', all_projects='all_tenants')
# capabilities
allow_fetch = True
allow_create = True
allow_delete = True
allow_commit = True
allow_list = True
# Properties
#: A ID representing this snapshot.
id = resource.Body("id")
#: Name of the snapshot. Default is None.
name = resource.Body("name")
#: The current status of this snapshot. Potential values are creating,
#: available, deleting, error, and error_deleting.
status = resource.Body("status")
#: Description of snapshot. Default is None.
description = resource.Body("description")
#: The timestamp of this snapshot creation.
created_at = resource.Body("created_at")
#: Metadata associated with this snapshot.
metadata = resource.Body("metadata", type=dict)
#: The ID of the volume this snapshot was taken of.
volume_id = resource.Body("volume_id")
#: The size of the volume, in GBs.
size = resource.Body("size", type=int)
#: Indicate whether to create snapshot, even if the volume is attached.
#: Default is ``False``. *Type: bool*
is_forced = resource.Body("force", type=format.BoolStr)
class SnapshotDetail(Snapshot):
base_path = "/snapshots/detail"
# capabilities
allow_fetch = False
allow_create = False
allow_delete = False
allow_commit = False
allow_list = True
#: The percentage of completeness the snapshot is currently at.
progress = resource.Body("os-extended-snapshot-attributes:progress")
#: The project ID this snapshot is associated with.
project_id = resource.Body("os-extended-snapshot-attributes:project_id")

View File

@ -0,0 +1,31 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import resource
class Pools(resource.Resource):
resource_key = "pool"
resources_key = "pools"
base_path = "/scheduler-stats/get_pools?detail=True"
# capabilities
allow_fetch = False
allow_create = False
allow_delete = False
allow_list = True
# Properties
#: The Cinder name for the pool
name = resource.Body("name")
#: returns a dict with information about the pool
capabilities = resource.Body("capabilities", type=dict)

View File

@ -0,0 +1,37 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import resource
class Type(resource.Resource):
resource_key = "volume_type"
resources_key = "volume_types"
base_path = "/types"
# capabilities
allow_fetch = True
allow_create = True
allow_delete = True
allow_list = True
_query_mapping = resource.QueryParameters("is_public")
# Properties
#: A ID representing this type.
id = resource.Body("id")
#: Name of the type.
name = resource.Body("name")
#: A dict of extra specifications. "capabilities" is a usual key.
extra_specs = resource.Body("extra_specs", type=dict)
#: a private volume-type. *Type: bool*
is_public = resource.Body('os-volume-type-access:is_public', type=bool)

View File

@ -0,0 +1,125 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import format
from openstack import resource
from openstack import utils
class Volume(resource.Resource):
resource_key = "volume"
resources_key = "volumes"
base_path = "/volumes"
_query_mapping = resource.QueryParameters(
'name', 'status', 'project_id', all_projects='all_tenants')
# capabilities
allow_fetch = True
allow_create = True
allow_delete = True
allow_commit = True
allow_list = True
# Properties
#: A ID representing this volume.
id = resource.Body("id")
#: The name of this volume.
name = resource.Body("name")
#: A list of links associated with this volume. *Type: list*
links = resource.Body("links", type=list)
#: The availability zone.
availability_zone = resource.Body("availability_zone")
#: To create a volume from an existing volume, specify the ID of
#: the existing volume. If specified, the volume is created with
#: same size of the source volume.
source_volume_id = resource.Body("source_volid")
#: The volume description.
description = resource.Body("description")
#: To create a volume from an existing snapshot, specify the ID of
#: the existing volume snapshot. If specified, the volume is created
#: in same availability zone and with same size of the snapshot.
snapshot_id = resource.Body("snapshot_id")
#: The size of the volume, in GBs. *Type: int*
size = resource.Body("size", type=int)
#: The ID of the image from which you want to create the volume.
#: Required to create a bootable volume.
image_id = resource.Body("imageRef")
#: The name of the associated volume type.
volume_type = resource.Body("volume_type")
#: Enables or disables the bootable attribute. You can boot an
#: instance from a bootable volume. *Type: bool*
is_bootable = resource.Body("bootable", type=format.BoolStr)
#: One or more metadata key and value pairs to associate with the volume.
metadata = resource.Body("metadata")
#: One or more metadata key and value pairs about image
volume_image_metadata = resource.Body("volume_image_metadata")
#: One of the following values: creating, available, attaching, in-use
#: deleting, error, error_deleting, backing-up, restoring-backup,
#: error_restoring. For details on these statuses, see the
#: Block Storage API documentation.
status = resource.Body("status")
#: TODO(briancurtin): This is currently undocumented in the API.
attachments = resource.Body("attachments")
#: The timestamp of this volume creation.
created_at = resource.Body("created_at")
def _action(self, session, body):
"""Preform volume actions given the message body."""
# NOTE: This is using Volume.base_path instead of self.base_path
# as both Volume and VolumeDetail instances can be acted on, but
# the URL used is sans any additional /detail/ part.
url = utils.urljoin(Volume.base_path, self.id, 'action')
headers = {'Accept': ''}
return session.post(url, json=body, headers=headers)
def extend(self, session, size):
"""Extend a volume size."""
body = {'os-extend': {'new_size': size}}
self._action(session, body)
class VolumeDetail(Volume):
base_path = "/volumes/detail"
# capabilities
allow_fetch = False
allow_create = False
allow_delete = False
allow_commit = False
allow_list = True
#: The volume's current back-end.
host = resource.Body("os-vol-host-attr:host")
#: The project ID associated with current back-end.
project_id = resource.Body("os-vol-tenant-attr:tenant_id")
#: The status of this volume's migration (None means that a migration
#: is not currently in progress).
migration_status = resource.Body("os-vol-mig-status-attr:migstat")
#: The volume ID that this volume's name on the back-end is based on.
migration_id = resource.Body("os-vol-mig-status-attr:name_id")
#: Status of replication on this volume.
replication_status = resource.Body("replication_status")
#: Extended replication status on this volume.
extended_replication_status = resource.Body(
"os-volume-replication:extended_status")
#: ID of the consistency group.
consistency_group_id = resource.Body("consistencygroup_id")
#: Data set by the replication driver
replication_driver_data = resource.Body(
"os-volume-replication:driver_data")
#: ``True`` if this volume is encrypted, ``False`` if not.
#: *Type: bool*
is_encrypted = resource.Body("encrypted", type=format.BoolStr)

View File

@ -0,0 +1,33 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from openstack.tests.functional import base
class BaseBlockStorageTest(base.BaseFunctionalTest):
@classmethod
def setUpClass(cls):
super(BaseBlockStorageTest, cls).setUpClass()
cls._wait_for_timeout = int(os.getenv(
'OPENSTACKSDK_FUNC_TEST_TIMEOUT_BLOCK_STORAGE',
cls._wait_for_timeout))
def setUp(self):
super(BaseBlockStorageTest, self).setUp()
self._set_user_cloud(block_storage_api_version='3')
self._set_operator_cloud(block_storage_api_version='3')
if not self.user_cloud.has_service('block-storage'):
self.skipTest('block-storage service not supported by cloud')

View File

@ -0,0 +1,68 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.block_storage.v3 import volume as _volume
from openstack.block_storage.v3 import backup as _backup
from openstack.tests.functional.block_storage.v3 import base
class TestBackup(base.BaseBlockStorageTest):
def setUp(self):
super(TestBackup, self).setUp()
if not self.user_cloud.has_service('object-store'):
self.skipTest('Object service is requred, but not available')
self.VOLUME_NAME = self.getUniqueString()
self.VOLUME_ID = None
self.BACKUP_NAME = self.getUniqueString()
self.BACKUP_ID = None
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
interval=5,
wait=self._wait_for_timeout)
assert isinstance(volume, _volume.Volume)
self.VOLUME_ID = volume.id
backup = self.user_cloud.block_storage.create_backup(
name=self.BACKUP_NAME,
volume_id=volume.id)
self.user_cloud.block_storage.wait_for_status(
backup,
status='available',
failures=['error'],
interval=5,
wait=self._wait_for_timeout)
assert isinstance(backup, _backup.Backup)
self.assertEqual(self.BACKUP_NAME, backup.name)
self.BACKUP_ID = backup.id
def tearDown(self):
sot = self.user_cloud.block_storage.delete_backup(
self.BACKUP_ID,
ignore_missing=False)
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID,
ignore_missing=False)
self.assertIsNone(sot)
super(TestBackup, self).tearDown()
def test_get(self):
sot = self.user_cloud.block_storage.get_backup(self.BACKUP_ID)
self.assertEqual(self.BACKUP_NAME, sot.name)

View File

@ -0,0 +1,68 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.block_storage.v3 import snapshot as _snapshot
from openstack.block_storage.v3 import volume as _volume
from openstack.tests.functional.block_storage.v3 import base
class TestSnapshot(base.BaseBlockStorageTest):
def setUp(self):
super(TestSnapshot, self).setUp()
self.SNAPSHOT_NAME = self.getUniqueString()
self.SNAPSHOT_ID = None
self.VOLUME_NAME = self.getUniqueString()
self.VOLUME_ID = None
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
interval=2,
wait=self._wait_for_timeout)
assert isinstance(volume, _volume.Volume)
self.assertEqual(self.VOLUME_NAME, volume.name)
self.VOLUME_ID = volume.id
snapshot = self.user_cloud.block_storage.create_snapshot(
name=self.SNAPSHOT_NAME,
volume_id=self.VOLUME_ID)
self.user_cloud.block_storage.wait_for_status(
snapshot,
status='available',
failures=['error'],
interval=2,
wait=self._wait_for_timeout)
assert isinstance(snapshot, _snapshot.Snapshot)
self.assertEqual(self.SNAPSHOT_NAME, snapshot.name)
self.SNAPSHOT_ID = snapshot.id
def tearDown(self):
snapshot = self.user_cloud.block_storage.get_snapshot(self.SNAPSHOT_ID)
sot = self.user_cloud.block_storage.delete_snapshot(
snapshot, ignore_missing=False)
self.user_cloud.block_storage.wait_for_delete(
snapshot, interval=2, wait=self._wait_for_timeout)
self.assertIsNone(sot)
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestSnapshot, self).tearDown()
def test_get(self):
sot = self.user_cloud.block_storage.get_snapshot(self.SNAPSHOT_ID)
self.assertEqual(self.SNAPSHOT_NAME, sot.name)

View File

@ -0,0 +1,40 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.block_storage.v3 import type as _type
from openstack.tests.functional.block_storage.v3 import base
class TestType(base.BaseBlockStorageTest):
def setUp(self):
super(TestType, self).setUp()
self.TYPE_NAME = self.getUniqueString()
self.TYPE_ID = None
sot = self.operator_cloud.block_storage.create_type(
name=self.TYPE_NAME)
assert isinstance(sot, _type.Type)
self.assertEqual(self.TYPE_NAME, sot.name)
self.TYPE_ID = sot.id
def tearDown(self):
sot = self.operator_cloud.block_storage.delete_type(
self.TYPE_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestType, self).tearDown()
def test_get(self):
sot = self.operator_cloud.block_storage.get_type(self.TYPE_ID)
self.assertEqual(self.TYPE_NAME, sot.name)

View File

@ -0,0 +1,50 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.block_storage.v3 import volume as _volume
from openstack.tests.functional.block_storage.v3 import base
class TestVolume(base.BaseBlockStorageTest):
def setUp(self):
super(TestVolume, self).setUp()
if not self.user_cloud.has_service('block-storage'):
self.skipTest('block-storage service not supported by cloud')
self.VOLUME_NAME = self.getUniqueString()
self.VOLUME_ID = None
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
interval=2,
wait=self._wait_for_timeout)
assert isinstance(volume, _volume.Volume)
self.assertEqual(self.VOLUME_NAME, volume.name)
self.VOLUME_ID = volume.id
def tearDown(self):
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID,
ignore_missing=False)
self.assertIsNone(sot)
super(TestVolume, self).tearDown()
def test_get(self):
sot = self.user_cloud.block_storage.get_volume(self.VOLUME_ID)
self.assertEqual(self.VOLUME_NAME, sot.name)

View File

@ -0,0 +1,121 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from keystoneauth1 import adapter
from openstack.tests.unit import base
from openstack.block_storage.v3 import backup
FAKE_ID = "6685584b-1eac-4da6-b5c3-555430cf68ff"
BACKUP = {
"availability_zone": "az1",
"container": "volumebackups",
"created_at": "2018-04-02T10:35:27.000000",
"updated_at": "2018-04-03T10:35:27.000000",
"description": 'description',
"fail_reason": 'fail reason',
"id": FAKE_ID,
"name": "backup001",
"object_count": 22,
"size": 1,
"status": "available",
"volume_id": "e5185058-943a-4cb4-96d9-72c184c337d6",
"is_incremental": True,
"has_dependent_backups": False
}
DETAILS = {
}
BACKUP_DETAIL = copy.copy(BACKUP)
BACKUP_DETAIL.update(DETAILS)
class TestBackup(base.TestCase):
def setUp(self):
super(TestBackup, self).setUp()
self.sess = mock.Mock(spec=adapter.Adapter)
self.sess.get = mock.Mock()
self.sess.default_microversion = mock.Mock(return_value='')
def test_basic(self):
sot = backup.Backup(BACKUP)
self.assertEqual("backup", sot.resource_key)
self.assertEqual("backups", sot.resources_key)
self.assertEqual("/backups", sot.base_path)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertTrue(sot.allow_get)
self.assertTrue(sot.allow_fetch)
self.assertDictEqual(
{
"all_tenants": "all_tenants",
"limit": "limit",
"marker": "marker",
"sort_dir": "sort_dir",
"sort_key": "sort_key"
},
sot._query_mapping._mapping
)
def test_create(self):
sot = backup.Backup(**BACKUP)
self.assertEqual(BACKUP["id"], sot.id)
self.assertEqual(BACKUP["name"], sot.name)
self.assertEqual(BACKUP["status"], sot.status)
self.assertEqual(BACKUP["container"], sot.container)
self.assertEqual(BACKUP["availability_zone"], sot.availability_zone)
self.assertEqual(BACKUP["created_at"], sot.created_at)
self.assertEqual(BACKUP["updated_at"], sot.updated_at)
self.assertEqual(BACKUP["description"], sot.description)
self.assertEqual(BACKUP["fail_reason"], sot.fail_reason)
self.assertEqual(BACKUP["volume_id"], sot.volume_id)
self.assertEqual(BACKUP["object_count"], sot.object_count)
self.assertEqual(BACKUP["is_incremental"], sot.is_incremental)
self.assertEqual(BACKUP["size"], sot.size)
self.assertEqual(BACKUP["has_dependent_backups"],
sot.has_dependent_backups)
class TestBackupDetail(base.TestCase):
def test_basic(self):
sot = backup.BackupDetail(BACKUP_DETAIL)
self.assertIsInstance(sot, backup.Backup)
self.assertEqual("/backups/detail", sot.base_path)
def test_create(self):
sot = backup.Backup(**BACKUP_DETAIL)
self.assertEqual(BACKUP_DETAIL["id"], sot.id)
self.assertEqual(BACKUP_DETAIL["name"], sot.name)
self.assertEqual(BACKUP_DETAIL["status"], sot.status)
self.assertEqual(BACKUP_DETAIL["container"], sot.container)
self.assertEqual(BACKUP_DETAIL["availability_zone"],
sot.availability_zone)
self.assertEqual(BACKUP_DETAIL["created_at"], sot.created_at)
self.assertEqual(BACKUP_DETAIL["updated_at"], sot.updated_at)
self.assertEqual(BACKUP_DETAIL["description"], sot.description)
self.assertEqual(BACKUP_DETAIL["fail_reason"], sot.fail_reason)
self.assertEqual(BACKUP_DETAIL["volume_id"], sot.volume_id)
self.assertEqual(BACKUP_DETAIL["object_count"], sot.object_count)
self.assertEqual(BACKUP_DETAIL["is_incremental"], sot.is_incremental)
self.assertEqual(BACKUP_DETAIL["size"], sot.size)
self.assertEqual(BACKUP_DETAIL["has_dependent_backups"],
sot.has_dependent_backups)

View File

@ -0,0 +1,171 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from openstack import exceptions
from openstack.block_storage.v3 import _proxy
from openstack.block_storage.v3 import backup
from openstack.block_storage.v3 import snapshot
from openstack.block_storage.v3 import stats
from openstack.block_storage.v3 import type
from openstack.block_storage.v3 import volume
from openstack.tests.unit import test_proxy_base
class TestVolumeProxy(test_proxy_base.TestProxyBase):
def setUp(self):
super(TestVolumeProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
def test_snapshot_get(self):
self.verify_get(self.proxy.get_snapshot, snapshot.Snapshot)
def test_snapshots_detailed(self):
self.verify_list(self.proxy.snapshots, snapshot.SnapshotDetail,
paginated=True,
method_kwargs={"details": True, "query": 1},
expected_kwargs={"query": 1})
def test_snapshots_not_detailed(self):
self.verify_list(self.proxy.snapshots, snapshot.Snapshot,
paginated=True,
method_kwargs={"details": False, "query": 1},
expected_kwargs={"query": 1})
def test_snapshot_create_attrs(self):
self.verify_create(self.proxy.create_snapshot, snapshot.Snapshot)
def test_snapshot_delete(self):
self.verify_delete(self.proxy.delete_snapshot,
snapshot.Snapshot, False)
def test_snapshot_delete_ignore(self):
self.verify_delete(self.proxy.delete_snapshot,
snapshot.Snapshot, True)
def test_type_get(self):
self.verify_get(self.proxy.get_type, type.Type)
def test_types(self):
self.verify_list(self.proxy.types, type.Type, paginated=False)
def test_type_create_attrs(self):
self.verify_create(self.proxy.create_type, type.Type)
def test_type_delete(self):
self.verify_delete(self.proxy.delete_type, type.Type, False)
def test_type_delete_ignore(self):
self.verify_delete(self.proxy.delete_type, type.Type, True)
def test_volume_get(self):
self.verify_get(self.proxy.get_volume, volume.Volume)
def test_volumes_detailed(self):
self.verify_list(self.proxy.volumes, volume.VolumeDetail,
paginated=True,
method_kwargs={"details": True, "query": 1},
expected_kwargs={"query": 1})
def test_volumes_not_detailed(self):
self.verify_list(self.proxy.volumes, volume.Volume,
paginated=True,
method_kwargs={"details": False, "query": 1},
expected_kwargs={"query": 1})
def test_volume_create_attrs(self):
self.verify_create(self.proxy.create_volume, volume.Volume)
def test_volume_delete(self):
self.verify_delete(self.proxy.delete_volume, volume.Volume, False)
def test_volume_delete_ignore(self):
self.verify_delete(self.proxy.delete_volume, volume.Volume, True)
def test_volume_extend(self):
self._verify("openstack.block_storage.v3.volume.Volume.extend",
self.proxy.extend_volume,
method_args=["value", "new-size"],
expected_args=["new-size"])
def test_backend_pools(self):
self.verify_list(self.proxy.backend_pools, stats.Pools,
paginated=False)
def test_backups_detailed(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_list(self.proxy.backups, backup.BackupDetail,
paginated=True,
method_kwargs={"details": True, "query": 1},
expected_kwargs={"query": 1})
def test_backups_not_detailed(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_list(self.proxy.backups, backup.Backup,
paginated=True,
method_kwargs={"details": False, "query": 1},
expected_kwargs={"query": 1})
def test_backup_get(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_get(self.proxy.get_backup, backup.Backup)
def test_backup_delete(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_delete(self.proxy.delete_backup, backup.Backup, False)
def test_backup_delete_ignore(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_delete(self.proxy.delete_backup, backup.Backup, True)
def test_backup_create_attrs(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self.verify_create(self.proxy.create_backup, backup.Backup)
def test_backup_restore(self):
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=True)
self._verify2(
'openstack.block_storage.v3.backup.Backup.restore',
self.proxy.restore_backup,
method_args=['volume_id'],
method_kwargs={'volume_id': 'vol_id', 'name': 'name'},
expected_args=[self.proxy],
expected_kwargs={'volume_id': 'vol_id', 'name': 'name'}
)
def test_backup_no_swift(self):
"""Ensure proxy method raises exception if swift is not available
"""
# NOTE: mock has_service
self.proxy._connection = mock.Mock()
self.proxy._connection.has_service = mock.Mock(return_value=False)
self.assertRaises(
exceptions.SDKException,
self.proxy.restore_backup,
'backup',
'volume_id',
'name')

View File

@ -0,0 +1,94 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.tests.unit import base
from openstack.block_storage.v3 import snapshot
FAKE_ID = "ffa9bc5e-1172-4021-acaf-cdcd78a9584d"
SNAPSHOT = {
"status": "creating",
"description": "Daily backup",
"created_at": "2015-03-09T12:14:57.233772",
"metadata": {},
"volume_id": "5aa119a8-d25b-45a7-8d1b-88e127885635",
"size": 1,
"id": FAKE_ID,
"name": "snap-001",
"force": "true",
}
DETAILS = {
"os-extended-snapshot-attributes:progress": "100%",
"os-extended-snapshot-attributes:project_id":
"0c2eba2c5af04d3f9e9d0d410b371fde"
}
DETAILED_SNAPSHOT = SNAPSHOT.copy()
DETAILED_SNAPSHOT.update(**DETAILS)
class TestSnapshot(base.TestCase):
def test_basic(self):
sot = snapshot.Snapshot(SNAPSHOT)
self.assertEqual("snapshot", sot.resource_key)
self.assertEqual("snapshots", sot.resources_key)
self.assertEqual("/snapshots", sot.base_path)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"name": "name",
"status": "status",
"all_projects": "all_tenants",
"volume_id": "volume_id",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
def test_create_basic(self):
sot = snapshot.Snapshot(**SNAPSHOT)
self.assertEqual(SNAPSHOT["id"], sot.id)
self.assertEqual(SNAPSHOT["status"], sot.status)
self.assertEqual(SNAPSHOT["created_at"], sot.created_at)
self.assertEqual(SNAPSHOT["metadata"], sot.metadata)
self.assertEqual(SNAPSHOT["volume_id"], sot.volume_id)
self.assertEqual(SNAPSHOT["size"], sot.size)
self.assertEqual(SNAPSHOT["name"], sot.name)
self.assertTrue(sot.is_forced)
class TestSnapshotDetail(base.TestCase):
def test_basic(self):
sot = snapshot.SnapshotDetail(DETAILED_SNAPSHOT)
self.assertIsInstance(sot, snapshot.Snapshot)
self.assertEqual("/snapshots/detail", sot.base_path)
self.assertFalse(sot.allow_fetch)
self.assertFalse(sot.allow_commit)
self.assertFalse(sot.allow_create)
self.assertFalse(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_create_detailed(self):
sot = snapshot.SnapshotDetail(**DETAILED_SNAPSHOT)
self.assertEqual(
DETAILED_SNAPSHOT["os-extended-snapshot-attributes:progress"],
sot.progress)
self.assertEqual(
DETAILED_SNAPSHOT["os-extended-snapshot-attributes:project_id"],
sot.project_id)

View File

@ -0,0 +1,48 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.tests.unit import base
from openstack.block_storage.v3 import type
FAKE_ID = "6685584b-1eac-4da6-b5c3-555430cf68ff"
TYPE = {
"extra_specs": {
"capabilities": "gpu"
},
"id": FAKE_ID,
"name": "SSD"
}
class TestType(base.TestCase):
def test_basic(self):
sot = type.Type(**TYPE)
self.assertEqual("volume_type", sot.resource_key)
self.assertEqual("volume_types", sot.resources_key)
self.assertEqual("/types", sot.base_path)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertFalse(sot.allow_commit)
def test_new(self):
sot = type.Type.new(id=FAKE_ID)
self.assertEqual(FAKE_ID, sot.id)
def test_create(self):
sot = type.Type(**TYPE)
self.assertEqual(TYPE["id"], sot.id)
self.assertEqual(TYPE["extra_specs"], sot.extra_specs)
self.assertEqual(TYPE["name"], sot.name)

View File

@ -0,0 +1,153 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from openstack.tests.unit import base
from openstack.block_storage.v3 import volume
FAKE_ID = "6685584b-1eac-4da6-b5c3-555430cf68ff"
IMAGE_METADATA = {
'container_format': 'bare',
'min_ram': '64', 'disk_format': u'qcow2',
'image_name': 'TestVM',
'image_id': '625d4f2c-cf67-4af3-afb6-c7220f766947',
'checksum': '64d7c1cd2b6f60c92c14662941cb7913',
'min_disk': '0', u'size': '13167616'
}
VOLUME = {
"status": "creating",
"name": "my_volume",
"attachments": [],
"availability_zone": "nova",
"bootable": "false",
"created_at": "2015-03-09T12:14:57.233772",
"description": "something",
"volume_type": "some_type",
"snapshot_id": "93c2e2aa-7744-4fd6-a31a-80c4726b08d7",
"source_volid": None,
"imageRef": "some_image",
"metadata": {},
"volume_image_metadata": IMAGE_METADATA,
"id": FAKE_ID,
"size": 10
}
DETAILS = {
"os-vol-host-attr:host": "127.0.0.1",
"os-vol-tenant-attr:tenant_id": "some tenant",
"os-vol-mig-status-attr:migstat": "done",
"os-vol-mig-status-attr:name_id": "93c2e2aa-7744-4fd6-a31a-80c4726b08d7",
"replication_status": "nah",
"os-volume-replication:extended_status": "really nah",
"consistencygroup_id": "123asf-asdf123",
"os-volume-replication:driver_data": "ahasadfasdfasdfasdfsdf",
"snapshot_id": "93c2e2aa-7744-4fd6-a31a-80c4726b08d7",
"encrypted": "false",
}
VOLUME_DETAIL = copy.copy(VOLUME)
VOLUME_DETAIL.update(DETAILS)
class TestVolume(base.TestCase):
def setUp(self):
super(TestVolume, self).setUp()
self.resp = mock.Mock()
self.resp.body = None
self.resp.json = mock.Mock(return_value=self.resp.body)
self.sess = mock.Mock()
self.sess.post = mock.Mock(return_value=self.resp)
def test_basic(self):
sot = volume.Volume(VOLUME)
self.assertEqual("volume", sot.resource_key)
self.assertEqual("volumes", sot.resources_key)
self.assertEqual("/volumes", sot.base_path)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_commit)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({"name": "name",
"status": "status",
"all_projects": "all_tenants",
"project_id": "project_id",
"limit": "limit",
"marker": "marker"},
sot._query_mapping._mapping)
def test_create(self):
sot = volume.Volume(**VOLUME)
self.assertEqual(VOLUME["id"], sot.id)
self.assertEqual(VOLUME["status"], sot.status)
self.assertEqual(VOLUME["attachments"], sot.attachments)
self.assertEqual(VOLUME["availability_zone"], sot.availability_zone)
self.assertFalse(sot.is_bootable)
self.assertEqual(VOLUME["created_at"], sot.created_at)
self.assertEqual(VOLUME["description"], sot.description)
self.assertEqual(VOLUME["volume_type"], sot.volume_type)
self.assertEqual(VOLUME["snapshot_id"], sot.snapshot_id)
self.assertEqual(VOLUME["source_volid"], sot.source_volume_id)
self.assertEqual(VOLUME["metadata"], sot.metadata)
self.assertEqual(VOLUME["volume_image_metadata"],
sot.volume_image_metadata)
self.assertEqual(VOLUME["size"], sot.size)
self.assertEqual(VOLUME["imageRef"], sot.image_id)
def test_extend(self):
sot = volume.Volume(**VOLUME)
self.assertIsNone(sot.extend(self.sess, '20'))
url = 'volumes/%s/action' % FAKE_ID
body = {"os-extend": {"new_size": "20"}}
headers = {'Accept': ''}
self.sess.post.assert_called_with(url, json=body, headers=headers)
class TestVolumeDetail(base.TestCase):
def test_basic(self):
sot = volume.VolumeDetail(VOLUME_DETAIL)
self.assertIsInstance(sot, volume.Volume)
self.assertEqual("/volumes/detail", sot.base_path)
self.assertFalse(sot.allow_fetch)
self.assertFalse(sot.allow_commit)
self.assertFalse(sot.allow_create)
self.assertFalse(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_create(self):
sot = volume.VolumeDetail(**VOLUME_DETAIL)
self.assertEqual(VOLUME_DETAIL["os-vol-host-attr:host"], sot.host)
self.assertEqual(VOLUME_DETAIL["os-vol-tenant-attr:tenant_id"],
sot.project_id)
self.assertEqual(VOLUME_DETAIL["os-vol-mig-status-attr:migstat"],
sot.migration_status)
self.assertEqual(VOLUME_DETAIL["os-vol-mig-status-attr:name_id"],
sot.migration_id)
self.assertEqual(VOLUME_DETAIL["replication_status"],
sot.replication_status)
self.assertEqual(
VOLUME_DETAIL["os-volume-replication:extended_status"],
sot.extended_replication_status)
self.assertEqual(VOLUME_DETAIL["consistencygroup_id"],
sot.consistency_group_id)
self.assertEqual(VOLUME_DETAIL["os-volume-replication:driver_data"],
sot.replication_driver_data)
self.assertFalse(sot.is_encrypted)

View File

@ -0,0 +1,4 @@
---
features:
- |
Added support for block storage v3.