Enforces the use of Credentials (part2)

Multiversion auth part5

Refactor mangers, utils and test base classes to use Credentials instead
of username, password and tenant_name.
Makes changes to tests where needed - some of the tests create their own
managers.

Partially implements: bp multi-keystone-api-version-tests

Change-Id: If05f5704d90390362cebf45e2664f2bfbc72268d
This commit is contained in:
Andrea Frittoli 2014-03-20 10:05:18 +00:00
parent 3ce1f8e6ed
commit 422fbdf0a0
26 changed files with 209 additions and 350 deletions

View File

@ -38,6 +38,8 @@ class BaseComputeTest(tempest.test.BaseTestCase):
cls.set_network_resources()
super(BaseComputeTest, cls).setUpClass()
# TODO(andreaf) WE should care also for the alt_manager here
# but only once client lazy load in the manager is done
os = cls.get_client_manager()
cls.os = os
@ -348,23 +350,19 @@ class BaseV2ComputeAdminTest(BaseV2ComputeTest):
@classmethod
def setUpClass(cls):
super(BaseV2ComputeAdminTest, cls).setUpClass()
admin_username = CONF.compute_admin.username
admin_password = CONF.compute_admin.password
admin_tenant = CONF.compute_admin.tenant_name
if not (admin_username and admin_password and admin_tenant):
msg = ("Missing Compute Admin API credentials "
"in configuration.")
raise cls.skipException(msg)
if (CONF.compute.allow_tenant_isolation or
cls.force_tenant_isolation is True):
creds = cls.isolated_creds.get_admin_creds()
admin_username, admin_tenant_name, admin_password = creds
cls.os_adm = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name,
cls.os_adm = clients.Manager(credentials=creds,
interface=cls._interface)
else:
cls.os_adm = clients.ComputeAdminManager(interface=cls._interface)
try:
cls.os_adm = clients.ComputeAdminManager(
interface=cls._interface)
except exceptions.InvalidCredentials:
msg = ("Missing Compute Admin API credentials "
"in configuration.")
raise cls.skipException(msg)
class BaseV3ComputeTest(BaseComputeTest):
@ -378,22 +376,18 @@ class BaseV3ComputeAdminTest(BaseV3ComputeTest):
@classmethod
def setUpClass(cls):
super(BaseV3ComputeAdminTest, cls).setUpClass()
admin_username = CONF.compute_admin.username
admin_password = CONF.compute_admin.password
admin_tenant = CONF.compute_admin.tenant_name
if not (admin_username and admin_password and admin_tenant):
msg = ("Missing Compute Admin API credentials "
"in configuration.")
raise cls.skipException(msg)
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_admin_creds()
admin_username, admin_tenant_name, admin_password = creds
os_adm = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name,
os_adm = clients.Manager(credentials=creds,
interface=cls._interface)
else:
os_adm = clients.ComputeAdminManager(interface=cls._interface)
try:
cls.os_adm = clients.ComputeAdminManager(
interface=cls._interface)
except exceptions.InvalidCredentials:
msg = ("Missing Compute Admin API credentials "
"in configuration.")
raise cls.skipException(msg)
cls.os_adm = os_adm
cls.servers_admin_client = cls.os_adm.servers_v3_client

View File

@ -43,10 +43,7 @@ class AuthorizationTestJSON(base.BaseV2ComputeTest):
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_alt_creds()
username, tenant_name, password = creds
cls.alt_manager = clients.Manager(username=username,
password=password,
tenant_name=tenant_name)
cls.alt_manager = clients.Manager(credentials=creds)
else:
# Use the alt_XXX credentials in the config file
cls.alt_manager = clients.AltManager()

View File

@ -13,6 +13,7 @@
import datetime
import re
from tempest.api.identity import base
from tempest import auth
from tempest import clients
from tempest.common.utils import data_utils
from tempest import config
@ -88,10 +89,13 @@ class BaseTrustsV3Test(base.BaseIdentityV3AdminTest):
self.assertIsNotNone(self.trustee_user_id)
# Initialize a new client with the trustor credentials
os = clients.Manager(username=self.trustor_username,
password=self.trustor_password,
tenant_name=self.trustor_project_name,
interface=self._interface)
creds = auth.get_credentials(
username=self.trustor_username,
password=self.trustor_password,
tenant_name=self.trustor_project_name)
os = clients.Manager(
credentials=creds,
interface=self._interface)
self.trustor_client = os.identity_v3_client
def cleanup_user_and_roles(self):

View File

@ -14,6 +14,7 @@
# under the License.
from tempest import auth
from tempest import clients
from tempest.common.utils import data_utils
from tempest import config
@ -120,6 +121,14 @@ class DataGenerator(object):
self.projects = []
self.v3_roles = []
@property
def test_credentials(self):
return auth.get_credentials(username=self.test_user,
user_id=self.user['id'],
password=self.test_password,
tenant_name=self.test_tenant,
tenant_id=self.tenant['id'])
def setup_test_user(self):
"""Set up a test user."""
self.setup_test_tenant()

