191 lines
7.3 KiB
Python
191 lines
7.3 KiB
Python
# Copyright 2022 Red Hat, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
|
|
from tempest import clients
|
|
from tempest import config
|
|
from tempest.lib import auth
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib.common.utils import test_utils
|
|
|
|
from manila_tempest_tests.common import waiters
|
|
|
|
CONF = config.CONF
|
|
|
|
|
|
class ShareRbacBaseTests(object):
|
|
|
|
identity_version = 'v3'
|
|
protocols = ['nfs', 'cifs', 'glusterfs', 'hdfs', 'cephfs', 'maprfs']
|
|
|
|
@classmethod
|
|
def skip_checks(cls):
|
|
super(ShareRbacBaseTests, cls).skip_checks()
|
|
if not CONF.enforce_scope.manila:
|
|
raise cls.skipException(
|
|
"Tempest is not configured to enforce_scope for manila, "
|
|
"skipping RBAC tests. To enable these tests set "
|
|
"`tempest.conf [enforce_scope] manila=True`."
|
|
)
|
|
if not CONF.share.default_share_type_name:
|
|
raise cls.skipException("Secure rbac tests require a default "
|
|
"share type")
|
|
if not any(p in CONF.share.enable_protocols for p in cls.protocols):
|
|
message = "%s tests are disabled" % cls.protocol
|
|
raise cls.skipException(message)
|
|
|
|
@classmethod
|
|
def delete_resource(cls, client, **kwargs):
|
|
key_names = {
|
|
'st': 'share_type',
|
|
'sn': 'share_network',
|
|
}
|
|
key, resource_id = list(kwargs.items())[0]
|
|
key = key.rsplit('_', 1)[0]
|
|
resource_name = key_names[key] if key in key_names else key
|
|
|
|
del_action = getattr(client, 'delete_{}'.format(resource_name))
|
|
test_utils.call_and_ignore_notfound_exc(
|
|
del_action, resource_id)
|
|
test_utils.call_and_ignore_notfound_exc(
|
|
client.wait_for_resource_deletion, **kwargs)
|
|
|
|
@classmethod
|
|
def create_share(cls, client, share_type_id, size=None, name=None,
|
|
metadata=None):
|
|
kwargs = {}
|
|
name = name or data_utils.rand_name('share')
|
|
metadata = metadata or {}
|
|
kwargs.update({
|
|
'share_protocol': cls.protocol,
|
|
'size': size or CONF.share.share_size,
|
|
'name': name or data_utils.rand_name('share'),
|
|
'share_type_id': share_type_id,
|
|
'metadata': metadata,
|
|
})
|
|
share = client.create_share(**kwargs)['share']
|
|
waiters.wait_for_resource_status(client, share['id'], 'available')
|
|
cls.addClassResourceCleanup(
|
|
cls.delete_resource, client,
|
|
share_id=share['id'])
|
|
return share
|
|
|
|
@classmethod
|
|
def create_snapshot(cls, client, share_id, name=None, metadata=None):
|
|
name = name or data_utils.rand_name('snapshot')
|
|
snapshot = client.create_snapshot(
|
|
share_id, name=name, metadata=metadata)['snapshot']
|
|
waiters.wait_for_resource_status(
|
|
client, snapshot['id'], 'available', resource_name='snapshot')
|
|
cls.addClassResourceCleanup(
|
|
cls.delete_resource, client, snapshot_id=snapshot['id'])
|
|
return snapshot
|
|
|
|
@classmethod
|
|
def create_share_network(cls, client, name=None):
|
|
name = name or data_utils.rand_name('share_network')
|
|
share_network = client.create_share_network(name=name)['share_network']
|
|
|
|
cls.addClassResourceCleanup(
|
|
cls.delete_resource, client, sn_id=share_network['id'])
|
|
return share_network
|
|
|
|
@classmethod
|
|
def create_share_type(cls):
|
|
name = data_utils.rand_name('share-type')
|
|
extra_specs = {
|
|
'driver_handles_share_servers': CONF.share.multitenancy_enabled,
|
|
}
|
|
share_type = cls.admin_shares_v2_client.create_share_type(
|
|
name=name, extra_specs=extra_specs)['share_type']
|
|
cls.addClassResourceCleanup(
|
|
cls.delete_resource, cls.admin_shares_v2_client,
|
|
st_id=share_type['id'])
|
|
return share_type
|
|
|
|
@classmethod
|
|
def create_share_group_type(cls, share_types, is_public=True,
|
|
group_specs=None):
|
|
name = data_utils.rand_name('share-group-type')
|
|
share_group_type = (
|
|
cls.admin_shares_v2_client.create_share_group_type(
|
|
name=name, share_types=share_types, is_public=is_public,
|
|
group_specs=group_specs))['share_group_type']
|
|
cls.addClassResourceCleanup(
|
|
cls.delete_resource, cls.admin_shares_v2_client,
|
|
share_group_type_id=share_group_type['id'])
|
|
return share_group_type
|
|
|
|
@classmethod
|
|
def get_share_type(cls):
|
|
return cls.shares_v2_client.get_default_share_type()['share_type']
|
|
|
|
def do_request(self, method, expected_status=200, client=None, **payload):
|
|
if not client:
|
|
client = self.client
|
|
if isinstance(expected_status, type(Exception)):
|
|
self.assertRaises(expected_status,
|
|
getattr(client, method),
|
|
**payload)
|
|
else:
|
|
response = getattr(client, method)(**payload)
|
|
self.assertEqual(response.response.status, expected_status)
|
|
return response
|
|
|
|
@classmethod
|
|
def setup_user_client(cls, client, project_id=None):
|
|
"""Set up project user with its own client.
|
|
|
|
This is useful for testing protection of resources in separate
|
|
projects.
|
|
NOTE(lkuchlan): Tempest creates 'project_member' and 'project_reader'
|
|
dynamic credentials in different projects. So this method is also
|
|
necessary for testing protection of resources in a specific project.
|
|
|
|
Returns a client object and the user's ID.
|
|
"""
|
|
|
|
projects_client = client.identity_v3.ProjectsClient()
|
|
users_client = client.identity_v3.UsersClient()
|
|
roles_client = client.identity_v3.RolesClient()
|
|
|
|
user_dict = {
|
|
'name': data_utils.rand_name('user'),
|
|
'password': data_utils.rand_password(),
|
|
}
|
|
user_id = users_client.create_user(
|
|
**user_dict)['user']['id']
|
|
cls.addClassResourceCleanup(users_client.delete_user, user_id)
|
|
|
|
if not project_id:
|
|
project_id = projects_client.create_project(
|
|
data_utils.rand_name())['project']['id']
|
|
cls.addClassResourceCleanup(
|
|
projects_client.delete_project,
|
|
project_id)
|
|
|
|
member_role_id = roles_client.list_roles(
|
|
name='member')['roles'][0]['id']
|
|
roles_client.create_user_role_on_project(
|
|
project_id, user_id, member_role_id)
|
|
creds = auth.KeystoneV3Credentials(
|
|
user_id=user_id,
|
|
password=user_dict['password'],
|
|
project_id=project_id)
|
|
auth_provider = clients.get_auth_provider(creds)
|
|
creds = auth_provider.fill_credentials()
|
|
client = clients.Manager(credentials=creds)
|
|
return client
|