Add functional tests for manage/unmanage of shares in DHSS=True

This patch adds functional tests for managing/unmanaging
share servers, shares, and snapshots in DHSS=True.

Change-Id: I452c2a99b186f53d737cb7fbd7eabfcfd9b249d6
Partially-implements: bp manage-unmanage-with-share-servers
This commit is contained in:
Lucio Seki 2019-01-24 15:40:20 -02:00 committed by Rodrigo Barbieri
parent b4e00216fb
commit 370569423e
13 changed files with 1131 additions and 151 deletions

View File

@ -15,6 +15,10 @@ STATUS_ERROR = 'error'
STATUS_AVAILABLE = 'available'
STATUS_ERROR_DELETING = 'error_deleting'
STATUS_MIGRATING = 'migrating'
STATUS_MANAGE_ERROR = 'manage_error'
STATUS_MIGRATING_TO = 'migrating_to'
STATUS_CREATING = 'creating'
STATUS_DELETING = 'deleting'
TEMPEST_MANILA_PREFIX = 'tempest-manila'
@ -85,3 +89,13 @@ SHARE_GROUP_TYPE_REQUIRED_KEYS = {
}
MIN_SHARE_ACCESS_METADATA_MICROVERSION = '2.45'
# Share servers
SERVER_STATE_ACTIVE = 'active'
SERVER_STATE_CREATING = 'creating'
SERVER_STATE_DELETING = 'deleting'
SERVER_STATE_ERROR = 'error'
SERVER_STATE_MANAGE_ERROR = 'manage_error'
SERVER_STATE_MANAGE_STARTING = 'manage_starting'
SERVER_STATE_UNMANAGE_ERROR = 'unmanage_error'
SERVER_STATE_UNMANAGE_STARTING = 'unmanage_starting'

View File

