From fb8bd1dc77a177ef760ece50ae6558f33a9fe464 Mon Sep 17 00:00:00 2001 From: Dolph Mathews Date: Wed, 20 Feb 2013 17:39:52 -0600 Subject: [PATCH] Tests for domain-scoped tokens - Fixes bug 1131292: catalog returned with unscoped tokens - Fixes bug 1131294: X-Subject-Token not returned on token validation Change-Id: I1808613f276354e2a37cf8c154b55509a2888d89 --- keystone/auth/controllers.py | 12 +- keystone/auth/token_factory.py | 10 +- tests/test_v3_auth.py | 284 ++++++++++++++++++++++++--------- 3 files changed, 221 insertions(+), 85 deletions(-) diff --git a/keystone/auth/controllers.py b/keystone/auth/controllers.py index 2ef4f8d16e..d2eaa234f0 100644 --- a/keystone/auth/controllers.py +++ b/keystone/auth/controllers.py @@ -377,11 +377,13 @@ class Auth(controller.V3Controller): token_id = context.get('subject_token_id') self.check_token(context) token_ref = self.token_api.get_token(context, token_id) - return token_factory.recreate_token_data(context, - token_ref.get('token_data'), - token_ref['expires'], - token_ref.get('user'), - token_ref.get('tenant')) + token_data = token_factory.recreate_token_data( + context, + token_ref.get('token_data'), + token_ref['expires'], + token_ref.get('user'), + token_ref.get('tenant')) + return token_factory.render_token_data_response(token_id, token_data) @controller.protected def revocation_list(self, context, auth=None): diff --git a/keystone/auth/token_factory.py b/keystone/auth/token_factory.py index fdd33d12fe..03d4ed7413 100644 --- a/keystone/auth/token_factory.py +++ b/keystone/auth/token_factory.py @@ -120,11 +120,11 @@ class TokenDataHelper(object): def _populate_service_catalog(self, token_data, user_id, domain_id, project_id): - service_catalog = self.catalog_api.get_v3_catalog(self.context, - user_id, - project_id) - # TODO(gyee): v3 service catalog is not quite completed yet - token_data['catalog'] = service_catalog + if project_id or domain_id: + service_catalog = self.catalog_api.get_v3_catalog( + self.context, user_id, project_id) + # TODO(gyee): v3 service catalog is not quite completed yet + token_data['catalog'] = service_catalog def _populate_token(self, token_data, expires=None): if not expires: diff --git a/tests/test_v3_auth.py b/tests/test_v3_auth.py index 5831e3925d..c7f78adf59 100644 --- a/tests/test_v3_auth.py +++ b/tests/test_v3_auth.py @@ -105,29 +105,42 @@ def _build_authentication_request(token=None, user_id=None, username=None, class AuthTest(test_v3.RestfulTestCase): - def assertValidToken(self, token): - self.assertNotIn('roles', token) - self.assertEqual(self.user['id'], token['user']['id']) - self.assertIn('expires', token) + def assertValidTokenResponse(self, r): + self.assertTrue(r.getheader('X-Subject-Token')) + token = r.body - def assertValidScopedToken(self, token): - self.assertIn('roles', token) self.assertIn('expires', token) - self.assertIn('catalog', token) self.assertIn('user', token) + self.assertEqual(self.user['id'], token['user']['id']) + self.assertEqual(self.user['name'], token['user']['name']) + self.assertEqual(self.user['domain_id'], token['user']['domain']['id']) + return token + + def assertValidUnscopedTokenResponse(self, r): + token = self.assertValidTokenResponse(r) + + self.assertNotIn('roles', token) + self.assertNotIn('catalog', token) + self.assertNotIn('project', token) + self.assertNotIn('domain', token) + + return token + + def assertValidScopedTokenResponse(self, r): + token = self.assertValidTokenResponse(r) + + self.assertIn('catalog', token) + self.assertIn('roles', token) self.assertTrue(token['roles']) for role in token['roles']: self.assertIn('id', role) self.assertIn('name', role) - self.assertEqual(self.user['id'], token['user']['id']) - self.assertEqual(self.user['name'], token['user']['name']) - self.assertEqual(self.user['domain_id'], token['user']['domain']['id']) - self.assertEqual(self.role_id, token['roles'][0]['id']) + return token - def assertValidProjectScopedToken(self, token): - self.assertValidScopedToken(token) + def assertValidProjectScopedTokenResponse(self, r): + token = self.assertValidScopedTokenResponse(r) self.assertIn('project', token) self.assertIn('id', token['project']) @@ -136,13 +149,19 @@ class AuthTest(test_v3.RestfulTestCase): self.assertIn('id', token['project']['domain']) self.assertIn('name', token['project']['domain']) - def assertValidDomainScopedToken(self, token): - self.assertValidScopedToken(token) + self.assertEqual(self.role_id, token['roles'][0]['id']) + + return token + + def assertValidDomainScopedTokenResponse(self, r): + token = self.assertValidScopedTokenResponse(r) self.assertIn('domain', token) self.assertIn('id', token['domain']) self.assertIn('name', token['domain']) + return token + def assertEqualTokens(self, a, b): """Assert that two tokens are equal. @@ -164,9 +183,6 @@ class AuthTest(test_v3.RestfulTestCase): class TestAuthInfo(test.TestCase): - def setUp(self): - super(TestAuthInfo, self).setUp() - def test_missing_auth_methods(self): auth_data = {'authentication': {}} auth_data['authentication']['token'] = {'id': uuid.uuid4().hex} @@ -332,64 +348,164 @@ class TestTokenAPIs(AuthTest): auth_data = _build_authentication_request( token=self.token, project_id=self.project_id) - resp = self.post('/auth/tokens', body=auth_data) - self.assertValidProjectScopedToken(resp.body) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectScopedTokenResponse(r) # make sure expires stayed the same - self.assertEqual(expires, resp.body['expires']) + self.assertEqual(expires, r.body['expires']) def test_check_token(self): - resp = self.head('/auth/tokens', headers=self.headers) - self.assertEqual(resp.status, 204) + self.head('/auth/tokens', headers=self.headers, expected_status=204) def test_validate_token(self): - resp = self.get('/auth/tokens', headers=self.headers) - self.assertValidToken(resp.body) + r = self.get('/auth/tokens', headers=self.headers) + self.assertValidUnscopedTokenResponse(r) def test_revoke_token(self): - token = self.get_scoped_token() - headers = {'X-Subject-Token': token} - self.delete('/auth/tokens', headers=headers) - - # make sure token no longer valid - resp = self.head('/auth/tokens', headers=headers, - expected_status=401) - self.assertEqual(resp.status, 401) + headers = {'X-Subject-Token': self.get_scoped_token()} + self.delete('/auth/tokens', headers=headers, expected_status=204) + self.head('/auth/tokens', headers=headers, expected_status=401) # make sure we have a CRL - resp = self.get('/auth/tokens/OS-PKI/revoked') - self.assertTrue('signed' in resp.body) + r = self.get('/auth/tokens/OS-PKI/revoked') + self.assertIn('signed', r.body) class TestAuth(AuthTest): - def test_unscope_token_with_name(self): + def test_unscoped_token_with_user_id(self): auth_data = _build_authentication_request( - username=self.user['name'], - user_domain_id=self.domain_id, + user_id=self.user['id'], password=self.user['password']) - resp = self.post('/auth/tokens', body=auth_data) - self.assertValidToken(resp.body) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidUnscopedTokenResponse(r) - def test_project_scope_token_with_name(self): + def test_unscoped_token_with_user_domain_id(self): auth_data = _build_authentication_request( username=self.user['name'], - user_domain_id=self.domain_id, - password=self.user['password'], - project_id=self.project_id) - resp = self.post('/auth/tokens', body=auth_data) - self.assertValidProjectScopedToken(resp.body) + user_domain_id=self.domain['id'], + password=self.user['password']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidUnscopedTokenResponse(r) - def test_domain_scope_token_with_id(self): - # grant the domain role to user + def test_unscoped_token_with_user_domain_name(self): + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_name=self.domain['name'], + password=self.user['password']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidUnscopedTokenResponse(r) + + def test_project_id_scoped_token_with_user_id(self): + auth_data = _build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectScopedTokenResponse(r) + + def test_project_id_scoped_token_with_user_id_401(self): + project_id = uuid.uuid4().hex + project = self.new_project_ref(domain_id=self.domain_id) + self.identity_api.create_project(project_id, project) + + auth_data = _build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=project['id']) + self.post('/auth/tokens', body=auth_data, expected_status=401) + + def test_project_id_scoped_token_with_user_domain_id(self): + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_id=self.domain['id'], + password=self.user['password'], + project_id=self.project['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectScopedTokenResponse(r) + + def test_project_id_scoped_token_with_user_domain_name(self): + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_name=self.domain['name'], + password=self.user['password'], + project_id=self.project['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectScopedTokenResponse(r) + + def test_domain_id_scoped_token_with_user_id(self): path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) - # now get a domain-scoped token + auth_data = _build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) - resp = self.post('/auth/tokens', body=auth_data) - self.assertValidDomainScopedToken(resp.body) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidDomainScopedTokenResponse(r) + + def test_domain_id_scoped_token_with_user_domain_id(self): + path = '/domains/%s/users/%s/roles/%s' % ( + self.domain['id'], self.user['id'], self.role['id']) + self.put(path=path) + + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_id=self.domain['id'], + password=self.user['password'], + domain_id=self.domain['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidDomainScopedTokenResponse(r) + + def test_domain_id_scoped_token_with_user_domain_name(self): + path = '/domains/%s/users/%s/roles/%s' % ( + self.domain['id'], self.user['id'], self.role['id']) + self.put(path=path) + + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_name=self.domain['name'], + password=self.user['password'], + domain_id=self.domain['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidDomainScopedTokenResponse(r) + + def test_domain_name_scoped_token_with_user_id(self): + path = '/domains/%s/users/%s/roles/%s' % ( + self.domain['id'], self.user['id'], self.role['id']) + self.put(path=path) + + auth_data = _build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + domain_name=self.domain['name']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidDomainScopedTokenResponse(r) + + def test_domain_name_scoped_token_with_user_domain_id(self): + path = '/domains/%s/users/%s/roles/%s' % ( + self.domain['id'], self.user['id'], self.role['id']) + self.put(path=path) + + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_id=self.domain['id'], + password=self.user['password'], + domain_name=self.domain['name']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidDomainScopedTokenResponse(r) + + def test_domain_name_scoped_token_with_user_domain_name(self): + path = '/domains/%s/users/%s/roles/%s' % ( + self.domain['id'], self.user['id'], self.role['id']) + self.put(path=path) + + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_name=self.domain['name'], + password=self.user['password'], + domain_name=self.domain['name']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_token_with_group_role(self): group_id = uuid.uuid4().hex @@ -397,19 +513,22 @@ class TestAuth(AuthTest): domain_id=self.domain_id) group['id'] = group_id self.identity_api.create_group(group_id, group) + # add user to group self.identity_api.add_user_to_group(self.user['id'], group['id']) + # grant the domain role to group path = '/domains/%s/groups/%s/roles/%s' % ( self.domain['id'], group['id'], self.role['id']) self.put(path=path) + # now get a domain-scoped token auth_data = _build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) - resp = self.post('/auth/tokens', body=auth_data) - self.assertValidDomainScopedToken(resp.body) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_token_with_name(self): # grant the domain role to user @@ -421,48 +540,63 @@ class TestAuth(AuthTest): user_id=self.user['id'], password=self.user['password'], domain_name=self.domain['name']) - resp = self.post('/auth/tokens', body=auth_data) - self.assertValidDomainScopedToken(resp.body) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_failed(self): auth_data = _build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) - resp = self.post('/auth/tokens', body=auth_data, - expected_status=401) - self.assertEqual(resp.status, 401) + self.post('/auth/tokens', body=auth_data, expected_status=401) def test_auth_with_id(self): auth_data = _build_authentication_request( user_id=self.user['id'], password=self.user['password']) - resp = self.post('/auth/tokens', body=auth_data) - self.assertValidToken(resp.body) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidUnscopedTokenResponse(r) - token = resp.getheader('X-Subject-Token') - headers = {'X-Subject-Token': resp.getheader('X-Subject-Token')} + token = r.getheader('X-Subject-Token') + headers = {'X-Subject-Token': r.getheader('X-Subject-Token')} # test token auth auth_data = _build_authentication_request(token=token) - resp = self.post('/auth/tokens', body=auth_data) - self.assertValidToken(resp.body) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidUnscopedTokenResponse(r) + + def test_invalid_user_id(self): + auth_data = _build_authentication_request( + user_id=uuid.uuid4().hex, + password=self.user['password']) + self.post('/auth/tokens', body=auth_data, expected_status=401) + + def test_invalid_user_name(self): + auth_data = _build_authentication_request( + username=uuid.uuid4().hex, + user_domain_id=self.domain['id'], + password=self.user['password']) + self.post('/auth/tokens', body=auth_data, expected_status=401) + + def test_invalid_domain_id(self): + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_id=uuid.uuid4().hex, + password=self.user['password']) + self.post('/auth/tokens', body=auth_data, expected_status=401) + + def test_invalid_domain_name(self): + auth_data = _build_authentication_request( + username=self.user['name'], + user_domain_name=uuid.uuid4().hex, + password=self.user['password']) + self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_password(self): auth_data = _build_authentication_request( user_id=self.user['id'], password=uuid.uuid4().hex) - resp = self.post('/auth/tokens', body=auth_data, - expected_status=401) - self.assertEqual(resp.status, 401) - - def test_invalid_username(self): - auth_data = _build_authentication_request( - username=uuid.uuid4().hex, - password=self.user['password']) - resp = self.post('/auth/tokens', body=auth_data, - expected_status=401) - self.assertEqual(resp.status, 401) + self.post('/auth/tokens', body=auth_data, expected_status=401) def test_remote_user(self): auth_data = _build_authentication_request(