Merge "Fix issues handling trust tokens via ec2tokens API" into stable/havana
This commit is contained in:
commit
03ed8c378b
|
@ -106,6 +106,11 @@ class Ec2Controller(controller.V2Controller):
|
|||
self.identity_api.get_roles_for_user_and_project(
|
||||
user_ref['id'], tenant_ref['id']))
|
||||
|
||||
trust_id = creds_ref.get('trust_id')
|
||||
if trust_id:
|
||||
metadata_ref['trust_id'] = trust_id
|
||||
metadata_ref['trustee_user_id'] = user_ref['id']
|
||||
|
||||
# Validate that the auth info is valid and nothing is disabled
|
||||
token.validate_auth_info(self, user_ref, tenant_ref)
|
||||
|
||||
|
@ -146,8 +151,10 @@ class Ec2Controller(controller.V2Controller):
|
|||
|
||||
self._assert_valid_user_id(user_id)
|
||||
self._assert_valid_project_id(tenant_id)
|
||||
trust_id = self._context_trust_id(context)
|
||||
blob = {'access': uuid.uuid4().hex,
|
||||
'secret': uuid.uuid4().hex}
|
||||
'secret': uuid.uuid4().hex,
|
||||
'trust_id': trust_id}
|
||||
credential_id = utils.hash_access_key(blob['access'])
|
||||
cred_ref = {'user_id': user_id,
|
||||
'project_id': tenant_id,
|
||||
|
@ -213,7 +220,8 @@ class Ec2Controller(controller.V2Controller):
|
|||
return {'user_id': credential.get('user_id'),
|
||||
'tenant_id': credential.get('project_id'),
|
||||
'access': blob.get('access'),
|
||||
'secret': blob.get('secret')}
|
||||
'secret': blob.get('secret'),
|
||||
'trust_id': blob.get('trust_id')}
|
||||
|
||||
def _get_credentials(self, credential_id):
|
||||
"""Return credentials from an ID.
|
||||
|
@ -244,6 +252,13 @@ class Ec2Controller(controller.V2Controller):
|
|||
if token_ref['user'].get('id') != user_id:
|
||||
raise exception.Forbidden(_('Token belongs to another user'))
|
||||
|
||||
def _context_trust_id(self, context):
|
||||
try:
|
||||
token_ref = self.token_api.get_token(context['token_id'])
|
||||
except exception.TokenNotFound as e:
|
||||
raise exception.Unauthorized(e)
|
||||
return token_ref.get('trust_id')
|
||||
|
||||
def _is_admin(self, context):
|
||||
"""Wrap admin assertion error return statement.
|
||||
|
||||
|
|
|
@ -88,9 +88,11 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base):
|
|||
self.assertRaises(client_exceptions.NotFound, client.endpoints.delete,
|
||||
id=endpoint.id)
|
||||
|
||||
def _send_ec2_auth_request(self, credentials):
|
||||
def _send_ec2_auth_request(self, credentials, client=None):
|
||||
if not client:
|
||||
client = self.default_client
|
||||
url = '%s/ec2tokens' % self.default_client.auth_url
|
||||
(resp, token) = self.default_client.request(
|
||||
(resp, token) = client.request(
|
||||
url=url, method='POST',
|
||||
body={'credentials': credentials})
|
||||
return resp, token
|
||||
|
@ -99,9 +101,12 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base):
|
|||
cred = self. default_client.ec2.create(
|
||||
user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'])
|
||||
signer = ec2_utils.Ec2Signer(cred.secret)
|
||||
return self._generate_user_ec2_credentials(cred.access, cred.secret)
|
||||
|
||||
def _generate_user_ec2_credentials(self, access, secret):
|
||||
signer = ec2_utils.Ec2Signer(secret)
|
||||
credentials = {'params': {'SignatureVersion': '2'},
|
||||
'access': cred.access,
|
||||
'access': access,
|
||||
'verb': 'GET',
|
||||
'host': 'localhost',
|
||||
'path': '/service/cloud'}
|
||||
|
@ -115,6 +120,43 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base):
|
|||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn('access', token)
|
||||
|
||||
def test_ec2_auth_success_trust(self):
|
||||
# Add "other" role user_foo and create trust delegating it to user_two
|
||||
self.identity_api.add_role_to_user_and_project(
|
||||
self.user_foo['id'],
|
||||
self.tenant_bar['id'],
|
||||
self.role_other['id'])
|
||||
trust_id = 'atrust123'
|
||||
trust = {'trustor_user_id': self.user_foo['id'],
|
||||
'trustee_user_id': self.user_two['id'],
|
||||
'project_id': self.tenant_bar['id'],
|
||||
'impersonation': True}
|
||||
roles = [self.role_other]
|
||||
self.trust_api.create_trust(trust_id, trust, roles)
|
||||
|
||||
# Create a client for user_two, scoped to the trust
|
||||
client = self.get_client(self.user_two)
|
||||
ret = client.authenticate(trust_id=trust_id,
|
||||
tenant_id=self.tenant_bar['id'])
|
||||
self.assertTrue(ret)
|
||||
self.assertTrue(client.auth_ref.trust_scoped)
|
||||
self.assertEqual(trust_id, client.auth_ref.trust_id)
|
||||
|
||||
# Create an ec2 keypair using the trust client impersonating user_foo
|
||||
cred = client.ec2.create(user_id=self.user_foo['id'],
|
||||
tenant_id=self.tenant_bar['id'])
|
||||
credentials, signature = self._generate_user_ec2_credentials(
|
||||
cred.access, cred.secret)
|
||||
credentials['signature'] = signature
|
||||
resp, token = self._send_ec2_auth_request(credentials)
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertEqual(trust_id, token['access']['trust']['id'])
|
||||
#TODO(shardy) we really want to check the roles and trustee
|
||||
# but because of where the stubbing happens we don't seem to
|
||||
# hit the necessary code in controllers.py _authenticate_token
|
||||
# so although all is OK via a real request, it incorrect in
|
||||
# this test..
|
||||
|
||||
def test_ec2_auth_failure(self):
|
||||
from keystoneclient import exceptions as client_exceptions
|
||||
|
||||
|
|
Loading…
Reference in New Issue