Drop Fernet token prefixes & add domain-scoped Fernet tokens

- Move the payload version (part of the plaintext token prefix) into the
  integrity verified portion of the token (the payload). This also drops
  the 'F', which doesn't serve a purpose with Fernet tokens as it does
  with token formats that can be validated offline (PKI, PKIZ). This
  requires a bunch of refactoring to move the responsibility of
  decrypting, unpacking, and disassembling the payload contents to the
  caller (the Provider).

- Add a domain-scoped payload format, identical to that for
  project-scoped tokens, just with a different version number. Better
  functional tests revealed that tests intended to exercise
  domain-scoped Fernet tokens, which didn't exist, should not have been
  passing.

- Remove remaining functional tests from the unit test suite
  (test_fernet_provider), and ensure that same coverage exists in the
  actual functional test suite (test_v3_auth). Several of the unit tests
  required heavy refactoring due to the refactoring required to support
  the first item above, so it was easier just to dump those tests in
  favor of better functional test coverage, which are agnostic to the
  implementation details.

Change-Id: I141f2707a391d46d9607710b30155b76de2f88f0
Closes-Bug: 1427485
Closes-Bug: 1428949
This commit is contained in:
Dolph Mathews 2015-03-05 22:01:53 +00:00 committed by Morgan Fainberg
parent c83f8920bf
commit a9fa7e315d
5 changed files with 322 additions and 361 deletions

View File