@ -30,7 +30,7 @@ ShareGroup = [
help="The minimum api microversion is configured to be the "
"value of the minimum microversion supported by Manila."),
cfg.StrOpt("max_api_microversion",
default="2.48",
default="2.49",
help="The maximum api microversion is configured to be the "
"value of the latest microversion supported by Manila."),
cfg.StrOpt("region",

View File

@ -436,7 +436,7 @@ class SharesV2Client(shares_client.SharesClient):
def manage_share(self, service_host, protocol, export_path,
share_type_id, name=None, description=None,
is_public=False, version=LATEST_MICROVERSION,
url=None):
url=None, share_server_id=None):
post_body = {
"share": {
"export_path": export_path,
@ -448,6 +448,8 @@ class SharesV2Client(shares_client.SharesClient):
"is_public": is_public,
}
}
if share_server_id is not None:
post_body['share']['share_server_id'] = share_server_id
if url is None:
if utils.is_microversion_gt(version, "2.6"):
url = 'shares/manage'
@ -548,6 +550,8 @@ class SharesV2Client(shares_client.SharesClient):
time.sleep(self.build_interval)
body = self.get_snapshot(snapshot_id, version=version)
snapshot_status = body['status']
if snapshot_status == status:
return
if 'error' in snapshot_status:
raise (share_exceptions.
SnapshotBuildErrorException(snapshot_id=snapshot_id))
@ -596,6 +600,12 @@ class SharesV2Client(shares_client.SharesClient):
self.expected_success(202, resp.status)
return body
def snapshot_reset_state(self, snapshot_id,
status=constants.STATUS_AVAILABLE,
version=LATEST_MICROVERSION):
self.reset_state(snapshot_id, status=status, s_type='snapshots',
version=version)
###############
def revert_to_snapshot(self, share_id, snapshot_id,
@ -1368,6 +1378,64 @@ class SharesV2Client(shares_client.SharesClient):
(sg_snapshot_name, status, self.build_timeout))
raise exceptions.TimeoutException(message)
###############
def manage_share_server(self, host, share_network_id, identifier,
driver_options=None, version=LATEST_MICROVERSION):
body = {
'share_server': {
'host': host,
'share_network_id': share_network_id,
'identifier': identifier,
'driver_options': driver_options if driver_options else {},
}
}
body = json.dumps(body)
resp, body = self.post('share-servers/manage', body,
extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def unmanage_share_server(self, share_server_id,
version=LATEST_MICROVERSION):
body = json.dumps({'unmanage': None})
resp, body = self.post('share-servers/%s/action' % share_server_id,
body, extra_headers=True, version=version)
self.expected_success(202, resp.status)
return self._parse_resp(body)
def wait_for_share_server_status(self, server_id, status,
status_attr='status'):
"""Waits for a share to reach a given status."""
body = self.show_share_server(server_id)
server_status = body[status_attr]
start = int(time.time())
while server_status != status:
time.sleep(self.build_interval)
body = self.show_share_server(server_id)
server_status = body[status_attr]
if server_status == status:
return
elif constants.STATUS_ERROR in server_status.lower():
raise share_exceptions.ShareServerBuildErrorException(
server_id=server_id)
if int(time.time()) - start >= self.build_timeout:
message = ("Share server's %(status_attr)s failed to "
"transition to %(status)s within the required "
"time %(seconds)s." %
{"status_attr": status_attr, "status": status,
"seconds": self.build_timeout})
raise exceptions.TimeoutException(message)
def share_server_reset_state(self, share_server_id,
status=constants.SERVER_STATE_ACTIVE,
version=LATEST_MICROVERSION):
self.reset_state(share_server_id, status=status,
s_type='share-servers', version=version)
###############
def migrate_share(self, share_id, host,

View File

@ -75,3 +75,8 @@ class ResourceReleaseFailed(exceptions.TempestException):
class ShareReplicationTypeException(exceptions.TempestException):
message = ("Option backend_replication_type is set to incorrect value: "
"%(replication_type)s")
class ShareServerBuildErrorException(exceptions.TempestException):
message = ("Share server %(server_id)s failed to build and is in ERROR "
"status")

View File

@ -13,19 +13,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
import ddt
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
@ddt.ddt
class ManageNFSShareTest(base.BaseSharesAdminTest):
protocol = 'nfs'
@ -34,36 +35,23 @@ class ManageNFSShareTest(base.BaseSharesAdminTest):
# won't be deleted.
@classmethod
@testtools.skipIf(
CONF.share.multitenancy_enabled,
"Only for driver_handles_share_servers = False driver mode.")
@testtools.skipUnless(
CONF.share.run_manage_unmanage_tests,
"Manage/unmanage tests are disabled.")
def resource_setup(cls):
super(ManageNFSShareTest, cls).resource_setup()
if cls.protocol not in CONF.share.enable_protocols:
message = "%s tests are disabled" % cls.protocol
raise cls.skipException(message)
# Create share types
utils.skip_if_manage_not_supported_for_version()
super(ManageNFSShareTest, cls).resource_setup()
# Create share type
cls.st_name = data_utils.rand_name("manage-st-name")
cls.st_name_invalid = data_utils.rand_name("manage-st-name-invalid")
cls.extra_specs = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': False,
'snapshot_support': six.text_type(
CONF.share.capability_snapshot_support),
'create_share_from_snapshot_support': six.text_type(
CONF.share.capability_create_share_from_snapshot_support)
}
cls.extra_specs_invalid = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': True,
'snapshot_support': six.text_type(
CONF.share.capability_snapshot_support),
'create_share_from_snapshot_support': six.text_type(
CONF.share.capability_create_share_from_snapshot_support),
'driver_handles_share_servers': CONF.share.multitenancy_enabled,
}
cls.st = cls.create_share_type(
@ -71,15 +59,12 @@ class ManageNFSShareTest(base.BaseSharesAdminTest):
cleanup_in_class=True,
extra_specs=cls.extra_specs)
cls.st_invalid = cls.create_share_type(
name=cls.st_name_invalid,
cleanup_in_class=True,
extra_specs=cls.extra_specs_invalid)
def _test_manage(self, is_public=False,
version=CONF.share.max_api_microversion,
check_manage=False):
utils.skip_if_manage_not_supported_for_version(version)
share = self._create_share_for_manage()
name = "Name for 'managed' share that had ID %s" % share['id']
@ -97,16 +82,19 @@ class ManageNFSShareTest(base.BaseSharesAdminTest):
self.assertNotIn(share['id'], share_ids)
# Manage share
managed_share = self.shares_v2_client.manage_share(
service_host=share['host'],
export_path=share['export_locations'][0],
protocol=share['share_proto'],
share_type_id=self.st['share_type']['id'],
name=name,
description=description,
is_public=is_public,
version=version,
)
manage_params = {
'service_host': share['host'],
'export_path': share['export_locations'][0],
'protocol': share['share_proto'],
'share_type_id': self.st['share_type']['id'],
'name': name,
'description': description,
'is_public': is_public,
'version': version,
}
if CONF.share.multitenancy_enabled:
manage_params['share_server_id'] = share['share_server_id']
managed_share = self.shares_v2_client.manage_share(**manage_params)
# Add managed share to cleanup queue
self.method_resources.insert(
@ -115,7 +103,7 @@ class ManageNFSShareTest(base.BaseSharesAdminTest):
# Wait for success
self.shares_v2_client.wait_for_share_status(managed_share['id'],
'available')
constants.STATUS_AVAILABLE)
# Verify data of managed share
self.assertEqual(name, managed_share['name'])
@ -141,31 +129,7 @@ class ManageNFSShareTest(base.BaseSharesAdminTest):
self.assertNotIn('user_id', managed_share)
# Delete share
self.shares_v2_client.delete_share(managed_share['id'])
self.shares_v2_client.wait_for_resource_deletion(
share_id=managed_share['id'])
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.get_share,
managed_share['id'])
def _create_share_for_manage(self):
creation_data = {
'share_type_id': self.st['share_type']['id'],
'share_protocol': self.protocol,
}
share = self.create_share(**creation_data)
share = self.shares_v2_client.get_share(share['id'])
if utils.is_microversion_ge(CONF.share.max_api_microversion, "2.9"):
el = self.shares_v2_client.list_share_export_locations(share["id"])
share["export_locations"] = el
return share
def _unmanage_share_and_wait(self, share):
self.shares_v2_client.unmanage_share(share['id'])
self.shares_v2_client.wait_for_resource_deletion(share_id=share['id'])
self._delete_share_and_wait(managed_share)
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@base.skip_if_microversion_not_supported("2.5")
@ -186,55 +150,6 @@ class ManageNFSShareTest(base.BaseSharesAdminTest):
def test_manage(self):
self._test_manage(check_manage=True)
@testtools.skipUnless(
CONF.share.multitenancy_enabled,
"Will be re-enabled along with the updated tests of Manage-Unmanage "
"with Share Server patch")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_invalid(self):
# Try to manage share with invalid parameters, it should not succeed
# because the scheduler will reject it. If it succeeds, then this test
# case failed. Then, in order to remove the resource from backend, we
# need to manage it again, properly, so we can delete it. Consequently
# the second part of this test also tests that manage operation with a
# proper share type works.
def _delete_share(share_id):
self.shares_v2_client.reset_state(share_id)
self.shares_v2_client.delete_share(share_id)
self.shares_v2_client.wait_for_resource_deletion(share_id=share_id)
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.get_share,
share_id)
share = self._create_share_for_manage()
self._unmanage_share_and_wait(share)
managed_share = self.shares_v2_client.manage_share(
service_host=share['host'],
export_path=share['export_locations'][0],
protocol=share['share_proto'],
share_type_id=self.st_invalid['share_type']['id'])
self.addCleanup(_delete_share, managed_share['id'])
self.shares_v2_client.wait_for_share_status(
managed_share['id'], 'manage_error')
managed_share = self.shares_v2_client.get_share(managed_share['id'])
self.assertEqual(1, int(managed_share['size']))
# Delete resource from backend. We need to manage the share properly
# so it can be removed.
managed_share = self.shares_v2_client.manage_share(
service_host=share['host'],
export_path=share['export_locations'][0],
protocol=share['share_proto'],
share_type_id=self.st['share_type']['id'])
self.addCleanup(_delete_share, managed_share['id'])
self.shares_v2_client.wait_for_share_status(
managed_share['id'], 'available')
class ManageCIFSShareTest(ManageNFSShareTest):
protocol = 'cifs'

