Make all token provider behave the same with trusts

Change-Id: I0a4fefe34a0c6912200d256e7bc3cbef66b34a16
This commit is contained in:
Lance Bragstad 2016-08-03 17:28:35 +00:00
parent 7a160c2589
commit 3efd271fbc
5 changed files with 106 additions and 66 deletions

View File

@ -240,15 +240,15 @@ def build_token_values_v2(access, default_domain_id):
token_values['identity_domain_id'] = default_domain_id
token_values['assignment_domain_id'] = default_domain_id
trust = token_data.get('trust')
trust = access.get('trust')
if trust is None:
token_values['trust_id'] = None
token_values['trustor_id'] = None
token_values['trustee_id'] = None
else:
token_values['trust_id'] = trust['id']
token_values['trustor_id'] = trust['trustor_id']
token_values['trustee_id'] = trust['trustee_id']
token_values['trustor_id'] = trust['trustor_user_id']
token_values['trustee_id'] = trust['trustee_user_id']
token_values['consumer_id'] = None
token_values['access_token_id'] = None

View File

@ -1114,6 +1114,13 @@ class AuthWithTrust(object):
self.make_request(), v3_req_with_trust)
return token_auth_response
def test_validate_v3_trust_scoped_token_against_v2_succeeds(self):
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
auth_response = self.fetch_v3_token_from_trust(new_trust, self.trustee)
trust_token = auth_response.headers['X-Subject-Token']
self.controller.validate_token(self.make_request(is_admin=True),
trust_token)
def test_create_v3_token_from_trust(self):
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
auth_response = self.fetch_v3_token_from_trust(new_trust, self.trustee)
@ -1176,27 +1183,44 @@ class AuthWithTrust(object):
request_body = _build_user_auth(token={'id': trust_token_id},
tenant_id=self.tenant_bar['id'])
self.assertRaises(
exception.Unauthorized,
exception.Forbidden,
self.controller.authenticate, self.make_request(), request_body)
def test_delete_trust_revokes_token(self):
# NOTE(lbragstad): This test doens't really make much sense because we
# can't validate trust-scoped tokens against the v2.0 API.
unscoped_token = self.get_unscoped_token(self.trustor['name'])
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
request = self._create_auth_request(
unscoped_token['access']['token']['id'])
self.fetch_v2_token_from_trust(new_trust)
trust_id = new_trust['id']
tokens = self.token_provider_api._persistence._list_tokens(
self.trustor['id'],
trust_id=trust_id)
self.assertEqual(1, len(tokens))
self.trust_controller.delete_trust(request, trust_id=trust_id)
tokens = self.token_provider_api._persistence._list_tokens(
self.trustor['id'],
trust_id=trust_id)
self.assertEqual(0, len(tokens))
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_time:
# NOTE(lbragstad): The freezegun package will attempt to patch all
# things in Python that issue a time. In some cases, by the time a
# test gets into the context manager of freezegun, the
# oslo_utils.timeutils package could have an unpatched version of
# whatever it gets it's time from. Here we are going to pull out
# our big hammer and use both freezegun and oslo_utils
# set_time_override function to make sure that any datetimes
# keystone asks for are under control of the context manager. If we
# don't do this, we could end up with situations where timeutils
# can give unpatched datetimes outside of the context we are
# expecting, which leads to debugging frustrating race conditions.
timeutils.set_time_override(frozen_time.time_to_freeze)
unscoped_token = self.get_unscoped_token(self.trustor['name'])
new_trust = self.create_trust(self.sample_data,
self.trustor['name'])
request = self._create_auth_request(
unscoped_token['access']['token']['id'])
trust_token_resp = self.fetch_v2_token_from_trust(new_trust)
trust_scoped_token_id = trust_token_resp['access']['token']['id']
self.controller.validate_token(
self.make_request(is_admin=True),
token_id=trust_scoped_token_id
)
trust_id = new_trust['id']
frozen_time.tick(delta=datetime.timedelta(seconds=1))
self.trust_controller.delete_trust(request, trust_id=trust_id)
self.assertRaises(
exception.TokenNotFound,
self.controller.validate_token,
self.make_request(is_admin=True),
token_id=trust_scoped_token_id
)
def test_token_from_trust_with_no_role_fails(self):
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
@ -1285,6 +1309,13 @@ class AuthWithTrust(object):
exception.Unauthorized,
self.controller.authenticate, self.make_request(), request_body)
def test_validate_trust_scoped_token_against_v2(self):
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
trust_token_resp = self.fetch_v2_token_from_trust(new_trust)
trust_scoped_token_id = trust_token_resp['access']['token']['id']
self.controller.validate_token(self.make_request(is_admin=True),
token_id=trust_scoped_token_id)
class UUIDAuthWithTrust(AuthWithTrust, AuthTest):
@ -1312,36 +1343,20 @@ class FernetAuthWithTrust(AuthWithTrust, AuthTest):
msg = 'The Fernet token provider does not support token persistence'
self.skipTest(msg)
def test_delete_trust_revokes_token(self):
# NOTE(lbragstad): This test doens't really make much sense because we
# can't validate trust-scoped tokens against the v2.0 API. This was
# originally validating that UUID tokens were removed from the backend
# when a trust was deleted. Fernet tokens aren't persisted in the
# backend, so I guess the equivalent test through the API is to make
# sure a trust-scoped token isn't valid after a trust is deleted.
unscoped_token = self.get_unscoped_token(self.trustor['name'])
new_trust = self.create_trust(self.sample_data, self.trustor['name'])
request = self._create_auth_request(
unscoped_token['access']['token']['id'])
trust_token_resp = self.fetch_v2_token_from_trust(new_trust)
trust_scoped_token_id = trust_token_resp['access']['token']['id']
# TODO(lbragstad): Make this a valid operation in the future?
self.assertRaises(
exception.Unauthorized,
self.controller.validate_token,
self.make_request(is_admin=True),
token_id=trust_scoped_token_id
)
trust_id = new_trust['id']
self.trust_controller.delete_trust(request, trust_id=trust_id)
self.assertRaises(
exception.Unauthorized,
self.controller.validate_token,
self.make_request(is_admin=True),
token_id=trust_scoped_token_id
)
def test_trust_get_token_fails_if_trustee_disabled(self):
# NOTE(lbragtad) But why does the Fernet token provider behave
# differently than the UUID provider?!
# I'm so happy you asked! It turns out that the v2.0 token controllers
# actually assert that the actors of a trust are enabled. If they
# aren't enabled, the controller will raise a Forbidden exception. This
# is exactly what happens in the Fernet case. The UUID token provider
# will fail to find a token after a user has been disabled because the
# token provider registers a callback to prune all tokens for a user
# when a user is disabled. The inconsistency is that the v2.0 token
# controller will except a TokenNotFound exception and raise an
# Unauthorized in it's place. This explains why this is inconsistent
# API behavior for the same test depending on which token provider is
# configured.
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_time:
new_trust = self.create_trust(self.sample_data,

View File

@ -1187,9 +1187,9 @@ class TokenAPITests(object):
self.token_provider_api.validate_token,
trust_scoped_token)
def test_v2_validate_trust_scoped_token(self):
# Test that validating an trust scoped token in v2.0 returns
# unauthorized.
def test_validate_trust_token_on_v2_fails_outside_default_domain(self):
# NOTE(lbragstad): This fails validation against the v2.0 API because
# the actors of the trust are not within the default domain.
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
self.assertRaises(exception.Unauthorized,

View File

@ -246,10 +246,10 @@ class Manager(manager.Manager):
# that makes sense for the request.
v3_token_ref = self.validate_non_persistent_token(token_id)
v2_token_data_helper = providers.common.V2TokenDataHelper()
token = v2_token_data_helper.v3_to_v2_token(v3_token_ref)
token = v2_token_data_helper.v3_to_v2_token(v3_token_ref,
token_id=token_id)
# these are common things that happen regardless of token provider
token['access']['token']['id'] = token_id
self._token_belongs_to(token, belongs_to)
self._is_valid_token(token)
return token

View File

@ -32,11 +32,12 @@ LOG = log.getLogger(__name__)
CONF = keystone.conf.CONF
@dependency.requires('catalog_api', 'resource_api', 'assignment_api')
@dependency.requires('catalog_api', 'resource_api', 'assignment_api',
'trust_api', 'identity_api')
class V2TokenDataHelper(object):
"""Create V2 token data."""
def v3_to_v2_token(self, v3_token_data):
def v3_to_v2_token(self, v3_token_data, token_id=None):
"""Convert v3 token data into v2.0 token data.
This method expects a dictionary generated from
@ -65,6 +66,7 @@ class V2TokenDataHelper(object):
token['expires'] = v3_token.get('expires_at')
token['issued_at'] = v3_token.get('issued_at')
token['audit_ids'] = v3_token.get('audit_ids')
token['id'] = token_id
if 'project' in v3_token:
# v3 token_data does not contain all tenant attributes
@ -81,9 +83,38 @@ class V2TokenDataHelper(object):
user = common_controller.V2Controller.v3_to_v2_user(v3_user)
if 'OS-TRUST:trust' in v3_token:
msg = ('Unable to validate trust-scoped tokens using version v2.0 '
'API.')
raise exception.Unauthorized(msg)
v3_trust = v3_token['OS-TRUST:trust']
# if token is scoped to trust, both trustor and trustee must
# be in the default domain. Furthermore, the delegated project
# must also be in the default domain
msg = _('Non-default domain is not supported')
if CONF.trust.enabled:
try:
trust_ref = self.trust_api.get_trust(v3_trust['id'])
except exception.TrustNotFound:
raise exception.TokenNotFound(token_id=token_id)
trustee_user_ref = self.identity_api.get_user(
trust_ref['trustee_user_id'])
if (trustee_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
trustor_user_ref = self.identity_api.get_user(
trust_ref['trustor_user_id'])
if (trustor_user_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
project_ref = self.resource_api.get_project(
trust_ref['project_id'])
if (project_ref['domain_id'] !=
CONF.identity.default_domain_id):
raise exception.Unauthorized(msg)
token_data['trust'] = {
'impersonation': v3_trust['impersonation'],
'id': v3_trust['id'],
'trustee_user_id': v3_trust['trustee_user']['id'],
'trustor_user_id': v3_trust['trustor_user']['id']
}
if 'OS-OAUTH1' in v3_token:
msg = ('Unable to validate Oauth tokens using the version v2.0 '
@ -751,12 +782,6 @@ class BaseProvider(provider.Provider):
token_data = self.v2_token_data_helper.v3_to_v2_token(
token_data)
trust_id = token_data['access'].get('trust', {}).get('id')
if trust_id:
msg = ('Unable to validate trust-scoped tokens using version '
'v2.0 API.')
raise exception.Unauthorized(msg)
return token_data
except exception.ValidationError:
LOG.exception(_LE('Failed to validate token'))