From fa16882507d05f3bf2bbfbd62bb1b5bc0a6fb2b4 Mon Sep 17 00:00:00 2001 From: Steven Hardy Date: Mon, 21 Oct 2013 19:49:01 +0100 Subject: [PATCH] Fix issues handling trust tokens via ec2tokens API Trust scoped tokens are handled incorectly when making requests via the ec2tokens API, meaning that the restrictions enforced by trust-scoped tokens are not respected when obtaining a token via ec2token signature validation. Storing the trust_id in the blob associated with the ec2 keypair, and passing that id in the metadata when requesting a v2 token solves the issue. Change-Id: I52566384d7813ef0e2f20fb94a5076386457ff02 Closes-Bug: #1242597 --- keystone/contrib/ec2/controllers.py | 19 ++++++++- keystone/tests/test_keystoneclient_sql.py | 50 +++++++++++++++++++++-- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/keystone/contrib/ec2/controllers.py b/keystone/contrib/ec2/controllers.py index be060ecbe1..c51c84c4f4 100644 --- a/keystone/contrib/ec2/controllers.py +++ b/keystone/contrib/ec2/controllers.py @@ -107,6 +107,11 @@ class Ec2Controller(controller.V2Controller): self.assignment_api.get_roles_for_user_and_project( user_ref['id'], tenant_ref['id'])) + trust_id = creds_ref.get('trust_id') + if trust_id: + metadata_ref['trust_id'] = trust_id + metadata_ref['trustee_user_id'] = user_ref['id'] + # Validate that the auth info is valid and nothing is disabled token.validate_auth_info(self, user_ref, tenant_ref) @@ -147,8 +152,10 @@ class Ec2Controller(controller.V2Controller): self._assert_valid_user_id(user_id) self._assert_valid_project_id(tenant_id) + trust_id = self._context_trust_id(context) blob = {'access': uuid.uuid4().hex, - 'secret': uuid.uuid4().hex} + 'secret': uuid.uuid4().hex, + 'trust_id': trust_id} credential_id = utils.hash_access_key(blob['access']) cred_ref = {'user_id': user_id, 'project_id': tenant_id, @@ -214,7 +221,8 @@ class Ec2Controller(controller.V2Controller): return {'user_id': credential.get('user_id'), 'tenant_id': credential.get('project_id'), 'access': blob.get('access'), - 'secret': blob.get('secret')} + 'secret': blob.get('secret'), + 'trust_id': blob.get('trust_id')} def _get_credentials(self, credential_id): """Return credentials from an ID. @@ -245,6 +253,13 @@ class Ec2Controller(controller.V2Controller): if token_ref['user'].get('id') != user_id: raise exception.Forbidden(_('Token belongs to another user')) + def _context_trust_id(self, context): + try: + token_ref = self.token_api.get_token(context['token_id']) + except exception.TokenNotFound as e: + raise exception.Unauthorized(e) + return token_ref.get('trust_id') + def _is_admin(self, context): """Wrap admin assertion error return statement. diff --git a/keystone/tests/test_keystoneclient_sql.py b/keystone/tests/test_keystoneclient_sql.py index c50c680b35..a058b72e7c 100644 --- a/keystone/tests/test_keystoneclient_sql.py +++ b/keystone/tests/test_keystoneclient_sql.py @@ -89,9 +89,11 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base): self.assertRaises(client_exceptions.NotFound, client.endpoints.delete, id=endpoint.id) - def _send_ec2_auth_request(self, credentials): + def _send_ec2_auth_request(self, credentials, client=None): + if not client: + client = self.default_client url = '%s/ec2tokens' % self.default_client.auth_url - (resp, token) = self.default_client.request( + (resp, token) = client.request( url=url, method='POST', body={'credentials': credentials}) return resp, token @@ -100,9 +102,12 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base): cred = self. default_client.ec2.create( user_id=self.user_foo['id'], tenant_id=self.tenant_bar['id']) - signer = ec2_utils.Ec2Signer(cred.secret) + return self._generate_user_ec2_credentials(cred.access, cred.secret) + + def _generate_user_ec2_credentials(self, access, secret): + signer = ec2_utils.Ec2Signer(secret) credentials = {'params': {'SignatureVersion': '2'}, - 'access': cred.access, + 'access': access, 'verb': 'GET', 'host': 'localhost', 'path': '/service/cloud'} @@ -116,6 +121,43 @@ class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase, sql.Base): self.assertEqual(resp.status_code, 200) self.assertIn('access', token) + def test_ec2_auth_success_trust(self): + # Add "other" role user_foo and create trust delegating it to user_two + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], + self.tenant_bar['id'], + self.role_other['id']) + trust_id = 'atrust123' + trust = {'trustor_user_id': self.user_foo['id'], + 'trustee_user_id': self.user_two['id'], + 'project_id': self.tenant_bar['id'], + 'impersonation': True} + roles = [self.role_other] + self.trust_api.create_trust(trust_id, trust, roles) + + # Create a client for user_two, scoped to the trust + client = self.get_client(self.user_two) + ret = client.authenticate(trust_id=trust_id, + tenant_id=self.tenant_bar['id']) + self.assertTrue(ret) + self.assertTrue(client.auth_ref.trust_scoped) + self.assertEqual(trust_id, client.auth_ref.trust_id) + + # Create an ec2 keypair using the trust client impersonating user_foo + cred = client.ec2.create(user_id=self.user_foo['id'], + tenant_id=self.tenant_bar['id']) + credentials, signature = self._generate_user_ec2_credentials( + cred.access, cred.secret) + credentials['signature'] = signature + resp, token = self._send_ec2_auth_request(credentials) + self.assertEqual(resp.status_code, 200) + self.assertEqual(trust_id, token['access']['trust']['id']) + #TODO(shardy) we really want to check the roles and trustee + # but because of where the stubbing happens we don't seem to + # hit the necessary code in controllers.py _authenticate_token + # so although all is OK via a real request, it incorrect in + # this test.. + def test_ec2_auth_failure(self): from keystoneclient import exceptions as client_exceptions