Simplify the KeystoneToken model

Now that we only really use the validate_v3_token method before
passing data to the token model, we can get rid of a lot of the
v2.0 logic. In the future, the token model shouldn't care about
any sort of version. This moves us in that direction.

Change-Id: I2687750e0c3647c3302c4479af5b2498c8d7351b
This commit is contained in:
Lance Bragstad 2016-09-30 21:54:18 +00:00
parent 52bde3cf08
commit f84dd99965
3 changed files with 61 additions and 235 deletions

View File

@ -46,15 +46,11 @@ class KeystoneToken(dict):
def __init__(self, token_id, token_data):
self.token_data = token_data
if 'access' in token_data:
super(KeystoneToken, self).__init__(**token_data['access'])
self.version = V2
elif 'token' in token_data and 'methods' in token_data['token']:
super(KeystoneToken, self).__init__(**token_data['token'])
self.version = V3
else:
raise exception.UnsupportedTokenVersionException()
self.token_id = token_id
try:
super(KeystoneToken, self).__init__(**token_data['token'])
except KeyError:
raise exception.UnsupportedTokenVersionException()
self.short_id = cms.cms_hash_token(token_id,
mode=CONF.token.hash_algorithm)
@ -75,31 +71,19 @@ class KeystoneToken(dict):
@property
def expires(self):
if self.version is V3:
expires_at = self['expires_at']
else:
expires_at = self['token']['expires']
return _parse_and_normalize_time(expires_at)
return _parse_and_normalize_time(self['expires_at'])
@property
def issued(self):
if self.version is V3:
issued_at = self['issued_at']
else:
issued_at = self['token']['issued_at']
return _parse_and_normalize_time(issued_at)
return _parse_and_normalize_time(self['issued_at'])
@property
def audit_id(self):
if self.version is V3:
return self.get('audit_ids', [None])[0]
return self['token'].get('audit_ids', [None])[0]
return self.get('audit_ids', [None])[0]
@property
def audit_chain_id(self):
if self.version is V3:
return self.get('audit_ids', [None])[-1]
return self['token'].get('audit_ids', [None])[-1]
return self.get('audit_ids', [None])[-1]
@property
def auth_token(self):
@ -116,10 +100,7 @@ class KeystoneToken(dict):
@property
def user_domain_name(self):
try:
if self.version == V3:
return self['user']['domain']['name']
elif 'user' in self:
return "Default"
return self['user']['domain']['name']
except KeyError: # nosec
# Do not raise KeyError, raise UnexpectedError
pass
@ -128,10 +109,7 @@ class KeystoneToken(dict):
@property
def user_domain_id(self):
try:
if self.version == V3:
return self['user']['domain']['id']
elif 'user' in self:
return CONF.identity.default_domain_id
return self['user']['domain']['id']
except KeyError: # nosec
# Do not raise KeyError, raise UnexpectedError
pass
@ -139,33 +117,24 @@ class KeystoneToken(dict):
@property
def domain_id(self):
if self.version is V3:
try:
return self['domain']['id']
except KeyError:
# Do not raise KeyError, raise UnexpectedError
raise exception.UnexpectedError()
# No domain scoped tokens in V2.
raise NotImplementedError()
try:
return self['domain']['id']
except KeyError:
# Do not raise KeyError, raise UnexpectedError
raise exception.UnexpectedError()
@property
def domain_name(self):
if self.version is V3:
try:
return self['domain']['name']
except KeyError:
# Do not raise KeyError, raise UnexpectedError
raise exception.UnexpectedError()
# No domain scoped tokens in V2.
raise NotImplementedError()
try:
return self['domain']['name']
except KeyError:
# Do not raise KeyError, raise UnexpectedError
raise exception.UnexpectedError()
@property
def project_id(self):
try:
if self.version is V3:
return self['project']['id']
else:
return self['token']['tenant']['id']
return self['project']['id']
except KeyError:
# Do not raise KeyError, raise UnexpectedError
raise exception.UnexpectedError()
@ -173,10 +142,7 @@ class KeystoneToken(dict):
@property
def project_name(self):
try:
if self.version is V3:
return self['project']['name']
else:
return self['token']['tenant']['name']
return self['project']['name']
except KeyError:
# Do not raise KeyError, raise UnexpectedError
raise exception.UnexpectedError()
@ -184,10 +150,7 @@ class KeystoneToken(dict):
@property
def project_domain_id(self):
try:
if self.version is V3:
return self['project']['domain']['id']
elif 'tenant' in self['token']:
return CONF.identity.default_domain_id
return self['project']['domain']['id']
except KeyError: # nosec
# Do not raise KeyError, raise UnexpectedError
pass
@ -197,10 +160,7 @@ class KeystoneToken(dict):
@property
def project_domain_name(self):
try:
if self.version is V3:
return self['project']['domain']['name']
if 'tenant' in self['token']:
return 'Default'
return self['project']['domain']['name']
except KeyError: # nosec
# Do not raise KeyError, raise UnexpectedError
pass
@ -209,23 +169,17 @@ class KeystoneToken(dict):
@property
def is_domain(self):
if self.version is V3:
if 'is_domain' in self:
return self['is_domain']
if 'is_domain' in self:
return self['is_domain']
return False
@property
def project_scoped(self):
if self.version is V3:
return 'project' in self
else:
return 'tenant' in self['token']
return 'project' in self
@property
def domain_scoped(self):
if self.version is V3:
return 'domain' in self
return False
return 'domain' in self
@property
def scoped(self):
@ -233,40 +187,23 @@ class KeystoneToken(dict):
@property
def trust_id(self):
if self.version is V3:
return self.get('OS-TRUST:trust', {}).get('id')
else:
return self.get('trust', {}).get('id')
return self.get('OS-TRUST:trust', {}).get('id')
@property
def trust_scoped(self):
if self.version is V3:
return 'OS-TRUST:trust' in self
else:
return 'trust' in self
return 'OS-TRUST:trust' in self
@property
def trustee_user_id(self):
if self.version is V3:
return self.get(
'OS-TRUST:trust', {}).get('trustee_user_id')
else:
return self.get('trust', {}).get('trustee_user_id')
return self.get('OS-TRUST:trust', {}).get('trustee_user_id')
@property
def trustor_user_id(self):
if self.version is V3:
return self.get(
'OS-TRUST:trust', {}).get('trustor_user_id')
else:
return self.get('trust', {}).get('trustor_user_id')
return self.get('OS-TRUST:trust', {}).get('trustor_user_id')
@property
def trust_impersonation(self):
if self.version is V3:
return self.get('OS-TRUST:trust', {}).get('impersonation')
else:
return self.get('trust', {}).get('impersonation')
return self.get('OS-TRUST:trust', {}).get('impersonation')
@property
def oauth_scoped(self):
@ -274,65 +211,55 @@ class KeystoneToken(dict):
@property
def oauth_access_token_id(self):
if self.version is V3 and self.oauth_scoped:
if self.oauth_scoped:
return self['OS-OAUTH1']['access_token_id']
return None
@property
def oauth_consumer_id(self):
if self.version is V3 and self.oauth_scoped:
if self.oauth_scoped:
return self['OS-OAUTH1']['consumer_id']
return None
@property
def role_ids(self):
if self.version is V3:
return [r['id'] for r in self.get('roles', [])]
else:
return self.get('metadata', {}).get('roles', [])
return [r['id'] for r in self.get('roles', [])]
@property
def role_names(self):
if self.version is V3:
return [r['name'] for r in self.get('roles', [])]
else:
return [r['name'] for r in self['user'].get('roles', [])]
return [r['name'] for r in self.get('roles', [])]
@property
def bind(self):
if self.version is V3:
return self.get('bind')
return self.get('token', {}).get('bind')
return self.get('bind')
@property
def is_federated_user(self):
try:
return (self.version is V3 and
constants.FEDERATION in self['user'])
return constants.FEDERATION in self['user']
except KeyError:
raise exception.UnexpectedError()
@property
def federation_group_ids(self):
if self.is_federated_user:
if self.version is V3:
try:
groups = self['user'][constants.FEDERATION].get(
'groups', [])
return [g['id'] for g in groups]
except KeyError:
raise exception.UnexpectedError()
try:
groups = self['user'][constants.FEDERATION].get('groups', [])
return [g['id'] for g in groups]
except KeyError:
raise exception.UnexpectedError()
return []
@property
def federation_idp_id(self):
if self.version is not V3 or not self.is_federated_user:
return None
return self['user'][constants.FEDERATION]['identity_provider']['id']
if self.is_federated_user:
return (
self['user'][constants.FEDERATION]['identity_provider']['id']
)
@property
def federation_protocol_id(self):
if self.version is V3 and self.is_federated_user:
if self.is_federated_user:
return self['user'][constants.FEDERATION]['protocol']['id']
return None
@ -342,6 +269,4 @@ class KeystoneToken(dict):
@property
def methods(self):
if self.version is V3:
return self.get('methods', [])
return []
return self.get('methods', [])

