Merge "Tests for domain-scoped tokens" into milestone-proposed

This commit is contained in:
Jenkins 2013-02-22 00:06:08 +00:00 committed by Gerrit Code Review
commit 27f0501535
3 changed files with 221 additions and 85 deletions

View File

@ -377,11 +377,13 @@ class Auth(controller.V3Controller):
token_id = context.get('subject_token_id')
self.check_token(context)
token_ref = self.token_api.get_token(context, token_id)
return token_factory.recreate_token_data(context,
token_ref.get('token_data'),
token_ref['expires'],
token_ref.get('user'),
token_ref.get('tenant'))
token_data = token_factory.recreate_token_data(
context,
token_ref.get('token_data'),
token_ref['expires'],
token_ref.get('user'),
token_ref.get('tenant'))
return token_factory.render_token_data_response(token_id, token_data)
@controller.protected
def revocation_list(self, context, auth=None):

View File

@ -120,11 +120,11 @@ class TokenDataHelper(object):
def _populate_service_catalog(self, token_data, user_id,
domain_id, project_id):
service_catalog = self.catalog_api.get_v3_catalog(self.context,
user_id,
project_id)
# TODO(gyee): v3 service catalog is not quite completed yet
token_data['catalog'] = service_catalog
if project_id or domain_id:
service_catalog = self.catalog_api.get_v3_catalog(
self.context, user_id, project_id)
# TODO(gyee): v3 service catalog is not quite completed yet
token_data['catalog'] = service_catalog
def _populate_token(self, token_data, expires=None):
if not expires:

View File