View File

@ -0,0 +1,306 @@
# Copyright 2019 NetApp Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
class ManageNFSShareNegativeTest(base.BaseSharesAdminTest):
protocol = 'nfs'
# NOTE(lseki): be careful running these tests using generic driver
# because cinder volumes will stay attached to service Nova VM and
# won't be deleted.
@classmethod
@testtools.skipUnless(
CONF.share.run_manage_unmanage_tests,
"Manage/unmanage tests are disabled.")
def resource_setup(cls):
if cls.protocol not in CONF.share.enable_protocols:
message = "%s tests are disabled" % cls.protocol
raise cls.skipException(message)
utils.skip_if_manage_not_supported_for_version()
super(ManageNFSShareNegativeTest, cls).resource_setup()
# Create share type
cls.st_name = data_utils.rand_name("manage-st-name")
cls.extra_specs = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': CONF.share.multitenancy_enabled,
'snapshot_support': CONF.share.capability_snapshot_support,
}
cls.st = cls.create_share_type(
name=cls.st_name,
cleanup_in_class=True,
extra_specs=cls.extra_specs)
def _manage_share_for_cleanup_and_wait(self, params,
state=constants.STATUS_AVAILABLE):
# Manage the share, schedule its deletion upon tearDown and wait for
# the expected state.
# Return the managed share object.
managed_share = self.shares_v2_client.manage_share(**params)
self.addCleanup(self._reset_state_and_delete_share,
managed_share)
self.shares_v2_client.wait_for_share_status(
managed_share['id'], state)
return managed_share
def _get_manage_params_from_share(self, share, invalid_params=None):
valid_params = {
'service_host': share['host'],
'protocol': share['share_proto'],
'share_type_id': share['share_type'],
}
if CONF.share.multitenancy_enabled:
valid_params['share_server_id'] = share['share_server_id']
if utils.is_microversion_ge(CONF.share.max_api_microversion, "2.9"):
el = self.shares_v2_client.list_share_export_locations(share["id"])
valid_params['export_path'] = el[0]['path']
if invalid_params:
valid_params.update(invalid_params)
return valid_params
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_invalid_param_raises_exception(self):
# Try to manage share with invalid parameters, it should not succeed
# because the api will reject it. If it succeeds, then this test case
# failed. Then, in order to remove the resource from backend, we need
# to manage it again, properly, so we can delete it. Consequently the
# second part of this test also tests that manage operation with a
# proper share type that works.
share = self._create_share_for_manage()
valid_params = self._get_manage_params_from_share(share)
self._unmanage_share_and_wait(share)
test_set = [
('service_host', 'invalid_host#invalid_pool', lib_exc.NotFound),
('share_type_id', 'invalid_share_type_id', lib_exc.NotFound),
]
if CONF.share.multitenancy_enabled:
test_set.append(
('share_server_id', 'invalid_server_id', lib_exc.BadRequest)
)
for invalid_key, invalid_value, expected_exception in test_set:
# forge a bad param
invalid_params = valid_params.copy()
invalid_params.update({
invalid_key: invalid_value
})
# the attempt to manage with bad param should fail and raise an
# exception
self.assertRaises(
expected_exception,
self.shares_v2_client.manage_share,
**invalid_params
)
# manage it properly and schedule cleanup upon tearDown
self._manage_share_for_cleanup_and_wait(valid_params)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_invalid_param_manage_error(self):
# Try to manage share with invalid parameters, it should not succeed.
# If it succeeds, then this test case failed. Then, in order to remove
# the resource from backend, we need to manage it again, properly, so
# we can delete it. Consequently the second part of this test also
# tests that manage operation with a proper share type works.
share = self._create_share_for_manage()
valid_params = self._get_manage_params_from_share(share)
self._unmanage_share_and_wait(share)
for invalid_key, invalid_value in (
('export_path', 'invalid_export'),
('protocol', 'invalid_protocol'),
):
# forge a bad param
invalid_params = valid_params.copy()
invalid_params.update({invalid_key: invalid_value})
# the attempt to manage the share with invalid params should fail
# and leave it in manage_error state
invalid_share = self.shares_v2_client.manage_share(
**invalid_params
)
self.shares_v2_client.wait_for_share_status(
invalid_share['id'], constants.STATUS_MANAGE_ERROR)
# cleanup
self._unmanage_share_and_wait(invalid_share)
# manage it properly and schedule cleanup upon tearDown
self._manage_share_for_cleanup_and_wait(valid_params)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_share_duplicate(self):
share = self._create_share_for_manage()
manage_params = self._get_manage_params_from_share(share)
self._unmanage_share_and_wait(share)
# manage the share for the first time
managed_share = self._manage_share_for_cleanup_and_wait(manage_params)
# update managed share's reference
managed_share = self.shares_v2_client.get_share(managed_share['id'])
manage_params = self._get_manage_params_from_share(managed_share)
# the second attempt to manage the same share should fail
self.assertRaises(
lib_exc.Conflict,
self.shares_v2_client.manage_share,
**manage_params
)
@testtools.skipUnless(CONF.share.multitenancy_enabled,
'Multitenancy tests are disabled.')
@utils.skip_if_microversion_not_supported("2.49")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_share_without_share_server_id(self):
share = self._create_share_for_manage()
manage_params = self._get_manage_params_from_share(share)
share_server_id = manage_params.pop('share_server_id')
self._unmanage_share_and_wait(share)
self.assertRaises(
lib_exc.BadRequest,
self.shares_v2_client.manage_share,
**manage_params)
manage_params['share_server_id'] = share_server_id
self._manage_share_for_cleanup_and_wait(manage_params)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_delete_share_in_manage_error(self):
share = self._create_share_for_manage()
valid_params = self._get_manage_params_from_share(share)
# forge bad param to have a share in manage_error state
invalid_params = valid_params.copy()
invalid_params.update({'export_path': 'invalid'})
invalid_share = self.shares_v2_client.manage_share(**invalid_params)
self.shares_v2_client.wait_for_share_status(
invalid_share['id'], constants.STATUS_MANAGE_ERROR)
self._unmanage_share_and_wait(share)
# the attempt to delete a share in manage_error should raise an
# exception
self.assertRaises(
lib_exc.Forbidden,
self.shares_v2_client.delete_share,
invalid_share['id']
)
# cleanup
self.shares_v2_client.unmanage_share(invalid_share['id'])
self._manage_share_for_cleanup_and_wait(valid_params)
@testtools.skipUnless(CONF.share.run_snapshot_tests,
'Snapshot tests are disabled.')
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_unmanage_share_with_snapshot(self):
# A share with snapshot cannot be unmanaged
share = self._create_share_for_manage()
snap = self.create_snapshot_wait_for_active(share["id"])
snap = self.shares_v2_client.get_snapshot(snap['id'])
self.assertRaises(
lib_exc.Forbidden,
self.shares_v2_client.unmanage_share,
share['id']
)
# cleanup
self._delete_snapshot_and_wait(snap)
self._delete_share_and_wait(share)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_unmanage_share_transitional_state(self):
# A share in transitional state cannot be unmanaged
share = self._create_share_for_manage()
for state in (constants.STATUS_CREATING,
constants.STATUS_DELETING,
constants.STATUS_MIGRATING,
constants.STATUS_MIGRATING_TO):
self.shares_v2_client.reset_state(share['id'], state)
self.assertRaises(
lib_exc.Forbidden,
self.shares_v2_client.unmanage_share,
share['id']
)
# cleanup
self._reset_state_and_delete_share(share)
@testtools.skipUnless(CONF.share.multitenancy_enabled,
'Multitenancy tests are disabled.')
@utils.skip_if_microversion_not_supported("2.48")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_unmanage_share_with_server_unsupported(self):
share = self._create_share_for_manage()
self.assertRaises(
lib_exc.Forbidden,
self.shares_v2_client.unmanage_share,
share['id'], version="2.48")
self._delete_share_and_wait(share)
class ManageCIFSShareNegativeTest(ManageNFSShareNegativeTest):
protocol = 'cifs'
class ManageGLUSTERFSShareNegativeTest(ManageNFSShareNegativeTest):
protocol = 'glusterfs'
class ManageHDFSShareNegativeTest(ManageNFSShareNegativeTest):
protocol = 'hdfs'
class ManageCephFSShareNegativeTest(ManageNFSShareNegativeTest):
protocol = 'cephfs'
class ManageMapRFSShareNegativeTest(ManageNFSShareNegativeTest):
protocol = 'maprfs'