@ -542,6 +542,7 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
def load_sample_data(self):
"""Load Sample Data for Test Cases.
Two domains, domainA and domainB
Two users in domainA, userNormalA and userAdminA
One user in domainB, userAdminB
@ -4055,23 +4056,17 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
super(TestFernetTokenProvider, self).setUp()
self.setUpKeyRepository()
def _make_auth_request(self, auth_data, trust_token=False,
unscoped_token=False):
def _make_auth_request(self, auth_data):
resp = self.post('/auth/tokens', body=auth_data, expected_status=201)
token = resp.headers.get('X-Subject-Token')
if trust_token:
self.assertThat(token, matchers.StartsWith('F02'))
elif unscoped_token:
self.assertThat(token, matchers.StartsWith('F00'))
else:
self.assertThat(token, matchers.StartsWith('F01'))
self.assertLess(len(token), 255)
return token
def _get_unscoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
return self._make_auth_request(auth_data, unscoped_token=True)
return self._make_auth_request(auth_data)
def _get_project_scoped_token(self):
auth_data = self.build_authentication_request(
@ -4092,7 +4087,19 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
user_id=trustee_user['id'],
password=trustee_user['password'],
trust_id=trust['id'])
return self._make_auth_request(auth_data, trust_token=True)
return self._make_auth_request(auth_data)
def _validate_token(self, token, expected_status=200):
return self.get(
'/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=expected_status)
def _revoke_token(self, token, expected_status=204):
return self.delete(
'/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=expected_status)
def _set_user_enabled(self, user, enabled=True):
user['enabled'] = enabled
@ -4121,45 +4128,26 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
group='token',
provider='keystone.token.providers.fernet.Provider')
def test_authenticate_for_unscoped_token(self):
unscoped_token = self._get_unscoped_token()
self.assertThat(unscoped_token, matchers.StartsWith('F00'))
self.assertLess(len(unscoped_token), 255)
def test_validate_unscoped_token(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
unscoped_token = resp.headers.get('X-Subject-Token')
self.assertThat(unscoped_token, matchers.StartsWith('F00'))
headers = {'X-Subject-Token': unscoped_token}
self.get('/auth/tokens', headers=headers, expected_status=200)
unscoped_token = self._get_unscoped_token()
self._validate_token(unscoped_token)
def test_validate_tampered_unscoped_token_fails(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
resp = self.post('/auth/tokens', body=auth_data)
unscoped_token = resp.headers.get('X-Subject-Token')
self.assertThat(unscoped_token, matchers.StartsWith('F00'))
unscoped_token = self._get_unscoped_token()
tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
unscoped_token[50 + 32:])
headers = {'X-Subject-Token': tampered_token}
self.get('/auth/tokens', headers=headers, expected_status=401)
self._validate_token(tampered_token, expected_status=401)
def test_revoke_unscoped_token(self):
unscoped_token = self._get_unscoped_token()
headers = {'X-Subject-Token': unscoped_token}
self.get('/auth/tokens', headers=headers, expected_status=200)
self.delete('/auth/tokens', headers=headers, expected_status=204)
self.get('/auth/tokens', headers=headers, expected_status=404)
self._validate_token(unscoped_token)
self._revoke_token(unscoped_token)
self._validate_token(unscoped_token, expected_status=404)
def test_unscoped_token_is_invalid_after_disabling_user(self):
unscoped_token = self._get_unscoped_token()
headers = {'X-Subject-Token': unscoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(unscoped_token)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
@ -4169,9 +4157,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
unscoped_token = self._get_unscoped_token()
headers = {'X-Subject-Token': unscoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(unscoped_token)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
@ -4187,9 +4174,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
unscoped_token = self._get_unscoped_token()
headers = {'X-Subject-Token': unscoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(unscoped_token)
# Disable the user's domain
self.domain['enabled'] = False
self.resource_api.update_domain(self.domain['id'], self.domain)
@ -4200,9 +4186,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_unscoped_token_is_invalid_after_changing_user_password(self):
unscoped_token = self._get_unscoped_token()
headers = {'X-Subject-Token': unscoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(unscoped_token)
# Change user's password
self.user['password'] = 'Password1'
self.identity_api.update_user(self.user['id'], self.user)
@ -4211,37 +4196,26 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
self.token_provider_api.validate_token,
unscoped_token)
def test_authenticate_for_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token()
self.assertThat(project_scoped_token, matchers.StartsWith('F01'))
self.assertLess(len(project_scoped_token), 255)
def test_validate_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token()
self.assertThat(project_scoped_token, matchers.StartsWith('F01'))
headers = {'X-Subject-Token': project_scoped_token}
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(project_scoped_token)
def test_validate_tampered_project_scoped_token_fails(self):
project_scoped_token = self._get_project_scoped_token()
self.assertThat(project_scoped_token, matchers.StartsWith('F01'))
tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
project_scoped_token[50 + 32:])
headers = {'X-Subject-Token': tampered_token}
self.get('/auth/tokens', headers=headers, expected_status=401)
self._validate_token(tampered_token, expected_status=401)
def test_revoke_project_scoped_token(self):
project_scoped_token = self._get_project_scoped_token()
headers = {'X-Subject-Token': project_scoped_token}
self.get('/auth/tokens', headers=headers, expected_status=200)
self.delete('/auth/tokens', headers=headers, expected_status=204)
self.get('/auth/tokens', headers=headers, expected_status=404)
self._validate_token(project_scoped_token)
self._revoke_token(project_scoped_token)
self._validate_token(project_scoped_token, expected_status=404)
def test_project_scoped_token_is_invalid_after_disabling_user(self):
project_scoped_token = self._get_project_scoped_token()
headers = {'X-Subject-Token': project_scoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(project_scoped_token)
# Disable the user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
@ -4255,9 +4229,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
user_id=self.user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self._get_domain_scoped_token()
headers = {'X-Subject-Token': domain_scoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(domain_scoped_token)
# Disable user
self._set_user_enabled(self.user, enabled=False)
# Ensure validating a token for a disabled user fails
@ -4271,9 +4244,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
user_id=self.user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self._get_domain_scoped_token()
headers = {'X-Subject-Token': domain_scoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(domain_scoped_token)
# Delete access to domain
self.assignment_api.delete_grant(self.role['id'],
user_id=self.user['id'],
@ -4285,9 +4257,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_project_scoped_token_invalid_after_changing_user_password(self):
project_scoped_token = self._get_project_scoped_token()
headers = {'X-Subject-Token': project_scoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(project_scoped_token)
# Update user's password
self.user['password'] = 'Password1'
self.identity_api.update_user(self.user['id'], self.user)
@ -4298,9 +4269,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_project_scoped_token_invalid_after_disabling_project(self):
project_scoped_token = self._get_project_scoped_token()
headers = {'X-Subject-Token': project_scoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(project_scoped_token)
# Disable project
self.project['enabled'] = False
self.resource_api.update_project(self.project['id'], self.project)
@ -4315,9 +4285,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
user_id=self.user['id'],
domain_id=self.domain['id'])
domain_scoped_token = self._get_domain_scoped_token()
headers = {'X-Subject-Token': domain_scoped_token}
# Make sure the token is valid
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(domain_scoped_token)
# Disable domain
self.domain['enabled'] = False
self.resource_api.update_domain(self.domain['id'], self.domain)
@ -4334,41 +4303,30 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_validate_a_trust_scoped_token(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
self.assertThat(trust_scoped_token, matchers.StartsWith('F02'))
headers = {'X-Subject-Token': trust_scoped_token}
# Validate a trust scoped token
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(trust_scoped_token)
def test_validate_tampered_trust_scoped_token_fails(self):
trustee_user, trust = self._create_trust()
auth_data = self.build_authentication_request(
user_id=trustee_user['id'],
password=trustee_user['password'],
trust_id=trust['id'])
resp = self.post('/auth/tokens', body=auth_data, expected_status=201)
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
# Get a trust scoped token
trust_scoped_token = resp.headers.get('X-Subject-Token')
self.assertThat(trust_scoped_token, matchers.StartsWith('F02'))
tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
trust_scoped_token[50 + 32:])
headers = {'X-Subject-Token': tampered_token}
self.get('/auth/tokens', headers=headers, expected_status=401)
self._validate_token(tampered_token, expected_status=401)
def test_revoke_trust_scoped_token(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
headers = {'X-Subject-Token': trust_scoped_token}
# Validate a trust scoped token
self.get('/auth/tokens', headers=headers, expected_status=200)
self.delete('/auth/tokens', headers=headers, expected_status=204)
self.get('/auth/tokens', headers=headers, expected_status=404)
self._validate_token(trust_scoped_token)
self._revoke_token(trust_scoped_token)
self._validate_token(trust_scoped_token, expected_status=404)
def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
headers = {'X-Subject-Token': trust_scoped_token}
# Validate a trust scoped token
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(trust_scoped_token)
# Disable trustee
trustee_update_ref = dict(enabled=False)
@ -4381,9 +4339,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
headers = {'X-Subject-Token': trust_scoped_token}
# Validate a trust scoped token
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(trust_scoped_token)
# Change trustee's password
trustee_update_ref = dict(password='Password1')
self.identity_api.update_user(trustee_user['id'], trustee_update_ref)
@ -4395,10 +4352,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
self.assertThat(trust_scoped_token, matchers.StartsWith('F02'))
headers = {'X-Subject-Token': trust_scoped_token}
# Validate a trust scoped token
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(trust_scoped_token)
# Disable the trustor
trustor_update_ref = dict(enabled=False)
@ -4411,9 +4366,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
headers = {'X-Subject-Token': trust_scoped_token}
# Validate a trust scoped token
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(trust_scoped_token)
# Change trustor's password
trustor_update_ref = dict(password='Password1')
@ -4426,9 +4380,8 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase,
def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
trustee_user, trust = self._create_trust()
trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
headers = {'X-Subject-Token': trust_scoped_token}
# Validate a trust scoped token
self.get('/auth/tokens', headers=headers, expected_status=200)
self._validate_token(trust_scoped_token)
# Disable trustor's domain
self.domain['enabled'] = False

View File

@ -12,13 +12,13 @@
import base64
import datetime
import hashlib
import shutil
import tempfile
import uuid
from oslo_utils import timeutils
from keystone.common import config
from keystone import exception
from keystone.tests import unit as tests
from keystone.token.providers import fernet
@ -26,6 +26,9 @@ from keystone.token.providers.fernet import token_formatters
from keystone.token.providers.fernet import utils
CONF = config.CONF
class KeyRepositoryTestMixin(object):
def setUpKeyRepository(self):
directory = tempfile.mkdtemp()
@ -70,124 +73,119 @@ class TestFernetTokenProvider(tests.TestCase, KeyRepositoryTestMixin):
uuid.uuid4().hex)
class TestBaseTokenFormatter(tests.TestCase, KeyRepositoryTestMixin):
def setUp(self):
super(TestBaseTokenFormatter, self).setUp()
self.setUpKeyRepository()
self.formatter = token_formatters.BaseTokenFormatter()
class TestPayloads(tests.TestCase, KeyRepositoryTestMixin):
def test_uuid_hex_to_byte_conversions(self):
payload_cls = token_formatters.BasePayload
expected_hex_uuid = uuid.uuid4().hex
uuid_obj = uuid.UUID(expected_hex_uuid)
expected_uuid_in_bytes = uuid_obj.bytes
actual_uuid_in_bytes = self.formatter._convert_uuid_hex_to_bytes(
actual_uuid_in_bytes = payload_cls.convert_uuid_hex_to_bytes(
expected_hex_uuid)
self.assertEqual(expected_uuid_in_bytes, actual_uuid_in_bytes)
actual_hex_uuid = self.formatter._convert_uuid_bytes_to_hex(
actual_hex_uuid = payload_cls.convert_uuid_bytes_to_hex(
expected_uuid_in_bytes)
self.assertEqual(expected_hex_uuid, actual_hex_uuid)
def test_time_string_to_int_conversions(self):
payload_cls = token_formatters.BasePayload
expected_time_str = timeutils.isotime()
time_obj = timeutils.parse_isotime(expected_time_str)
expected_time_int = (
(timeutils.normalize_time(time_obj) -
datetime.datetime.utcfromtimestamp(0)).total_seconds())
actual_time_int = self.formatter._convert_time_string_to_int(
actual_time_int = payload_cls._convert_time_string_to_int(
expected_time_str)
self.assertEqual(expected_time_int, actual_time_int)
actual_time_str = self.formatter._convert_int_to_time_string(
actual_time_str = payload_cls._convert_int_to_time_string(
actual_time_int)
self.assertEqual(expected_time_str, actual_time_str)
class TestScopedTokenFormatter(tests.TestCase, KeyRepositoryTestMixin):
def setUp(self):
super(TestScopedTokenFormatter, self).setUp()
self.setUpKeyRepository()
self.formatter = token_formatters.ScopedTokenFormatter()
def test_token_encryption(self):
def test_unscoped_payload(self):
exp_user_id = uuid.uuid4().hex
exp_project_id = uuid.uuid4().hex
# All we are validating here is that the token is encrypted and
# decrypted properly, not the actual validity of token data.
exp_expires_at = timeutils.isotime(timeutils.utcnow())
exp_audit_ids = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
token = self.formatter.create_token(
payload = token_formatters.UnscopedPayload.assemble(
exp_user_id, exp_expires_at, exp_audit_ids)
(user_id, expires_at, audit_ids) = (
token_formatters.UnscopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
def test_project_scoped_payload(self):
exp_user_id = uuid.uuid4().hex
exp_project_id = uuid.uuid4().hex
exp_expires_at = timeutils.isotime(timeutils.utcnow())
exp_audit_ids = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
payload = token_formatters.ProjectScopedPayload.assemble(
exp_user_id, exp_project_id, exp_expires_at, exp_audit_ids)
(user_id, project_id, expires_at, audit_ids) = (
self.formatter.validate_token(token[len('F00'):]))
token_formatters.ProjectScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
def test_encrypted_token_is_under_255_characters(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
# All we are validating here is that the token is encrypted and
# decrypted properly, not the actual validity of token data.
encrypted_token = self.formatter.create_token(
user_id,
project_id,
timeutils.isotime(timeutils.utcnow()),
base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2])
self.assertLess(len(encrypted_token), 255)
def test_domain_scoped_payload(self):
exp_user_id = uuid.uuid4().hex
exp_domain_id = uuid.uuid4().hex
exp_expires_at = timeutils.isotime(timeutils.utcnow())
exp_audit_ids = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
payload = token_formatters.DomainScopedPayload.assemble(
exp_user_id, exp_domain_id, exp_expires_at, exp_audit_ids)
class TestCustomTokenFormatter(TestScopedTokenFormatter):
def setUp(self):
# bypassing the parent setUp because we want to set up our own custom
# token formatter
super(TestScopedTokenFormatter, self).setUp()
(user_id, domain_id, expires_at, audit_ids) = (
token_formatters.DomainScopedPayload.disassemble(payload))
class HandRolledCrypto(object):
"""Hold my beer and watch this."""
def encrypt(self, plaintext):
"""Adds security by obscurity."""
checksum = hashlib.md5(plaintext).hexdigest()
return '%s-%s' % (plaintext[::-1], checksum)
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_domain_id, domain_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
def decrypt(self, ciphertext):
"""Removes obscurity to validate security."""
try:
ciphertext, checksum = ciphertext.rsplit('-', 1)
except ValueError:
raise exception.Unauthorized()
plaintext = ciphertext[::-1]
if checksum != hashlib.md5(plaintext).hexdigest():
raise exception.Unauthorized()
return plaintext
def test_domain_scoped_payload_with_default_domain(self):
exp_user_id = uuid.uuid4().hex
exp_domain_id = CONF.identity.default_domain_id
exp_expires_at = timeutils.isotime(timeutils.utcnow())
exp_audit_ids = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
class CustomTokenFormatter(token_formatters.ScopedTokenFormatter):
@property
def crypto(self):
"""Customize the cryptography implementation."""
return HandRolledCrypto()
payload = token_formatters.DomainScopedPayload.assemble(
exp_user_id, exp_domain_id, exp_expires_at, exp_audit_ids)
self.formatter = CustomTokenFormatter()
(user_id, domain_id, expires_at, audit_ids) = (
token_formatters.DomainScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_domain_id, domain_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
class TestTrustTokenFormatter(tests.TestCase, KeyRepositoryTestMixin):
def setUp(self):
super(TestTrustTokenFormatter, self).setUp()
self.setUpKeyRepository()
self.formatter = token_formatters.TrustTokenFormatter()
def test_trust_scoped_payload(self):
exp_user_id = uuid.uuid4().hex
exp_project_id = uuid.uuid4().hex
exp_expires_at = timeutils.isotime(timeutils.utcnow())
exp_audit_ids = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
exp_trust_id = uuid.uuid4().hex
def test_encrypted_trust_token_is_under_255_characters(self):
user_id = uuid.uuid4().hex
project_id = uuid.uuid4().hex
payload = token_formatters.TrustScopedPayload.assemble(
exp_user_id, exp_project_id, exp_expires_at, exp_audit_ids,
exp_trust_id)
encrypted_token = self.formatter.create_token(
user_id,
project_id,
timeutils.isotime(timeutils.utcnow()),
base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2],
uuid.uuid4().hex)
self.assertLess(len(encrypted_token), 255)
(user_id, project_id, expires_at, audit_ids, trust_id) = (
token_formatters.TrustScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)

View File

@ -14,6 +14,7 @@ import base64
import datetime
import struct
import msgpack
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
@ -22,7 +23,6 @@ from keystone.common import dependency
from keystone import exception
from keystone.i18n import _
from keystone.token.providers import common
from keystone.token.providers.fernet import format_map as fm
from keystone.token.providers.fernet import token_formatters as tf
@ -41,10 +41,7 @@ class Provider(common.BaseProvider):
def __init__(self, *args, **kwargs):
super(Provider, self).__init__(*args, **kwargs)
self.token_format_map = {
fm.UNSCOPED_TOKEN_PREFIX: tf.UnscopedTokenFormatter(),
fm.SCOPED_TOKEN_PREFIX: tf.ScopedTokenFormatter(),
fm.TRUST_TOKEN_PREFIX: tf.TrustTokenFormatter()}
self.token_formatter = tf.TokenFormatter()
def needs_persistence(self):
"""Should the token be written to a backend."""
@ -104,31 +101,40 @@ class Provider(common.BaseProvider):
include_catalog=include_catalog,
audit_info=parent_audit_id)
token_format = None
if trust:
token_format = self.token_format_map[fm.TRUST_TOKEN_PREFIX]
token_id = token_format.create_token(
version = tf.TrustScopedPayload.version
payload = tf.TrustScopedPayload.assemble(
user_id,
project_id,
token_data['token']['expires_at'],
token_data['token']['audit_ids'],
token_data['token']['OS-TRUST:trust']['id'])
elif domain_id is None and project_id is None:
token_format = self.token_format_map[fm.UNSCOPED_TOKEN_PREFIX]
token_id = token_format.create_token(
user_id,
token_data['token']['expires_at'],
token_data['token']['audit_ids'])
else:
token_format = self.token_format_map[fm.SCOPED_TOKEN_PREFIX]
token_id = token_format.create_token(
elif project_id:
version = tf.ProjectScopedPayload.version
payload = tf.ProjectScopedPayload.assemble(
user_id,
project_id,
token_data['token']['expires_at'],
token_data['token']['audit_ids'])
elif domain_id:
version = tf.DomainScopedPayload.version
payload = tf.DomainScopedPayload.assemble(
user_id,
domain_id,
token_data['token']['expires_at'],
token_data['token']['audit_ids'])
else:
version = tf.UnscopedPayload.version
payload = tf.UnscopedPayload.assemble(
user_id,
token_data['token']['expires_at'],
token_data['token']['audit_ids'])
return token_id, token_data
versioned_payload = (version,) + payload
serialized_payload = msgpack.packb(versioned_payload)
token = self.token_formatter.pack(serialized_payload)
return token, token_data
def validate_v2_token(self, token_ref):
"""Validate a V2 formatted token.
@ -158,47 +164,46 @@ class Provider(common.BaseProvider):
return created_at
def validate_v3_token(self, token_ref):
def validate_v3_token(self, token):
"""Validate a V3 formatted token.
:param token_ref: a string describing the token to validate
:param token: a string describing the token to validate
:returns: the token data
:raises: keystone.exception.Unauthorized
"""
# Determine and look up the token formatter.
token_prefix_length = len(fm.SCOPED_TOKEN_PREFIX)
token_format = token_ref[:token_prefix_length]
token_formatter = self.token_format_map.get(token_format)
if not token_formatter:
# If the token_format is not recognized, raise Unauthorized.
raise exception.Unauthorized(_(
'This is not a recognized Fernet formatted token: %s') %
token_format)
# If we recognize the token format pass the rest of the token
# string to the correct token_formatter.
token_str = token_ref[token_prefix_length:]
serialized_payload = self.token_formatter.unpack(token)
versioned_payload = msgpack.unpackb(serialized_payload)
version, payload = versioned_payload[0], versioned_payload[1:]
# depending on the formatter, these may or may not be defined
domain_id = None
project_id = None
trust_ref = None
if token_format == fm.UNSCOPED_TOKEN_PREFIX:
if version == tf.UnscopedPayload.version:
(user_id, expires_at, audit_ids) = (
token_formatter.validate_token(token_str))
elif token_format == fm.SCOPED_TOKEN_PREFIX:
tf.UnscopedPayload.disassemble(payload))
elif version == tf.DomainScopedPayload.version:
(user_id, domain_id, expires_at, audit_ids) = (
tf.DomainScopedPayload.disassemble(payload))
elif version == tf.ProjectScopedPayload.version:
(user_id, project_id, expires_at, audit_ids) = (
token_formatter.validate_token(token_str))
elif token_format == fm.TRUST_TOKEN_PREFIX:
tf.ProjectScopedPayload.disassemble(payload))
elif version == tf.TrustScopedPayload.version:
(user_id, project_id, expires_at, audit_ids, trust_id) = (
token_formatter.validate_token(token_str))
tf.TrustScopedPayload.disassemble(payload))
trust_ref = self.trust_api.get_trust(trust_id)
else:
# If the token_format is not recognized, raise Unauthorized.
raise exception.Unauthorized(_(
'This is not a recognized Fernet payload version: %s') %
version)
# rather than appearing in the payload, the creation time is encoded
# into the token format itself
created_at = Provider._creation_time(token_str)
created_at = Provider._creation_time(token)
return self.v3_token_data_helper.get_token_data(
user_id,

View File

@ -1,15 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
UNSCOPED_TOKEN_PREFIX = 'F00'
SCOPED_TOKEN_PREFIX = 'F01'
TRUST_TOKEN_PREFIX = 'F02'

View File

@ -14,14 +14,12 @@ import datetime
import uuid
from cryptography import fernet
import msgpack
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import six
from keystone import exception
from keystone.token.providers.fernet import format_map as fm
from keystone.token.providers.fernet import utils
@ -29,14 +27,8 @@ CONF = cfg.CONF
LOG = log.getLogger(__name__)
class BaseTokenFormatter(object):
"""Base object for token formatters to inherit."""
# NOTE(lbragstad): Each class the inherits BaseTokenFormatter should define
# the `token_format` and `token_version`. The combination of the two should
# be unique.
token_format = None
token_version = None
class TokenFormatter(object):
"""Packs and unpacks payloads into tokens for transport."""
@property
def crypto(self):
@ -59,7 +51,44 @@ class BaseTokenFormatter(object):
fernet_instances = [fernet.Fernet(key) for key in utils.load_keys()]
return fernet.MultiFernet(fernet_instances)
def _convert_uuid_hex_to_bytes(self, uuid_string):
def pack(self, payload):
"""Pack a payload for transport as a token."""
return self.crypto.encrypt(payload)
def unpack(self, token):
"""Unpack a token, and validate the payload."""
try:
return self.crypto.decrypt(token, ttl=CONF.token.expiration)
except fernet.InvalidToken as e:
raise exception.Unauthorized(six.text_type(e))
class BasePayload(object):
# each payload variant should have a unique version
version = None
@classmethod
def assemble(cls, *args):
"""Assemble the payload of a token.
:param args: whatever data should go into the payload
:returns: the payload of a token
"""
raise NotImplementedError()
@classmethod
def disassemble(cls, payload):
"""Disassemble an unscoped payload into the component data.
:param payload: this variant of payload
:returns: a tuple of the payloads component data
"""
raise NotImplementedError()
@classmethod
def convert_uuid_hex_to_bytes(cls, uuid_string):
"""Compress UUID formatted strings to bytes.
:param uuid_string: uuid string to compress to bytes
@ -72,7 +101,8 @@ class BaseTokenFormatter(object):
uuid_obj = uuid.UUID(uuid_string)
return uuid_obj.bytes
def _convert_uuid_bytes_to_hex(self, uuid_byte_string):
@classmethod
def convert_uuid_bytes_to_hex(cls, uuid_byte_string):
"""Generate uuid.hex format based on byte string.
:param uuid_byte_string: uuid string to generate from
@ -85,7 +115,8 @@ class BaseTokenFormatter(object):
uuid_obj = uuid.UUID(bytes=uuid_byte_string)
return uuid_obj.hex
def _convert_time_string_to_int(self, time_string):
@classmethod
def _convert_time_string_to_int(cls, time_string):
"""Convert a time formatted string to a timestamp integer.
:param time_string: time formatted string
@ -96,7 +127,8 @@ class BaseTokenFormatter(object):
return (timeutils.normalize_time(time_object) -
datetime.datetime.utcfromtimestamp(0)).total_seconds()
def _convert_int_to_time_string(self, time_int):
@classmethod
def _convert_int_to_time_string(cls, time_int):
"""Convert a timestamp integer to a string.
:param time_int: integer representing timestamp
@ -106,172 +138,160 @@ class BaseTokenFormatter(object):
time_object = datetime.datetime.utcfromtimestamp(int(time_int))
return timeutils.isotime(time_object)
def pack(self, payload):
"""Pack a payload for transport."""
msgpacked = msgpack.packb(payload)
encrypted = self.crypto.encrypt(msgpacked)
# Tack the token format on to the encrypted_token
return self.token_format + encrypted
class UnscopedPayload(BasePayload):
version = 0
def unpack(self, token_string):
"""Unpack and validate a payload."""
try:
decrypted_token = self.crypto.decrypt(token_string)
except fernet.InvalidToken as e:
raise exception.Unauthorized(six.text_type(e))
# TODO(lbragstad): catch msgpack errors here
payload = msgpack.unpackb(decrypted_token)
return payload
class UnscopedTokenFormatter(BaseTokenFormatter):
token_format = fm.UNSCOPED_TOKEN_PREFIX
def create_token(self, user_id, expires_at, audit_ids):
"""Create a unscoped token.
@classmethod
def assemble(cls, user_id, expires_at, audit_ids):
"""Assemble the payload of an unscoped token.
:param user_id: identifier of the user in the token request
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:returns: a string representing the token
:returns: the payload of an unscoped token
"""
b_user_id = self._convert_uuid_hex_to_bytes(user_id)
expires_at_int = self._convert_time_string_to_int(expires_at)
payload = (b_user_id, expires_at_int, audit_ids)
b_user_id = cls.convert_uuid_hex_to_bytes(user_id)
expires_at_int = cls._convert_time_string_to_int(expires_at)
return (b_user_id, expires_at_int, audit_ids)
return self.pack(payload)
@classmethod
def disassemble(cls, payload):
"""Disassemble an unscoped payload into the component data.
def validate_token(self, token_string):
"""Validate an unscoped token.
:param token_string: a string representing the token
:returns: a tuple containing the user_id, issued_at_str,
expires_at_str, audit_ids
:param payload: the payload of an unscoped token
:return: a tuple containing the user_id, expires_at, and audit_ids
"""
payload = self.unpack(token_string)
# Rebuild and retrieve token information from the token string
b_user_id = payload[0]
expires_at_ts = payload[1]
user_id = cls.convert_uuid_bytes_to_hex(payload[0])
expires_at_str = cls._convert_int_to_time_string(payload[1])
audit_ids = payload[2]
user_id = self._convert_uuid_bytes_to_hex(b_user_id)
expires_at_str = self._convert_int_to_time_string(expires_at_ts)
return (user_id, expires_at_str, audit_ids)
class ScopedTokenFormatter(BaseTokenFormatter):
class DomainScopedPayload(BasePayload):
version = 1
token_format = fm.SCOPED_TOKEN_PREFIX
@classmethod
def assemble(cls, user_id, domain_id, expires_at, audit_ids):
"""Assemble the payload of a domain-scoped token.
def create_token(self, user_id, project_id, expires_at, audit_ids):
"""Create a standard formatted token.
:param user_id: ID of the user in the token request
:param domain_id: ID of the domain to scope to
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:returns: the payload of a domain-scoped token
"""
b_user_id = cls.convert_uuid_hex_to_bytes(user_id)
try:
b_domain_id = cls.convert_uuid_hex_to_bytes(domain_id)
except ValueError:
# the default domain ID is configurable, and probably isn't a UUID
if domain_id == CONF.identity.default_domain_id:
b_domain_id = domain_id
else:
raise
expires_at_int = cls._convert_time_string_to_int(expires_at)
return (b_user_id, b_domain_id, expires_at_int, audit_ids)
@classmethod
def disassemble(cls, payload):
"""Disassemble a payload into the component data.
:param payload: the payload of a token
:return: a tuple containing the user_id, domain_id, expires_at_str, and
audit_ids
"""
user_id = cls.convert_uuid_bytes_to_hex(payload[0])
try:
domain_id = cls.convert_uuid_bytes_to_hex(payload[1])
except ValueError:
# the default domain ID is configurable, and probably isn't a UUID
if payload[1] == CONF.identity.default_domain_id:
domain_id = payload[1]
else:
raise
expires_at_str = cls._convert_int_to_time_string(payload[2])
audit_ids = payload[3]
return (user_id, domain_id, expires_at_str, audit_ids)
class ProjectScopedPayload(BasePayload):
version = 2
@classmethod
def assemble(cls, user_id, project_id, expires_at, audit_ids):
"""Assemble the payload of a project-scoped token.
:param user_id: ID of the user in the token request
:param project_id: ID of the project to scope to
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:returns: a string representing the token
:returns: the payload of a project-scoped token
"""
expires_at_int = self._convert_time_string_to_int(expires_at)
b_user_id = self._convert_uuid_hex_to_bytes(user_id)
if project_id:
b_scope_id = self._convert_uuid_hex_to_bytes(project_id)
payload = (b_user_id, b_scope_id, expires_at_int, audit_ids)
else:
payload = (b_user_id, expires_at_int, audit_ids)
b_user_id = cls.convert_uuid_hex_to_bytes(user_id)
b_scope_id = cls.convert_uuid_hex_to_bytes(project_id)
expires_at_int = cls._convert_time_string_to_int(expires_at)
return (b_user_id, b_scope_id, expires_at_int, audit_ids)
return self.pack(payload)
@classmethod
def disassemble(cls, payload):
"""Disassemble a payload into the component data.
def validate_token(self, token_string):
"""Validate a F00 formatted token.
:param token_string: a string representing the token
:returns: a tuple containing the user_id, project_id, issued_at_str,
expires_at_str, and audit_ids
:param payload: the payload of a token
:return: a tuple containing the user_id, project_id, expires_at_str,
and audit_ids
"""
payload = self.unpack(token_string)
# Rebuild and retrieve token information from the token string
b_user_id = payload[0]
b_project_id = None
if isinstance(payload[1], str):
b_project_id = payload[1]
expires_at_ts = payload[2]
audit_ids = payload[3]
else:
expires_at_ts = payload[1]
audit_ids = payload[2]
# Uncompress the IDs
user_id = self._convert_uuid_bytes_to_hex(b_user_id)
project_id = None
if b_project_id:
project_id = self._convert_uuid_bytes_to_hex(b_project_id)
# Generate created at and expires at times
expires_at_str = self._convert_int_to_time_string(expires_at_ts)
user_id = cls.convert_uuid_bytes_to_hex(payload[0])
project_id = cls.convert_uuid_bytes_to_hex(payload[1])
expires_at_str = cls._convert_int_to_time_string(payload[2])
audit_ids = payload[3]
return (user_id, project_id, expires_at_str, audit_ids)
class TrustTokenFormatter(BaseTokenFormatter):
class TrustScopedPayload(BasePayload):
version = 3
token_format = fm.TRUST_TOKEN_PREFIX
def create_token(self, user_id, project_id, expires_at, audit_ids,
trust_id):
"""Create a trust formatted token.
@classmethod
def assemble(cls, user_id, project_id, expires_at, audit_ids, trust_id):
"""Assemble the payload of a trust-scoped token.
:param user_id: ID of the user in the token request
:param project_id: ID of the project to scope to
:param expires_at: datetime of the token's expiration
:param audit_ids: list of the token's audit IDs
:param trust_id: ID of the trust in effect
:returns: a string representing the token
:returns: the payload of a trust-scoped token
"""
expires_at_int = self._convert_time_string_to_int(expires_at)
b_user_id = self._convert_uuid_hex_to_bytes(user_id)
b_project_id = self._convert_uuid_hex_to_bytes(project_id)
b_trust_id = self._convert_uuid_hex_to_bytes(trust_id)
payload = (b_user_id, b_project_id, b_trust_id, expires_at_int,
audit_ids)
b_user_id = cls.convert_uuid_hex_to_bytes(user_id)
b_project_id = cls.convert_uuid_hex_to_bytes(project_id)
b_trust_id = cls.convert_uuid_hex_to_bytes(trust_id)
expires_at_int = cls._convert_time_string_to_int(expires_at)
return self.pack(payload)
return (b_user_id, b_project_id, expires_at_int, b_trust_id, audit_ids)
def validate_token(self, token_string):
"""Validate a trust formatted token.
@classmethod
def disassemble(cls, payload):
"""Validate a trust-based payload.
:param token_string: a string representing the token
:returns: a tuple containing the user_id, project_id, issued_at_str,
expires_at_str, audit_ids, and trust_id
:returns: a tuple containing the user_id, project_id, expires_at_str,
audit_ids, and trust_id
"""
payload = self.unpack(token_string)
# Rebuild and retrieve token information from the token string
b_user_id = payload[0]
b_project_id = payload[1]
b_trust_id = payload[2]
expires_at_ts = payload[3]
user_id = cls.convert_uuid_bytes_to_hex(payload[0])
project_id = cls.convert_uuid_bytes_to_hex(payload[1])
expires_at_str = cls._convert_int_to_time_string(payload[2])
trust_id = cls.convert_uuid_bytes_to_hex(payload[3])
audit_ids = payload[4]
# Uncompress the IDs
user_id = self._convert_uuid_bytes_to_hex(b_user_id)
project_id = self._convert_uuid_bytes_to_hex(b_project_id)
trust_id = self._convert_uuid_bytes_to_hex(b_trust_id)
# Generate created at and expires at times
expires_at_str = self._convert_int_to_time_string(expires_at_ts)
return (user_id, project_id, expires_at_str, audit_ids, trust_id)