Remove unused statements in matches

With recent changes [1], in list_events in the sql backend there is no
longer a need to use repetitive statements. list_events already prunes
out rows based off user_id, project_id, audit_id and issued_at so we
can remove these subsequent statements in matches.
In order to test this, the _assertToken methods needed to call the
list_events method directly. The old implementation ignores
list_events and instead makes a list where it can "add_event" and
"remove_event" which bypasses testing the actual changes made in
list_events since the tests assume list_events would have just sent
back the entire list anyway rather than what it does now which is
filter events based off the token

[1] 9e84371461
Related-Bug: 1524030
Change-Id: I8cb111df733f826df7aabf70359cc849a70f914b
This commit is contained in:
Richard Avelar 2016-11-03 16:32:36 +00:00 committed by “Richard
parent a93d03e04f
commit 852a5186b8
2 changed files with 151 additions and 173 deletions

View File

@ -169,13 +169,6 @@ def matches(event, token_values):
# that the token is still valid and short-circuits the
# rest of the logic.
# The token has three attributes that can match the user_id.
if event.user_id is not None and event.user_id not in (
token_values['user_id'],
token_values['trustor_id'],
token_values['trustee_id'],):
return False
# The token has two attributes that can match the domain_id.
if event.domain_id is not None and event.domain_id not in(
token_values['identity_domain_id'],
@ -188,10 +181,6 @@ def matches(event, token_values):
# If an event specifies an attribute name, but it does not match, the token
# is not revoked.
if event.project_id is not None and event.project_id not in (
token_values['project_id'],):
return False
if event.expires_at is not None and event.expires_at not in (
token_values['expires_at'],):
return False
@ -208,10 +197,6 @@ def matches(event, token_values):
token_values['access_token_id'],):
return False
if event.audit_id is not None and event.audit_id not in (
token_values['audit_id'],):
return False
if event.audit_chain_id is not None and event.audit_chain_id not in (
token_values['audit_chain_id'],):
return False
@ -220,9 +205,6 @@ def matches(event, token_values):
token_values['roles']):
return False
if token_values['issued_at'] > event.issued_before:
return False
return True

View File

