Block delegation escalation of privilege

Forbids doing the following with either a trust
  or oauth based token:
  creating a trust
  approving a request_token
  listing request tokens

Change-Id: I1528f9dd003f5e03cbc50b78e1b32dbbf85ffcc2
Closes-Bug:  1324592
This commit is contained in:
Adam Young 2014-05-29 13:56:17 -04:00
parent 2b2c9fbd48
commit 73785122ee
5 changed files with 188 additions and 2 deletions

View File

@ -67,7 +67,7 @@ def is_v3_token(token):
def v3_token_to_auth_context(token):
creds = {}
creds = {'is_delegated_auth': False}
token_data = token['token']
try:
creds['user_id'] = token_data['user']['id']
@ -87,11 +87,31 @@ def v3_token_to_auth_context(token):
creds['group_ids'] = [
g['id'] for g in token_data['user'].get(federation.FEDERATION, {}).get(
'groups', [])]
trust = token_data.get('OS-TRUST:trust')
if trust is None:
creds['trust_id'] = None
creds['trustor_id'] = None
creds['trustee_id'] = None
else:
creds['trust_id'] = trust['id']
creds['trustor_id'] = trust['trustor_user']['id']
creds['trustee_id'] = trust['trustee_user']['id']
creds['is_delegated_auth'] = True
oauth1 = token_data.get('OS-OAUTH1')
if oauth1 is None:
creds['consumer_id'] = None
creds['access_token_id'] = None
else:
creds['consumer_id'] = oauth1['consumer_id']
creds['access_token_id'] = oauth1['access_token_id']
creds['is_delegated_auth'] = True
return creds
def v2_token_to_auth_context(token):
creds = {}
creds = {'is_delegated_auth': False}
token_data = token['access']
try:
creds['user_id'] = token_data['user']['id']
@ -105,6 +125,18 @@ def v2_token_to_auth_context(token):
if 'roles' in token_data['user']:
creds['roles'] = [role['name'] for
role in token_data['user']['roles']]
trust = token_data.get('trust')
if trust is None:
creds['trust_id'] = None
creds['trustor_id'] = None
creds['trustee_id'] = None
else:
creds['trust_id'] = trust.get('id')
creds['trustor_id'] = trust.get('trustor_id')
creds['trustee_id'] = trust.get('trustee_id')
creds['is_delegated_auth'] = True
return creds

View File

@ -95,6 +95,12 @@ class AccessTokenCrudV3(controller.V3Controller):
@controller.protected()
def list_access_tokens(self, context, user_id):
auth_context = context.get('environment',
{}).get('KEYSTONE_AUTH_CONTEXT', {})
if auth_context.get('is_delegated_auth'):
raise exception.Forbidden(
_('Cannot list request tokens'
' with a token issued via delegation.'))
refs = self.oauth_api.list_access_tokens(user_id)
formatted_refs = ([self._format_token_entity(context, x)
for x in refs])
@ -310,6 +316,12 @@ class OAuthControllerV3(controller.V3Controller):
there is not another easy way to make sure the user knows which roles
are being requested before authorizing.
"""
auth_context = context.get('environment',
{}).get('KEYSTONE_AUTH_CONTEXT', {})
if auth_context.get('is_delegated_auth'):
raise exception.Forbidden(
_('Cannot authorize a request token'
' with a token issued via delegation.'))
req_token = self.oauth_api.get_request_token(request_token_id)

View File

@ -2777,6 +2777,42 @@ class TestTrustAuth(TestAuthInfo):
self.assertEqual(r.result['token']['project']['name'],
self.project['name'])
def test_impersonation_token_cannot_create_new_trust(self):
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
trust_token = r.headers['X-Subject-Token']
# Build second trust
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user_id,
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=trust_token,
expected_status=403)
def assertTrustTokensRevoked(self, trust_id):
revocation_response = self.get('/OS-REVOKE/events',
expected_status=200)

View File

@ -13,6 +13,8 @@
# under the License.
import copy
import os
import tempfile
import uuid
from six.moves import urllib
@ -26,6 +28,7 @@ from keystone.contrib.oauth1 import controllers
from keystone import exception
from keystone.openstack.common.db.sqlalchemy import migration
from keystone.openstack.common import importutils
from keystone.openstack.common import jsonutils
from keystone.tests import test_v3
@ -486,6 +489,100 @@ class AuthTokenTests(OAuthFlowTests):
self.assertRaises(exception.TokenNotFound, self.token_api.get_token,
self.keystone_token_id)
def _create_trust_get_token(self):
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.user_id,
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
trust = self.assertValidTrustResponse(r)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
trust_id=trust['id'])
r = self.post('/auth/tokens', body=auth_data)
trust_token = r.headers['X-Subject-Token']
return trust_token
def _approve_request_token_url(self):
consumer = self._create_single_consumer()
consumer_id = consumer['id']
consumer_secret = consumer['secret']
self.consumer = {'key': consumer_id, 'secret': consumer_secret}
self.assertIsNotNone(self.consumer['secret'])
url, headers = self._create_request_token(self.consumer,
self.project_id)
content = self.post(url, headers=headers)
credentials = urllib.parse.parse_qs(content.result)
request_key = credentials['oauth_token'][0]
request_secret = credentials['oauth_token_secret'][0]
self.request_token = oauth1.Token(request_key, request_secret)
self.assertIsNotNone(self.request_token.key)
url = self._authorize_request_token(request_key)
return url
def test_oauth_token_cannot_create_new_trust(self):
self.test_oauth_flow()
ref = self.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.user_id,
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
role_ids=[self.role_id])
del ref['id']
self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=self.keystone_token_id,
expected_status=403)
def test_oauth_token_cannot_authorize_request_token(self):
self.test_oauth_flow()
url = self._approve_request_token_url()
body = {'roles': [{'id': self.role_id}]}
self.put(url, body=body, token=self.keystone_token_id,
expected_status=403)
def test_oauth_token_cannot_list_request_tokens(self):
self._set_policy({"identity:list_access_tokens": [],
"identity:create_consumer": [],
"identity:authorize_request_token": []})
self.test_oauth_flow()
url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
self.get(url, token=self.keystone_token_id,
expected_status=403)
def _set_policy(self, new_policy):
_unused, self.tmpfilename = tempfile.mkstemp()
self.config_fixture.config(policy_file=self.tmpfilename)
with open(self.tmpfilename, "w") as policyfile:
policyfile.write(jsonutils.dumps(new_policy))
self.addCleanup(os.remove, self.tmpfilename)
def test_trust_token_cannot_authorize_request_token(self):
trust_token = self._create_trust_get_token()
url = self._approve_request_token_url()
body = {'roles': [{'id': self.role_id}]}
self.put(url, body=body, token=trust_token, expected_status=403)
def test_trust_token_cannot_list_request_tokens(self):
self._set_policy({"identity:list_access_tokens": [],
"identity:create_trust": []})
trust_token = self._create_trust_get_token()
url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
self.get(url, token=trust_token, expected_status=403)
class MaliciousOAuth1Tests(OAuth1Tests):

View File

@ -132,6 +132,15 @@ class TrustV3(controller.V3Controller):
# TODO(ayoung): instead of raising ValidationError on the first
# problem, return a collection of all the problems.
# Explicitly prevent a trust token from creating a new trust.
auth_context = context.get('environment',
{}).get('KEYSTONE_AUTH_CONTEXT', {})
if auth_context.get('is_delegated_auth'):
raise exception.Forbidden(
_('Cannot create a trust'
' with a token issued via delegation.'))
if not trust:
raise exception.ValidationError(attribute='trust',
target='request')