diff --git a/keystone/common/authorization.py b/keystone/common/authorization.py index 6dc74356df..11d0d791ec 100644 --- a/keystone/common/authorization.py +++ b/keystone/common/authorization.py @@ -67,7 +67,7 @@ def is_v3_token(token): def v3_token_to_auth_context(token): - creds = {} + creds = {'is_delegated_auth': False} token_data = token['token'] try: creds['user_id'] = token_data['user']['id'] @@ -87,11 +87,31 @@ def v3_token_to_auth_context(token): creds['group_ids'] = [ g['id'] for g in token_data['user'].get(federation.FEDERATION, {}).get( 'groups', [])] + + trust = token_data.get('OS-TRUST:trust') + if trust is None: + creds['trust_id'] = None + creds['trustor_id'] = None + creds['trustee_id'] = None + else: + creds['trust_id'] = trust['id'] + creds['trustor_id'] = trust['trustor_user']['id'] + creds['trustee_id'] = trust['trustee_user']['id'] + creds['is_delegated_auth'] = True + + oauth1 = token_data.get('OS-OAUTH1') + if oauth1 is None: + creds['consumer_id'] = None + creds['access_token_id'] = None + else: + creds['consumer_id'] = oauth1['consumer_id'] + creds['access_token_id'] = oauth1['access_token_id'] + creds['is_delegated_auth'] = True return creds def v2_token_to_auth_context(token): - creds = {} + creds = {'is_delegated_auth': False} token_data = token['access'] try: creds['user_id'] = token_data['user']['id'] @@ -105,6 +125,18 @@ def v2_token_to_auth_context(token): if 'roles' in token_data['user']: creds['roles'] = [role['name'] for role in token_data['user']['roles']] + + trust = token_data.get('trust') + if trust is None: + creds['trust_id'] = None + creds['trustor_id'] = None + creds['trustee_id'] = None + else: + creds['trust_id'] = trust.get('id') + creds['trustor_id'] = trust.get('trustor_id') + creds['trustee_id'] = trust.get('trustee_id') + creds['is_delegated_auth'] = True + return creds diff --git a/keystone/contrib/oauth1/controllers.py b/keystone/contrib/oauth1/controllers.py index 2c938ba485..a185e4fc70 100644 --- a/keystone/contrib/oauth1/controllers.py +++ b/keystone/contrib/oauth1/controllers.py @@ -95,6 +95,12 @@ class AccessTokenCrudV3(controller.V3Controller): @controller.protected() def list_access_tokens(self, context, user_id): + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot list request tokens' + ' with a token issued via delegation.')) refs = self.oauth_api.list_access_tokens(user_id) formatted_refs = ([self._format_token_entity(context, x) for x in refs]) @@ -310,6 +316,12 @@ class OAuthControllerV3(controller.V3Controller): there is not another easy way to make sure the user knows which roles are being requested before authorizing. """ + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot authorize a request token' + ' with a token issued via delegation.')) req_token = self.oauth_api.get_request_token(request_token_id) diff --git a/keystone/tests/test_v3_auth.py b/keystone/tests/test_v3_auth.py index 5de7e02f56..8a27a38b2c 100644 --- a/keystone/tests/test_v3_auth.py +++ b/keystone/tests/test_v3_auth.py @@ -2777,6 +2777,42 @@ class TestTrustAuth(TestAuthInfo): self.assertEqual(r.result['token']['project']['name'], self.project['name']) + def test_impersonation_token_cannot_create_new_trust(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + + trust_token = r.headers['X-Subject-Token'] + + # Build second trust + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + self.post('/OS-TRUST/trusts', + body={'trust': ref}, + token=trust_token, + expected_status=403) + def assertTrustTokensRevoked(self, trust_id): revocation_response = self.get('/OS-REVOKE/events', expected_status=200) diff --git a/keystone/tests/test_v3_oauth1.py b/keystone/tests/test_v3_oauth1.py index b653855d9a..d993889afa 100644 --- a/keystone/tests/test_v3_oauth1.py +++ b/keystone/tests/test_v3_oauth1.py @@ -13,6 +13,8 @@ # under the License. import copy +import os +import tempfile import uuid from six.moves import urllib @@ -26,6 +28,7 @@ from keystone.contrib.oauth1 import controllers from keystone import exception from keystone.openstack.common.db.sqlalchemy import migration from keystone.openstack.common import importutils +from keystone.openstack.common import jsonutils from keystone.tests import test_v3 @@ -486,6 +489,100 @@ class AuthTokenTests(OAuthFlowTests): self.assertRaises(exception.TokenNotFound, self.token_api.get_token, self.keystone_token_id) + def _create_trust_get_token(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + + trust_token = r.headers['X-Subject-Token'] + return trust_token + + def _approve_request_token_url(self): + consumer = self._create_single_consumer() + consumer_id = consumer['id'] + consumer_secret = consumer['secret'] + self.consumer = {'key': consumer_id, 'secret': consumer_secret} + self.assertIsNotNone(self.consumer['secret']) + + url, headers = self._create_request_token(self.consumer, + self.project_id) + content = self.post(url, headers=headers) + credentials = urllib.parse.parse_qs(content.result) + request_key = credentials['oauth_token'][0] + request_secret = credentials['oauth_token_secret'][0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + + return url + + def test_oauth_token_cannot_create_new_trust(self): + self.test_oauth_flow() + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + self.post('/OS-TRUST/trusts', + body={'trust': ref}, + token=self.keystone_token_id, + expected_status=403) + + def test_oauth_token_cannot_authorize_request_token(self): + self.test_oauth_flow() + url = self._approve_request_token_url() + body = {'roles': [{'id': self.role_id}]} + self.put(url, body=body, token=self.keystone_token_id, + expected_status=403) + + def test_oauth_token_cannot_list_request_tokens(self): + self._set_policy({"identity:list_access_tokens": [], + "identity:create_consumer": [], + "identity:authorize_request_token": []}) + self.test_oauth_flow() + url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id + self.get(url, token=self.keystone_token_id, + expected_status=403) + + def _set_policy(self, new_policy): + _unused, self.tmpfilename = tempfile.mkstemp() + self.config_fixture.config(policy_file=self.tmpfilename) + with open(self.tmpfilename, "w") as policyfile: + policyfile.write(jsonutils.dumps(new_policy)) + self.addCleanup(os.remove, self.tmpfilename) + + def test_trust_token_cannot_authorize_request_token(self): + trust_token = self._create_trust_get_token() + url = self._approve_request_token_url() + body = {'roles': [{'id': self.role_id}]} + self.put(url, body=body, token=trust_token, expected_status=403) + + def test_trust_token_cannot_list_request_tokens(self): + self._set_policy({"identity:list_access_tokens": [], + "identity:create_trust": []}) + trust_token = self._create_trust_get_token() + url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id + self.get(url, token=trust_token, expected_status=403) + class MaliciousOAuth1Tests(OAuth1Tests): diff --git a/keystone/trust/controllers.py b/keystone/trust/controllers.py index cc3cc1f227..552db441b5 100644 --- a/keystone/trust/controllers.py +++ b/keystone/trust/controllers.py @@ -132,6 +132,15 @@ class TrustV3(controller.V3Controller): # TODO(ayoung): instead of raising ValidationError on the first # problem, return a collection of all the problems. + + # Explicitly prevent a trust token from creating a new trust. + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot create a trust' + ' with a token issued via delegation.')) + if not trust: raise exception.ValidationError(attribute='trust', target='request')