@ -51,6 +51,43 @@ def _sample_blank_token():
return token_data
def _sample_data():
user_ids = []
project_ids = []
role_ids = []
for i in range(0, 3):
user_ids.append(uuid.uuid4().hex)
project_ids.append(uuid.uuid4().hex)
role_ids.append(uuid.uuid4().hex)
# For testing purposes, create 3 project tokens with a different user_id,
# role_id, and project_id which will be used to verify that revoking by
# grant on certain user_id, project_id, and role_id pairs leaves these
# project_tokens unrevoked if only one of the revoked columns are matched
# but not all of them as the expected behavior dictates
project_tokens = []
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[1]
project_tokens[i]['project_id'] = project_ids[0]
project_tokens[i]['roles'] = [role_ids[0]]
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[0]
project_tokens[i]['project_id'] = project_ids[1]
project_tokens[i]['roles'] = [role_ids[0]]
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[0]
project_tokens[i]['project_id'] = project_ids[0]
project_tokens[i]['roles'] = [role_ids[1]]
return user_ids, project_ids, role_ids, project_tokens
def _matches(event, token_values):
"""See if the token matches the revocation event.
@ -116,14 +153,22 @@ def _matches(event, token_values):
class RevokeTests(object):
def _assertTokenRevoked(self, events, token_data):
backend = sql.Revoke()
if events:
self.assertTrue(revoke_model.is_revoked(events, token_data),
'Token should be revoked')
return self.assertTrue(
revoke_model.is_revoked(events, token_data),
'Token should be revoked')
revoke_model.is_revoked(backend.list_events(token=token_data),
token_data), 'Token should be revoked')
def _assertTokenNotRevoked(self, events, token_data):
backend = sql.Revoke()
if events:
self.assertTrue(revoke_model.is_revoked(events, token_data),
'Token should be revoked')
return self.assertFalse(
revoke_model.is_revoked(events, token_data),
'Token should not be revoked')
revoke_model.is_revoked(backend.list_events(token=token_data),
token_data), 'Token should not be revoked')
def test_list(self):
self.revoke_api.revoke_by_user(user_id=1)
@ -339,6 +384,108 @@ class RevokeTests(object):
self.assertEqual(
1, len(revocation_backend.list_events(token=fourth_token)))
def _user_field_test(self, field_name):
token = _sample_blank_token()
token[field_name] = uuid.uuid4().hex
self.revoke_api.revoke_by_user(user_id=token[field_name])
self._assertTokenRevoked(None, token)
token2 = _sample_blank_token()
token2[field_name] = uuid.uuid4().hex
self._assertTokenNotRevoked(None, token2)
def test_revoke_by_user(self):
self._user_field_test('user_id')
def test_revoke_by_user_matches_trustee(self):
self._user_field_test('trustee_id')
def test_revoke_by_user_matches_trustor(self):
self._user_field_test('trustor_id')
def test_revoke_by_audit_id(self):
token = _sample_blank_token()
# Audit ID and Audit Chain ID are populated with the same value
# if the token is an original token
token['audit_id'] = uuid.uuid4().hex
token['audit_chain_id'] = token['audit_id']
self.revoke_api.revoke_by_audit_id(audit_id=token['audit_id'])
self._assertTokenRevoked(None, token)
token2 = _sample_blank_token()
token2['audit_id'] = uuid.uuid4().hex
token2['audit_chain_id'] = token2['audit_id']
self._assertTokenNotRevoked(None, token2)
def test_by_project_grant(self):
user_ids, project_ids, role_ids, project_tokens = _sample_data()
token1 = _sample_blank_token()
token1['roles'] = role_ids[0]
token1['user_id'] = user_ids[0]
token1['project_id'] = project_ids[0]
token2 = _sample_blank_token()
token2['roles'] = role_ids[1]
token2['user_id'] = user_ids[1]
token2['project_id'] = project_ids[1]
token3 = _sample_blank_token()
token3['roles'] = [role_ids[0],
role_ids[1],
role_ids[2]]
token3['user_id'] = user_ids[2]
token3['project_id'] = project_ids[2]
# Check that all tokens are revoked at the start
self._assertTokenNotRevoked(None, token1)
self._assertTokenNotRevoked(None, token2)
self._assertTokenNotRevoked(None, token3)
for token in project_tokens:
self._assertTokenNotRevoked(None, token)
self.revoke_api.revoke_by_grant(role_id=role_ids[0],
user_id=user_ids[0],
project_id=project_ids[0])
# Only the first token should be revoked
self._assertTokenRevoked(None, token1)
self._assertTokenNotRevoked(None, token2)
self._assertTokenNotRevoked(None, token3)
for token in project_tokens:
self._assertTokenNotRevoked(None, token)
self.revoke_api.revoke_by_grant(role_id=role_ids[1],
user_id=user_ids[1],
project_id=project_ids[1])
# Tokens 1 and 2 should be revoked now
self._assertTokenRevoked(None, token1)
self._assertTokenRevoked(None, token2)
self._assertTokenNotRevoked(None, token3)
for token in project_tokens:
self._assertTokenNotRevoked(None, token)
# test that multiple roles with a single user and project get revoked
# and invalidate token3
self.revoke_api.revoke_by_grant(role_id=role_ids[0],
user_id=user_ids[2],
project_id=project_ids[2])
self.revoke_api.revoke_by_grant(role_id=role_ids[1],
user_id=user_ids[2],
project_id=project_ids[2])
self.revoke_api.revoke_by_grant(role_id=role_ids[2],
user_id=user_ids[2],
project_id=project_ids[2])
# Tokens 1, 2, and 3 should now be revoked leaving project_tokens
# unrevoked.
self._assertTokenRevoked(None, token1)
self._assertTokenRevoked(None, token2)
self._assertTokenRevoked(None, token3)
for token in project_tokens:
self._assertTokenNotRevoked(None, token)
@mock.patch.object(timeutils, 'utcnow')
def test_expired_events_are_removed(self, mock_utcnow):
def _sample_token_values():
@ -413,46 +560,6 @@ class RevokeListTests(unit.TestCase):
super(RevokeListTests, self).setUp()
self.events = []
self.revoke_events = list()
self._sample_data()
def _sample_data(self):
user_ids = []
project_ids = []
role_ids = []
for i in range(0, 3):
user_ids.append(uuid.uuid4().hex)
project_ids.append(uuid.uuid4().hex)
role_ids.append(uuid.uuid4().hex)
project_tokens = []
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[0]
project_tokens[i]['project_id'] = project_ids[0]
project_tokens[i]['roles'] = [role_ids[1]]
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[1]
project_tokens[i]['project_id'] = project_ids[0]
project_tokens[i]['roles'] = [role_ids[0]]
i = len(project_tokens)
project_tokens.append(_sample_blank_token())
project_tokens[i]['user_id'] = user_ids[0]
project_tokens[i]['project_id'] = project_ids[1]
project_tokens[i]['roles'] = [role_ids[0]]
token_to_revoke = _sample_blank_token()
token_to_revoke['user_id'] = user_ids[0]
token_to_revoke['project_id'] = project_ids[0]
token_to_revoke['roles'] = [role_ids[0]]
self.project_tokens = project_tokens
self.user_ids = user_ids
self.project_ids = project_ids
self.role_ids = role_ids
self.token_to_revoke = token_to_revoke
def _assertTokenRevoked(self, token_data):
self.assertTrue(any([_matches(e, token_data) for e in self.events]))
@ -471,13 +578,6 @@ class RevokeListTests(unit.TestCase):
self.revoke_events,
revoke_model.RevokeEvent(user_id=user_id))
def _revoke_by_audit_id(self, audit_id):
event = add_event(
self.revoke_events,
revoke_model.RevokeEvent(audit_id=audit_id))
self.events.append(event)
return event
def _revoke_by_audit_chain_id(self, audit_chain_id, project_id=None,
domain_id=None):
event = add_event(
@ -500,17 +600,6 @@ class RevokeListTests(unit.TestCase):
self.events.append(event)
return event
def _revoke_by_grant(self, role_id, user_id=None,
domain_id=None, project_id=None):
event = add_event(
self.revoke_events,
revoke_model.RevokeEvent(user_id=user_id,
role_id=role_id,
domain_id=domain_id,
project_id=project_id))
self.events.append(event)
return event
def _revoke_by_user_and_project(self, user_id, project_id):
event = add_event(self.revoke_events,
revoke_model.RevokeEvent(project_id=project_id,
@ -537,48 +626,6 @@ class RevokeListTests(unit.TestCase):
revoke_model.RevokeEvent(domain_id=domain_id))
self.events.append(event)
def _user_field_test(self, field_name):
user_id = uuid.uuid4().hex
event = self._revoke_by_user(user_id)
self.events.append(event)
token_data_u1 = _sample_blank_token()
token_data_u1[field_name] = user_id
self._assertTokenRevoked(token_data_u1)
token_data_u2 = _sample_blank_token()
token_data_u2[field_name] = uuid.uuid4().hex
self._assertTokenNotRevoked(token_data_u2)
remove_event(self.revoke_events, event)
self.events.remove(event)
self._assertTokenNotRevoked(token_data_u1)
def test_revoke_by_user(self):
self._user_field_test('user_id')
def test_revoke_by_user_matches_trustee(self):
self._user_field_test('trustee_id')
def test_revoke_by_user_matches_trustor(self):
self._user_field_test('trustor_id')
def test_revoke_by_audit_id(self):
audit_id = common.build_audit_info(parent_audit_id=None)[0]
token_data_1 = _sample_blank_token()
# Audit ID and Audit Chain ID are populated with the same value
# if the token is an original token
token_data_1['audit_id'] = audit_id
token_data_1['audit_chain_id'] = audit_id
event = self._revoke_by_audit_id(audit_id)
self._assertTokenRevoked(token_data_1)
audit_id_2 = common.build_audit_info(parent_audit_id=audit_id)[0]
token_data_2 = _sample_blank_token()
token_data_2['audit_id'] = audit_id_2
token_data_2['audit_chain_id'] = audit_id
self._assertTokenNotRevoked(token_data_2)
self.remove_event(event)
self._assertTokenNotRevoked(token_data_1)
def test_revoke_by_audit_chain_id(self):
audit_id = common.build_audit_info(parent_audit_id=None)[0]
token_data_1 = _sample_blank_token()
@ -603,57 +650,6 @@ class RevokeListTests(unit.TestCase):
self.events.remove(event)
remove_event(self.revoke_events, event)
def test_by_project_grant(self):
token_to_revoke = self.token_to_revoke
tokens = self.project_tokens
self._assertTokenNotRevoked(token_to_revoke)
for token in tokens:
self._assertTokenNotRevoked(token)
event = self._revoke_by_grant(role_id=self.role_ids[0],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._assertTokenRevoked(token_to_revoke)
for token in tokens:
self._assertTokenNotRevoked(token)
self.remove_event(event)
self._assertTokenNotRevoked(token_to_revoke)
for token in tokens:
self._assertTokenNotRevoked(token)
token_to_revoke['roles'] = [self.role_ids[0],
self.role_ids[1],
self.role_ids[2]]
event = self._revoke_by_grant(role_id=self.role_ids[0],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._assertTokenRevoked(token_to_revoke)
self.remove_event(event)
self._assertTokenNotRevoked(token_to_revoke)
event = self._revoke_by_grant(role_id=self.role_ids[1],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._assertTokenRevoked(token_to_revoke)
self.remove_event(event)
self._assertTokenNotRevoked(token_to_revoke)
self._revoke_by_grant(role_id=self.role_ids[0],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._revoke_by_grant(role_id=self.role_ids[1],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._revoke_by_grant(role_id=self.role_ids[2],
user_id=self.user_ids[0],
project_id=self.project_ids[0])
self._assertTokenRevoked(token_to_revoke)
def test_by_project_and_user_and_role(self):
user_id1 = uuid.uuid4().hex
user_id2 = uuid.uuid4().hex