View File

@ -42,11 +42,7 @@ class BaseImageTest(tempest.test.BaseTestCase):
skip_msg = ("%s skipped as glance is not available" % cls.__name__)
raise cls.skipException(skip_msg)
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_primary_creds()
username, tenant_name, password = creds
cls.os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name)
cls.os = clients.Manager(cls.isolated_creds.get_primary_creds())
else:
cls.os = clients.Manager()
@ -96,11 +92,7 @@ class BaseV1ImageMembersTest(BaseV1ImageTest):
def setUpClass(cls):
super(BaseV1ImageMembersTest, cls).setUpClass()
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_alt_creds()
username, tenant_name, password = creds
cls.os_alt = clients.Manager(username=username,
password=password,
tenant_name=tenant_name)
cls.os_alt = clients.Manager(cls.isolated_creds.get_alt_creds())
cls.alt_tenant_id = cls.isolated_creds.get_alt_tenant()['id']
else:
cls.os_alt = clients.AltManager()
@ -139,12 +131,8 @@ class BaseV2MemberImageTest(BaseV2ImageTest):
super(BaseV2MemberImageTest, cls).setUpClass()
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_alt_creds()
username, tenant_name, password = creds
cls.os_alt = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=cls._interface)
cls.alt_tenant_id = cls.isolated_creds.get_alt_tenant()['id']
cls.os_alt = clients.Manager(creds)
cls.alt_tenant_id = cls.isolated_creds.get_alt_creds().tenant_id
else:
cls.os_alt = clients.AltManager()
alt_tenant_name = cls.os_alt.credentials['tenant_name']

View File

@ -38,9 +38,9 @@ class LoadBalancerAdminTestJSON(base.BaseAdminNetworkTest):
cls.force_tenant_isolation = True
manager = cls.get_client_manager()
cls.client = manager.network_client
username, tenant_name, passwd = cls.isolated_creds.get_primary_creds()
primary_creds = cls.isolated_creds.get_primary_creds()
cls.tenant_id = cls.os_adm.identity_client.get_tenant_by_name(
tenant_name)['id']
primary_creds.tenant_name)['id']
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.pool = cls.create_pool(data_utils.rand_name('pool-'),

View File

@ -352,11 +352,7 @@ class BaseAdminNetworkTest(BaseNetworkTest):
raise cls.skipException(msg)
if (CONF.compute.allow_tenant_isolation or
cls.force_tenant_isolation is True):
creds = cls.isolated_creds.get_admin_creds()
admin_username, admin_tenant_name, admin_password = creds
cls.os_adm = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name,
cls.os_adm = clients.Manager(cls.isolated_creds.get_admin_creds(),
interface=cls._interface)
else:
cls.os_adm = clients.ComputeAdminManager(interface=cls._interface)

View File

@ -38,23 +38,12 @@ class BaseObjectTest(tempest.test.BaseTestCase):
cls.__name__, network_resources=cls.network_resources)
if CONF.compute.allow_tenant_isolation:
# Get isolated creds for normal user
creds = cls.isolated_creds.get_primary_creds()
username, tenant_name, password = creds
cls.os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name)
cls.os = clients.Manager(cls.isolated_creds.get_primary_creds())
# Get isolated creds for admin user
admin_creds = cls.isolated_creds.get_admin_creds()
admin_username, admin_tenant_name, admin_password = admin_creds
cls.os_admin = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name)
cls.os_admin = clients.Manager(
cls.isolated_creds.get_admin_creds())
# Get isolated creds for alt user
alt_creds = cls.isolated_creds.get_alt_creds()
alt_username, alt_tenant, alt_password = alt_creds
cls.os_alt = clients.Manager(username=alt_username,
password=alt_password,
tenant_name=alt_tenant)
cls.os_alt = clients.Manager(cls.isolated_creds.get_alt_creds())
# Add isolated users to operator role so that they can create a
# container in swift.
cls._assign_member_role()
@ -92,8 +81,8 @@ class BaseObjectTest(tempest.test.BaseTestCase):
@classmethod
def _assign_member_role(cls):
primary_user = cls.isolated_creds.get_primary_user()
alt_user = cls.isolated_creds.get_alt_user()
primary_creds = cls.isolated_creds.get_primary_creds()
alt_creds = cls.isolated_creds.get_alt_creds()
swift_role = CONF.object_storage.operator_role
try:
resp, roles = cls.os_admin.identity_client.list_roles()
@ -101,9 +90,9 @@ class BaseObjectTest(tempest.test.BaseTestCase):
except StopIteration:
msg = "No role named %s found" % swift_role
raise exceptions.NotFound(msg)
for user in [primary_user, alt_user]:
cls.os_admin.identity_client.assign_user_role(user['tenantId'],
user['id'],
for creds in [primary_creds, alt_creds]:
cls.os_admin.identity_client.assign_user_role(creds.tenant_id,
creds.user_id,
role['id'])
@classmethod

View File

@ -35,10 +35,7 @@ class AccountQuotasTest(base.BaseObjectTest):
cls.data.setup_test_user()
cls.os_reselleradmin = clients.Manager(
cls.data.test_user,
cls.data.test_password,
cls.data.test_tenant)
cls.os_reselleradmin = clients.Manager(cls.data.test_credentials)
# Retrieve the ResellerAdmin role id
reseller_role_id = None

View File

@ -35,10 +35,7 @@ class AccountQuotasNegativeTest(base.BaseObjectTest):
cls.data.setup_test_user()
cls.os_reselleradmin = clients.Manager(
cls.data.test_user,
cls.data.test_password,
cls.data.test_tenant)
cls.os_reselleradmin = clients.Manager(cls.data.test_credentials)
# Retrieve the ResellerAdmin role id
reseller_role_id = None

View File

@ -67,9 +67,7 @@ class AccountTest(base.BaseObjectTest):
self.data.setup_test_user()
os_test_user = clients.Manager(
self.data.test_user,
self.data.test_password,
self.data.test_tenant)
self.data.test_credentials)
# Retrieve the id of an operator role of object storage
test_role_id = None

