Merge "Add validate token for v3"

This commit is contained in:
Jenkins 2015-01-30 21:55:18 +00:00 committed by Gerrit Code Review
commit bf99f4c1ef
2 changed files with 106 additions and 7 deletions

View File

@ -12,12 +12,17 @@
import uuid
import testresources
from keystoneclient import access
from keystoneclient import exceptions
from keystoneclient.tests import client_fixtures
from keystoneclient.tests.v3 import utils
class TokenTests(utils.TestCase):
class TokenTests(utils.TestCase, testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def test_revoke_token_with_token_id(self):
token_id = uuid.uuid4().hex
@ -27,8 +32,8 @@ class TokenTests(utils.TestCase):
def test_revoke_token_with_access_info_instance(self):
token_id = uuid.uuid4().hex
examples = self.useFixture(client_fixtures.Examples())
token_ref = examples.TOKEN_RESPONSES[examples.v3_UUID_TOKEN_DEFAULT]
token_ref = self.examples.TOKEN_RESPONSES[
self.examples.v3_UUID_TOKEN_DEFAULT]
token = access.AccessInfoV3(token_id, token_ref['token'])
self.stub_url('DELETE', ['/auth/tokens'], status_code=204)
self.client.tokens.revoke_token(token)
@ -40,3 +45,66 @@ class TokenTests(utils.TestCase):
json=sample_revoked_response)
resp = self.client.tokens.get_revoked()
self.assertEqual(sample_revoked_response, resp)
def test_validate_token_with_token_id(self):
# Can validate a token passing a string token ID.
token_id = uuid.uuid4().hex
token_ref = self.examples.TOKEN_RESPONSES[
self.examples.v3_UUID_TOKEN_DEFAULT]
self.stub_url('GET', ['auth', 'tokens'],
headers={'X-Subject-Token': token_id, }, json=token_ref)
access_info = self.client.tokens.validate(token_id)
self.assertRequestHeaderEqual('X-Subject-Token', token_id)
self.assertIsInstance(access_info, access.AccessInfoV3)
self.assertEqual(token_id, access_info.auth_token)
def test_validate_token_with_access_info(self):
# Can validate a token passing an access info.
token_id = uuid.uuid4().hex
token_ref = self.examples.TOKEN_RESPONSES[
self.examples.v3_UUID_TOKEN_DEFAULT]
token = access.AccessInfoV3(token_id, token_ref['token'])
self.stub_url('GET', ['auth', 'tokens'],
headers={'X-Subject-Token': token_id, }, json=token_ref)
access_info = self.client.tokens.validate(token)
self.assertRequestHeaderEqual('X-Subject-Token', token_id)
self.assertIsInstance(access_info, access.AccessInfoV3)
self.assertEqual(token_id, access_info.auth_token)
def test_validate_token_invalid(self):
# When the token is invalid the server typically returns a 404.
token_id = uuid.uuid4().hex
self.stub_url('GET', ['auth', 'tokens'], status_code=404)
self.assertRaises(exceptions.NotFound,
self.client.tokens.validate, token_id)
def test_validate_token_catalog(self):
# Can validate a token and a catalog is requested by default.
token_id = uuid.uuid4().hex
token_ref = self.examples.TOKEN_RESPONSES[
self.examples.v3_UUID_TOKEN_DEFAULT]
self.stub_url('GET', ['auth', 'tokens'],
headers={'X-Subject-Token': token_id, }, json=token_ref)
access_info = self.client.tokens.validate(token_id)
self.assertQueryStringIs()
self.assertTrue(access_info.has_service_catalog())
def test_validate_token_nocatalog(self):
# Can validate a token and request no catalog.
token_id = uuid.uuid4().hex
token_ref = self.examples.TOKEN_RESPONSES[
self.examples.v3_UUID_TOKEN_UNSCOPED]
self.stub_url('GET', ['auth', 'tokens'],
headers={'X-Subject-Token': token_id, }, json=token_ref)
access_info = self.client.tokens.validate(token_id,
include_catalog=False)
self.assertQueryStringIs('nocatalog')
self.assertFalse(access_info.has_service_catalog())
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)

View File

@ -12,6 +12,14 @@
from keystoneclient import access
from keystoneclient import base
from keystoneclient import utils
def _calc_id(token):
if isinstance(token, access.AccessInfo):
return token.auth_token
return base.getid(token)
class TokenManager(object):
@ -28,10 +36,7 @@ class TokenManager(object):
token_id.
"""
if isinstance(token, access.AccessInfo):
token_id = token.auth_token
else:
token_id = base.getid(token)
token_id = _calc_id(token)
headers = {'X-Subject-Token': token_id}
return self._client.delete('/auth/tokens', headers=headers)
@ -45,3 +50,29 @@ class TokenManager(object):
resp, body = self._client.get('/auth/tokens/OS-PKI/revoked')
return body
@utils.positional.method(1)
def validate(self, token, include_catalog=True):
"""Validate a token.
:param token: Token to be validated. This can be an instance of
:py:class:`keystoneclient.access.AccessInfo` or a string
token_id.
:param include_catalog: If False, the response is requested to not
include the catalog.
:rtype: :py:class:`keystoneclient.access.AccessInfoV3`
"""
token_id = _calc_id(token)
headers = {'X-Subject-Token': token_id}
url = '/auth/tokens'
if not include_catalog:
url += '?nocatalog'
resp, body = self._client.get(url, headers=headers)
access_info = access.AccessInfo.factory(resp=resp, body=body)
return access_info