Merge "Tests for domain-scoped tokens" into milestone-proposed
This commit is contained in:
commit
27f0501535
|
@ -377,11 +377,13 @@ class Auth(controller.V3Controller):
|
|||
token_id = context.get('subject_token_id')
|
||||
self.check_token(context)
|
||||
token_ref = self.token_api.get_token(context, token_id)
|
||||
return token_factory.recreate_token_data(context,
|
||||
token_ref.get('token_data'),
|
||||
token_ref['expires'],
|
||||
token_ref.get('user'),
|
||||
token_ref.get('tenant'))
|
||||
token_data = token_factory.recreate_token_data(
|
||||
context,
|
||||
token_ref.get('token_data'),
|
||||
token_ref['expires'],
|
||||
token_ref.get('user'),
|
||||
token_ref.get('tenant'))
|
||||
return token_factory.render_token_data_response(token_id, token_data)
|
||||
|
||||
@controller.protected
|
||||
def revocation_list(self, context, auth=None):
|
||||
|
|
|
@ -120,11 +120,11 @@ class TokenDataHelper(object):
|
|||
|
||||
def _populate_service_catalog(self, token_data, user_id,
|
||||
domain_id, project_id):
|
||||
service_catalog = self.catalog_api.get_v3_catalog(self.context,
|
||||
user_id,
|
||||
project_id)
|
||||
# TODO(gyee): v3 service catalog is not quite completed yet
|
||||
token_data['catalog'] = service_catalog
|
||||
if project_id or domain_id:
|
||||
service_catalog = self.catalog_api.get_v3_catalog(
|
||||
self.context, user_id, project_id)
|
||||
# TODO(gyee): v3 service catalog is not quite completed yet
|
||||
token_data['catalog'] = service_catalog
|
||||
|
||||
def _populate_token(self, token_data, expires=None):
|
||||
if not expires:
|
||||
|
|
|
@ -105,29 +105,42 @@ def _build_authentication_request(token=None, user_id=None, username=None,
|
|||
|
||||
|
||||
class AuthTest(test_v3.RestfulTestCase):
|
||||
def assertValidToken(self, token):
|
||||
self.assertNotIn('roles', token)
|
||||
self.assertEqual(self.user['id'], token['user']['id'])
|
||||
self.assertIn('expires', token)
|
||||
def assertValidTokenResponse(self, r):
|
||||
self.assertTrue(r.getheader('X-Subject-Token'))
|
||||
token = r.body
|
||||
|
||||
def assertValidScopedToken(self, token):
|
||||
self.assertIn('roles', token)
|
||||
self.assertIn('expires', token)
|
||||
self.assertIn('catalog', token)
|
||||
self.assertIn('user', token)
|
||||
self.assertEqual(self.user['id'], token['user']['id'])
|
||||
self.assertEqual(self.user['name'], token['user']['name'])
|
||||
self.assertEqual(self.user['domain_id'], token['user']['domain']['id'])
|
||||
|
||||
return token
|
||||
|
||||
def assertValidUnscopedTokenResponse(self, r):
|
||||
token = self.assertValidTokenResponse(r)
|
||||
|
||||
self.assertNotIn('roles', token)
|
||||
self.assertNotIn('catalog', token)
|
||||
self.assertNotIn('project', token)
|
||||
self.assertNotIn('domain', token)
|
||||
|
||||
return token
|
||||
|
||||
def assertValidScopedTokenResponse(self, r):
|
||||
token = self.assertValidTokenResponse(r)
|
||||
|
||||
self.assertIn('catalog', token)
|
||||
self.assertIn('roles', token)
|
||||
self.assertTrue(token['roles'])
|
||||
for role in token['roles']:
|
||||
self.assertIn('id', role)
|
||||
self.assertIn('name', role)
|
||||
|
||||
self.assertEqual(self.user['id'], token['user']['id'])
|
||||
self.assertEqual(self.user['name'], token['user']['name'])
|
||||
self.assertEqual(self.user['domain_id'], token['user']['domain']['id'])
|
||||
self.assertEqual(self.role_id, token['roles'][0]['id'])
|
||||
return token
|
||||
|
||||
def assertValidProjectScopedToken(self, token):
|
||||
self.assertValidScopedToken(token)
|
||||
def assertValidProjectScopedTokenResponse(self, r):
|
||||
token = self.assertValidScopedTokenResponse(r)
|
||||
|
||||
self.assertIn('project', token)
|
||||
self.assertIn('id', token['project'])
|
||||
|
@ -136,13 +149,19 @@ class AuthTest(test_v3.RestfulTestCase):
|
|||
self.assertIn('id', token['project']['domain'])
|
||||
self.assertIn('name', token['project']['domain'])
|
||||
|
||||
def assertValidDomainScopedToken(self, token):
|
||||
self.assertValidScopedToken(token)
|
||||
self.assertEqual(self.role_id, token['roles'][0]['id'])
|
||||
|
||||
return token
|
||||
|
||||
def assertValidDomainScopedTokenResponse(self, r):
|
||||
token = self.assertValidScopedTokenResponse(r)
|
||||
|
||||
self.assertIn('domain', token)
|
||||
self.assertIn('id', token['domain'])
|
||||
self.assertIn('name', token['domain'])
|
||||
|
||||
return token
|
||||
|
||||
def assertEqualTokens(self, a, b):
|
||||
"""Assert that two tokens are equal.
|
||||
|
||||
|
@ -164,9 +183,6 @@ class AuthTest(test_v3.RestfulTestCase):
|
|||
|
||||
|
||||
class TestAuthInfo(test.TestCase):
|
||||
def setUp(self):
|
||||
super(TestAuthInfo, self).setUp()
|
||||
|
||||
def test_missing_auth_methods(self):
|
||||
auth_data = {'authentication': {}}
|
||||
auth_data['authentication']['token'] = {'id': uuid.uuid4().hex}
|
||||
|
@ -332,64 +348,164 @@ class TestTokenAPIs(AuthTest):
|
|||
auth_data = _build_authentication_request(
|
||||
token=self.token,
|
||||
project_id=self.project_id)
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectScopedToken(resp.body)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(r)
|
||||
# make sure expires stayed the same
|
||||
self.assertEqual(expires, resp.body['expires'])
|
||||
self.assertEqual(expires, r.body['expires'])
|
||||
|
||||
def test_check_token(self):
|
||||
resp = self.head('/auth/tokens', headers=self.headers)
|
||||
self.assertEqual(resp.status, 204)
|
||||
self.head('/auth/tokens', headers=self.headers, expected_status=204)
|
||||
|
||||
def test_validate_token(self):
|
||||
resp = self.get('/auth/tokens', headers=self.headers)
|
||||
self.assertValidToken(resp.body)
|
||||
r = self.get('/auth/tokens', headers=self.headers)
|
||||
self.assertValidUnscopedTokenResponse(r)
|
||||
|
||||
def test_revoke_token(self):
|
||||
token = self.get_scoped_token()
|
||||
headers = {'X-Subject-Token': token}
|
||||
self.delete('/auth/tokens', headers=headers)
|
||||
|
||||
# make sure token no longer valid
|
||||
resp = self.head('/auth/tokens', headers=headers,
|
||||
expected_status=401)
|
||||
self.assertEqual(resp.status, 401)
|
||||
headers = {'X-Subject-Token': self.get_scoped_token()}
|
||||
self.delete('/auth/tokens', headers=headers, expected_status=204)
|
||||
self.head('/auth/tokens', headers=headers, expected_status=401)
|
||||
|
||||
# make sure we have a CRL
|
||||
resp = self.get('/auth/tokens/OS-PKI/revoked')
|
||||
self.assertTrue('signed' in resp.body)
|
||||
r = self.get('/auth/tokens/OS-PKI/revoked')
|
||||
self.assertIn('signed', r.body)
|
||||
|
||||
|
||||
class TestAuth(AuthTest):
|
||||
def test_unscope_token_with_name(self):
|
||||
def test_unscoped_token_with_user_id(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_id=self.domain_id,
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidToken(resp.body)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidUnscopedTokenResponse(r)
|
||||
|
||||
def test_project_scope_token_with_name(self):
|
||||
def test_unscoped_token_with_user_domain_id(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_id=self.domain_id,
|
||||
password=self.user['password'],
|
||||
project_id=self.project_id)
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectScopedToken(resp.body)
|
||||
user_domain_id=self.domain['id'],
|
||||
password=self.user['password'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidUnscopedTokenResponse(r)
|
||||
|
||||
def test_domain_scope_token_with_id(self):
|
||||
# grant the domain role to user
|
||||
def test_unscoped_token_with_user_domain_name(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_name=self.domain['name'],
|
||||
password=self.user['password'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidUnscopedTokenResponse(r)
|
||||
|
||||
def test_project_id_scoped_token_with_user_id(self):
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
project_id=self.project['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(r)
|
||||
|
||||
def test_project_id_scoped_token_with_user_id_401(self):
|
||||
project_id = uuid.uuid4().hex
|
||||
project = self.new_project_ref(domain_id=self.domain_id)
|
||||
self.identity_api.create_project(project_id, project)
|
||||
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
project_id=project['id'])
|
||||
self.post('/auth/tokens', body=auth_data, expected_status=401)
|
||||
|
||||
def test_project_id_scoped_token_with_user_domain_id(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_id=self.domain['id'],
|
||||
password=self.user['password'],
|
||||
project_id=self.project['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(r)
|
||||
|
||||
def test_project_id_scoped_token_with_user_domain_name(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_name=self.domain['name'],
|
||||
password=self.user['password'],
|
||||
project_id=self.project['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidProjectScopedTokenResponse(r)
|
||||
|
||||
def test_domain_id_scoped_token_with_user_id(self):
|
||||
path = '/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id'])
|
||||
self.put(path=path)
|
||||
# now get a domain-scoped token
|
||||
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
domain_id=self.domain['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedToken(resp.body)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedTokenResponse(r)
|
||||
|
||||
def test_domain_id_scoped_token_with_user_domain_id(self):
|
||||
path = '/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id'])
|
||||
self.put(path=path)
|
||||
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_id=self.domain['id'],
|
||||
password=self.user['password'],
|
||||
domain_id=self.domain['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedTokenResponse(r)
|
||||
|
||||
def test_domain_id_scoped_token_with_user_domain_name(self):
|
||||
path = '/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id'])
|
||||
self.put(path=path)
|
||||
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_name=self.domain['name'],
|
||||
password=self.user['password'],
|
||||
domain_id=self.domain['id'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedTokenResponse(r)
|
||||
|
||||
def test_domain_name_scoped_token_with_user_id(self):
|
||||
path = '/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id'])
|
||||
self.put(path=path)
|
||||
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
domain_name=self.domain['name'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedTokenResponse(r)
|
||||
|
||||
def test_domain_name_scoped_token_with_user_domain_id(self):
|
||||
path = '/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id'])
|
||||
self.put(path=path)
|
||||
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_id=self.domain['id'],
|
||||
password=self.user['password'],
|
||||
domain_name=self.domain['name'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedTokenResponse(r)
|
||||
|
||||
def test_domain_name_scoped_token_with_user_domain_name(self):
|
||||
path = '/domains/%s/users/%s/roles/%s' % (
|
||||
self.domain['id'], self.user['id'], self.role['id'])
|
||||
self.put(path=path)
|
||||
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_name=self.domain['name'],
|
||||
password=self.user['password'],
|
||||
domain_name=self.domain['name'])
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedTokenResponse(r)
|
||||
|
||||
def test_domain_scope_token_with_group_role(self):
|
||||
group_id = uuid.uuid4().hex
|
||||
|
@ -397,19 +513,22 @@ class TestAuth(AuthTest):
|
|||
domain_id=self.domain_id)
|
||||
group['id'] = group_id
|
||||
self.identity_api.create_group(group_id, group)
|
||||
|
||||
# add user to group
|
||||
self.identity_api.add_user_to_group(self.user['id'], group['id'])
|
||||
|
||||
# grant the domain role to group
|
||||
path = '/domains/%s/groups/%s/roles/%s' % (
|
||||
self.domain['id'], group['id'], self.role['id'])
|
||||
self.put(path=path)
|
||||
|
||||
# now get a domain-scoped token
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
domain_id=self.domain['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedToken(resp.body)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedTokenResponse(r)
|
||||
|
||||
def test_domain_scope_token_with_name(self):
|
||||
# grant the domain role to user
|
||||
|
@ -421,48 +540,63 @@ class TestAuth(AuthTest):
|
|||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
domain_name=self.domain['name'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedToken(resp.body)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidDomainScopedTokenResponse(r)
|
||||
|
||||
def test_domain_scope_failed(self):
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'],
|
||||
domain_id=self.domain['id'])
|
||||
resp = self.post('/auth/tokens', body=auth_data,
|
||||
expected_status=401)
|
||||
self.assertEqual(resp.status, 401)
|
||||
self.post('/auth/tokens', body=auth_data, expected_status=401)
|
||||
|
||||
def test_auth_with_id(self):
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=self.user['password'])
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidToken(resp.body)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidUnscopedTokenResponse(r)
|
||||
|
||||
token = resp.getheader('X-Subject-Token')
|
||||
headers = {'X-Subject-Token': resp.getheader('X-Subject-Token')}
|
||||
token = r.getheader('X-Subject-Token')
|
||||
headers = {'X-Subject-Token': r.getheader('X-Subject-Token')}
|
||||
|
||||
# test token auth
|
||||
auth_data = _build_authentication_request(token=token)
|
||||
resp = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidToken(resp.body)
|
||||
r = self.post('/auth/tokens', body=auth_data)
|
||||
self.assertValidUnscopedTokenResponse(r)
|
||||
|
||||
def test_invalid_user_id(self):
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=uuid.uuid4().hex,
|
||||
password=self.user['password'])
|
||||
self.post('/auth/tokens', body=auth_data, expected_status=401)
|
||||
|
||||
def test_invalid_user_name(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=uuid.uuid4().hex,
|
||||
user_domain_id=self.domain['id'],
|
||||
password=self.user['password'])
|
||||
self.post('/auth/tokens', body=auth_data, expected_status=401)
|
||||
|
||||
def test_invalid_domain_id(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_id=uuid.uuid4().hex,
|
||||
password=self.user['password'])
|
||||
self.post('/auth/tokens', body=auth_data, expected_status=401)
|
||||
|
||||
def test_invalid_domain_name(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=self.user['name'],
|
||||
user_domain_name=uuid.uuid4().hex,
|
||||
password=self.user['password'])
|
||||
self.post('/auth/tokens', body=auth_data, expected_status=401)
|
||||
|
||||
def test_invalid_password(self):
|
||||
auth_data = _build_authentication_request(
|
||||
user_id=self.user['id'],
|
||||
password=uuid.uuid4().hex)
|
||||
resp = self.post('/auth/tokens', body=auth_data,
|
||||
expected_status=401)
|
||||
self.assertEqual(resp.status, 401)
|
||||
|
||||
def test_invalid_username(self):
|
||||
auth_data = _build_authentication_request(
|
||||
username=uuid.uuid4().hex,
|
||||
password=self.user['password'])
|
||||
resp = self.post('/auth/tokens', body=auth_data,
|
||||
expected_status=401)
|
||||
self.assertEqual(resp.status, 401)
|
||||
self.post('/auth/tokens', body=auth_data, expected_status=401)
|
||||
|
||||
def test_remote_user(self):
|
||||
auth_data = _build_authentication_request(
|
||||
|
|
Loading…
Reference in New Issue