Block delegation escalation of privilege
Forbids doing the following with either a trust or oauth based token: creating a trust approving a request_token listing request tokens Change-Id: I1528f9dd003f5e03cbc50b78e1b32dbbf85ffcc2 Closes-Bug: 1324592
This commit is contained in:
parent
98cb9ae1cd
commit
9162837329
|
@ -44,7 +44,7 @@ def _build_policy_check_credentials(self, action, context, kwargs):
|
|||
# it would otherwise need to reload the token_ref from backing store.
|
||||
wsgi.validate_token_bind(context, token_ref)
|
||||
|
||||
creds = {}
|
||||
creds = {'is_delegated_auth': False}
|
||||
if 'token_data' in token_ref and 'token' in token_ref['token_data']:
|
||||
#V3 Tokens
|
||||
token_data = token_ref['token_data']['token']
|
||||
|
@ -66,9 +66,32 @@ def _build_policy_check_credentials(self, action, context, kwargs):
|
|||
creds['roles'] = []
|
||||
for role in token_data['roles']:
|
||||
creds['roles'].append(role['name'])
|
||||
|
||||
trust = token_data.get('OS-TRUST:trust')
|
||||
if trust is None:
|
||||
creds['trust_id'] = None
|
||||
creds['trustor_id'] = None
|
||||
creds['trustee_id'] = None
|
||||
else:
|
||||
creds['trust_id'] = trust['id']
|
||||
creds['trustor_id'] = trust['trustor_user']['id']
|
||||
creds['trustee_id'] = trust['trustee_user']['id']
|
||||
creds['is_delegated_auth'] = True
|
||||
|
||||
oauth1 = token_data.get('OS-OAUTH1')
|
||||
if oauth1 is None:
|
||||
creds['consumer_id'] = None
|
||||
creds['access_token_id'] = None
|
||||
else:
|
||||
creds['consumer_id'] = oauth1['consumer_id']
|
||||
creds['access_token_id'] = oauth1['access_token_id']
|
||||
creds['is_delegated_auth'] = True
|
||||
|
||||
else:
|
||||
#v2 Tokens
|
||||
creds = token_ref.get('metadata', {}).copy()
|
||||
creds['is_delegated_auth'] = False
|
||||
|
||||
try:
|
||||
creds['user_id'] = token_ref['user'].get('id')
|
||||
except AttributeError:
|
||||
|
@ -81,6 +104,16 @@ def _build_policy_check_credentials(self, action, context, kwargs):
|
|||
# NOTE(vish): this is pretty inefficient
|
||||
creds['roles'] = [self.identity_api.get_role(role)['name']
|
||||
for role in creds.get('roles', [])]
|
||||
trust = token_ref.get('trust')
|
||||
if trust is None:
|
||||
creds['trust_id'] = None
|
||||
creds['trustor_id'] = None
|
||||
creds['trustee_id'] = None
|
||||
else:
|
||||
creds['trust_id'] = trust.get('id')
|
||||
creds['trustor_id'] = trust.get('trustor_id')
|
||||
creds['trustee_id'] = trust.get('trustee_id')
|
||||
creds['is_delegated_auth'] = True
|
||||
|
||||
return creds
|
||||
|
||||
|
@ -155,6 +188,7 @@ def protected(callback=None):
|
|||
policy_dict.update(kwargs)
|
||||
self.policy_api.enforce(creds, action, flatten(policy_dict))
|
||||
LOG.debug(_('RBAC: Authorization granted'))
|
||||
context['environment'] = {'KEYSTONE_AUTH_CONTEXT': creds}
|
||||
return f(self, context, *args, **kwargs)
|
||||
return inner
|
||||
return wrapper
|
||||
|
|
|
@ -86,6 +86,12 @@ class AccessTokenCrudV3(controller.V3Controller):
|
|||
|
||||
@controller.protected()
|
||||
def list_access_tokens(self, context, user_id):
|
||||
auth_context = context.get('environment',
|
||||
{}).get('KEYSTONE_AUTH_CONTEXT', {})
|
||||
if auth_context.get('is_delegated_auth'):
|
||||
raise exception.Forbidden(
|
||||
_('Cannot list request tokens'
|
||||
' with a token issued via delegation.'))
|
||||
refs = self.oauth_api.list_access_tokens(user_id)
|
||||
formatted_refs = ([self._format_token_entity(x) for x in refs])
|
||||
return AccessTokenCrudV3.wrap_collection(context, formatted_refs)
|
||||
|
@ -314,6 +320,12 @@ class OAuthControllerV3(controller.V3Controller):
|
|||
there is not another easy way to make sure the user knows which roles
|
||||
are being requested before authorizing.
|
||||
"""
|
||||
auth_context = context.get('environment',
|
||||
{}).get('KEYSTONE_AUTH_CONTEXT', {})
|
||||
if auth_context.get('is_delegated_auth'):
|
||||
raise exception.Forbidden(
|
||||
_('Cannot authorize a request token'
|
||||
' with a token issued via delegation.'))
|
||||
|
||||
req_token = self.oauth_api.get_request_token(request_token_id)
|
||||
|
||||
|
|
|
@ -2150,6 +2150,68 @@ class TestTrustAuth(TestAuthInfo):
|
|||
self.assertEqual(r.result['token']['project']['name'],
|
||||
self.project['name'])
|
||||
|
||||
def test_impersonation_token_cannot_create_new_trust(self):
|
||||
ref = self.new_trust_ref(
|
||||
trustor_user_id=self.user_id,
|
||||
trustee_user_id=self.trustee_user_id,
|
||||
project_id=self.project_id,
|
||||
impersonation=True,
|
||||
expires=dict(minutes=1),
|
||||
role_ids=[self.role_id])
|
||||
del ref['id']
|
||||
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.trustee_user['id'],
|
||||
password=self.trustee_user['password'],
|
||||
trust_id=trust['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
|
||||
trust_token = r.headers['X-Subject-Token']
|
||||
|
||||
# Build second trust
|
||||
ref = self.new_trust_ref(
|
||||
trustor_user_id=self.user_id,
|
||||
trustee_user_id=self.trustee_user_id,
|
||||
project_id=self.project_id,
|
||||
impersonation=True,
|
||||
expires=dict(minutes=1),
|
||||
role_ids=[self.role_id])
|
||||
del ref['id']
|
||||
|
||||
self.post('/OS-TRUST/trusts',
|
||||
body={'trust': ref},
|
||||
token=trust_token,
|
||||
expected_status=403)
|
||||
|
||||
def test_delete_trust_revokes_tokens(self):
|
||||
ref = self.new_trust_ref(
|
||||
trustor_user_id=self.user_id,
|
||||
trustee_user_id=self.trustee_user_id,
|
||||
project_id=self.project_id,
|
||||
impersonation=False,
|
||||
expires=dict(minutes=1),
|
||||
role_ids=[self.role_id])
|
||||
del ref['id']
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
trust_id = trust['id']
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.trustee_user['id'],
|
||||
password=self.trustee_user['password'],
|
||||
trust_id=trust_id)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectTrustScopedTokenResponse(
|
||||
r, self.trustee_user)
|
||||
trust_token = r.headers['X-Subject-Token']
|
||||
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
|
||||
'trust_id': trust_id},
|
||||
expected_status=204)
|
||||
headers = {'X-Subject-Token': trust_token}
|
||||
self.head('/auth/tokens', headers=headers, expected_status=404)
|
||||
|
||||
def test_delete_trust(self):
|
||||
ref = self.new_trust_ref(
|
||||
trustor_user_id=self.user_id,
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
import copy
|
||||
import os
|
||||
import tempfile
|
||||
import urlparse
|
||||
import uuid
|
||||
|
||||
|
@ -26,6 +27,8 @@ from keystone import contrib
|
|||
from keystone.contrib import oauth1
|
||||
from keystone.contrib.oauth1 import controllers
|
||||
from keystone.openstack.common import importutils
|
||||
from keystone.openstack.common import jsonutils
|
||||
from keystone.policy.backends import rules
|
||||
from keystone.tests import test_v3
|
||||
|
||||
|
||||
|
@ -447,6 +450,101 @@ class AuthTokenTests(OAuthFlowTests):
|
|||
self.assertTrue(len(tokens) > 0)
|
||||
self.assertTrue(keystone_token_uuid in tokens)
|
||||
|
||||
def _create_trust_get_token(self):
|
||||
ref = self.new_trust_ref(
|
||||
trustor_user_id=self.user_id,
|
||||
trustee_user_id=self.user_id,
|
||||
project_id=self.project_id,
|
||||
impersonation=True,
|
||||
expires=dict(minutes=1),
|
||||
role_ids=[self.role_id])
|
||||
del ref['id']
|
||||
|
||||
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
|
||||
trust = self.assertValidTrustResponse(r)
|
||||
|
||||
auth_data = self.build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
trust_id=trust['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
|
||||
trust_token = r.headers['X-Subject-Token']
|
||||
return trust_token
|
||||
|
||||
def _approve_request_token_url(self):
|
||||
consumer = self._create_single_consumer()
|
||||
consumer_id = consumer.get('id')
|
||||
consumer_secret = consumer.get('secret')
|
||||
self.consumer = oauth1.Consumer(consumer_id, consumer_secret)
|
||||
self.assertIsNotNone(self.consumer.key)
|
||||
|
||||
url, headers = self._create_request_token(self.consumer,
|
||||
self.project_id)
|
||||
content = self.post(url, headers=headers)
|
||||
credentials = urlparse.parse_qs(content.result)
|
||||
request_key = credentials.get('oauth_token')[0]
|
||||
request_secret = credentials.get('oauth_token_secret')[0]
|
||||
self.request_token = oauth1.Token(request_key, request_secret)
|
||||
self.assertIsNotNone(self.request_token.key)
|
||||
|
||||
url = self._authorize_request_token(request_key)
|
||||
|
||||
return url
|
||||
|
||||
def test_oauth_token_cannot_create_new_trust(self):
|
||||
self.test_oauth_flow()
|
||||
ref = self.new_trust_ref(
|
||||
trustor_user_id=self.user_id,
|
||||
trustee_user_id=self.user_id,
|
||||
project_id=self.project_id,
|
||||
impersonation=True,
|
||||
expires=dict(minutes=1),
|
||||
role_ids=[self.role_id])
|
||||
del ref['id']
|
||||
|
||||
self.post('/OS-TRUST/trusts',
|
||||
body={'trust': ref},
|
||||
token=self.keystone_token_id,
|
||||
expected_status=403)
|
||||
|
||||
def test_oauth_token_cannot_authorize_request_token(self):
|
||||
self.test_oauth_flow()
|
||||
url = self._approve_request_token_url()
|
||||
body = {'roles': [{'id': self.role_id}]}
|
||||
self.put(url, body=body, token=self.keystone_token_id,
|
||||
expected_status=403)
|
||||
|
||||
def test_oauth_token_cannot_list_request_tokens(self):
|
||||
self._set_policy({"identity:list_access_tokens": [],
|
||||
"identity:create_consumer": [],
|
||||
"identity:authorize_request_token": []})
|
||||
self.test_oauth_flow()
|
||||
url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
|
||||
self.get(url, token=self.keystone_token_id,
|
||||
expected_status=403)
|
||||
|
||||
def _set_policy(self, new_policy):
|
||||
_unused, self.tmpfilename = tempfile.mkstemp()
|
||||
rules.reset()
|
||||
self.opt(policy_file=self.tmpfilename)
|
||||
with open(self.tmpfilename, "w") as policyfile:
|
||||
policyfile.write(jsonutils.dumps(new_policy))
|
||||
self.addCleanup(os.remove, self.tmpfilename)
|
||||
|
||||
def test_trust_token_cannot_authorize_request_token(self):
|
||||
trust_token = self._create_trust_get_token()
|
||||
url = self._approve_request_token_url()
|
||||
body = {'roles': [{'id': self.role_id}]}
|
||||
self.put(url, body=body, token=trust_token, expected_status=403)
|
||||
|
||||
def test_trust_token_cannot_list_request_tokens(self):
|
||||
self._set_policy({"identity:list_access_tokens": [],
|
||||
"identity:create_trust": []})
|
||||
trust_token = self._create_trust_get_token()
|
||||
url = '/users/%s/OS-OAUTH1/access_tokens' % self.user_id
|
||||
self.get(url, token=trust_token, expected_status=403)
|
||||
|
||||
|
||||
class MaliciousOAuth1Tests(OAuth1Tests):
|
||||
|
||||
|
|
|
@ -144,6 +144,15 @@ class TrustV3(controller.V3Controller):
|
|||
|
||||
# TODO(ayoung): instead of raising ValidationError on the first
|
||||
# problem, return a collection of all the problems.
|
||||
|
||||
# Explicitly prevent a trust token from creating a new trust.
|
||||
auth_context = context.get('environment',
|
||||
{}).get('KEYSTONE_AUTH_CONTEXT', {})
|
||||
if auth_context.get('is_delegated_auth'):
|
||||
raise exception.Forbidden(
|
||||
_('Cannot create a trust'
|
||||
' with a token issued via delegation.'))
|
||||
|
||||
if not trust:
|
||||
raise exception.ValidationError(attribute='trust',
|
||||
target='request')
|
||||
|
|
Loading…
Reference in New Issue