View File

@ -28,9 +28,7 @@ class AccountNegativeTest(base.BaseObjectTest):
# create user
self.data.setup_test_user()
test_os = clients.Manager(self.data.test_user,
self.data.test_password,
self.data.test_tenant)
test_os = clients.Manager(self.data.test_credentials)
test_auth_provider = test_os.auth_provider
# Get auth for the test user
test_auth_provider.auth_data

View File

@ -24,9 +24,7 @@ class ObjectTestACLs(base.BaseObjectTest):
def setUpClass(cls):
super(ObjectTestACLs, cls).setUpClass()
cls.data.setup_test_user()
test_os = clients.Manager(cls.data.test_user,
cls.data.test_password,
cls.data.test_tenant)
test_os = clients.Manager(cls.data.test_credentials)
cls.test_auth_data = test_os.auth_provider.auth_data
@classmethod

View File

@ -26,9 +26,7 @@ class ObjectACLsNegativeTest(base.BaseObjectTest):
def setUpClass(cls):
super(ObjectACLsNegativeTest, cls).setUpClass()
cls.data.setup_test_user()
test_os = clients.Manager(cls.data.test_user,
cls.data.test_password,
cls.data.test_tenant)
test_os = clients.Manager(cls.data.test_credentials)
cls.test_auth_data = test_os.auth_provider.auth_data
@classmethod

View File

@ -29,10 +29,7 @@ class CrossdomainTest(base.BaseObjectTest):
# endpoint and test the healthcheck feature.
cls.data.setup_test_user()
cls.os_test_user = clients.Manager(
cls.data.test_user,
cls.data.test_password,
cls.data.test_tenant)
cls.os_test_user = clients.Manager(cls.data.test_credentials)
cls.xml_start = '<?xml version="1.0"?>\n' \
'<!DOCTYPE cross-domain-policy SYSTEM ' \

View File

@ -136,11 +136,7 @@ class BaseVolumeV1AdminTest(BaseVolumeV1Test):
"in configuration.")
raise cls.skipException(msg)
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_admin_creds()
admin_username, admin_tenant_name, admin_password = creds
cls.os_adm = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name,
cls.os_adm = clients.Manager(cls.isolated_creds.get_admin_creds(),
interface=cls._interface)
else:
cls.os_adm = clients.AdminManager(interface=cls._interface)

View File

@ -32,21 +32,13 @@ class VolumesTransfersTest(base.BaseVolumeV1Test):
# Add another tenant to test volume-transfer
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_alt_creds()
username, tenant_name, password = creds
cls.os_alt = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
cls.os_alt = clients.Manager(cls.isolated_creds.get_alt_creds(),
interface=cls._interface)
cls.alt_tenant_id = cls.isolated_creds.get_alt_tenant()['id']
# Add admin tenant to cleanup resources
adm_creds = cls.isolated_creds.get_admin_creds()
admin_username, admin_tenant_name, admin_password = adm_creds
cls.os_adm = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name,
cls.os_adm = clients.Manager(cls.isolated_creds.get_admin_creds(),
interface=cls._interface)
else:
cls.os_alt = clients.AltManager()
alt_tenant_name = cls.os_alt.credentials['tenant_name']

View File