View File

@ -22,7 +22,9 @@ from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
@ -168,6 +170,9 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
"updated_at",
"backend_details",
]
if utils.is_microversion_ge(CONF.share.max_api_microversion, "2.49"):
keys.append("is_auto_deletable")
keys.append("identifier")
# all expected keys are present
for key in keys:
self.assertIn(key, server.keys())
@ -261,3 +266,65 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
if delete_share_network:
self.shares_v2_client.wait_for_resource_deletion(
sn_id=new_sn["id"])
@testtools.skipIf(CONF.share.share_network_id != "",
"This test is not suitable for pre-existing "
"share_network.")
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@utils.skip_if_microversion_not_supported("2.49")
def test_share_server_reset_state(self):
# Get network and subnet from existing share_network and reuse it
# to be able to delete share_server after test ends.
new_sn = self.create_share_network(
neutron_net_id=self.share_network['neutron_net_id'],
neutron_subnet_id=self.share_network['neutron_subnet_id'])
share = self.create_share(
share_type_id=self.share_type_id,
share_network_id=new_sn['id']
)
share = self.shares_v2_client.get_share(share['id'])
# obtain share server
share_server = self.shares_v2_client.show_share_server(
share['share_server_id']
)
for state in (constants.SERVER_STATE_ACTIVE,
constants.SERVER_STATE_CREATING,
constants.SERVER_STATE_DELETING,
constants.SERVER_STATE_ERROR,
constants.SERVER_STATE_MANAGE_ERROR,
constants.SERVER_STATE_MANAGE_STARTING,
constants.SERVER_STATE_UNMANAGE_ERROR,
constants.SERVER_STATE_UNMANAGE_STARTING):
# leave it in a new state
self.shares_v2_client.share_server_reset_state(
share_server['id'],
status=state,
)
self.shares_v2_client.wait_for_share_server_status(
share_server['id'],
status=state
)
# bring the share server back in the active state
self.shares_v2_client.share_server_reset_state(
share_server['id'],
status=constants.SERVER_STATE_ACTIVE,
)
self.shares_v2_client.wait_for_share_server_status(
share_server['id'],
status=constants.SERVER_STATE_ACTIVE
)
# delete share
self.shares_v2_client.delete_share(share["id"])
self.shares_v2_client.wait_for_resource_deletion(
share_id=share["id"]
)
# delete share network. This will trigger share server deletion
self.shares_v2_client.delete_share_network(new_sn["id"])
self.shares_v2_client.wait_for_resource_deletion(
sn_id=new_sn['id'])

