[Tempest] Refactor api/tests/admin/test_share_servers module

Doing following things:
 - Use ddt module removing code duplication.
 - Use v2 APIs instead of deprecated v1.
 - Use correct 'share_server_id' for
   'test_show_share_server_details' test to avoid concurrency issues.
   For more details, see closed b_u_g.
 - Use only 'active' status for listing servers with "status" filter.
   To avoid concurrency in case we pick server with different
   status than "active" and that may be changed before second call.

This commit is combination of two cherry-pick'ed commits:
 - 6e4e8fbfb4
 - 46d499a85c

Change-Id: If4d6029f2250e80c1eec0debb1b09a805d997028
Closes-Bug: #1663865
This commit is contained in:
Valeriy Ponomaryov 2017-02-13 13:25:38 +02:00
parent eb2e91d937
commit 40bbbfd8c6
1 changed files with 56 additions and 75 deletions

View File

@ -15,9 +15,11 @@
import re
import ddt
import six
from tempest import config
from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.tests.api import base
@ -25,21 +27,21 @@ from manila_tempest_tests.tests.api import base
CONF = config.CONF
@testtools.skipUnless(
CONF.share.multitenancy_enabled,
'Share servers can be tested only with multitenant drivers.')
@ddt.ddt
class ShareServersAdminTest(base.BaseSharesAdminTest):
@classmethod
def resource_setup(cls):
super(ShareServersAdminTest, cls).resource_setup()
if not CONF.share.multitenancy_enabled:
msg = ("Share servers can be tested only with multitenant drivers."
" Skipping.")
raise cls.skipException(msg)
cls.share = cls.create_share()
cls.share_network = cls.shares_client.get_share_network(
cls.shares_client.share_network_id)
cls.share_network = cls.shares_v2_client.get_share_network(
cls.shares_v2_client.share_network_id)
if not cls.share_network["name"]:
sn_id = cls.share_network["id"]
cls.share_network = cls.shares_client.update_share_network(
cls.share_network = cls.shares_v2_client.update_share_network(
sn_id, name="sn_%s" % sn_id)
cls.sn_name_and_id = [
cls.share_network["name"],
@ -52,7 +54,7 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_without_filters(self):
servers = self.shares_client.list_share_servers()
servers = self.shares_v2_client.list_share_servers()
self.assertGreater(len(servers), 0)
keys = [
"id",
@ -84,10 +86,9 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_with_host_filter(self):
# Get list of share servers and remember 'host' name
servers = self.shares_client.list_share_servers()
servers = self.shares_v2_client.list_share_servers()
# Remember name of server that was used by this test suite
# to be sure it will be still existing.
host = ""
for server in servers:
if server["share_network_name"] in self.sn_name_and_id:
if not server["host"]:
@ -96,47 +97,32 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
raise lib_exc.InvalidContentType(message=msg)
host = server["host"]
break
if not host:
else:
msg = ("Appropriate server was not found. Its share_network_data"
": '%s'. List of servers: '%s'.") % (self.sn_name_and_id,
str(servers))
six.text_type(servers))
raise lib_exc.NotFound(message=msg)
search_opts = {"host": host}
servers = self.shares_client.list_share_servers(search_opts)
servers = self.shares_v2_client.list_share_servers(search_opts)
self.assertGreater(len(servers), 0)
for server in servers:
self.assertEqual(server["host"], host)
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_with_status_filter(self):
# Get list of share servers
servers = self.shares_client.list_share_servers()
# Remember status of server that was used by this test suite
# to be sure it will be still existing.
status = ""
for server in servers:
if server["share_network_name"] in self.sn_name_and_id:
if not server["status"]:
msg = ("Server '%s' has wrong value for status - "
"'%s'.") % (server["id"], server["host"])
raise lib_exc.InvalidContentType(message=msg)
status = server["status"]
break
if not status:
msg = ("Appropriate server was not found. Its share_network_data"
": '%s'. List of servers: '%s'.") % (self.sn_name_and_id,
str(servers))
raise lib_exc.NotFound(message=msg)
search_opts = {"status": status}
servers = self.shares_client.list_share_servers(search_opts)
search_opts = {"status": "active"}
servers = self.shares_v2_client.list_share_servers(search_opts)
# At least 1 share server should exist always - the one created
# for this class.
self.assertGreater(len(servers), 0)
for server in servers:
self.assertEqual(server["status"], status)
self.assertEqual(server["status"], "active")
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_with_project_id_filter(self):
search_opts = {"project_id": self.share_network["project_id"]}
servers = self.shares_client.list_share_servers(search_opts)
servers = self.shares_v2_client.list_share_servers(search_opts)
# Should exist, at least, one share server, used by this test suite.
self.assertGreater(len(servers), 0)
for server in servers:
@ -146,7 +132,7 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_with_share_network_name_filter(self):
search_opts = {"share_network": self.share_network["name"]}
servers = self.shares_client.list_share_servers(search_opts)
servers = self.shares_v2_client.list_share_servers(search_opts)
# Should exist, at least, one share server, used by this test suite.
self.assertGreater(len(servers), 0)
for server in servers:
@ -156,7 +142,7 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_list_share_servers_with_share_network_id_filter(self):
search_opts = {"share_network": self.share_network["id"]}
servers = self.shares_client.list_share_servers(search_opts)
servers = self.shares_v2_client.list_share_servers(search_opts)
# Should exist, at least, one share server, used by this test suite.
self.assertGreater(len(servers), 0)
for server in servers:
@ -165,8 +151,9 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_show_share_server(self):
share = self.shares_client.get_share(self.share["id"])
server = self.shares_client.show_share_server(share["share_server_id"])
share = self.shares_v2_client.get_share(self.share["id"])
server = self.shares_v2_client.show_share_server(
share["share_server_id"])
keys = [
"id",
"host",
@ -180,35 +167,35 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
# all expected keys are present
for key in keys:
self.assertIn(key, server.keys())
# 'created_at' is valid date
self.assertTrue(self.date_re.match(server["created_at"]))
# 'updated_at' is valid date if set
if server["updated_at"]:
self.assertTrue(self.date_re.match(server["updated_at"]))
# Host is not empty
self.assertGreater(len(server["host"]), 0)
# Id is not empty
self.assertGreater(len(server["id"]), 0)
# Project id is not empty
self.assertGreater(len(server["project_id"]), 0)
# Status is not empty
self.assertGreater(len(server["status"]), 0)
# share_network_name is not empty
self.assertGreater(len(server["share_network_name"]), 0)
# backend_details should be a dict
# veriy that values for following keys are not empty
for k in ('host', 'id', 'project_id', 'status', 'share_network_name'):
self.assertGreater(len(server[k]), 0)
# 'backend_details' should be a dict
self.assertIsInstance(server["backend_details"], dict)
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_show_share_server_details(self):
servers = self.shares_client.list_share_servers()
details = self.shares_client.show_share_server_details(
servers[0]["id"])
share = self.shares_v2_client.get_share(self.share['id'])
details = self.shares_v2_client.show_share_server_details(
share['share_server_id'])
# If details are present they and their values should be only strings
for k, v in details.iteritems():
for k, v in details.items():
self.assertIsInstance(k, six.string_types)
self.assertIsInstance(v, six.string_types)
def _delete_share_server(self, delete_share_network):
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@ddt.data(True, False)
def test_delete_share_server(self, delete_share_network):
# Get network and subnet from existing share_network and reuse it
# to be able to delete share_server after test ends.
# TODO(vponomaryov): attach security-services too. If any exist from
@ -221,8 +208,8 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
self.create_share(share_network_id=new_sn['id'])
# List share servers, filtered by share_network_id
search_opts = {"share_network": new_sn["id"]}
servers = self.shares_client.list_share_servers(search_opts)
servers = self.shares_v2_client.list_share_servers(
{"share_network": new_sn["id"]})
# There can be more than one share server for share network when retry
# was used and share was created successfully not from first time.
@ -233,42 +220,36 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
self.assertEqual(new_sn["id"], serv["share_network_id"])
# List shares by share server id
params = {"share_server_id": serv["id"]}
shares = self.shares_client.list_shares_with_detail(params)
shares = self.shares_v2_client.list_shares_with_detail(
{"share_server_id": serv["id"]})
for s in shares:
self.assertEqual(new_sn["id"], s["share_network_id"])
# Delete shares, so we will have share server without shares
for s in shares:
self.shares_client.delete_share(s["id"])
self.shares_v2_client.delete_share(s["id"])
# Wait for shares deletion
for s in shares:
self.shares_client.wait_for_resource_deletion(share_id=s["id"])
self.shares_v2_client.wait_for_resource_deletion(
share_id=s["id"])
# List shares by share server id, we expect empty list
params = {"share_server_id": serv["id"]}
empty = self.shares_client.list_shares_with_detail(params)
empty = self.shares_v2_client.list_shares_with_detail(
{"share_server_id": serv["id"]})
self.assertEqual(0, len(empty))
if delete_share_network:
# Delete share network, it should trigger share server deletion
self.shares_client.delete_share_network(new_sn["id"])
self.shares_v2_client.delete_share_network(new_sn["id"])
else:
# Delete share server
self.shares_client.delete_share_server(serv["id"])
self.shares_v2_client.delete_share_server(serv["id"])
# Wait for share server deletion
self.shares_client.wait_for_resource_deletion(server_id=serv["id"])
self.shares_v2_client.wait_for_resource_deletion(
server_id=serv["id"])
if delete_share_network:
self.shares_client.wait_for_resource_deletion(
self.shares_v2_client.wait_for_resource_deletion(
sn_id=new_sn["id"])
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_delete_share_server(self):
self._delete_share_server(False)
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_delete_share_server_by_deletion_of_share_network(self):
self._delete_share_server(True)