View File

@ -38,7 +38,6 @@ class TestKeystoneTokenModel(core.TestCase):
def test_token_model_v3(self):
token_data = token_model.KeystoneToken(uuid.uuid4().hex,
self.v3_sample_token)
self.assertIs(token_model.V3, token_data.version)
expires = timeutils.normalize_time(timeutils.parse_isotime(
self.v3_sample_token['token']['expires_at']))
issued = timeutils.normalize_time(timeutils.parse_isotime(
@ -140,109 +139,6 @@ class TestKeystoneTokenModel(core.TestCase):
self.assertEqual(federation_data['identity_provider']['id'],
token_data.federation_idp_id)
def test_token_model_v2_federated_user(self):
token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
token_data=self.v2_sample_token)
federation_data = {'identity_provider': {'id': uuid.uuid4().hex},
'protocol': {'id': 'saml2'},
'groups': [{'id': uuid.uuid4().hex}
for x in range(1, 5)]}
self.assertFalse(token_data.is_federated_user)
self.assertEqual([], token_data.federation_group_ids)
self.assertIsNone(token_data.federation_protocol_id)
self.assertIsNone(token_data.federation_idp_id)
token_data['user'][federation_constants.FEDERATION] = federation_data
# Federated users should not exist in V2, the data should remain empty
self.assertFalse(token_data.is_federated_user)
self.assertEqual([], token_data.federation_group_ids)
self.assertIsNone(token_data.federation_protocol_id)
self.assertIsNone(token_data.federation_idp_id)
def test_token_model_v2(self):
token_data = token_model.KeystoneToken(uuid.uuid4().hex,
self.v2_sample_token)
self.assertIs(token_model.V2, token_data.version)
expires = timeutils.normalize_time(timeutils.parse_isotime(
self.v2_sample_token['access']['token']['expires']))
issued = timeutils.normalize_time(timeutils.parse_isotime(
self.v2_sample_token['access']['token']['issued_at']))
self.assertEqual(expires, token_data.expires)
self.assertEqual(issued, token_data.issued)
self.assertEqual(self.v2_sample_token['access']['user']['id'],
token_data.user_id)
self.assertEqual(self.v2_sample_token['access']['user']['name'],
token_data.user_name)
self.assertEqual(CONF.identity.default_domain_id,
token_data.user_domain_id)
self.assertEqual('Default', token_data.user_domain_name)
self.assertEqual(CONF.identity.default_domain_id,
token_data.project_domain_id)
self.assertEqual('Default',
token_data.project_domain_name)
self.assertEqual(self.v2_sample_token['access']['trust']['id'],
token_data.trust_id)
self.assertEqual(
self.v2_sample_token['access']['trust']['trustor_user_id'],
token_data.trustor_user_id)
self.assertEqual(
self.v2_sample_token['access']['trust']['impersonation'],
token_data.trust_impersonation)
self.assertEqual(
self.v2_sample_token['access']['trust']['trustee_user_id'],
token_data.trustee_user_id)
# Project Scoped Token
self.assertEqual(
self.v2_sample_token['access']['token']['tenant']['id'],
token_data.project_id)
self.assertEqual(
self.v2_sample_token['access']['token']['tenant']['name'],
token_data.project_name)
self.assertTrue(token_data.project_scoped)
self.assertTrue(token_data.scoped)
self.assertTrue(token_data.trust_scoped)
self.assertEqual(
[r['name']
for r in self.v2_sample_token['access']['user']['roles']],
token_data.role_names)
token_data['token'].pop('tenant')
self.assertFalse(token_data.scoped)
self.assertFalse(token_data.project_scoped)
self.assertFalse(token_data.domain_scoped)
self.assertRaises(exception.UnexpectedError, getattr, token_data,
'project_id')
self.assertRaises(exception.UnexpectedError, getattr, token_data,
'project_name')
self.assertRaises(exception.UnexpectedError, getattr, token_data,
'project_domain_id')
self.assertRaises(exception.UnexpectedError, getattr, token_data,
'project_domain_id')
# No Domain Scoped tokens in V2
self.assertRaises(NotImplementedError, getattr, token_data,
'domain_id')
self.assertRaises(NotImplementedError, getattr, token_data,
'domain_name')
token_data['domain'] = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.assertRaises(NotImplementedError, getattr, token_data,
'domain_id')
self.assertRaises(NotImplementedError, getattr, token_data,
'domain_name')
self.assertFalse(token_data.domain_scoped)
token_data['token']['audit_ids'] = [uuid.uuid4().hex]
self.assertEqual(token_data.audit_chain_id,
token_data['token']['audit_ids'][0])
token_data['token']['audit_ids'].append(uuid.uuid4().hex)
self.assertEqual(token_data.audit_chain_id,
token_data['token']['audit_ids'][1])
self.assertEqual(token_data.audit_id,
token_data['token']['audit_ids'][0])
del token_data['token']['audit_ids']
self.assertIsNone(token_data.audit_id)
self.assertIsNone(token_data.audit_chain_id)
def test_token_model_unknown(self):
self.assertRaises(exception.UnsupportedTokenVersionException,
token_model.KeystoneToken,
@ -255,10 +151,6 @@ class TestKeystoneTokenModel(core.TestCase):
self.v2_sample_token['access']['domain'] = domain
self.v3_sample_token['token']['domain'] = domain
# V2 Tokens Cannot be domain scoped, this should work
token_model.KeystoneToken(token_id=uuid.uuid4().hex,
token_data=self.v2_sample_token)
self.assertRaises(exception.UnexpectedError,
token_model.KeystoneToken,
token_id=uuid.uuid4().hex,

View File

@ -179,11 +179,20 @@ class Auth(controller.V2Controller):
v3_token_data = self.token_provider_api.validate_v3_token(
old_token
)
# NOTE(lbragstad): Even though we are not using the v2.0 token
# reference after we translate it in v3_to_v2_token(), we still
# need to perform that check. We have to do this because
# v3_to_v2_token will ensure we don't use specific tokens only
# attainable via v3 to get new tokens on v2.0. For example, an
# exception would be raised if we passed a federated token to
# v3_to_v2_token, because federated tokens aren't supported by
# v2.0 (the same applies to OAuth tokens, domain-scoped tokens,
# etc..).
v2_helper = providers.common.V2TokenDataHelper()
v2_token = v2_helper.v3_to_v2_token(v3_token_data, old_token)
v2_helper.v3_to_v2_token(v3_token_data, old_token)
token_model_ref = token_model.KeystoneToken(
token_id=old_token,
token_data=v2_token
token_data=v3_token_data
)
except exception.NotFound as e:
raise exception.Unauthorized(e)