View File

@ -0,0 +1,140 @@
# Copyright 2019 NetApp Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib.common.utils import data_utils
import testtools
from testtools import testcase as tc
from manila_tempest_tests.tests.api import base
CONF = config.CONF
@base.skip_if_microversion_lt("2.49")
@testtools.skipUnless(
CONF.share.multitenancy_enabled,
'Multitenancy tests are disabled.')
@testtools.skipUnless(
CONF.share.run_manage_unmanage_tests,
'Manage/unmanage tests are disabled.')
class ManageShareServersTest(base.BaseSharesAdminTest):
@classmethod
def resource_setup(cls):
super(ManageShareServersTest, cls).resource_setup()
# create share type
cls.st_name = data_utils.rand_name("manage-st-name")
cls.extra_specs = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': CONF.share.multitenancy_enabled,
}
cls.share_type = cls.create_share_type(
name=cls.st_name,
cleanup_in_class=True,
extra_specs=cls.extra_specs)
@testtools.skipIf(CONF.share.share_network_id != "",
"This test is not suitable for pre-existing "
"share_network.")
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_manage_share_server(self):
# create a new share network to make sure that a new share server
# will be created
original_share_network = self.shares_v2_client.get_share_network(
self.shares_v2_client.share_network_id
)
share_network = self.create_share_network(
neutron_net_id=original_share_network['neutron_net_id'],
neutron_subnet_id=original_share_network['neutron_subnet_id'],
cleanup_in_class=True
)
# create share
share = self.create_share(
share_type_id=self.share_type['share_type']['id'],
share_network_id=share_network['id']
)
share = self.shares_v2_client.get_share(share['id'])
el = self.shares_v2_client.list_share_export_locations(share['id'])
share['export_locations'] = el
share_server = self.shares_v2_client.show_share_server(
share['share_server_id']
)
keys = [
"id",
"host",
"project_id",
"status",
"share_network_name",
"created_at",
"updated_at",
"backend_details",
"is_auto_deletable",
"identifier",
]
# all expected keys are present
for key in keys:
self.assertIn(key, share_server)
# check that the share server is initially auto-deletable
self.assertIs(True, share_server["is_auto_deletable"])
self.assertIsNotNone(share_server["identifier"])
self._unmanage_share_and_wait(share)
# Starting from microversion 2.49, any share server that has ever had
# an unmanaged share will never be auto-deleted.
share_server = self.shares_v2_client.show_share_server(
share_server['id']
)
self.assertIs(False, share_server['is_auto_deletable'])
# unmanage share server and manage it again
self._unmanage_share_server_and_wait(share_server)
managed_share_server = self._manage_share_server(share_server)
managed_share = self._manage_share(
share,
name="managed share that had ID %s" % share['id'],
description="description for managed share",
share_server_id=managed_share_server['id']
)
# check managed share server
managed_share_server = self.shares_v2_client.show_share_server(
managed_share_server['id']
)
# all expected keys are present in the managed share server
for key in keys:
self.assertIn(key, managed_share_server)
# check that managed share server is used by the managed share
self.assertEqual(
managed_share['share_server_id'],
managed_share_server['id']
)
# check that the managed share server is still not auto-deletable
self.assertIs(False, managed_share_server["is_auto_deletable"])
# delete share
self._delete_share_and_wait(managed_share)
# delete share server
self._delete_share_server_and_wait(managed_share_server['id'])

View File

