Handle 16-char non-uuid user IDs in payload
If a user_id just happens to be of 16 character length, this will
cause the convert_uuid_bytes_to_hex function to improperly return
a UUID value instead of the user_id string unconverted.
This patch modifies the payload to indicate whether the ID was in
fact a UUID and the attempt to convert to bytes was successful.
This change has effect on more than just user IDs. It also resolves
potential issues with project IDs, group IDs, IDP IDs, and scope IDs.
Change-Id: Ia4a4f760d67d8bbc22759c48fc800aef016b84ed
Closes-Bug: #1497461
(cherry picked from commit 794e1510cc
)
This commit is contained in:
parent
2d198d7fe0
commit
9ec4e61940
|
@ -243,8 +243,7 @@ class TestPayloads(unit.TestCase):
|
|||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
self.assertEqual(exp_trust_id, trust_id)
|
||||
|
||||
def test_unscoped_payload_with_non_uuid_user_id(self):
|
||||
exp_user_id = 'someNonUuidUserId'
|
||||
def _test_unscoped_payload_with_user_id(self, exp_user_id):
|
||||
exp_methods = ['password']
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
exp_audit_ids = [provider.random_urlsafe_str()]
|
||||
|
@ -260,48 +259,40 @@ class TestPayloads(unit.TestCase):
|
|||
self.assertEqual(exp_expires_at, expires_at)
|
||||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
|
||||
def test_unscoped_payload_with_non_uuid_user_id(self):
|
||||
self._test_unscoped_payload_with_user_id('someNonUuidUserId')
|
||||
|
||||
def test_unscoped_payload_with_16_char_non_uuid_user_id(self):
|
||||
self._test_unscoped_payload_with_user_id('0123456789abcdef')
|
||||
|
||||
def _test_project_scoped_payload_with_ids(self, exp_user_id,
|
||||
exp_project_id):
|
||||
exp_methods = ['password']
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
exp_audit_ids = [provider.random_urlsafe_str()]
|
||||
|
||||
payload = token_formatters.ProjectScopedPayload.assemble(
|
||||
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
|
||||
exp_audit_ids)
|
||||
|
||||
(user_id, methods, project_id, expires_at, audit_ids) = (
|
||||
token_formatters.ProjectScopedPayload.disassemble(payload))
|
||||
|
||||
self.assertEqual(exp_user_id, user_id)
|
||||
self.assertEqual(exp_methods, methods)
|
||||
self.assertEqual(exp_project_id, project_id)
|
||||
self.assertEqual(exp_expires_at, expires_at)
|
||||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
|
||||
def test_project_scoped_payload_with_non_uuid_user_id(self):
|
||||
exp_user_id = 'someNonUuidUserId'
|
||||
exp_methods = ['password']
|
||||
exp_project_id = uuid.uuid4().hex
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
exp_audit_ids = [provider.random_urlsafe_str()]
|
||||
self._test_project_scoped_payload_with_ids('someNonUuidUserId',
|
||||
'someNonUuidProjectId')
|
||||
|
||||
payload = token_formatters.ProjectScopedPayload.assemble(
|
||||
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
|
||||
exp_audit_ids)
|
||||
def test_project_scoped_payload_with_16_char_non_uuid_user_id(self):
|
||||
self._test_project_scoped_payload_with_ids('0123456789abcdef',
|
||||
'0123456789abcdef')
|
||||
|
||||
(user_id, methods, project_id, expires_at, audit_ids) = (
|
||||
token_formatters.ProjectScopedPayload.disassemble(payload))
|
||||
|
||||
self.assertEqual(exp_user_id, user_id)
|
||||
self.assertEqual(exp_methods, methods)
|
||||
self.assertEqual(exp_project_id, project_id)
|
||||
self.assertEqual(exp_expires_at, expires_at)
|
||||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
|
||||
def test_project_scoped_payload_with_non_uuid_project_id(self):
|
||||
exp_user_id = uuid.uuid4().hex
|
||||
exp_methods = ['password']
|
||||
exp_project_id = 'someNonUuidProjectId'
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
exp_audit_ids = [provider.random_urlsafe_str()]
|
||||
|
||||
payload = token_formatters.ProjectScopedPayload.assemble(
|
||||
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
|
||||
exp_audit_ids)
|
||||
|
||||
(user_id, methods, project_id, expires_at, audit_ids) = (
|
||||
token_formatters.ProjectScopedPayload.disassemble(payload))
|
||||
|
||||
self.assertEqual(exp_user_id, user_id)
|
||||
self.assertEqual(exp_methods, methods)
|
||||
self.assertEqual(exp_project_id, project_id)
|
||||
self.assertEqual(exp_expires_at, expires_at)
|
||||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
|
||||
def test_domain_scoped_payload_with_non_uuid_user_id(self):
|
||||
exp_user_id = 'someNonUuidUserId'
|
||||
def _test_domain_scoped_payload_with_user_id(self, exp_user_id):
|
||||
exp_methods = ['password']
|
||||
exp_domain_id = uuid.uuid4().hex
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
|
@ -320,56 +311,45 @@ class TestPayloads(unit.TestCase):
|
|||
self.assertEqual(exp_expires_at, expires_at)
|
||||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
|
||||
def test_domain_scoped_payload_with_non_uuid_user_id(self):
|
||||
self._test_domain_scoped_payload_with_user_id('nonUuidUserId')
|
||||
|
||||
def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self):
|
||||
self._test_domain_scoped_payload_with_user_id('0123456789abcdef')
|
||||
|
||||
def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id):
|
||||
exp_methods = ['password']
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
exp_audit_ids = [provider.random_urlsafe_str()]
|
||||
exp_trust_id = uuid.uuid4().hex
|
||||
|
||||
payload = token_formatters.TrustScopedPayload.assemble(
|
||||
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
|
||||
exp_audit_ids, exp_trust_id)
|
||||
|
||||
(user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
|
||||
token_formatters.TrustScopedPayload.disassemble(payload))
|
||||
|
||||
self.assertEqual(exp_user_id, user_id)
|
||||
self.assertEqual(exp_methods, methods)
|
||||
self.assertEqual(exp_project_id, project_id)
|
||||
self.assertEqual(exp_expires_at, expires_at)
|
||||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
self.assertEqual(exp_trust_id, trust_id)
|
||||
|
||||
def test_trust_scoped_payload_with_non_uuid_user_id(self):
|
||||
exp_user_id = 'someNonUuidUserId'
|
||||
exp_methods = ['password']
|
||||
exp_project_id = uuid.uuid4().hex
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
exp_audit_ids = [provider.random_urlsafe_str()]
|
||||
exp_trust_id = uuid.uuid4().hex
|
||||
self._test_trust_scoped_payload_with_ids('someNonUuidUserId',
|
||||
'someNonUuidProjectId')
|
||||
|
||||
payload = token_formatters.TrustScopedPayload.assemble(
|
||||
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
|
||||
exp_audit_ids, exp_trust_id)
|
||||
def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self):
|
||||
self._test_trust_scoped_payload_with_ids('0123456789abcdef',
|
||||
'0123456789abcdef')
|
||||
|
||||
(user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
|
||||
token_formatters.TrustScopedPayload.disassemble(payload))
|
||||
|
||||
self.assertEqual(exp_user_id, user_id)
|
||||
self.assertEqual(exp_methods, methods)
|
||||
self.assertEqual(exp_project_id, project_id)
|
||||
self.assertEqual(exp_expires_at, expires_at)
|
||||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
self.assertEqual(exp_trust_id, trust_id)
|
||||
|
||||
def test_trust_scoped_payload_with_non_uuid_project_id(self):
|
||||
exp_user_id = uuid.uuid4().hex
|
||||
exp_methods = ['password']
|
||||
exp_project_id = 'someNonUuidProjectId'
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
exp_audit_ids = [provider.random_urlsafe_str()]
|
||||
exp_trust_id = uuid.uuid4().hex
|
||||
|
||||
payload = token_formatters.TrustScopedPayload.assemble(
|
||||
exp_user_id, exp_methods, exp_project_id, exp_expires_at,
|
||||
exp_audit_ids, exp_trust_id)
|
||||
|
||||
(user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
|
||||
token_formatters.TrustScopedPayload.disassemble(payload))
|
||||
|
||||
self.assertEqual(exp_user_id, user_id)
|
||||
self.assertEqual(exp_methods, methods)
|
||||
self.assertEqual(exp_project_id, project_id)
|
||||
self.assertEqual(exp_expires_at, expires_at)
|
||||
self.assertEqual(exp_audit_ids, audit_ids)
|
||||
self.assertEqual(exp_trust_id, trust_id)
|
||||
|
||||
def test_federated_payload_with_non_uuid_ids(self):
|
||||
exp_user_id = 'someNonUuidUserId'
|
||||
def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id):
|
||||
exp_methods = ['password']
|
||||
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
|
||||
exp_audit_ids = [provider.random_urlsafe_str()]
|
||||
exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
|
||||
exp_federated_info = {'group_ids': [{'id': exp_group_id}],
|
||||
'idp_id': uuid.uuid4().hex,
|
||||
'protocol_id': uuid.uuid4().hex}
|
||||
|
||||
|
@ -391,6 +371,14 @@ class TestPayloads(unit.TestCase):
|
|||
self.assertEqual(exp_federated_info['protocol_id'],
|
||||
federated_info['protocol_id'])
|
||||
|
||||
def test_federated_payload_with_non_uuid_ids(self):
|
||||
self._test_federated_payload_with_ids('someNonUuidUserId',
|
||||
'someNonUuidGroupId')
|
||||
|
||||
def test_federated_payload_with_16_char_non_uuid_ids(self):
|
||||
self._test_federated_payload_with_ids('0123456789abcdef',
|
||||
'0123456789abcdef')
|
||||
|
||||
def test_federated_project_scoped_payload(self):
|
||||
exp_user_id = 'someNonUuidUserId'
|
||||
exp_methods = ['token']
|
||||
|
|
|
@ -350,15 +350,16 @@ class BasePayload(object):
|
|||
"""Attempt to convert value to bytes or return value.
|
||||
|
||||
:param value: value to attempt to convert to bytes
|
||||
:returns: uuid value in bytes or value
|
||||
:returns: tuple containing boolean indicating whether user_id was
|
||||
stored as bytes and uuid value as bytes or the original value
|
||||
|
||||
"""
|
||||
try:
|
||||
return cls.convert_uuid_hex_to_bytes(value)
|
||||
return (True, cls.convert_uuid_hex_to_bytes(value))
|
||||
except ValueError:
|
||||
# this might not be a UUID, depending on the situation (i.e.
|
||||
# federation)
|
||||
return value
|
||||
return (False, value)
|
||||
|
||||
@classmethod
|
||||
def attempt_convert_uuid_bytes_to_hex(cls, value):
|
||||
|
@ -404,7 +405,9 @@ class UnscopedPayload(BasePayload):
|
|||
audit_ids
|
||||
|
||||
"""
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
|
||||
(is_stored_as_bytes, user_id) = payload[0]
|
||||
if is_stored_as_bytes:
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
|
||||
methods = auth_plugins.convert_integer_to_method_list(payload[1])
|
||||
expires_at_str = cls._convert_int_to_time_string(payload[2])
|
||||
audit_ids = list(map(provider.base64_encode, payload[3]))
|
||||
|
@ -450,7 +453,9 @@ class DomainScopedPayload(BasePayload):
|
|||
expires_at_str, and audit_ids
|
||||
|
||||
"""
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
|
||||
(is_stored_as_bytes, user_id) = payload[0]
|
||||
if is_stored_as_bytes:
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
|
||||
methods = auth_plugins.convert_integer_to_method_list(payload[1])
|
||||
try:
|
||||
domain_id = cls.convert_uuid_bytes_to_hex(payload[2])
|
||||
|
@ -498,9 +503,13 @@ class ProjectScopedPayload(BasePayload):
|
|||
expires_at_str, and audit_ids
|
||||
|
||||
"""
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
|
||||
(is_stored_as_bytes, user_id) = payload[0]
|
||||
if is_stored_as_bytes:
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
|
||||
methods = auth_plugins.convert_integer_to_method_list(payload[1])
|
||||
project_id = cls.attempt_convert_uuid_bytes_to_hex(payload[2])
|
||||
(is_stored_as_bytes, project_id) = payload[2]
|
||||
if is_stored_as_bytes:
|
||||
project_id = cls.attempt_convert_uuid_bytes_to_hex(project_id)
|
||||
expires_at_str = cls._convert_int_to_time_string(payload[3])
|
||||
audit_ids = list(map(provider.base64_encode, payload[4]))
|
||||
|
||||
|
@ -544,9 +553,13 @@ class TrustScopedPayload(BasePayload):
|
|||
expires_at_str, audit_ids, and trust_id
|
||||
|
||||
"""
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
|
||||
(is_stored_as_bytes, user_id) = payload[0]
|
||||
if is_stored_as_bytes:
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
|
||||
methods = auth_plugins.convert_integer_to_method_list(payload[1])
|
||||
project_id = cls.attempt_convert_uuid_bytes_to_hex(payload[2])
|
||||
(is_stored_as_bytes, project_id) = payload[2]
|
||||
if is_stored_as_bytes:
|
||||
project_id = cls.attempt_convert_uuid_bytes_to_hex(project_id)
|
||||
expires_at_str = cls._convert_int_to_time_string(payload[3])
|
||||
audit_ids = list(map(provider.base64_encode, payload[4]))
|
||||
trust_id = cls.convert_uuid_bytes_to_hex(payload[5])
|
||||
|
@ -564,7 +577,9 @@ class FederatedUnscopedPayload(BasePayload):
|
|||
|
||||
@classmethod
|
||||
def unpack_group_id(cls, group_id_in_bytes):
|
||||
group_id = cls.attempt_convert_uuid_bytes_to_hex(group_id_in_bytes)
|
||||
(is_stored_as_bytes, group_id) = group_id_in_bytes
|
||||
if is_stored_as_bytes:
|
||||
group_id = cls.attempt_convert_uuid_bytes_to_hex(group_id)
|
||||
return {'id': group_id}
|
||||
|
||||
@classmethod
|
||||
|
@ -608,10 +623,14 @@ class FederatedUnscopedPayload(BasePayload):
|
|||
|
||||
"""
|
||||
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
|
||||
(is_stored_as_bytes, user_id) = payload[0]
|
||||
if is_stored_as_bytes:
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
|
||||
methods = auth_plugins.convert_integer_to_method_list(payload[1])
|
||||
group_ids = list(map(cls.unpack_group_id, payload[2]))
|
||||
idp_id = cls.attempt_convert_uuid_bytes_to_hex(payload[3])
|
||||
(is_stored_as_bytes, idp_id) = payload[3]
|
||||
if is_stored_as_bytes:
|
||||
idp_id = cls.attempt_convert_uuid_bytes_to_hex(idp_id)
|
||||
protocol_id = payload[4]
|
||||
expires_at_str = cls._convert_int_to_time_string(payload[5])
|
||||
audit_ids = list(map(provider.base64_encode, payload[6]))
|
||||
|
@ -665,11 +684,17 @@ class FederatedScopedPayload(FederatedUnscopedPayload):
|
|||
group IDs
|
||||
|
||||
"""
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(payload[0])
|
||||
(is_stored_as_bytes, user_id) = payload[0]
|
||||
if is_stored_as_bytes:
|
||||
user_id = cls.attempt_convert_uuid_bytes_to_hex(user_id)
|
||||
methods = auth_plugins.convert_integer_to_method_list(payload[1])
|
||||
scope_id = cls.attempt_convert_uuid_bytes_to_hex(payload[2])
|
||||
(is_stored_as_bytes, scope_id) = payload[2]
|
||||
if is_stored_as_bytes:
|
||||
scope_id = cls.attempt_convert_uuid_bytes_to_hex(scope_id)
|
||||
group_ids = list(map(cls.unpack_group_id, payload[3]))
|
||||
idp_id = cls.attempt_convert_uuid_bytes_to_hex(payload[4])
|
||||
(is_stored_as_bytes, idp_id) = payload[4]
|
||||
if is_stored_as_bytes:
|
||||
idp_id = cls.attempt_convert_uuid_bytes_to_hex(idp_id)
|
||||
protocol_id = payload[5]
|
||||
expires_at_str = cls._convert_int_to_time_string(payload[6])
|
||||
audit_ids = list(map(provider.base64_encode, payload[7]))
|
||||
|
|
Loading…
Reference in New Issue