Fix issues handling trust tokens via ec2tokens API

Trust scoped tokens are handled incorectly when making requests
via the ec2tokens API, meaning that the restrictions enforced
by trust-scoped tokens are not respected when obtaining a token
via ec2token signature validation.

Storing the trust_id in the blob associated with the ec2 keypair,
and passing that id in the metadata when requesting a v2 token
solves the issue.

Change-Id: I52566384d7813ef0e2f20fb94a5076386457ff02
Closes-Bug: #1242597
This commit is contained in:
Steven Hardy 2013-10-21 19:49:01 +01:00 committed by Jeremy Stanley
parent 6c7f00d459
commit fa16882507
2 changed files with 63 additions and 6 deletions

View File

@ -107,6 +107,11 @@ class Ec2Controller(controller.V2Controller):
self.assignment_api.get_roles_for_user_and_project(
user_ref['id'], tenant_ref['id']))
trust_id = creds_ref.get('trust_id')
if trust_id:
metadata_ref['trust_id'] = trust_id
metadata_ref['trustee_user_id'] = user_ref['id']
# Validate that the auth info is valid and nothing is disabled
token.validate_auth_info(self, user_ref, tenant_ref)
@ -147,8 +152,10 @@ class Ec2Controller(controller.V2Controller):
self._assert_valid_user_id(user_id)
self._assert_valid_project_id(tenant_id)
trust_id = self._context_trust_id(context)
blob = {'access': uuid.uuid4().hex,
'secret': uuid.uuid4().hex}
'secret': uuid.uuid4().hex,
'trust_id': trust_id}
credential_id = utils.hash_access_key(blob['access'])
cred_ref = {'user_id': user_id,
'project_id': tenant_id,
@ -214,7 +221,8 @@ class Ec2Controller(controller.V2Controller):
return {'user_id': credential.get('user_id'),
'tenant_id': credential.get('project_id'),
'access': blob.get('access'),
'secret': blob.get('secret')}
'secret': blob.get('secret'),
'trust_id': blob.get('trust_id')}
def _get_credentials(self, credential_id):
"""Return credentials from an ID.
@ -245,6 +253,13 @@ class Ec2Controller(controller.V2Controller):
if token_ref['user'].get('id') != user_id:
raise exception.Forbidden(_('Token belongs to another user'))
def _context_trust_id(self, context):
try:
token_ref = self.token_api.get_token(context['token_id'])
except exception.TokenNotFound as e:
raise exception.Unauthorized(e)
return token_ref.get('trust_id')
def _is_admin(self, context):
"""Wrap admin assertion error return statement.

View File

@ -89,9 +89,11 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base):
self.assertRaises(client_exceptions.NotFound, client.endpoints.delete,
id=endpoint.id)
def _send_ec2_auth_request(self, credentials):
def _send_ec2_auth_request(self, credentials, client=None):
if not client:
client = self.default_client
url = '%s/ec2tokens' % self.default_client.auth_url
(resp, token) = self.default_client.request(
(resp, token) = client.request(
url=url, method='POST',
body={'credentials': credentials})
return resp, token
@ -100,9 +102,12 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base):
cred = self. default_client.ec2.create(
user_id=self.user_foo['id'],
tenant_id=self.tenant_bar['id'])
signer = ec2_utils.Ec2Signer(cred.secret)
return self._generate_user_ec2_credentials(cred.access, cred.secret)
def _generate_user_ec2_credentials(self, access, secret):
signer = ec2_utils.Ec2Signer(secret)
credentials = {'params': {'SignatureVersion': '2'},
'access': cred.access,
'access': access,
'verb': 'GET',
'host': 'localhost',
'path': '/service/cloud'}
@ -116,6 +121,43 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base):
self.assertEqual(resp.status_code, 200)
self.assertIn('access', token)
def test_ec2_auth_success_trust(self):
# Add "other" role user_foo and create trust delegating it to user_two
self.assignment_api.add_role_to_user_and_project(
self.user_foo['id'],
self.tenant_bar['id'],
self.role_other['id'])
trust_id = 'atrust123'
trust = {'trustor_user_id': self.user_foo['id'],
'trustee_user_id': self.user_two['id'],
'project_id': self.tenant_bar['id'],
'impersonation': True}
roles = [self.role_other]
self.trust_api.create_trust(trust_id, trust, roles)
# Create a client for user_two, scoped to the trust
client = self.get_client(self.user_two)
ret = client.authenticate(trust_id=trust_id,
tenant_id=self.tenant_bar['id'])
self.assertTrue(ret)
self.assertTrue(client.auth_ref.trust_scoped)
self.assertEqual(trust_id, client.auth_ref.trust_id)
# Create an ec2 keypair using the trust client impersonating user_foo
cred = client.ec2.create(user_id=self.user_foo['id'],
tenant_id=self.tenant_bar['id'])
credentials, signature = self._generate_user_ec2_credentials(
cred.access, cred.secret)
credentials['signature'] = signature
resp, token = self._send_ec2_auth_request(credentials)
self.assertEqual(resp.status_code, 200)
self.assertEqual(trust_id, token['access']['trust']['id'])
#TODO(shardy) we really want to check the roles and trustee
# but because of where the stubbing happens we don't seem to
# hit the necessary code in controllers.py _authenticate_token
# so although all is OK via a real request, it incorrect in
# this test..
def test_ec2_auth_failure(self):
from keystoneclient import exceptions as client_exceptions