Use auth data to fill credentials

Provide the ability to automatically fill in credentials
details e.g. IDs if names where provided.
Verified via unit test.

Partially implements: bp multi-keystone-api-version-tests

Change-Id: I505e5024754fe1b912104ce4d5d3206f4cedd6d8
This commit is contained in:
Andrea Frittoli 2014-03-20 08:36:23 +00:00
parent b1b04bbd18
commit 2095d2493a
4 changed files with 202 additions and 18 deletions

View File

@ -80,6 +80,17 @@ class AuthProvider(object):
def _get_auth(self):
raise NotImplementedError
def _fill_credentials(self, auth_data_body):
raise NotImplementedError
def fill_credentials(self):
"""
Fill credentials object with data from auth
"""
auth_data = self.get_auth()
self._fill_credentials(auth_data[1])
return self.credentials
@classmethod
def check_credentials(cls, credentials):
"""
@ -89,20 +100,35 @@ class AuthProvider(object):
@property
def auth_data(self):
if self.cache is None or self.is_expired(self.cache):
self.cache = self._get_auth()
return self.cache
return self.get_auth()
@auth_data.deleter
def auth_data(self):
self.clear_auth()
def get_auth(self):
"""
Returns auth from cache if available, else auth first
"""
if self.cache is None or self.is_expired(self.cache):
self.set_auth()
return self.cache
def set_auth(self):
"""
Forces setting auth, ignores cache if it exists.
Refills credentials
"""
self.cache = self._get_auth()
self._fill_credentials(self.cache[1])
def clear_auth(self):
"""
Can be called to clear the access cache so that next request
will fetch a new token and base_url.
"""
self.cache = None
self.credentials.reset()
def is_expired(self, auth_data):
raise NotImplementedError
@ -244,6 +270,18 @@ class KeystoneV2AuthProvider(KeystoneAuthProvider):
else:
raise NotImplementedError
def _fill_credentials(self, auth_data_body):
tenant = auth_data_body['token']['tenant']
user = auth_data_body['user']
if self.credentials.tenant_name is None:
self.credentials.tenant_name = tenant['name']
if self.credentials.tenant_id is None:
self.credentials.tenant_id = tenant['id']
if self.credentials.username is None:
self.credentials.username = user['name']
if self.credentials.user_id is None:
self.credentials.user_id = user['id']
def base_url(self, filters, auth_data=None):
"""
Filters can be:
@ -320,6 +358,39 @@ class KeystoneV3AuthProvider(KeystoneAuthProvider):
else:
raise NotImplementedError
def _fill_credentials(self, auth_data_body):
# project or domain, depending on the scope
project = auth_data_body.get('project', None)
domain = auth_data_body.get('domain', None)
# user is always there
user = auth_data_body['user']
# Set project fields
if project is not None:
if self.credentials.project_name is None:
self.credentials.project_name = project['name']
if self.credentials.project_id is None:
self.credentials.project_id = project['id']
if self.credentials.project_domain_id is None:
self.credentials.project_domain_id = project['domain']['id']
if self.credentials.project_domain_name is None:
self.credentials.project_domain_name = \
project['domain']['name']
# Set domain fields
if domain is not None:
if self.credentials.domain_id is None:
self.credentials.domain_id = domain['id']
if self.credentials.domain_name is None:
self.credentials.domain_name = domain['name']
# Set user fields
if self.credentials.username is None:
self.credentials.username = user['name']
if self.credentials.user_id is None:
self.credentials.user_id = user['id']
if self.credentials.user_domain_id is None:
self.credentials.user_domain_id = user['domain']['id']
if self.credentials.user_domain_name is None:
self.credentials.user_domain_name = user['domain']['name']
def base_url(self, filters, auth_data=None):
"""
Filters can be:
@ -387,15 +458,15 @@ class KeystoneV3AuthProvider(KeystoneAuthProvider):
datetime.datetime.utcnow()
def get_default_credentials(credential_type):
def get_default_credentials(credential_type, fill_in=True):
"""
Returns configured credentials of the specified type
based on the configured auth_version
"""
return get_credentials(credential_type=credential_type)
return get_credentials(fill_in=fill_in, credential_type=credential_type)
def get_credentials(credential_type=None, **kwargs):
def get_credentials(credential_type=None, fill_in=True, **kwargs):
"""
Builds a credentials object based on the configured auth_version
@ -415,14 +486,20 @@ def get_credentials(credential_type=None, **kwargs):
"""
if CONF.identity.auth_version == 'v2':
credential_class = KeystoneV2Credentials
auth_provider_class = KeystoneV2AuthProvider
elif CONF.identity.auth_version == 'v3':
credential_class = KeystoneV3Credentials
auth_provider_class = KeystoneV3AuthProvider
else:
raise exceptions.InvalidConfiguration('Unsupported auth version')
if credential_type is not None:
creds = credential_class.get_default(credential_type)
else:
creds = credential_class(**kwargs)
# Fill in the credentials fields that were not specified
if fill_in:
auth_provider = auth_provider_class(creds)
creds = auth_provider.fill_credentials()
return creds
@ -451,6 +528,7 @@ class Credentials(object):
Additional attributes can still be set afterwards if tests need
to do so.
"""
self._initial = kwargs
self._apply_credentials(kwargs)
def _apply_credentials(self, attr):
@ -511,6 +589,14 @@ class Credentials(object):
def is_valid(self):
raise NotImplementedError
def reset(self):
# First delete all known attributes
for key in self.ATTRIBUTES:
if getattr(self, key) is not None:
delattr(self, key)
# Then re-apply initial setup
self._apply_credentials(self._initial)
class KeystoneV2Credentials(Credentials):