@ -0,0 +1,320 @@
# Copyright 2019 NetApp Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests import share_exceptions
from manila_tempest_tests.tests.api import base
CONF = config.CONF
@base.skip_if_microversion_lt("2.49")
@testtools.skipUnless(
CONF.share.multitenancy_enabled,
'Multitenancy tests are disabled')
@testtools.skipUnless(
CONF.share.run_manage_unmanage_tests,
'Manage/unmanage tests are disabled.')
@ddt.ddt
class ManageShareServersNegativeTest(base.BaseSharesAdminTest):
@classmethod
def resource_setup(cls):
super(ManageShareServersNegativeTest, cls).resource_setup()
# create share type
cls.st_name = data_utils.rand_name("manage-st-name")
cls.extra_specs = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': CONF.share.multitenancy_enabled,
}
cls.share_type = cls.create_share_type(
name=cls.st_name,
cleanup_in_class=True,
extra_specs=cls.extra_specs)
cls.original_share_network = cls.shares_v2_client.get_share_network(
cls.shares_v2_client.share_network_id)
def _create_share_with_new_share_network(self):
share_network = self.create_share_network(
neutron_net_id=self.original_share_network['neutron_net_id'],
neutron_subnet_id=self.original_share_network['neutron_subnet_id'],
cleanup_in_class=True
)
share = self.create_share(
share_type_id=self.share_type['share_type']['id'],
share_network_id=share_network['id']
)
return self.shares_v2_client.get_share(share['id'])
@ddt.data(
('host', 'invalid_host'),
('share_network_id', 'invalid_share_network_id'),
)
@ddt.unpack
@testtools.skipIf(CONF.share.share_network_id != "",
"This test is not suitable for pre-existing "
"share_network.")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_share_server_invalid_params(self, param, invalid_value):
# create share
share = self._create_share_with_new_share_network()
el = self.shares_v2_client.list_share_export_locations(share['id'])
share['export_locations'] = el
share_server = self.shares_v2_client.show_share_server(
share['share_server_id']
)
self._unmanage_share_and_wait(share)
self._unmanage_share_server_and_wait(share_server)
# forge invalid params
invalid_params = share_server.copy()
invalid_params[param] = invalid_value
# try to manage in the wrong way
self.assertRaises(
lib_exc.BadRequest,
self._manage_share_server,
share_server,
invalid_params
)
# manage in the correct way
managed_share_server = self._manage_share_server(share_server)
managed_share = self._manage_share(
share,
name="managed share that had ID %s" % share['id'],
description="description for managed share",
share_server_id=managed_share_server['id']
)
# delete share
self._delete_share_and_wait(managed_share)
# delete share server
self._delete_share_server_and_wait(managed_share_server['id'])
@testtools.skipIf(CONF.share.share_network_id != "",
"This test is not suitable for pre-existing "
"share_network.")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_delete_share_server_invalid_state(self):
# create share
share = self._create_share_with_new_share_network()
for state in (constants.SERVER_STATE_MANAGE_STARTING,
constants.SERVER_STATE_CREATING,
constants.SERVER_STATE_DELETING):
# leave it in the wrong state
self.shares_v2_client.share_server_reset_state(
share['share_server_id'],
status=state,
)
# try to delete
self.assertRaises(
lib_exc.Forbidden,
self.shares_v2_client.delete_share_server,
share['share_server_id'],
)
# put it in the correct state
self.shares_v2_client.share_server_reset_state(
share['share_server_id'],
status=constants.SERVER_STATE_ACTIVE,
)
self.shares_v2_client.wait_for_share_server_status(
share['share_server_id'],
constants.SERVER_STATE_ACTIVE,
)
# delete share
self._delete_share_and_wait(share)
# delete share server
self._delete_share_server_and_wait(share['share_server_id'])
@testtools.skipIf(CONF.share.share_network_id != "",
"This test is not suitable for pre-existing "
"share_network.")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_unmanage_share_server_invalid_state(self):
# create share
share = self._create_share_with_new_share_network()
for state in (constants.SERVER_STATE_MANAGE_STARTING,
constants.SERVER_STATE_CREATING,
constants.SERVER_STATE_DELETING):
# leave it in the wrong state
self.shares_v2_client.share_server_reset_state(
share['share_server_id'],
status=state,
)
# try to unmanage
self.assertRaises(
lib_exc.BadRequest,
self.shares_v2_client.unmanage_share_server,
share['share_server_id'],
)
# put it in the correct state
self.shares_v2_client.share_server_reset_state(
share['share_server_id'],
status=constants.SERVER_STATE_ACTIVE,
)
self.shares_v2_client.wait_for_share_server_status(
share['share_server_id'],
constants.SERVER_STATE_ACTIVE,
)
# delete share
self._delete_share_and_wait(share)
# delete share server
self._delete_share_server_and_wait(share['share_server_id'])
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_share_server_reset_state_invalid_state(self):
# create share
share = self.create_share(
share_type_id=self.share_type['share_type']['id'])
share = self.shares_v2_client.get_share(share['id'])
# try to change it to wrong state
self.assertRaises(
lib_exc.BadRequest,
self.shares_v2_client.share_server_reset_state,
share['share_server_id'],
status='invalid_state',
)
# delete share
self._delete_share_and_wait(share)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_unmanage_share_server_with_share(self):
# create share
share = self.create_share(
share_type_id=self.share_type['share_type']['id'])
share = self.shares_v2_client.get_share(share['id'])
# try to unmanage
self.assertRaises(
lib_exc.BadRequest,
self.shares_v2_client.unmanage_share_server,
share['share_server_id'],
)
# delete share
self._delete_share_and_wait(share)
@testtools.skipIf(CONF.share.share_network_id != "",
"This test is not suitable for pre-existing "
"share_network.")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_share_server_invalid_identifier(self):
# create share
share = self._create_share_with_new_share_network()
el = self.shares_v2_client.list_share_export_locations(share['id'])
share['export_locations'] = el
share_server = self.shares_v2_client.show_share_server(
share['share_server_id']
)
self._unmanage_share_and_wait(share)
self._unmanage_share_server_and_wait(share_server)
# forge invalid params
invalid_params = share_server.copy()
invalid_params['identifier'] = 'invalid_id'
self.assertRaises(
share_exceptions.ShareServerBuildErrorException,
self._manage_share_server,
invalid_params
)
# manage in the correct way
managed_share_server = self._manage_share_server(share_server)
managed_share_server = self.shares_v2_client.show_share_server(
managed_share_server['id']
)
managed_share = self._manage_share(
share,
name="managed share that had ID %s" % share['id'],
description="description for managed share",
share_server_id=managed_share_server['id']
)
# delete share
self._delete_share_and_wait(managed_share)
# delete share server
self._delete_share_server_and_wait(managed_share_server['id'])
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_share_server_double_manage(self):
# create share
share = self.create_share(
share_type_id=self.share_type['share_type']['id'])
share = self.shares_v2_client.get_share(share['id'])
share_server = self.shares_v2_client.show_share_server(
share['share_server_id'])
# try with more data around the identifier
invalid_params = share_server.copy()
invalid_params['identifier'] = (
'foo_' + share_server['identifier'] + '_bar')
self.assertRaises(
lib_exc.BadRequest,
self._manage_share_server,
invalid_params)
# try with part of the identifier
invalid_params['identifier'] = share_server['identifier'].split("-")[2]
self.assertRaises(
lib_exc.BadRequest,
self._manage_share_server,
invalid_params)
# try with same identifier but underscores
invalid_params['identifier'] = (
share_server['identifier'].replace("-", "_"))
self.assertRaises(
lib_exc.BadRequest,
self._manage_share_server,
invalid_params)
# delete share
self._delete_share_and_wait(share)