@ -105,29 +105,42 @@ def _build_authentication_request(token=None, user_id=None, username=None,
class AuthTest(test_v3.RestfulTestCase):
def assertValidToken(self, token):
self.assertNotIn('roles', token)
self.assertEqual(self.user['id'], token['user']['id'])
self.assertIn('expires', token)
def assertValidTokenResponse(self, r):
self.assertTrue(r.getheader('X-Subject-Token'))
token = r.body
def assertValidScopedToken(self, token):
self.assertIn('roles', token)
self.assertIn('expires', token)
self.assertIn('catalog', token)
self.assertIn('user', token)
self.assertEqual(self.user['id'], token['user']['id'])
self.assertEqual(self.user['name'], token['user']['name'])
self.assertEqual(self.user['domain_id'], token['user']['domain']['id'])
return token
def assertValidUnscopedTokenResponse(self, r):
token = self.assertValidTokenResponse(r)
self.assertNotIn('roles', token)
self.assertNotIn('catalog', token)
self.assertNotIn('project', token)
self.assertNotIn('domain', token)
return token
def assertValidScopedTokenResponse(self, r):
token = self.assertValidTokenResponse(r)
self.assertIn('catalog', token)
self.assertIn('roles', token)
self.assertTrue(token['roles'])
for role in token['roles']:
self.assertIn('id', role)
self.assertIn('name', role)
self.assertEqual(self.user['id'], token['user']['id'])
self.assertEqual(self.user['name'], token['user']['name'])
self.assertEqual(self.user['domain_id'], token['user']['domain']['id'])
self.assertEqual(self.role_id, token['roles'][0]['id'])
return token
def assertValidProjectScopedToken(self, token):
self.assertValidScopedToken(token)
def assertValidProjectScopedTokenResponse(self, r):
token = self.assertValidScopedTokenResponse(r)
self.assertIn('project', token)
self.assertIn('id', token['project'])
@ -136,13 +149,19 @@ class AuthTest(test_v3.RestfulTestCase):
self.assertIn('id', token['project']['domain'])
self.assertIn('name', token['project']['domain'])
def assertValidDomainScopedToken(self, token):
self.assertValidScopedToken(token)
self.assertEqual(self.role_id, token['roles'][0]['id'])
return token
def assertValidDomainScopedTokenResponse(self, r):
token = self.assertValidScopedTokenResponse(r)
self.assertIn('domain', token)
self.assertIn('id', token['domain'])
self.assertIn('name', token['domain'])
return token
def assertEqualTokens(self, a, b):
"""Assert that two tokens are equal.
@ -164,9 +183,6 @@ class AuthTest(test_v3.RestfulTestCase):
class TestAuthInfo(test.TestCase):
def setUp(self):
super(TestAuthInfo, self).setUp()
def test_missing_auth_methods(self):
auth_data = {'authentication': {}}
auth_data['authentication']['token'] = {'id': uuid.uuid4().hex}
@ -332,64 +348,164 @@ class TestTokenAPIs(AuthTest):
auth_data = _build_authentication_request(
token=self.token,
project_id=self.project_id)
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedToken(resp.body)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedTokenResponse(r)
# make sure expires stayed the same
self.assertEqual(expires, resp.body['expires'])
self.assertEqual(expires, r.body['expires'])
def test_check_token(self):
resp = self.head('/auth/tokens', headers=self.headers)
self.assertEqual(resp.status, 204)
self.head('/auth/tokens', headers=self.headers, expected_status=204)
def test_validate_token(self):
resp = self.get('/auth/tokens', headers=self.headers)
self.assertValidToken(resp.body)
r = self.get('/auth/tokens', headers=self.headers)
self.assertValidUnscopedTokenResponse(r)
def test_revoke_token(self):
token = self.get_scoped_token()
headers = {'X-Subject-Token': token}
self.delete('/auth/tokens', headers=headers)
# make sure token no longer valid
resp = self.head('/auth/tokens', headers=headers,
expected_status=401)
self.assertEqual(resp.status, 401)
headers = {'X-Subject-Token': self.get_scoped_token()}
self.delete('/auth/tokens', headers=headers, expected_status=204)
self.head('/auth/tokens', headers=headers, expected_status=401)
# make sure we have a CRL
resp = self.get('/auth/tokens/OS-PKI/revoked')
self.assertTrue('signed' in resp.body)
r = self.get('/auth/tokens/OS-PKI/revoked')
self.assertIn('signed', r.body)
class TestAuth(AuthTest):
def test_unscope_token_with_name(self):
def test_unscoped_token_with_user_id(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidToken(resp.body)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_project_scope_token_with_name(self):
def test_unscoped_token_with_user_domain_id(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password'],
project_id=self.project_id)
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedToken(resp.body)
user_domain_id=self.domain['id'],
password=self.user['password'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_domain_scope_token_with_id(self):
# grant the domain role to user
def test_unscoped_token_with_user_domain_name(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_project_id_scoped_token_with_user_id(self):
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_project_id_scoped_token_with_user_id_401(self):
project_id = uuid.uuid4().hex
project = self.new_project_ref(domain_id=self.domain_id)
self.identity_api.create_project(project_id, project)
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_project_id_scoped_token_with_user_domain_id(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_project_id_scoped_token_with_user_domain_name(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
project_id=self.project['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_domain_id_scoped_token_with_user_id(self):
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
# now get a domain-scoped token
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedToken(resp.body)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_id_scoped_token_with_user_domain_id(self):
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_id_scoped_token_with_user_domain_name(self):
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
domain_id=self.domain['id'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_name_scoped_token_with_user_id(self):
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_name_scoped_token_with_user_domain_id(self):
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_name_scoped_token_with_user_domain_name(self):
path = '/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id'])
self.put(path=path)
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'],
domain_name=self.domain['name'])
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_scope_token_with_group_role(self):
group_id = uuid.uuid4().hex
@ -397,19 +513,22 @@ class TestAuth(AuthTest):
domain_id=self.domain_id)
group['id'] = group_id
self.identity_api.create_group(group_id, group)
# add user to group
self.identity_api.add_user_to_group(self.user['id'], group['id'])
# grant the domain role to group
path = '/domains/%s/groups/%s/roles/%s' % (
self.domain['id'], group['id'], self.role['id'])
self.put(path=path)
# now get a domain-scoped token
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedToken(resp.body)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_scope_token_with_name(self):
# grant the domain role to user
@ -421,48 +540,63 @@ class TestAuth(AuthTest):
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedToken(resp.body)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_scope_failed(self):
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
resp = self.post('/auth/tokens', body=auth_data,
expected_status=401)
self.assertEqual(resp.status, 401)
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_auth_with_id(self):
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidToken(resp.body)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
token = resp.getheader('X-Subject-Token')
headers = {'X-Subject-Token': resp.getheader('X-Subject-Token')}
token = r.getheader('X-Subject-Token')
headers = {'X-Subject-Token': r.getheader('X-Subject-Token')}
# test token auth
auth_data = _build_authentication_request(token=token)
resp = self.post('/auth/tokens', body=auth_data)
self.assertValidToken(resp.body)
r = self.post('/auth/tokens', body=auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_invalid_user_id(self):
auth_data = _build_authentication_request(
user_id=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_user_name(self):
auth_data = _build_authentication_request(
username=uuid.uuid4().hex,
user_domain_id=self.domain['id'],
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_domain_id(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_id=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_domain_name(self):
auth_data = _build_authentication_request(
username=self.user['name'],
user_domain_name=uuid.uuid4().hex,
password=self.user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_invalid_password(self):
auth_data = _build_authentication_request(
user_id=self.user['id'],
password=uuid.uuid4().hex)
resp = self.post('/auth/tokens', body=auth_data,
expected_status=401)
self.assertEqual(resp.status, 401)
def test_invalid_username(self):
auth_data = _build_authentication_request(
username=uuid.uuid4().hex,
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data,
expected_status=401)
self.assertEqual(resp.status, 401)
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_remote_user(self):
auth_data = _build_authentication_request(