View File

@ -109,6 +109,10 @@ class TestBaseAuthProvider(BaseAuthTestsSetUp):
self.assertIsNone(self.auth_provider.alt_part)
self.assertIsNone(self.auth_provider.alt_auth_data)
def test_fill_credentials(self):
self.assertRaises(NotImplementedError,
self.auth_provider.fill_credentials)
class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
_endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
@ -133,6 +137,13 @@ class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
def _get_token_from_fake_identity(self):
return fake_identity.TOKEN
def _get_from_fake_identity(self, attr):
access = fake_identity.IDENTITY_V2_RESPONSE['access']
if attr == 'user_id':
return access['user']['id']
elif attr == 'tenant_id':
return access['token']['tenant']['id']
def _test_request_helper(self, filters, expected):
url, headers, body = self.auth_provider.auth_request('GET',
self.target_url,
@ -220,6 +231,13 @@ class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
del cred[attr]
self.assertFalse(self.auth_provider.check_credentials(cred))
def test_fill_credentials(self):
self.auth_provider.fill_credentials()
creds = self.auth_provider.credentials
for attr in ['user_id', 'tenant_id']:
self.assertEqual(self._get_from_fake_identity(attr),
getattr(creds, attr))
def _test_base_url_helper(self, expected_url, filters,
auth_data=None):
@ -340,9 +358,20 @@ class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
access['expires_at'] = date_as_string
return token, access
def _get_from_fake_identity(self, attr):
token = fake_identity.IDENTITY_V3_RESPONSE['token']
if attr == 'user_id':
return token['user']['id']
elif attr == 'project_id':
return token['project']['id']
elif attr == 'user_domain_id':
return token['user']['domain']['id']
elif attr == 'project_domain_id':
return token['project']['domain']['id']
def test_check_credentials_missing_attribute(self):
# reset credentials to fresh ones
self.credentials = fake_credentials.FakeKeystoneV3Credentials()
self.credentials.reset()
for attr in ['username', 'password', 'user_domain_name',
'project_domain_name']:
cred = copy.copy(self.credentials)
@ -352,7 +381,7 @@ class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
def test_check_domain_credentials_missing_attribute(self):
# reset credentials to fresh ones
self.credentials = fake_credentials.FakeKeystoneV3Credentials()
self.credentials.reset()
domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials()
for attr in ['username', 'password', 'user_domain_name']:
cred = copy.copy(domain_creds)
@ -360,6 +389,14 @@ class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
self.assertFalse(self.auth_provider.check_credentials(cred),
"Credentials should be invalid without %s" % attr)
def test_fill_credentials(self):
self.auth_provider.fill_credentials()
creds = self.auth_provider.credentials
for attr in ['user_id', 'project_id', 'user_domain_id',
'project_domain_id']:
self.assertEqual(self._get_from_fake_identity(attr),
getattr(creds, attr))
# Overwrites v2 test
def test_base_url_to_get_admin_endpoint(self):
self.filters = {

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo.config import cfg
from tempest import auth
@ -42,6 +44,10 @@ class CredentialsTests(base.TestCase):
self.useFixture(fake_config.ConfigFixture())
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
def test_create(self):
creds = self._get_credentials()
self.assertEqual(self.attributes, creds._initial)
def test_create_invalid_attr(self):
self.assertRaises(exceptions.InvalidCredentials,
self._get_credentials,
@ -79,19 +85,44 @@ class KeystoneV2CredentialsTests(CredentialsTests):
self.stubs.Set(http.ClosingHttp, 'request', self.identity_response)
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
def _verify_credentials(self, credentials_class, creds_dict):
creds = auth.get_credentials(**creds_dict)
# Check the right version of credentials has been returned
self.assertIsInstance(creds, credentials_class)
# Check the id attributes are filled in
attributes = [x for x in creds.ATTRIBUTES if (
'_id' in x and x != 'domain_id')]
for attr in attributes:
self.assertIsNone(getattr(creds, attr))
def _verify_credentials(self, credentials_class, filled=True,
creds_dict=None):
def _check(credentials):
# Check the right version of credentials has been returned
self.assertIsInstance(credentials, credentials_class)
# Check the id attributes are filled in
attributes = [x for x in credentials.ATTRIBUTES if (
'_id' in x and x != 'domain_id')]
for attr in attributes:
if filled:
self.assertIsNotNone(getattr(credentials, attr))
else:
self.assertIsNone(getattr(credentials, attr))
if creds_dict is None:
for ctype in auth.Credentials.TYPES:
creds = auth.get_default_credentials(credential_type=ctype,
fill_in=filled)
_check(creds)
else:
creds = auth.get_credentials(fill_in=filled, **creds_dict)
_check(creds)
def test_get_default_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(credentials_class=self.credentials_class)
def test_get_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(self.credentials_class, self.attributes)
self._verify_credentials(credentials_class=self.credentials_class,
creds_dict=self.attributes)
def test_get_credentials_not_filled(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(credentials_class=self.credentials_class,
filled=False,
creds_dict=self.attributes)
def test_is_valid(self):
creds = self._get_credentials()
@ -113,6 +144,30 @@ class KeystoneV2CredentialsTests(CredentialsTests):
# are defined as fake_* in fake_config.py
self.assertEqual(getattr(creds, attr), 'fake_' + attr)
def test_reset_all_attributes(self):
creds = self._get_credentials()
initial_creds = copy.deepcopy(creds)
set_attr = creds.__dict__.keys()
missing_attr = set(creds.ATTRIBUTES).difference(set_attr)
# Set all unset attributes, then reset
for attr in missing_attr:
setattr(creds, attr, 'fake' + attr)
creds.reset()
# Check reset credentials are same as initial ones
self.assertEqual(creds, initial_creds)
def test_reset_single_attribute(self):
creds = self._get_credentials()
initial_creds = copy.deepcopy(creds)
set_attr = creds.__dict__.keys()
missing_attr = set(creds.ATTRIBUTES).difference(set_attr)
# Set one unset attributes, then reset
for attr in missing_attr:
setattr(creds, attr, 'fake' + attr)
creds.reset()
# Check reset credentials are same as initial ones
self.assertEqual(creds, initial_creds)
class KeystoneV3CredentialsTests(KeystoneV2CredentialsTests):
attributes = {

View File

@ -17,6 +17,7 @@ from mock import patch
import neutronclient.v2_0.client as neutronclient
from oslo.config import cfg
from tempest.common import http
from tempest.common import isolated_creds
from tempest import config
from tempest import exceptions
@ -27,6 +28,8 @@ from tempest.services.network.json import network_client as json_network_client
from tempest.services.network.xml import network_client as xml_network_client
from tempest.tests import base
from tempest.tests import fake_config
from tempest.tests import fake_http
from tempest.tests import fake_identity
class TestTenantIsolation(base.TestCase):
@ -35,6 +38,9 @@ class TestTenantIsolation(base.TestCase):
super(TestTenantIsolation, self).setUp()
self.useFixture(fake_config.ConfigFixture())
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
self.fake_http = fake_http.fake_httplib2(return_type=200)
self.stubs.Set(http.ClosingHttp, 'request',
fake_identity._fake_v2_response)
def test_tempest_client(self):
iso_creds = isolated_creds.IsolatedCreds('test class')