View File

@ -14,13 +14,13 @@
# under the License.
import ddt
import six
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
@ -36,27 +36,24 @@ class ManageNFSSnapshotTest(base.BaseSharesAdminTest):
@classmethod
@base.skip_if_microversion_lt("2.12")
@testtools.skipIf(
CONF.share.multitenancy_enabled,
"Only for driver_handles_share_servers = False driver mode.")
@testtools.skipUnless(
CONF.share.run_manage_unmanage_snapshot_tests,
"Manage/unmanage snapshot tests are disabled.")
def resource_setup(cls):
super(ManageNFSSnapshotTest, cls).resource_setup()
if cls.protocol not in CONF.share.enable_protocols:
message = "%s tests are disabled" % cls.protocol
raise cls.skipException(message)
utils.skip_if_manage_not_supported_for_version()
super(ManageNFSSnapshotTest, cls).resource_setup()
# Create share type
cls.st_name = data_utils.rand_name("tempest-manage-st-name")
cls.extra_specs = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': False,
'snapshot_support': six.text_type(
CONF.share.capability_snapshot_support),
'create_share_from_snapshot_support': six.text_type(
CONF.share.capability_create_share_from_snapshot_support)
'driver_handles_share_servers': CONF.share.multitenancy_enabled,
'snapshot_support': CONF.share.capability_snapshot_support,
}
cls.st = cls.create_share_type(
@ -76,6 +73,8 @@ class ManageNFSSnapshotTest(base.BaseSharesAdminTest):
snapshot['id'])
description = "Description for 'managed' snapshot"
utils.skip_if_manage_not_supported_for_version(version)
# Manage snapshot
share_id = snapshot['share_id']
snapshot = self.shares_v2_client.manage_snapshot(
@ -96,8 +95,10 @@ class ManageNFSSnapshotTest(base.BaseSharesAdminTest):
'client': self.shares_v2_client})
# Wait for success
self.shares_v2_client.wait_for_snapshot_status(snapshot['id'],
'available')
self.shares_v2_client.wait_for_snapshot_status(
snapshot['id'],
constants.STATUS_AVAILABLE
)
# Verify manage snapshot API response
expected_keys = ["status", "links", "share_id", "name",
@ -135,6 +136,8 @@ class ManageNFSSnapshotTest(base.BaseSharesAdminTest):
version as well as versions 2.12 (when the API was introduced) and
2.16.
"""
utils.skip_if_manage_not_supported_for_version(version)
# Skip in case specified version is not supported
self.skip_if_microversion_not_supported(version)

View File

@ -20,7 +20,9 @@ from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
@ -30,27 +32,25 @@ class ManageNFSSnapshotNegativeTest(base.BaseSharesAdminTest):
@classmethod
@base.skip_if_microversion_lt("2.12")
@testtools.skipIf(
CONF.share.multitenancy_enabled,
"Only for driver_handles_share_servers = False driver mode.")
@testtools.skipUnless(
CONF.share.run_manage_unmanage_snapshot_tests,
"Manage/unmanage snapshot tests are disabled.")
def resource_setup(cls):
super(ManageNFSSnapshotNegativeTest, cls).resource_setup()
if cls.protocol not in CONF.share.enable_protocols:
message = "%s tests are disabled" % cls.protocol
raise cls.skipException(message)
utils.skip_if_manage_not_supported_for_version()
super(ManageNFSSnapshotNegativeTest, cls).resource_setup()
# Create share type
cls.st_name = data_utils.rand_name("tempest-manage-st-name")
cls.extra_specs = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': False,
'driver_handles_share_servers': CONF.share.multitenancy_enabled,
'snapshot_support': six.text_type(
CONF.share.capability_snapshot_support),
'create_share_from_snapshot_support': six.text_type(
CONF.share.capability_create_share_from_snapshot_support),
}
cls.st = cls.create_share_type(
@ -66,12 +66,13 @@ class ManageNFSSnapshotNegativeTest(base.BaseSharesAdminTest):
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_not_found(self):
# Manage snapshot fails
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.manage_snapshot,
'fake-share-id',
'fake-vol-snap-id',
driver_options={})
# Manage non-existing snapshot fails
self.assertRaises(
lib_exc.NotFound,
self.shares_v2_client.manage_snapshot,
'fake-share-id',
'fake-provider-location',
)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_already_exists(self):
@ -79,24 +80,74 @@ class ManageNFSSnapshotNegativeTest(base.BaseSharesAdminTest):
# Create snapshot
snap = self.create_snapshot_wait_for_active(self.share['id'])
get_snap = self.shares_v2_client.get_snapshot(snap['id'])
self.assertEqual(self.share['id'], get_snap['share_id'])
self.assertIsNotNone(get_snap['provider_location'])
snap = self.shares_v2_client.get_snapshot(snap['id'])
self.assertEqual(self.share['id'], snap['share_id'])
self.assertIsNotNone(snap['provider_location'])
# Manage snapshot fails
self.assertRaises(lib_exc.Conflict,
self.shares_v2_client.manage_snapshot,
self.share['id'],
get_snap['provider_location'],
driver_options={})
self.assertRaises(
lib_exc.Conflict,
self.shares_v2_client.manage_snapshot,
self.share['id'],
snap['provider_location']
)
# Delete snapshot
self.shares_v2_client.delete_snapshot(get_snap['id'])
self._delete_snapshot_and_wait(snap)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_manage_invalid_provider_location(self):
# Manage a snapshot with wrong provider location fails
# Create snapshot
snap = self.create_snapshot_wait_for_active(self.share['id'])
snap = self.shares_v2_client.get_snapshot(snap['id'])
# Unmanage snapshot
self.shares_v2_client.unmanage_snapshot(snap['id'])
self.shares_client.wait_for_resource_deletion(
snapshot_id=get_snap['id'])
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.get_snapshot,
get_snap['id'])
snapshot_id=snap['id']
)
# Manage snapshot with invalid provider location leaves it in
# manage_error state
invalid_snap = self.shares_v2_client.manage_snapshot(
self.share['id'],
'invalid_provider_location',
driver_options={}
)
self.shares_v2_client.wait_for_snapshot_status(
invalid_snap['id'],
constants.STATUS_MANAGE_ERROR
)
self.shares_v2_client.unmanage_snapshot(invalid_snap['id'])
# Manage it properly and delete
managed_snap = self.shares_v2_client.manage_snapshot(
self.share['id'],
snap['provider_location']
)
self.shares_v2_client.wait_for_snapshot_status(
managed_snap['id'],
constants.STATUS_AVAILABLE
)
self._delete_snapshot_and_wait(managed_snap)
@testtools.skipUnless(CONF.share.multitenancy_enabled,
'Multitenancy tests are disabled.')
@utils.skip_if_microversion_not_supported("2.48")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_unmanage_snapshot_with_server_unsupported(self):
share = self._create_share_for_manage()
snap = self.create_snapshot_wait_for_active(share["id"])
self.assertRaises(
lib_exc.Forbidden,
self.shares_v2_client.unmanage_snapshot,
snap['id'], version="2.48")
self._delete_snapshot_and_wait(snap)
self._delete_share_and_wait(share)
class ManageCIFSSnapshotNegativeTest(ManageNFSSnapshotNegativeTest):

View File

@ -1115,6 +1115,88 @@ class BaseSharesAdminTest(BaseSharesTest):
name=share_group_type_name, share_types=[cls.share_type_id],
client=cls.admin_shares_v2_client)
def _create_share_for_manage(self):
creation_data = {
'share_type_id': self.st['share_type']['id'],
'share_protocol': self.protocol,
}
share = self.create_share(**creation_data)
share = self.shares_v2_client.get_share(share['id'])
if utils.is_microversion_ge(CONF.share.max_api_microversion, "2.9"):
el = self.shares_v2_client.list_share_export_locations(share["id"])
share["export_locations"] = el
return share
def _unmanage_share_and_wait(self, share):
self.shares_v2_client.unmanage_share(share['id'])
self.shares_v2_client.wait_for_resource_deletion(share_id=share['id'])
def _reset_state_and_delete_share(self, share):
self.shares_v2_client.reset_state(share['id'])
self._delete_share_and_wait(share)
def _delete_snapshot_and_wait(self, snap):
self.shares_v2_client.delete_snapshot(snap['id'])
self.shares_v2_client.wait_for_resource_deletion(
snapshot_id=snap['id']
)
self.assertRaises(exceptions.NotFound,
self.shares_v2_client.get_snapshot,
snap['id'])
def _delete_share_and_wait(self, share):
self.shares_v2_client.delete_share(share['id'])
self.shares_v2_client.wait_for_resource_deletion(share_id=share['id'])
self.assertRaises(exceptions.NotFound,
self.shares_v2_client.get_share,
share['id'])
def _manage_share(self, share, name, description, share_server_id):
managed_share = self.shares_v2_client.manage_share(
service_host=share['host'],
export_path=share['export_locations'][0],
protocol=share['share_proto'],
share_type_id=self.share_type['share_type']['id'],
name=name,
description=description,
share_server_id=share_server_id
)
self.shares_v2_client.wait_for_share_status(
managed_share['id'], constants.STATUS_AVAILABLE
)
return managed_share
def _unmanage_share_server_and_wait(self, server):
self.shares_v2_client.unmanage_share_server(server['id'])
self.shares_v2_client.wait_for_resource_deletion(
server_id=server['id']
)
def _manage_share_server(self, share_server, fields=None):
params = fields or {}
managed_share_server = self.shares_v2_client.manage_share_server(
params.get('host', share_server['host']),
params.get('share_network_id', share_server['share_network_id']),
params.get('identifier', share_server['identifier']),
)
self.shares_v2_client.wait_for_share_server_status(
managed_share_server['id'],
constants.SERVER_STATE_ACTIVE,
)
return managed_share_server
def _delete_share_server_and_wait(self, share_server_id):
self.shares_v2_client.delete_share_server(
share_server_id
)
self.shares_v2_client.wait_for_resource_deletion(
server_id=share_server_id)
class BaseSharesMixedTest(BaseSharesTest):
"""Base test case class for all Shares API tests with all user roles."""

View File

@ -168,3 +168,12 @@ def get_configured_extra_specs(variation=None):
CONF.share.capability_create_share_from_snapshot_support)
return extra_specs
def skip_if_manage_not_supported_for_version(
version=CONF.share.max_api_microversion):
if (is_microversion_lt(version, "2.49")
and CONF.share.multitenancy_enabled):
raise testtools.TestCase.skipException(
"Share manage tests with multitenancy are disabled for "
"microversion < 2.49")