Handle 16-char non-uuid user IDs in payload

If a user_id just happens to be of 16 character length, this will
cause the convert_uuid_bytes_to_hex function to improperly return
a UUID value instead of the user_id string unconverted.

This patch modifies the payload to indicate whether the ID was in
fact a UUID and the attempt to convert to bytes was successful.

This change has effect on more than just user IDs.  It also resolves
potential issues with project IDs, group IDs, IDP IDs, and scope IDs.

Change-Id: Ia4a4f760d67d8bbc22759c48fc800aef016b84ed
Closes-Bug: #1497461
(cherry picked from commit 794e1510cc)
This commit is contained in:
Eric Brown 2015-09-21 18:24:11 -07:00
parent 2d198d7fe0
commit 9ec4e61940
2 changed files with 113 additions and 100 deletions

View File

@ -243,8 +243,7 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)
def test_unscoped_payload_with_non_uuid_user_id(self):
exp_user_id = 'someNonUuidUserId'
def _test_unscoped_payload_with_user_id(self, exp_user_id):
exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
@ -260,48 +259,40 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
def test_unscoped_payload_with_non_uuid_user_id(self):
self._test_unscoped_payload_with_user_id('someNonUuidUserId')
def test_unscoped_payload_with_16_char_non_uuid_user_id(self):
self._test_unscoped_payload_with_user_id('0123456789abcdef')
def _test_project_scoped_payload_with_ids(self, exp_user_id,
exp_project_id):
exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
payload = token_formatters.ProjectScopedPayload.assemble(
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
exp_audit_ids)
(user_id, methods, project_id, expires_at, audit_ids) = (
token_formatters.ProjectScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_methods, methods)
self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
def test_project_scoped_payload_with_non_uuid_user_id(self):
exp_user_id = 'someNonUuidUserId'
exp_methods = ['password']
exp_project_id = uuid.uuid4().hex
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
self._test_project_scoped_payload_with_ids('someNonUuidUserId',
'someNonUuidProjectId')
payload = token_formatters.ProjectScopedPayload.assemble(
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
exp_audit_ids)
def test_project_scoped_payload_with_16_char_non_uuid_user_id(self):
self._test_project_scoped_payload_with_ids('0123456789abcdef',
'0123456789abcdef')
(user_id, methods, project_id, expires_at, audit_ids) = (
token_formatters.ProjectScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_methods, methods)
self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
def test_project_scoped_payload_with_non_uuid_project_id(self):
exp_user_id = uuid.uuid4().hex
exp_methods = ['password']
exp_project_id = 'someNonUuidProjectId'
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
payload = token_formatters.ProjectScopedPayload.assemble(
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
exp_audit_ids)
(user_id, methods, project_id, expires_at, audit_ids) = (
token_formatters.ProjectScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_methods, methods)
self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
def test_domain_scoped_payload_with_non_uuid_user_id(self):
exp_user_id = 'someNonUuidUserId'
def _test_domain_scoped_payload_with_user_id(self, exp_user_id):
exp_methods = ['password']
exp_domain_id = uuid.uuid4().hex
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
@ -320,56 +311,45 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
def test_domain_scoped_payload_with_non_uuid_user_id(self):
self._test_domain_scoped_payload_with_user_id('nonUuidUserId')
def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self):
self._test_domain_scoped_payload_with_user_id('0123456789abcdef')
def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id):
exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
exp_trust_id = uuid.uuid4().hex
payload = token_formatters.TrustScopedPayload.assemble(
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
exp_audit_ids, exp_trust_id)
(user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
token_formatters.TrustScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_methods, methods)
self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)
def test_trust_scoped_payload_with_non_uuid_user_id(self):
exp_user_id = 'someNonUuidUserId'
exp_methods = ['password']
exp_project_id = uuid.uuid4().hex
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
exp_trust_id = uuid.uuid4().hex
self._test_trust_scoped_payload_with_ids('someNonUuidUserId',
'someNonUuidProjectId')
payload = token_formatters.TrustScopedPayload.assemble(
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
exp_audit_ids, exp_trust_id)
def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self):
self._test_trust_scoped_payload_with_ids('0123456789abcdef',
'0123456789abcdef')
(user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
token_formatters.TrustScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_methods, methods)
self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)
def test_trust_scoped_payload_with_non_uuid_project_id(self):
exp_user_id = uuid.uuid4().hex
exp_methods = ['password']
exp_project_id = 'someNonUuidProjectId'
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
exp_trust_id = uuid.uuid4().hex
payload = token_formatters.TrustScopedPayload.assemble(
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
exp_audit_ids, exp_trust_id)
(user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
token_formatters.TrustScopedPayload.disassemble(payload))
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_methods, methods)
self.assertEqual(exp_project_id, project_id)
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)
def test_federated_payload_with_non_uuid_ids(self):
exp_user_id = 'someNonUuidUserId'
def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id):
exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
exp_federated_info = {'group_ids': [{'id': exp_group_id}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
@ -391,6 +371,14 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_federated_info['protocol_id'],
federated_info['protocol_id'])
def test_federated_payload_with_non_uuid_ids(self):
self._test_federated_payload_with_ids('someNonUuidUserId',
'someNonUuidGroupId')
def test_federated_payload_with_16_char_non_uuid_ids(self):
self._test_federated_payload_with_ids('0123456789abcdef',
'0123456789abcdef')
def test_federated_project_scoped_payload(self):
exp_user_id = 'someNonUuidUserId'
exp_methods = ['token']

View File

@ -350,15 +350,16 @@ class BasePayload(object):
"""Attempt to convert value to bytes or return value.
:param value: value to attempt to convert to bytes
:returns: uuid value in bytes or value
:returns: tuple containing boolean indicating whether user_id was
stored as bytes and uuid value as bytes or the original value
"""
try:
return cls.convert_uuid_hex_to_bytes(value)
return (True, cls.convert_uuid_hex_to_bytes(value))
except ValueError:
# this might not be a UUID, depending on the situation (i.e.
# federation)
return value
return (False, value)
@classmethod
def attempt_convert_uuid_bytes_to_hex(cls, value):
@ -404,7 +405,9 @@ class UnscopedPayload(BasePayload):
audit_ids
"""
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
expires_at_str = cls._convert_int_to_time_string(payload[2])
audit_ids = list(map(provider.base64_encode, payload[3]))
@ -450,7 +453,9 @@ class DomainScopedPayload(BasePayload):
expires_at_str, and audit_ids
"""
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
try:
domain_id = cls.convert_uuid_bytes_to_hex(payload[2])
@ -498,9 +503,13 @@ class ProjectScopedPayload(BasePayload):
expires_at_str, and audit_ids
"""
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
project_id = cls.attempt_convert_uuid_bytes_to_hex(payload[2])
(is_stored_as_bytes, project_id) = payload[2]
if is_stored_as_bytes:
project_id = cls.attempt_convert_uuid_bytes_to_hex(project_id)
expires_at_str = cls._convert_int_to_time_string(payload[3])
audit_ids = list(map(provider.base64_encode, payload[4]))
@ -544,9 +553,13 @@ class TrustScopedPayload(BasePayload):
expires_at_str, audit_ids, and trust_id
"""
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
project_id = cls.attempt_convert_uuid_bytes_to_hex(payload[2])
(is_stored_as_bytes, project_id) = payload[2]
if is_stored_as_bytes:
project_id = cls.attempt_convert_uuid_bytes_to_hex(project_id)
expires_at_str = cls._convert_int_to_time_string(payload[3])
audit_ids = list(map(provider.base64_encode, payload[4]))
trust_id = cls.convert_uuid_bytes_to_hex(payload[5])
@ -564,7 +577,9 @@ class FederatedUnscopedPayload(BasePayload):
@classmethod
def unpack_group_id(cls, group_id_in_bytes):
group_id = cls.attempt_convert_uuid_bytes_to_hex(group_id_in_bytes)
(is_stored_as_bytes, group_id) = group_id_in_bytes
if is_stored_as_bytes:
group_id = cls.attempt_convert_uuid_bytes_to_hex(group_id)
return {'id': group_id}
@classmethod
@ -608,10 +623,14 @@ class FederatedUnscopedPayload(BasePayload):
"""
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
group_ids = list(map(cls.unpack_group_id, payload[2]))
idp_id = cls.attempt_convert_uuid_bytes_to_hex(payload[3])
(is_stored_as_bytes, idp_id) = payload[3]
if is_stored_as_bytes:
idp_id = cls.attempt_convert_uuid_bytes_to_hex(idp_id)
protocol_id = payload[4]
expires_at_str = cls._convert_int_to_time_string(payload[5])
audit_ids = list(map(provider.base64_encode, payload[6]))
@ -665,11 +684,17 @@ class FederatedScopedPayload(FederatedUnscopedPayload):
group IDs
"""
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
(is_stored_as_bytes, user_id) = payload[0]
if is_stored_as_bytes:
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
methods = auth_plugins.convert_integer_to_method_list(payload[1])
scope_id = cls.attempt_convert_uuid_bytes_to_hex(payload[2])
(is_stored_as_bytes, scope_id) = payload[2]
if is_stored_as_bytes:
scope_id = cls.attempt_convert_uuid_bytes_to_hex(scope_id)
group_ids = list(map(cls.unpack_group_id, payload[3]))
idp_id = cls.attempt_convert_uuid_bytes_to_hex(payload[4])
(is_stored_as_bytes, idp_id) = payload[4]
if is_stored_as_bytes:
idp_id = cls.attempt_convert_uuid_bytes_to_hex(idp_id)
protocol_id = payload[5]
expires_at_str = cls._convert_int_to_time_string(payload[6])
audit_ids = list(map(provider.base64_encode, payload[7]))