188 lines
6.4 KiB
Python
188 lines
6.4 KiB
Python
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from tempest.common import waiters
|
|
from tempest import config
|
|
from tempest.lib.common import api_microversion_fixture
|
|
from tempest.lib.common import api_version_utils
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib.common.utils import test_utils
|
|
from tempest.lib.decorators import cleanup_order
|
|
from tempest import test
|
|
|
|
CONF = config.CONF
|
|
|
|
|
|
class VolumeV3RbacBaseTests(
|
|
api_version_utils.BaseMicroversionTest, test.BaseTestCase
|
|
):
|
|
identity_version = 'v3'
|
|
|
|
@classmethod
|
|
def skip_checks(cls):
|
|
super(VolumeV3RbacBaseTests, cls).skip_checks()
|
|
if not CONF.enforce_scope.cinder:
|
|
raise cls.skipException(
|
|
"Tempest is not configured to enforce_scope for cinder, "
|
|
"skipping RBAC tests. To enable these tests set "
|
|
"`tempest.conf [enforce_scope] cinder=True`."
|
|
)
|
|
if not CONF.service_available.cinder:
|
|
skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
|
|
raise cls.skipException(skip_msg)
|
|
api_version_utils.check_skip_with_microversion(
|
|
cls.min_microversion, cls.max_microversion,
|
|
CONF.volume.min_microversion, CONF.volume.max_microversion)
|
|
|
|
@classmethod
|
|
def setup_credentials(cls):
|
|
cls.set_network_resources()
|
|
super(VolumeV3RbacBaseTests, cls).setup_credentials()
|
|
|
|
def setUp(self):
|
|
super(VolumeV3RbacBaseTests, self).setUp()
|
|
self.useFixture(api_microversion_fixture.APIMicroversionFixture(
|
|
volume_microversion=self.request_microversion))
|
|
|
|
@classmethod
|
|
def resource_setup(cls):
|
|
super(VolumeV3RbacBaseTests, cls).resource_setup()
|
|
cls.request_microversion = (
|
|
api_version_utils.select_request_microversion(
|
|
cls.min_microversion,
|
|
CONF.volume.min_microversion))
|
|
|
|
def do_request(self, method, expected_status=200, client=None, **payload):
|
|
"""Perform API call
|
|
|
|
Args:
|
|
method: Name of the API call
|
|
expected_status: HTTP desired response code
|
|
client: Client object if exists, None otherwise
|
|
payload: API call required parameters
|
|
|
|
Returns:
|
|
HTTP response
|
|
"""
|
|
if not client:
|
|
client = self.client
|
|
if isinstance(expected_status, type(Exception)):
|
|
self.assertRaises(expected_status,
|
|
getattr(client, method),
|
|
**payload)
|
|
else:
|
|
response = getattr(client, method)(**payload)
|
|
self.assertEqual(response.response.status, expected_status)
|
|
return response
|
|
|
|
@cleanup_order
|
|
def create_volume(self, client, **kwargs):
|
|
"""Wrapper utility that returns a test volume
|
|
|
|
Args:
|
|
client: Client object
|
|
|
|
Returns:
|
|
ID of the created volume
|
|
"""
|
|
kwargs['size'] = CONF.volume.volume_size
|
|
kwargs['name'] = data_utils.rand_name(
|
|
VolumeV3RbacBaseTests.__name__ + '-Volume'
|
|
)
|
|
|
|
volume_id = client.create_volume(**kwargs)['volume']['id']
|
|
self.cleanup(
|
|
test_utils.call_and_ignore_notfound_exc, func=self.delete_resource,
|
|
client=client, volume_id=volume_id
|
|
)
|
|
waiters.wait_for_volume_resource_status(
|
|
client=client, resource_id=volume_id, status='available'
|
|
)
|
|
|
|
return volume_id
|
|
|
|
@cleanup_order
|
|
def create_snapshot(self, client, volume_id, cleanup=True, **kwargs):
|
|
"""Wrapper utility that returns a test snapshot.
|
|
|
|
Args:
|
|
client: Client object
|
|
volume_id: ID of the volume
|
|
cleanup: Boolean if should delete the snapshot
|
|
|
|
Returns:
|
|
ID of the created snapshot
|
|
"""
|
|
kwargs['name'] = data_utils.rand_name(
|
|
VolumeV3RbacBaseTests.__name__ + '-Snapshot'
|
|
)
|
|
|
|
snapshot_id = client.create_snapshot(
|
|
volume_id=volume_id, **kwargs)['snapshot']['id']
|
|
if cleanup:
|
|
self.cleanup(
|
|
test_utils.call_and_ignore_notfound_exc,
|
|
func=self.delete_resource,
|
|
client=client, snapshot_id=snapshot_id
|
|
)
|
|
waiters.wait_for_volume_resource_status(
|
|
client=client, resource_id=snapshot_id, status='available'
|
|
)
|
|
|
|
return snapshot_id
|
|
|
|
@classmethod
|
|
def delete_resource(cls, client, **kwargs):
|
|
"""Delete a resource by a given client
|
|
|
|
Args:
|
|
client: Client object
|
|
|
|
Keyword Args:
|
|
snapshot_id: ID of a snapshot
|
|
volume_id: ID of a volume
|
|
"""
|
|
key, resource_id = list(kwargs.items())[0]
|
|
resource_name = key.split('_')[0]
|
|
|
|
del_action = getattr(client, f'delete_{resource_name}')
|
|
test_utils.call_and_ignore_notfound_exc(del_action, resource_id)
|
|
test_utils.call_and_ignore_notfound_exc(
|
|
client.wait_for_resource_deletion, resource_id)
|
|
|
|
@classmethod
|
|
def create_backup(
|
|
cls, volume_id, backup_client=None, add_cleanup=True, **kwargs
|
|
):
|
|
"""Wrapper utility that returns a test backup."""
|
|
if backup_client is None:
|
|
backup_client = cls.backups_client
|
|
if 'name' not in kwargs:
|
|
name = data_utils.rand_name(cls.__class__.__name__ + '-Backup')
|
|
kwargs['name'] = name
|
|
|
|
backup = backup_client.create_backup(
|
|
volume_id=volume_id, **kwargs
|
|
)['backup']
|
|
if add_cleanup:
|
|
cls.addClassResourceCleanup(
|
|
test_utils.call_and_ignore_notfound_exc,
|
|
cls.delete_resource,
|
|
client=backup_client,
|
|
backup_id=backup['id']
|
|
)
|
|
waiters.wait_for_volume_resource_status(
|
|
backup_client, backup['id'], 'available'
|
|
)
|
|
return backup
|