diff --git a/keystone/common/controller.py b/keystone/common/controller.py index faadc09d5d..d6c33dfae4 100644 --- a/keystone/common/controller.py +++ b/keystone/common/controller.py @@ -44,7 +44,7 @@ def _build_policy_check_credentials(self, action, context, kwargs): # it would otherwise need to reload the token_ref from backing store. wsgi.validate_token_bind(context, token_ref) - creds = {} + creds = {'is_delegated_auth': False} if 'token_data' in token_ref and 'token' in token_ref['token_data']: #V3 Tokens token_data = token_ref['token_data']['token'] @@ -66,9 +66,32 @@ def _build_policy_check_credentials(self, action, context, kwargs): creds['roles'] = [] for role in token_data['roles']: creds['roles'].append(role['name']) + + trust = token_data.get('OS-TRUST:trust') + if trust is None: + creds['trust_id'] = None + creds['trustor_id'] = None + creds['trustee_id'] = None + else: + creds['trust_id'] = trust['id'] + creds['trustor_id'] = trust['trustor_user']['id'] + creds['trustee_id'] = trust['trustee_user']['id'] + creds['is_delegated_auth'] = True + + oauth1 = token_data.get('OS-OAUTH1') + if oauth1 is None: + creds['consumer_id'] = None + creds['access_token_id'] = None + else: + creds['consumer_id'] = oauth1['consumer_id'] + creds['access_token_id'] = oauth1['access_token_id'] + creds['is_delegated_auth'] = True + else: #v2 Tokens creds = token_ref.get('metadata', {}).copy() + creds['is_delegated_auth'] = False + try: creds['user_id'] = token_ref['user'].get('id') except AttributeError: @@ -81,6 +104,16 @@ def _build_policy_check_credentials(self, action, context, kwargs): # NOTE(vish): this is pretty inefficient creds['roles'] = [self.identity_api.get_role(role)['name'] for role in creds.get('roles', [])] + trust = token_ref.get('trust') + if trust is None: + creds['trust_id'] = None + creds['trustor_id'] = None + creds['trustee_id'] = None + else: + creds['trust_id'] = trust.get('id') + creds['trustor_id'] = trust.get('trustor_id') + creds['trustee_id'] = trust.get('trustee_id') + creds['is_delegated_auth'] = True return creds @@ -155,6 +188,7 @@ def protected(callback=None): policy_dict.update(kwargs) self.policy_api.enforce(creds, action, flatten(policy_dict)) LOG.debug(_('RBAC: Authorization granted')) + context['environment'] = {'KEYSTONE_AUTH_CONTEXT': creds} return f(self, context, *args, **kwargs) return inner return wrapper diff --git a/keystone/contrib/oauth1/controllers.py b/keystone/contrib/oauth1/controllers.py index b8c2441928..d4024dffa3 100644 --- a/keystone/contrib/oauth1/controllers.py +++ b/keystone/contrib/oauth1/controllers.py @@ -86,6 +86,12 @@ class AccessTokenCrudV3(controller.V3Controller): @controller.protected() def list_access_tokens(self, context, user_id): + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot list request tokens' + ' with a token issued via delegation.')) refs = self.oauth_api.list_access_tokens(user_id) formatted_refs = ([self._format_token_entity(x) for x in refs]) return AccessTokenCrudV3.wrap_collection(context, formatted_refs) @@ -314,6 +320,12 @@ class OAuthControllerV3(controller.V3Controller): there is not another easy way to make sure the user knows which roles are being requested before authorizing. """ + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot authorize a request token' + ' with a token issued via delegation.')) req_token = self.oauth_api.get_request_token(request_token_id) diff --git a/keystone/tests/test_v3_auth.py b/keystone/tests/test_v3_auth.py index e89e29f327..f3e3ace301 100644 --- a/keystone/tests/test_v3_auth.py +++ b/keystone/tests/test_v3_auth.py @@ -2150,6 +2150,68 @@ class TestTrustAuth(TestAuthInfo): self.assertEqual(r.result['token']['project']['name'], self.project['name']) + def test_impersonation_token_cannot_create_new_trust(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + + trust_token = r.headers['X-Subject-Token'] + + # Build second trust + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + self.post('/OS-TRUST/trusts', + body={'trust': ref}, + token=trust_token, + expected_status=403) + + def test_delete_trust_revokes_tokens(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + trust_id = trust['id'] + auth_data = self.build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust_id) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectTrustScopedTokenResponse( + r, self.trustee_user) + trust_token = r.headers['X-Subject-Token'] + self.delete('/OS-TRUST/trusts/%(trust_id)s' % { + 'trust_id': trust_id}, + expected_status=204) + headers = {'X-Subject-Token': trust_token} + self.head('/auth/tokens', headers=headers, expected_status=404) + def test_delete_trust(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, diff --git a/keystone/tests/test_v3_oauth1.py b/keystone/tests/test_v3_oauth1.py index 73a34d7221..a83c86e49a 100644 --- a/keystone/tests/test_v3_oauth1.py +++ b/keystone/tests/test_v3_oauth1.py @@ -16,6 +16,7 @@ import copy import os +import tempfile import urlparse import uuid @@ -26,6 +27,8 @@ from keystone import contrib from keystone.contrib import oauth1 from keystone.contrib.oauth1 import controllers from keystone.openstack.common import importutils +from keystone.openstack.common import jsonutils +from keystone.policy.backends import rules from keystone.tests import test_v3 @@ -447,6 +450,101 @@ class AuthTokenTests(OAuthFlowTests): self.assertTrue(len(tokens) > 0) self.assertTrue(keystone_token_uuid in tokens) + def _create_trust_get_token(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + + trust_token = r.headers['X-Subject-Token'] + return trust_token + + def _approve_request_token_url(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + self.consumer = oauth1.Consumer(consumer_id, consumer_secret) + self.assertIsNotNone(self.consumer.key) + + url, headers = self._create_request_token(self.consumer, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + + return url + + def test_oauth_token_cannot_create_new_trust(self): + self.test_oauth_flow() + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + self.post('/OS-TRUST/trusts', + body={'trust': ref}, + token=self.keystone_token_id, + expected_status=403) + + def test_oauth_token_cannot_authorize_request_token(self): + self.test_oauth_flow() + url = self._approve_request_token_url() + body = {'roles': [{'id': self.role_id}]} + self.put(url, body=body, token=self.keystone_token_id, + expected_status=403) + + def test_oauth_token_cannot_list_request_tokens(self): + self._set_policy({"identity:list_access_tokens": [], + "identity:create_consumer": [], + "identity:authorize_request_token": []}) + self.test_oauth_flow() + url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id + self.get(url, token=self.keystone_token_id, + expected_status=403) + + def _set_policy(self, new_policy): + _unused, self.tmpfilename = tempfile.mkstemp() + rules.reset() + self.opt(policy_file=self.tmpfilename) + with open(self.tmpfilename, "w") as policyfile: + policyfile.write(jsonutils.dumps(new_policy)) + self.addCleanup(os.remove, self.tmpfilename) + + def test_trust_token_cannot_authorize_request_token(self): + trust_token = self._create_trust_get_token() + url = self._approve_request_token_url() + body = {'roles': [{'id': self.role_id}]} + self.put(url, body=body, token=trust_token, expected_status=403) + + def test_trust_token_cannot_list_request_tokens(self): + self._set_policy({"identity:list_access_tokens": [], + "identity:create_trust": []}) + trust_token = self._create_trust_get_token() + url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id + self.get(url, token=trust_token, expected_status=403) + class MaliciousOAuth1Tests(OAuth1Tests): diff --git a/keystone/trust/controllers.py b/keystone/trust/controllers.py index 1d54f51a70..7fdc8c29dc 100644 --- a/keystone/trust/controllers.py +++ b/keystone/trust/controllers.py @@ -144,6 +144,15 @@ class TrustV3(controller.V3Controller): # TODO(ayoung): instead of raising ValidationError on the first # problem, return a collection of all the problems. + + # Explicitly prevent a trust token from creating a new trust. + auth_context = context.get('environment', + {}).get('KEYSTONE_AUTH_CONTEXT', {}) + if auth_context.get('is_delegated_auth'): + raise exception.Forbidden( + _('Cannot create a trust' + ' with a token issued via delegation.')) + if not trust: raise exception.ValidationError(attribute='trust', target='request')