Use freezegun to increment clock in test_v3_assignment

This commit prepares the tests in keystone/tests/unit/test_v3_assignment.py for
the switch to make Fernet the default token provider. Since Fernet doesn't
support sub-second precision it is possible to get the wrong response when
using the token API within the same second as a revocation event. We can either
introduce a sleep (which slows down our tests) or mock the system clock.

We can use freezegun to mock the system clock. This commit uses freezegun to
increment the clock by one second in cases that fail with the Fernet provider.

This fix was originally a part of https://review.openstack.org/#/c/258650 but
this is an attempt to break 258650 into smaller, more reviewable, pieces.

Co-Authored-By: Raildo Mascena <raildo@lsd.ufcg.edu.br>
Co-Authored-By: Adam Young <ayound@redhat.com>

Change-Id: I2604376f63cd84c2a3d1a640dfcfbc29e5682c73
Partial-Bug: 1561054
This commit is contained in:
Lance Bragstad 2016-07-18 18:52:11 +00:00
parent 72e6196d54
commit 6bcc03ff1e
1 changed files with 185 additions and 153 deletions

View File

@ -10,9 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import random
import uuid
import freezegun
from six.moves import http_client
from six.moves import range
from testtools import matchers
@ -145,26 +147,33 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.put(member_url, expected_status=http_client.NOT_FOUND)
def test_crud_user_domain_role_grants(self):
collection_url = (
'/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': self.domain_id,
'user_id': self.user['id']})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id}
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
collection_url = (
'/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': self.domain_id,
'user_id': self.user['id']})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id}
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.delete(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.delete(member_url)
# NOTE(lbragstad): Make sure we wait a second before we ask for the
# roles. This ensures the token we use isn't considered revoked
# because it was issued within the same second as a revocation
# event.
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
def test_crud_user_domain_role_grants_no_user(self):
"""Grant role on a domain to a user that doesn't exist.
@ -185,26 +194,33 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.put(member_url, expected_status=http_client.NOT_FOUND)
def test_crud_group_project_role_grants(self):
collection_url = (
'/projects/%(project_id)s/groups/%(group_id)s/roles' % {
'project_id': self.project_id,
'group_id': self.group_id})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id}
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
collection_url = (
'/projects/%(project_id)s/groups/%(group_id)s/roles' % {
'project_id': self.project_id,
'group_id': self.group_id})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id}
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.delete(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.delete(member_url)
# NOTE(lbragstad): Make sure we wait a second before we ask for the
# roles. This ensures the token we use isn't considered revoked
# because it was issued within the same second as a revocation
# event.
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
def test_crud_group_project_role_grants_no_group(self):
"""Grant role on a project to a group that doesn't exist.
@ -226,26 +242,33 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.put(member_url, expected_status=http_client.NOT_FOUND)
def test_crud_group_domain_role_grants(self):
collection_url = (
'/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
'domain_id': self.domain_id,
'group_id': self.group_id})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id}
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
collection_url = (
'/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
'domain_id': self.domain_id,
'group_id': self.group_id})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id}
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=self.role,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.delete(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
self.delete(member_url)
# NOTE(lbragstad): Make sure we wait a second before we ask for the
# roles. This ensures the token we use isn't considered revoked
# because it was issued within the same second as a revocation
# event.
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
def test_crud_group_domain_role_grants_no_group(self):
"""Grant role on a domain to a group that doesn't exist.
@ -314,37 +337,41 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
The revocation should be independently to the presence
of the revoke API.
"""
# creates grant from group on project.
self.assignment_api.create_grant(role_id=self.role['id'],
project_id=self.project['id'],
group_id=self.group['id'])
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
# creates grant from group on project.
self.assignment_api.create_grant(role_id=self.role['id'],
project_id=self.project['id'],
group_id=self.group['id'])
# adds user to the group.
self.identity_api.add_user_to_group(user_id=self.user['id'],
group_id=self.group['id'])
# adds user to the group.
self.identity_api.add_user_to_group(user_id=self.user['id'],
group_id=self.group['id'])
# creates a token for the user
auth_body = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
token_resp = self.post('/auth/tokens', body=auth_body)
token = token_resp.headers.get('x-subject-token')
# creates a token for the user
auth_body = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
token_resp = self.post('/auth/tokens', body=auth_body)
token = token_resp.headers.get('x-subject-token')
# validates the returned token; it should be valid.
self.head('/auth/tokens',
headers={'x-subject-token': token},
expected_status=http_client.OK)
# validates the returned token; it should be valid.
self.head('/auth/tokens',
headers={'x-subject-token': token},
expected_status=http_client.OK)
# revokes the grant from group on project.
self.assignment_api.delete_grant(role_id=self.role['id'],
project_id=self.project['id'],
group_id=self.group['id'])
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
# revokes the grant from group on project.
self.assignment_api.delete_grant(role_id=self.role['id'],
project_id=self.project['id'],
group_id=self.group['id'])
# validates the same token again; it should not longer be valid.
self.head('/auth/tokens',
headers={'x-subject-token': token},
expected_status=http_client.NOT_FOUND)
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
# validates the same token again; it should not longer be valid.
self.head('/auth/tokens',
headers={'x-subject-token': token},
expected_status=http_client.NOT_FOUND)
@unit.skip_if_cache_disabled('assignment')
def test_delete_grant_from_user_and_project_invalidate_cache(self):
@ -494,86 +521,91 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
been removed
"""
# Since the default fixtures already assign some roles to the
# user it creates, we also need a new user that will not have any
# existing assignments
user1 = unit.new_user_ref(domain_id=self.domain['id'])
user1 = self.identity_api.create_user(user1)
time = datetime.datetime.utcnow()
with freezegun.freeze_time(time) as frozen_datetime:
# Since the default fixtures already assign some roles to the
# user it creates, we also need a new user that will not have any
# existing assignments
user1 = unit.new_user_ref(domain_id=self.domain['id'])
user1 = self.identity_api.create_user(user1)
collection_url = '/role_assignments'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r,
resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
existing_assignments = len(r.result.get('role_assignments'))
collection_url = '/role_assignments'
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r, resource_url=collection_url)
self.head(collection_url, expected_status=http_client.OK)
existing_assignments = len(r.result.get('role_assignments'))
# Now add one of each of the four types of assignment, making sure
# that we get them all back.
gd_entity = self.build_role_assignment_entity(domain_id=self.domain_id,
group_id=self.group_id,
role_id=self.role_id)
self.put(gd_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments + 1,
resource_url=collection_url)
self.assertRoleAssignmentInListResponse(r, gd_entity)
self.head(collection_url, expected_status=http_client.OK)
# Now add one of each of the four types of assignment, making sure
# that we get them all back.
gd_entity = self.build_role_assignment_entity(
domain_id=self.domain_id,
group_id=self.group_id,
role_id=self.role_id)
self.put(gd_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments + 1,
resource_url=collection_url)
self.assertRoleAssignmentInListResponse(r, gd_entity)
self.head(collection_url, expected_status=http_client.OK)
ud_entity = self.build_role_assignment_entity(domain_id=self.domain_id,
user_id=user1['id'],
role_id=self.role_id)
self.put(ud_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments + 2,
resource_url=collection_url)
self.assertRoleAssignmentInListResponse(r, ud_entity)
self.head(collection_url, expected_status=http_client.OK)
ud_entity = self.build_role_assignment_entity(
domain_id=self.domain_id,
user_id=user1['id'],
role_id=self.role_id)
self.put(ud_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments + 2,
resource_url=collection_url)
self.assertRoleAssignmentInListResponse(r, ud_entity)
self.head(collection_url, expected_status=http_client.OK)
gp_entity = self.build_role_assignment_entity(
project_id=self.project_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gp_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments + 3,
resource_url=collection_url)
self.assertRoleAssignmentInListResponse(r, gp_entity)
self.head(collection_url, expected_status=http_client.OK)
gp_entity = self.build_role_assignment_entity(
project_id=self.project_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gp_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments + 3,
resource_url=collection_url)
self.assertRoleAssignmentInListResponse(r, gp_entity)
self.head(collection_url, expected_status=http_client.OK)
up_entity = self.build_role_assignment_entity(
project_id=self.project_id, user_id=user1['id'],
role_id=self.role_id)
self.put(up_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments + 4,
resource_url=collection_url)
self.assertRoleAssignmentInListResponse(r, up_entity)
self.head(collection_url, expected_status=http_client.OK)
up_entity = self.build_role_assignment_entity(
project_id=self.project_id, user_id=user1['id'],
role_id=self.role_id)
self.put(up_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments + 4,
resource_url=collection_url)
self.assertRoleAssignmentInListResponse(r, up_entity)
self.head(collection_url, expected_status=http_client.OK)
# Now delete the four we added and make sure they are removed
# from the collection.
# Now delete the four we added and make sure they are removed
# from the collection.
self.delete(gd_entity['links']['assignment'])
self.delete(ud_entity['links']['assignment'])
self.delete(gp_entity['links']['assignment'])
self.delete(up_entity['links']['assignment'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments,
resource_url=collection_url)
self.assertRoleAssignmentNotInListResponse(r, gd_entity)
self.assertRoleAssignmentNotInListResponse(r, ud_entity)
self.assertRoleAssignmentNotInListResponse(r, gp_entity)
self.assertRoleAssignmentNotInListResponse(r, up_entity)
self.head(collection_url, expected_status=http_client.OK)
self.delete(gd_entity['links']['assignment'])
self.delete(ud_entity['links']['assignment'])
self.delete(gp_entity['links']['assignment'])
self.delete(up_entity['links']['assignment'])
frozen_datetime.tick(delta=datetime.timedelta(seconds=1))
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(
r,
expected_length=existing_assignments,
resource_url=collection_url)
self.assertRoleAssignmentNotInListResponse(r, gd_entity)
self.assertRoleAssignmentNotInListResponse(r, ud_entity)
self.assertRoleAssignmentNotInListResponse(r, gp_entity)
self.assertRoleAssignmentNotInListResponse(r, up_entity)
self.head(collection_url, expected_status=http_client.OK)
def test_get_effective_role_assignments(self):
"""Call ``GET /role_assignments?effective``.