Add security service to share networks

This patch adds dynamic security services configurations
when provided in the new manila-tempest-plugin MultiOpt.
With this patch the manila-tempest-plugin will be
able to perform tests using real security services
configurations provided by the administrator in a new
config option called 'security_service'.

Change-Id: I544d415f51cd9fa9daae0010dd9d9c5d0dde516b
Closes-Bug: #1699856
Signed-off-by: Goutham Pacha Ravi <gouthampravi@gmail.com>
This commit is contained in:
debeltrami 2020-05-11 18:27:30 +00:00
parent 1ba5e647ae
commit 1753a59b43
6 changed files with 286 additions and 80 deletions

View File

@ -14,6 +14,7 @@
# under the License.
from oslo_config import cfg
from oslo_config import types
service_option = cfg.BoolOpt("manila",
default=True,
@ -174,6 +175,24 @@ ShareGroup = [
help="Whether to suppress errors with clean up operation "
"or not. There are cases when we may want to skip "
"such errors and catch only test errors."),
cfg.MultiOpt("security_service",
item_type=types.Dict(),
secret=True,
help="This option enables specifying security service "
"parameters needed to create security services "
"dynamically in order to run the tempest tests. "
"The configured security service must be reachable by "
"the project share networks created by the tests. So, "
"ideally project networks must be able to route to the "
"network where the pre-existing security services has "
"been deployed. The set of parameters that can be "
"configured is the same used in the security service "
"creation. You can repeat this option many times, and "
"each entry takes the standard dict config parameters: "
"security_service = "
"ss_type:<ldap, kerberos or active_directory>, "
"ss_dns_ip:value, ss_user:value, ss_password=value, "
"ss_domain:value, ss_server:value"),
# Switching ON/OFF test suites filtered by features
cfg.BoolOpt("run_quota_tests",
@ -272,5 +291,4 @@ ShareGroup = [
cfg.BoolOpt("run_ipv6_tests",
default=False,
help="Enable or disable running IPv6 tests."),
]

View File

@ -439,11 +439,6 @@ class MigrationOppositeDriverModesNFSTest(MigrationBase):
def test_migration_opposite_driver_modes(self, force_host_assisted):
self._check_migration_enabled(force_host_assisted)
share = self.create_share(self.protocol,
share_type_id=self.share_type_id)
share = self.shares_v2_client.get_share(share['id'])
share, dest_pool = self._setup_migration(share, opposite=True)
if not CONF.share.multitenancy_enabled:
# If currently configured is DHSS=False,
# then we need it for DHSS=True
@ -457,6 +452,12 @@ class MigrationOppositeDriverModesNFSTest(MigrationBase):
# then we must pass None for DHSS=False
new_share_network_id = None
share = self.create_share(self.protocol,
share_type_id=self.share_type_id,
cleanup_in_class=False)
share = self.shares_v2_client.get_share(share['id'])
share, dest_pool = self._setup_migration(share, opposite=True)
old_share_network_id = share['share_network_id']
old_share_type_id = share['share_type']
new_share_type_id = self.new_type_opposite['share_type']['id']

View File

@ -45,14 +45,17 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
# create share type
cls.share_type = cls._create_share_type()
cls.share_type_id = cls.share_type['id']
# create share
cls.share = cls.create_share(share_type_id=cls.share_type_id)
# create share in this new share network
cls.share = cls.create_share(
share_type_id=cls.share_type_id)
cls.share_network = cls.shares_v2_client.get_share_network(
cls.shares_v2_client.share_network_id)
if not cls.share_network["name"]:
sn_id = cls.share_network["id"]
cls.share_network = cls.shares_v2_client.update_share_network(
sn_id, name="sn_%s" % sn_id)
cls.sn_name_and_id = [
cls.share_network["name"],
cls.share_network["id"],
@ -222,6 +225,7 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
# TODO(vponomaryov): attach security-services too. If any exist from
# donor share-network.
new_sn = self.create_share_network(
add_security_services=True,
neutron_net_id=self.share_net_info['neutron_net_id'],
neutron_subnet_id=self.share_net_info['neutron_subnet_id'])
@ -285,6 +289,7 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
# Get network and subnet from existing share_network and reuse it
# to be able to delete share_server after test ends.
new_sn = self.create_share_network(
add_security_services=True,
neutron_net_id=self.share_net_info['neutron_net_id'],
neutron_subnet_id=self.share_net_info['neutron_subnet_id'])
share = self.create_share(

321
manila_tempest_tests/tests/api/base.py Normal file → Executable file
View File

@ -14,6 +14,8 @@
# under the License.
import copy
import inspect
import ipaddress
import re
import traceback
@ -26,6 +28,7 @@ from tempest import config
from tempest.lib.common import cred_client
from tempest.lib.common import dynamic_creds
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions
from tempest import test
@ -195,14 +198,25 @@ class BaseSharesTest(test.BaseTestCase):
# Initialise share clients for test credentials
cls.shares_client = os.share_v1.SharesClient()
cls.shares_v2_client = os.share_v2.SharesV2Client()
cls.admin_nc = cls.admin_snc = cls.admin_rc = cls.admin_net_cred = None
# Initialise network clients for test credentials
if CONF.service_available.neutron:
cls.networks_client = os.network.NetworksClient()
cls.subnets_client = os.network.SubnetsClient()
if CONF.share.multitenancy_enabled and (
CONF.auth.use_dynamic_credentials):
# Get admin credentials so we can create neutron networks
# dynamically if/when needed
(cls.admin_nc, cls.admin_snc,
cls.admin_rc, cls.admin_net_cred) = (
cls.get_network_clients_with_isolated_creds())
else:
cls.networks_client = None
cls.subnets_client = None
cls.project_network_cidr = CONF.network.project_network_cidr
cls.public_network_id = CONF.network.public_network_id
if CONF.identity.auth_version == 'v3':
project_id = os.auth_provider.auth_data[1]['project']['id']
else:
@ -233,8 +247,93 @@ class BaseSharesTest(test.BaseTestCase):
@classmethod
def resource_cleanup(cls):
cls.clear_resources(cls.class_resources)
if cls.admin_net_cred:
cls.admin_net_cred.clear_creds()
super(BaseSharesTest, cls).resource_cleanup()
@classmethod
def provide_and_associate_security_services(
cls, shares_client, share_network_id, cleanup_in_class=True):
"""Creates a security service and associates to a share network.
This method creates security services based on the Multiopt
defined in tempest configuration named security_service. When this
configuration is not provided, the method will return None.
After the security service creation, this method also associates
the security service to a share network.
:param shares_client: shares client, which requires the provisioning
:param share_network_id: id of the share network to associate the
security service
:param cleanup_in_class: if the security service and the association
will be removed in the method teardown or class teardown
:returns: None -- if the security service configuration is not
defined
"""
ss_configs = CONF.share.security_service
if not ss_configs:
return
for ss_config in ss_configs:
ss_name = "ss_autogenerated_by_tempest_%s" % (
ss_config.get("ss_type"))
ss_params = {
"name": ss_name,
"dns_ip": ss_config.get("ss_dns_ip"),
"server": ss_config.get("ss_server"),
"domain": ss_config.get("ss_domain"),
"user": ss_config.get("ss_user"),
"password": ss_config.get("ss_password")
}
ss_type = ss_config.get("ss_type")
security_service = cls.create_security_service(
ss_type,
client=shares_client,
cleanup_in_class=cleanup_in_class,
**ss_params)
cls.add_sec_service_to_share_network(
shares_client, share_network_id,
security_service["id"],
cleanup_in_class=cleanup_in_class)
@classmethod
def add_sec_service_to_share_network(
cls, client, share_network_id,
security_service_id, cleanup_in_class=True):
"""Associates a security service to a share network.
This method associates a security service provided by
the security service configuration with a specific
share network.
:param share_network_id: the share network id to be
associate with a given security service
:param security_service_id: the security service id
to be associate with a given share network
:param cleanup_in_class: if the resources will be
dissociate in the method teardown or class teardown
"""
client.add_sec_service_to_share_network(
share_network_id,
security_service_id)
resource = {
"type": "dissociate_security_service",
"id": security_service_id,
"extra_params": {
"share_network_id": share_network_id
},
"client": client,
}
if cleanup_in_class:
cls.class_resources.insert(0, resource)
else:
cls.method_resources.insert(0, resource)
@classmethod
@network_synchronized
def provide_share_network(cls, shares_client, networks_client,
@ -253,88 +352,153 @@ class BaseSharesTest(test.BaseTestCase):
"""
sc = shares_client
search_word = "reusable"
sn_name = "autogenerated_by_tempest_%s" % search_word
sn_name = "autogenerated_by_tempest"
if (not ignore_multitenancy_config and
not CONF.share.multitenancy_enabled):
# Assumed usage of a single-tenant driver
share_network_id = None
return None
else:
if sc.share_network_id:
# Share-network already exists, use it
share_network_id = sc.share_network_id
return sc.share_network_id
elif not CONF.share.create_networks_when_multitenancy_enabled:
share_network_id = None
# Try get suitable share-network
share_networks = sc.list_share_networks_with_detail()
for sn in share_networks:
net_info = (
utils.share_network_get_default_subnet(sn)
if utils.share_network_subnets_are_supported() else sn)
if net_info is None:
continue
if(net_info["neutron_net_id"] is None and
net_info["neutron_subnet_id"] is None and
sn["name"] and search_word in sn["name"]):
share_network_id = sn["id"]
break
# Create new share-network if one was not found
if share_network_id is None:
sn_desc = "This share-network was created by tempest"
sn = sc.create_share_network(name=sn_name,
description=sn_desc)
share_network_id = sn["id"]
# We need a new share network, but don't need to associate
# any neutron networks to it - this configuration is used
# when manila is configured with "StandaloneNetworkPlugin"
# or "NeutronSingleNetworkPlugin" where all tenants share
# a single backend network where shares are exported.
sn_desc = "This share-network was created by tempest"
sn = cls.create_share_network(cleanup_in_class=True,
add_security_services=True,
name=sn_name,
description=sn_desc)
return sn['id']
else:
net_id = subnet_id = share_network_id = None
# Search for networks, created in previous runs
service_net_name = "share-service"
networks = networks_client.list_networks()
if "networks" in networks.keys():
networks = networks["networks"]
for network in networks:
if (service_net_name in network["name"] and
sc.tenant_id == network['tenant_id']):
net_id = network["id"]
if len(network["subnets"]) > 0:
subnet_id = network["subnets"][0]
break
net_id = subnet_id = None
# Retrieve non-public network list owned by the tenant
search_opts = {'tenant_id': sc.tenant_id, 'shared': False}
tenant_networks = (
networks_client.list_networks(
**search_opts).get('networks', [])
)
tenant_networks_with_subnet = (
[n for n in tenant_networks if n['subnets']]
)
if tenant_networks_with_subnet:
net_id = tenant_networks_with_subnet[0]['id']
subnet_id = tenant_networks_with_subnet[0]['subnets'][0]
# Create suitable network
if net_id is None or subnet_id is None:
ic = cls._get_dynamic_creds(service_net_name)
net_data = ic._create_network_resources(sc.tenant_id)
network, subnet, router = net_data
net_id = network["id"]
subnet_id = subnet["id"]
# Try get suitable share-network
share_networks = sc.list_share_networks_with_detail()
for sn in share_networks:
net_info = (
utils.share_network_get_default_subnet(sn)
if utils.share_network_subnets_are_supported()
else sn)
if net_info is None:
continue
if (net_id == net_info["neutron_net_id"] and
subnet_id == net_info["neutron_subnet_id"] and
sn["name"] and search_word in sn["name"]):
share_network_id = sn["id"]
break
network, subnet, router = (
cls.provide_network_resources_for_tenant_id(
sc.tenant_id)
)
net_id = network['network']['id']
subnet_id = subnet['subnet']['id']
# Create suitable share-network
if share_network_id is None:
sn_desc = "This share-network was created by tempest"
sn = sc.create_share_network(name=sn_name,
description=sn_desc,
neutron_net_id=net_id,
neutron_subnet_id=subnet_id)
share_network_id = sn["id"]
sn_desc = "This share-network was created by tempest"
sn = cls.create_share_network(cleanup_in_class=True,
add_security_services=True,
name=sn_name,
description=sn_desc,
neutron_net_id=net_id,
neutron_subnet_id=subnet_id)
return share_network_id
return sn['id']
@classmethod
def provide_network_resources_for_tenant_id(cls, tenant_id):
"""Used for creating neutron network resources.
This method creates a suitable network, subnet and router
to be used when providing a new share network in the tempest.
The tempest conf project_network_cidr is very important
in order to create a reachable network. Also, this method will
cleanup the neutron resources in the class teardown.
:param tenant_id: tenant_id to be used for network resources creation
:returns network, subnet, router: the neutron resources created
"""
network = cls.admin_nc.create_network(
tenant_id=tenant_id,
name="tempest-net")
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
cls.admin_nc.delete_network,
network['network']['id'])
subnet = cls.admin_snc.create_subnet(
network_id=network['network']['id'],
tenant_id=tenant_id,
cidr=str(cls.project_network_cidr),
name="tempest-subnet",
ip_version=(ipaddress.ip_network(
six.text_type(cls.project_network_cidr)).version))
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
cls.admin_snc.delete_subnet,
subnet['subnet']['id'])
router = None
if cls.public_network_id:
kwargs = {'name': "tempest-router",
'tenant_id': tenant_id,
'external_gateway_info': cls.public_network_id}
body = cls.routers_client.create_router(**kwargs)
router = body['router']
cls.admin_rc.add_router_interface(
router['id'],
subnet_id=subnet['subnet']['id'])
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
cls.admin_rc.delete_router, router)
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
cls.admin_rc.remove_router_interface,
router['id'],
subnet_id=subnet['subnet']['id'])
return network, subnet, router
@classmethod
def get_network_clients_with_isolated_creds(cls,
name=None,
type_of_creds='admin'):
"""Creates isolated creds and provide network clients.
:param name: name, will be used for naming ic and related stuff
:param type_of_creds: defines the type of creds to be created
:returns: NetworksClient, SubnetsClient, RoutersClient,
Isolated Credentials
"""
if name is None:
# Get name of test method
name = inspect.stack()[1][3]
if len(name) > 32:
name = name[0:32]
# Choose type of isolated creds
ic = cls._get_dynamic_creds(name)
if "admin" in type_of_creds:
creds = ic.get_admin_creds().credentials
elif "alt" in type_of_creds:
creds = ic.get_alt_creds().credentials
else:
creds = ic.get_credentials(type_of_creds).credentials
ic.type_of_creds = type_of_creds
# create client with isolated creds
os = clients.Clients(creds)
net_client = os.network.NetworksClient()
subnet_client = os.network.SubnetsClient()
router_client = os.network.RoutersClient()
return net_client, subnet_client, router_client, ic
@classmethod
def _create_share(cls, share_protocol=None, size=None, name=None,
@ -734,7 +898,9 @@ class BaseSharesTest(test.BaseTestCase):
@classmethod
def create_share_network(cls, client=None,
cleanup_in_class=False, **kwargs):
cleanup_in_class=False,
add_security_services=True, **kwargs):
if client is None:
client = cls.shares_client
share_network = client.create_share_network(**kwargs)
@ -743,15 +909,23 @@ class BaseSharesTest(test.BaseTestCase):
"id": share_network["id"],
"client": client,
}
if cleanup_in_class:
cls.class_resources.insert(0, resource)
else:
cls.method_resources.insert(0, resource)
if add_security_services:
cls.provide_and_associate_security_services(
client, share_network["id"], cleanup_in_class=cleanup_in_class)
return share_network
@classmethod
def create_share_network_subnet(cls, client=None,
cleanup_in_class=False, **kwargs):
def create_share_network_subnet(cls,
client=None,
cleanup_in_class=False,
**kwargs):
if client is None:
client = cls.shares_v2_client
share_network_subnet = client.create_subnet(**kwargs)
@ -907,6 +1081,11 @@ class BaseSharesTest(test.BaseTestCase):
res_id != CONF.share.share_network_id):
client.delete_share_network(res_id)
client.wait_for_resource_deletion(sn_id=res_id)
elif res["type"] == "dissociate_security_service":
sn_id = res["extra_params"]["share_network_id"]
client.remove_sec_service_from_share_network(
sn_id=sn_id, ss_id=res_id
)
elif res["type"] == "security_service":
client.delete_security_service(res_id)
client.wait_for_resource_deletion(ss_id=res_id)

View File

@ -77,6 +77,7 @@ class SecurityServiceListMixin(object):
fresh_sn = []
for i in range(2):
sn = self.create_share_network(
add_security_services=False,
neutron_net_id=sn["neutron_net_id"],
neutron_subnet_id=sn["neutron_subnet_id"])
fresh_sn.append(sn)

View File

@ -31,7 +31,9 @@ class SecurityServicesMappingTest(base.BaseSharesTest):
# create share network
data = self.generate_share_network_data()
self.sn = self.create_share_network(client=self.cl, **data)
self.sn = self.create_share_network(client=self.cl,
add_security_services=False,
**data)
self.assertDictContainsSubset(data, self.sn)
# create security service