@ -16,6 +16,7 @@
import keystoneclient.exceptions
import keystoneclient.v2_0.client
from tempest import auth
from tempest.common.rest_client import NegativeRestClient
from tempest import config
from tempest import exceptions
@ -196,22 +197,12 @@ class Manager(manager.Manager):
Top level manager for OpenStack tempest clients
"""
def __init__(self, username=None, password=None, tenant_name=None,
interface='json', service=None):
"""
We allow overriding of the credentials used within the various
client classes managed by the Manager object. Left as None, the
standard username/password/tenant_name is used.
:param username: Override of the username
:param password: Override of the password
:param tenant_name: Override of the tenant name
"""
def __init__(self, credentials=None, interface='json', service=None):
# Set interface and client type first
self.interface = interface
self.client_type = 'tempest'
# super cares for credentials validation
super(Manager, self).__init__(
username=username, password=password, tenant_name=tenant_name)
super(Manager, self).__init__(credentials=credentials)
if self.interface == 'xml':
self.certificates_client = CertificatesClientXML(
@ -368,10 +359,10 @@ class Manager(manager.Manager):
raise exceptions.InvalidConfiguration(msg)
# TODO(andreaf) EC2 client still do their auth, v2 only
ec2_client_args = (self.credentials.get('username'),
self.credentials.get('password'),
ec2_client_args = (self.credentials.username,
self.credentials.password,
CONF.identity.uri,
self.credentials.get('tenant_name'))
self.credentials.tenant_name)
# common clients
self.account_client = AccountClient(self.auth_provider)
@ -402,11 +393,10 @@ class AltManager(Manager):
"""
def __init__(self, interface='json', service=None):
super(AltManager, self).__init__(CONF.identity.alt_username,
CONF.identity.alt_password,
CONF.identity.alt_tenant_name,
interface=interface,
service=service)
super(AltManager, self).__init__(
credentials=auth.get_default_credentials('alt_user'),
interface=interface,
service=service)
class AdminManager(Manager):
@ -417,11 +407,10 @@ class AdminManager(Manager):
"""
def __init__(self, interface='json', service=None):
super(AdminManager, self).__init__(CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name,
interface=interface,
service=service)
super(AdminManager, self).__init__(
credentials=auth.get_default_credentials('identity_admin'),
interface=interface,
service=service)
class ComputeAdminManager(Manager):
@ -433,11 +422,10 @@ class ComputeAdminManager(Manager):
def __init__(self, interface='json', service=None):
base = super(ComputeAdminManager, self)
base.__init__(CONF.compute_admin.username,
CONF.compute_admin.password,
CONF.compute_admin.tenant_name,
interface=interface,
service=service)
base.__init__(
credentials=auth.get_default_credentials('compute_admin'),
interface=interface,
service=service)
class OfficialClientManager(manager.Manager):
@ -452,47 +440,32 @@ class OfficialClientManager(manager.Manager):
IRONICCLIENT_VERSION = '1'
SAHARACLIENT_VERSION = '1.1'
def __init__(self, username, password, tenant_name):
def __init__(self, credentials):
# FIXME(andreaf) Auth provider for client_type 'official' is
# not implemented yet, setting to 'tempest' for now.
self.client_type = 'tempest'
self.interface = None
# super cares for credentials validation
super(OfficialClientManager, self).__init__(
username=username, password=password, tenant_name=tenant_name)
super(OfficialClientManager, self).__init__(credentials=credentials)
self.baremetal_client = self._get_baremetal_client()
self.compute_client = self._get_compute_client(username,
password,
tenant_name)
self.identity_client = self._get_identity_client(username,
password,
tenant_name)
self.compute_client = self._get_compute_client(credentials)
self.identity_client = self._get_identity_client(credentials)
self.image_client = self._get_image_client()
self.network_client = self._get_network_client()
self.volume_client = self._get_volume_client(username,
password,
tenant_name)
self.volume_client = self._get_volume_client(credentials)
self.object_storage_client = self._get_object_storage_client(
username,
password,
tenant_name)
credentials)
self.orchestration_client = self._get_orchestration_client(
username,
password,
tenant_name)
credentials)
self.data_processing_client = self._get_data_processing_client(
username,
password,
tenant_name)
credentials)
def _get_roles(self):
keystone_admin = self._get_identity_client(
CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name)
admin_credentials = auth.get_default_credentials('identity_admin')
keystone_admin = self._get_identity_client(admin_credentials)
username = self.credentials['username']
tenant_name = self.credentials['tenant_name']
username = self.credentials.username
tenant_name = self.credentials.tenant_name
user_id = keystone_admin.users.find(name=username).id
tenant_id = keystone_admin.tenants.find(name=tenant_name).id
@ -501,20 +474,20 @@ class OfficialClientManager(manager.Manager):
return [r.name for r in roles]
def _get_compute_client(self, username, password, tenant_name):
def _get_compute_client(self, credentials):
# Novaclient will not execute operations for anyone but the
# identified user, so a new client needs to be created for
# each user that operations need to be performed for.
if not CONF.service_available.nova:
return None
import novaclient.client
self._validate_credentials(username, password, tenant_name)
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
region = CONF.identity.region
client_args = (username, password, tenant_name, auth_url)
client_args = (credentials.username, credentials.password,
credentials.tenant_name, auth_url)
# Create our default Nova client to use in testing
service_type = CONF.compute.catalog_type
@ -542,7 +515,7 @@ class OfficialClientManager(manager.Manager):
return glanceclient.Client('1', endpoint=endpoint, token=token,
insecure=dscv)
def _get_volume_client(self, username, password, tenant_name):
def _get_volume_client(self, credentials):
if not CONF.service_available.cinder:
return None
import cinderclient.client
@ -551,25 +524,23 @@ class OfficialClientManager(manager.Manager):
endpoint_type = CONF.volume.endpoint_type
dscv = CONF.identity.disable_ssl_certificate_validation
return cinderclient.client.Client(self.CINDERCLIENT_VERSION,
username,
password,
tenant_name,
credentials.username,
credentials.password,
credentials.tenant_name,
auth_url,
region_name=region,
endpoint_type=endpoint_type,
insecure=dscv,
http_log_debug=True)
def _get_object_storage_client(self, username, password, tenant_name):
def _get_object_storage_client(self, credentials):
if not CONF.service_available.swift:
return None
import swiftclient
auth_url = CONF.identity.uri
# add current tenant to swift operator role group.
keystone_admin = self._get_identity_client(
CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name)
admin_credentials = auth.get_default_credentials('identity_admin')
keystone_admin = self._get_identity_client(admin_credentials)
# enable test user to operate swift by adding operator role to him.
roles = keystone_admin.roles.list()
@ -586,26 +557,18 @@ class OfficialClientManager(manager.Manager):
endpoint_type = CONF.object_storage.endpoint_type
os_options = {'endpoint_type': endpoint_type}
return swiftclient.Connection(auth_url, username, password,
tenant_name=tenant_name,
return swiftclient.Connection(auth_url, credentials.username,
credentials.password,
tenant_name=credentials.tenant_name,
auth_version='2',
os_options=os_options)
def _get_orchestration_client(self, username=None, password=None,
tenant_name=None):
def _get_orchestration_client(self, credentials):
if not CONF.service_available.heat:
return None
import heatclient.client
if not username:
username = CONF.identity.admin_username
if not password:
password = CONF.identity.admin_password
if not tenant_name:
tenant_name = CONF.identity.tenant_name
self._validate_credentials(username, password, tenant_name)
keystone = self._get_identity_client(username, password, tenant_name)
keystone = self._get_identity_client(credentials)
region = CONF.identity.region
endpoint_type = CONF.orchestration.endpoint_type
token = keystone.auth_token
@ -622,22 +585,22 @@ class OfficialClientManager(manager.Manager):
return heatclient.client.Client(self.HEATCLIENT_VERSION,
endpoint,
token=token,
username=username,
password=password)
username=credentials.username,
password=credentials.password)
def _get_identity_client(self, username, password, tenant_name):
def _get_identity_client(self, credentials):
# This identity client is not intended to check the security
# of the identity service, so use admin credentials by default.
self._validate_credentials(username, password, tenant_name)
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
return keystoneclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
return keystoneclient.v2_0.client.Client(
username=credentials.username,
password=credentials.password,
tenant_name=credentials.tenant_name,
auth_url=auth_url,
insecure=dscv)
def _get_baremetal_client(self):
# ironic client is currently intended to by used by admin users
@ -654,9 +617,9 @@ class OfficialClientManager(manager.Manager):
service_type = CONF.baremetal.catalog_type
endpoint_type = CONF.baremetal.endpoint_type
creds = {
'os_username': self.credentials['username'],
'os_password': self.credentials['password'],
'os_tenant_name': self.credentials['tenant_name']
'os_username': self.credentials.username,
'os_password': self.credentials.password,
'os_tenant_name': self.credentials.tenant_name
}
try:
@ -680,41 +643,39 @@ class OfficialClientManager(manager.Manager):
if not CONF.service_available.neutron:
return None
import neutronclient.v2_0.client
username = CONF.identity.admin_username
password = CONF.identity.admin_password
tenant_name = CONF.identity.admin_tenant_name
self._validate_credentials(username, password, tenant_name)
credentials = auth.get_default_credentials('identity_admin')
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
endpoint_type = CONF.network.endpoint_type
return neutronclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
endpoint_type=endpoint_type,
auth_url=auth_url,
insecure=dscv)
return neutronclient.v2_0.client.Client(
username=credentials.username,
password=credentials.password,
tenant_name=credentials.tenant_name,
endpoint_type=endpoint_type,
auth_url=auth_url,
insecure=dscv)
def _get_data_processing_client(self, username, password, tenant_name):
def _get_data_processing_client(self, credentials):
if not CONF.service_available.sahara:
# Sahara isn't available
return None
import saharaclient.client
self._validate_credentials(username, password, tenant_name)
endpoint_type = CONF.data_processing.endpoint_type
catalog_type = CONF.data_processing.catalog_type
auth_url = CONF.identity.uri
client = saharaclient.client.Client(self.SAHARACLIENT_VERSION,
username, password,
project_name=tenant_name,
endpoint_type=endpoint_type,
service_type=catalog_type,
auth_url=auth_url)
client = saharaclient.client.Client(
self.SAHARACLIENT_VERSION,
credentials.username,
credentials.password,
project_name=credentials.tenant_name,
endpoint_type=endpoint_type,
service_type=catalog_type,
auth_url=auth_url)
return client

View File

@ -14,9 +14,6 @@
import netaddr
import keystoneclient.v2_0.client as keystoneclient
import neutronclient.v2_0.client as neutronclient
from tempest import auth
from tempest import clients
from tempest.common.utils import data_utils
@ -44,24 +41,6 @@ class IsolatedCreds(object):
self.identity_admin_client, self.network_admin_client = (
self._get_admin_clients())
def _get_official_admin_clients(self):
username = CONF.identity.admin_username
password = CONF.identity.admin_password
tenant_name = CONF.identity.admin_tenant_name
auth_url = CONF.identity.uri
dscv = CONF.identity.disable_ssl_certificate_validation
identity_client = keystoneclient.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
network_client = neutronclient.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
return identity_client, network_client
def _get_admin_clients(self):
"""
Returns a tuple with instances of the following admin clients (in this
@ -71,11 +50,11 @@ class IsolatedCreds(object):
"""
if self.tempest_client:
os = clients.AdminManager(interface=self.interface)
admin_clients = (os.identity_client,
os.network_client,)
else:
admin_clients = self._get_official_admin_clients()
return admin_clients
os = clients.OfficialClientManager(
auth.get_default_credentials('identity_admin')
)
return os.identity_client, os.network_client
def _create_tenant(self, name, description):
if self.tempest_client:
@ -388,13 +367,13 @@ class IsolatedCreds(object):
else:
return credentials
def get_primary_creds(self, old_style=True):
def get_primary_creds(self, old_style=False):
return self.get_credentials('primary', old_style)
def get_admin_creds(self, old_style=True):
def get_admin_creds(self, old_style=False):
return self.get_credentials('admin', old_style)
def get_alt_creds(self, old_style=True):
def get_alt_creds(self, old_style=False):
return self.get_credentials('alt', old_style)
def _clear_isolated_router(self, router_id, router_name):

View File

@ -29,7 +29,7 @@ class Manager(object):
and a client object for a test case to use in performing actions.
"""
def __init__(self, username=None, password=None, tenant_name=None):
def __init__(self, credentials=None):
"""
We allow overriding of the credentials used within the various
client classes managed by the Manager object. Left as None, the
@ -38,29 +38,18 @@ class Manager(object):
:param credentials: Override of the credentials
"""
self.auth_version = CONF.identity.auth_version
# FIXME(andreaf) Change Manager __init__ to accept a credentials dict
if username is None or password is None:
# Tenant None is a valid use case
self.credentials = self.get_default_credentials()
if credentials is None:
self.credentials = auth.get_default_credentials('user')
else:
self.credentials = dict(username=username, password=password,
tenant_name=tenant_name)
if self.auth_version == 'v3':
self.credentials['domain_name'] = 'Default'
self.credentials = credentials
# Check if passed or default credentials are valid
if not self.credentials.is_valid():
raise exceptions.InvalidCredentials()
# Creates an auth provider for the credentials
self.auth_provider = self.get_auth_provider(self.credentials)
# FIXME(andreaf) unused
self.client_attr_names = []
# we do this everywhere, have it be part of the super class
def _validate_credentials(self, username, password, tenant_name):
if None in (username, password, tenant_name):
msg = ("Missing required credentials. "
"username: %(u)s, password: %(p)s, "
"tenant_name: %(t)s" %
{'u': username, 'p': password, 't': tenant_name})
raise exceptions.InvalidConfiguration(msg)
@classmethod
def get_auth_provider_class(cls, auth_version):
if auth_version == 'v2':
@ -68,13 +57,6 @@ class Manager(object):
else:
return auth.KeystoneV3AuthProvider
def get_default_credentials(self):
return dict(
username=CONF.identity.username,
password=CONF.identity.password,
tenant_name=CONF.identity.tenant_name
)
def get_auth_provider(self, credentials):
if credentials is None:
raise exceptions.InvalidCredentials(

View File

@ -24,6 +24,7 @@ from neutronclient.common import exceptions as exc
from novaclient import exceptions as nova_exceptions
from tempest.api.network import common as net_common
from tempest import auth
from tempest import clients
from tempest.common import isolated_creds
from tempest.common.utils import data_utils
@ -65,10 +66,8 @@ class OfficialClientTest(tempest.test.BaseTestCase):
cls.__name__, tempest_client=False,
network_resources=cls.network_resources)
username, password, tenant_name = cls.credentials()
cls.manager = clients.OfficialClientManager(
username, password, tenant_name)
credentials=cls.credentials())
cls.compute_client = cls.manager.compute_client
cls.image_client = cls.manager.image_client
cls.baremetal_client = cls.manager.baremetal_client
@ -82,27 +81,27 @@ class OfficialClientTest(tempest.test.BaseTestCase):
cls.os_resources = []
@classmethod
def _get_credentials(cls, get_creds, prefix):
def _get_credentials(cls, get_creds, ctype):
if CONF.compute.allow_tenant_isolation:
username, tenant_name, password = get_creds()
creds = get_creds()
else:
username = getattr(CONF.identity, prefix + 'username')
password = getattr(CONF.identity, prefix + 'password')
tenant_name = getattr(CONF.identity, prefix + 'tenant_name')
return username, password, tenant_name
creds = auth.get_default_credentials(ctype)
return creds
@classmethod
def credentials(cls):
return cls._get_credentials(cls.isolated_creds.get_primary_creds, '')
return cls._get_credentials(cls.isolated_creds.get_primary_creds,
'user')
@classmethod
def alt_credentials(cls):
return cls._get_credentials(cls.isolated_creds.get_alt_creds, 'alt_')
return cls._get_credentials(cls.isolated_creds.get_alt_creds,
'alt_user')
@classmethod
def admin_credentials(cls):
return cls._get_credentials(cls.isolated_creds.get_admin_creds,
'admin_')
'identity_admin')
@staticmethod
def cleanup_resource(resource, test_name):
@ -541,13 +540,7 @@ class NetworkScenarioTest(OfficialClientTest):
@classmethod
def setUpClass(cls):
super(NetworkScenarioTest, cls).setUpClass()
if CONF.compute.allow_tenant_isolation:
cls.tenant_id = cls.isolated_creds.get_primary_tenant().id
else:
cls.tenant_id = cls.manager._get_identity_client(
CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name).tenant_id
cls.tenant_id = cls.manager.identity_client.tenant_id
def _create_network(self, tenant_id, namestart='network-smoke-'):
name = data_utils.rand_name(namestart)
@ -1053,10 +1046,10 @@ class OrchestrationScenarioTest(OfficialClientTest):
@classmethod
def credentials(cls):
username = CONF.identity.admin_username
password = CONF.identity.admin_password
tenant_name = CONF.identity.tenant_name
return username, password, tenant_name
admin_creds = auth.get_default_credentials('identity_admin')
creds = auth.get_default_credentials('user')
admin_creds.tenant_name = creds.tenant_name
return admin_creds
def _load_template(self, base_file, file_name):
filepath = os.path.join(os.path.dirname(os.path.realpath(base_file)),

View File

@ -98,17 +98,15 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
access point
"""
def __init__(self, tenant_id, tenant_user, tenant_pass, tenant_name):
self.manager = clients.OfficialClientManager(
tenant_user,
tenant_pass,
tenant_name
)
def __init__(self, credentials):
self.manager = clients.OfficialClientManager(credentials)
# Credentials from manager are filled with both names and IDs
self.creds = self.manager.credentials
self.keypair = None
self.tenant_id = tenant_id
self.tenant_name = tenant_name
self.tenant_user = tenant_user
self.tenant_pass = tenant_pass
self.tenant_id = credentials.tenant_id
self.tenant_name = credentials.tenant_name
self.tenant_user = credentials.username
self.tenant_pass = credentials.password
self.network = None
self.subnet = None
self.router = None
@ -140,19 +138,17 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
@classmethod
def setUpClass(cls):
super(TestSecurityGroupsBasicOps, cls).setUpClass()
alt_creds = cls.alt_credentials()
cls.alt_tenant_id = cls.manager._get_identity_client(
*alt_creds
).tenant_id
cls.alt_creds = cls.alt_credentials()
cls.alt_manager = clients.OfficialClientManager(cls.alt_creds)
cls.alt_tenant_id = cls.alt_manager.identity_client.tenant_id
cls.check_preconditions()
# TODO(mnewby) Consider looking up entities as needed instead
# of storing them as collections on the class.
cls.floating_ips = {}
cls.tenants = {}
cls.primary_tenant = cls.TenantProperties(cls.tenant_id,
*cls.credentials())
cls.alt_tenant = cls.TenantProperties(cls.alt_tenant_id,
*alt_creds)
creds = cls.credentials()
cls.primary_tenant = cls.TenantProperties(creds)
cls.alt_tenant = cls.TenantProperties(cls.alt_creds)
for tenant in [cls.primary_tenant, cls.alt_tenant]:
cls.tenants[tenant.tenant_id] = tenant
cls.floating_ip_access = not CONF.network.public_router_id

View File

@ -21,6 +21,7 @@ import unicodedata
import testscenarios
import testtools
from tempest import auth
from tempest import clients
from tempest.common.utils import misc
from tempest import config
@ -39,9 +40,8 @@ class ImageUtils(object):
self.non_ssh_image_pattern = \
CONF.input_scenario.non_ssh_image_regex
# Setup clients
ocm = clients.OfficialClientManager(CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name)
ocm = clients.OfficialClientManager(
auth.get_default_credentials('user'))
self.client = ocm.compute_client
def ssh_user(self, image_id):
@ -99,9 +99,8 @@ class InputScenarioUtils(object):
digit=string.digits)
def __init__(self):
ocm = clients.OfficialClientManager(CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name)
ocm = clients.OfficialClientManager(
auth.get_default_credentials('user', fill_in=False))
self.client = ocm.compute_client
self.image_pattern = CONF.input_scenario.image_regex
self.flavor_pattern = CONF.input_scenario.flavor_regex

View File

@ -19,6 +19,7 @@ import time
from six import moves
from tempest import auth
from tempest import clients
from tempest.common import ssh
from tempest.common.utils import data_utils
@ -147,9 +148,10 @@ def stress_openstack(tests, duration, max_runs=None, stop_on_error=False):
password,
tenant['id'],
"email")
manager = clients.Manager(username=username,
password="pass",
tenant_name=tenant_name)
creds = auth.get_credentials(username=username,
password=password,
tenant_name=tenant_name)
manager = clients.Manager(credentials=creds)
test_obj = importutils.import_class(test['action'])
test_run = test_obj(manager, max_runs, stop_on_error)

View File

@ -307,26 +307,18 @@ class BaseTestCase(BaseDeps):
cls.__name__, network_resources=cls.network_resources)
force_tenant_isolation = getattr(cls, 'force_tenant_isolation', None)
if (CONF.compute.allow_tenant_isolation or
force_tenant_isolation):
if CONF.compute.allow_tenant_isolation or force_tenant_isolation:
creds = cls.isolated_creds.get_primary_creds()
username, tenant_name, password = creds
if getattr(cls, '_interface', None):
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
os = clients.Manager(credentials=creds,
interface=cls._interface,
service=cls._service)
elif interface:
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
os = clients.Manager(credentials=creds,
interface=interface,
service=cls._service)
else:
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
os = clients.Manager(credentials=creds,
service=cls._service)
else:
if getattr(cls, '_interface', None):

View File

@ -17,6 +17,7 @@ from mock import patch
import neutronclient.v2_0.client as neutronclient
from oslo.config import cfg
from tempest import clients
from tempest.common import http
from tempest.common import isolated_creds
from tempest import config
@ -52,6 +53,12 @@ class TestTenantIsolation(base.TestCase):
def test_official_client(self):
self.useFixture(mockpatch.PatchObject(keystoneclient.Client,
'authenticate'))
self.useFixture(mockpatch.PatchObject(clients.OfficialClientManager,
'_get_image_client'))
self.useFixture(mockpatch.PatchObject(clients.OfficialClientManager,
'_get_object_storage_client'))
self.useFixture(mockpatch.PatchObject(clients.OfficialClientManager,
'_get_orchestration_client'))
iso_creds = isolated_creds.IsolatedCreds('test class',
tempest_client=False)
self.assertTrue(isinstance(iso_creds.identity_admin_client,
@ -142,7 +149,7 @@ class TestTenantIsolation(base.TestCase):
self.addCleanup(user_mock.stop)
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role') as user_mock:
admin_creds = iso_creds.get_admin_creds(old_style=False)
admin_creds = iso_creds.get_admin_creds()
user_mock.assert_called_once_with('1234', '1234', '1234')
self.assertEqual(admin_creds.username, 'fake_admin_user')
self.assertEqual(admin_creds.tenant_name, 'fake_admin_tenant')
@ -176,7 +183,7 @@ class TestTenantIsolation(base.TestCase):
[{'id': '123456', 'name': 'admin'}])))
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role'):
iso_creds.get_admin_creds(old_style=False)
iso_creds.get_admin_creds()
user_mock = self.patch(
'tempest.services.identity.json.identity_client.'
'IdentityClientJSON.delete_user')
@ -290,7 +297,7 @@ class TestTenantIsolation(base.TestCase):
[{'id': '123456', 'name': 'admin'}])))
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role'):
iso_creds.get_admin_creds(old_style=False)
iso_creds.get_admin_creds()
self.patch('tempest.services.identity.json.identity_client.'
'IdentityClientJSON.delete_user')
self.patch('tempest.services.identity.json.identity_client.'
@ -353,7 +360,7 @@ class TestTenantIsolation(base.TestCase):
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClientJSON.'
'add_router_interface_with_subnet_id')
username, tenant_name, password = iso_creds.get_alt_creds()
iso_creds.get_alt_creds()
router_interface_mock.called_once_with('1234', '1234')
network = iso_creds.get_alt_network()
subnet = iso_creds.get_alt_subnet()
@ -384,7 +391,7 @@ class TestTenantIsolation(base.TestCase):
[{'id': '123456', 'name': 'admin'}])))
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role'):
username, tenant_name, password = iso_creds.get_admin_creds()
iso_creds.get_admin_creds()
router_interface_mock.called_once_with('1234', '1234')
network = iso_creds.get_admin_network()
subnet = iso_creds.get_admin_subnet()
@ -419,7 +426,7 @@ class TestTenantIsolation(base.TestCase):
'delete_router')
router_mock = router.start()
username, tenant_name, password = iso_creds.get_primary_creds()
iso_creds.get_primary_creds()
self.assertEqual(net_mock.mock_calls, [])
self.assertEqual(subnet_mock.mock_calls, [])
self.assertEqual(router_mock.mock_calls, [])