adding additional token commands
The ability to reissue tokens for a registration, and the ability to delete expired tokens. There is a matching client patch. Test to check these work. Change-Id: I8c68ae861aa126172435ad30580e9fbb3028d12d
This commit is contained in:
parent
7d3a6e8116
commit
b86c3ae8e7
|
@ -54,6 +54,10 @@ class Registration(models.Model):
|
|||
def actions(self):
|
||||
return self.action_set.order_by('order')
|
||||
|
||||
@property
|
||||
def tokens(self):
|
||||
return self.token_set.all()
|
||||
|
||||
def to_dict(self):
|
||||
actions = []
|
||||
for action in self.actions:
|
||||
|
|
115
api_v1/tests.py
115
api_v1/tests.py
|
@ -840,3 +840,118 @@ class APITests(APITestCase):
|
|||
url = "/api_v1/notification"
|
||||
response = self.client.get(url, headers=headers)
|
||||
self.assertEqual(response.data, [])
|
||||
|
||||
@mock.patch('base.models.IdentityManager', FakeManager)
|
||||
def test_token_expired_delete(self):
|
||||
"""
|
||||
test deleting of expired tokens.
|
||||
"""
|
||||
global temp_cache
|
||||
|
||||
user = mock.Mock()
|
||||
user.id = 'user_id'
|
||||
user.name = "test@example.com"
|
||||
user.email = "test@example.com"
|
||||
user.password = "test_password"
|
||||
|
||||
user2 = mock.Mock()
|
||||
user2.id = 'user_id2'
|
||||
user2.name = "test2@example.com"
|
||||
user2.email = "test2@example.com"
|
||||
user2.password = "test_password"
|
||||
|
||||
temp_cache = {
|
||||
'i': 0,
|
||||
'users': {user.name: user, user2.name: user2},
|
||||
'projects': {},
|
||||
'roles': {
|
||||
'Member': 'Member', 'admin': 'admin',
|
||||
'project_owner': 'project_owner'
|
||||
}
|
||||
}
|
||||
url = "/api_v1/reset"
|
||||
data = {'email': "test@example.com"}
|
||||
response = self.client.post(url, data, format='json')
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data, {'notes': ['created token']})
|
||||
|
||||
url = "/api_v1/reset"
|
||||
data = {'email': "test2@example.com"}
|
||||
response = self.client.post(url, data, format='json')
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data, {'notes': ['created token']})
|
||||
|
||||
tokens = Token.objects.all()
|
||||
|
||||
self.assertEqual(len(tokens), 2)
|
||||
|
||||
new_token = tokens[0]
|
||||
new_token.expires = timezone.now() - timedelta(hours=24)
|
||||
new_token.save()
|
||||
|
||||
headers = {
|
||||
'project_name': "test_project",
|
||||
'project_id': "test_project_id",
|
||||
'roles': "admin,Member",
|
||||
'username': "test@example.com",
|
||||
'user_id': "test_user_id",
|
||||
'authenticated': True
|
||||
}
|
||||
url = "/api_v1/token/"
|
||||
response = self.client.delete(url, format='json', headers=headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data,
|
||||
{'notes': ['Deleted all expired tokens.']})
|
||||
self.assertEqual(Token.objects.count(), 1)
|
||||
|
||||
@mock.patch('base.models.IdentityManager', FakeManager)
|
||||
def test_token_reissue(self):
|
||||
"""
|
||||
test for reissue of tokens
|
||||
"""
|
||||
global temp_cache
|
||||
|
||||
user = mock.Mock()
|
||||
user.id = 'user_id'
|
||||
user.name = "test@example.com"
|
||||
user.email = "test@example.com"
|
||||
user.password = "test_password"
|
||||
|
||||
temp_cache = {
|
||||
'i': 0,
|
||||
'users': {user.name: user},
|
||||
'projects': {},
|
||||
'roles': {
|
||||
'Member': 'Member', 'admin': 'admin',
|
||||
'project_owner': 'project_owner'
|
||||
}
|
||||
}
|
||||
url = "/api_v1/reset"
|
||||
data = {'email': "test@example.com"}
|
||||
response = self.client.post(url, data, format='json')
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data, {'notes': ['created token']})
|
||||
|
||||
registration = Registration.objects.all()[0]
|
||||
new_token = Token.objects.all()[0]
|
||||
|
||||
uuid = new_token.token
|
||||
|
||||
headers = {
|
||||
'project_name': "test_project",
|
||||
'project_id': "test_project_id",
|
||||
'roles': "admin,Member",
|
||||
'username': "test@example.com",
|
||||
'user_id': "test_user_id",
|
||||
'authenticated': True
|
||||
}
|
||||
url = "/api_v1/token/"
|
||||
data = {"registration": registration.uuid}
|
||||
response = self.client.post(url, data, format='json',
|
||||
headers=headers)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data,
|
||||
{'notes': ['Token reissued.']})
|
||||
self.assertEqual(Token.objects.count(), 1)
|
||||
new_token = Token.objects.all()[0]
|
||||
self.assertNotEquals(new_token.token, uuid)
|
||||
|
|
|
@ -95,6 +95,7 @@ def email_token(registration, token):
|
|||
}
|
||||
create_notification(registration, notes)
|
||||
# TODO(adriant): raise some error?
|
||||
# and surround calls to this function with try/except
|
||||
|
||||
try:
|
||||
message = "your token is: %s" % token.token
|
||||
|
@ -108,6 +109,8 @@ def email_token(registration, token):
|
|||
(e, registration.uuid))
|
||||
}
|
||||
create_notification(registration, notes)
|
||||
# TODO(adriant): raise some error?
|
||||
# and surround calls to this function with try/except
|
||||
|
||||
|
||||
def create_notification(registration, notes):
|
||||
|
@ -421,6 +424,47 @@ class TokenList(APIViewWithLogger):
|
|||
token_list.append(token.to_dict())
|
||||
return Response(token_list)
|
||||
|
||||
@admin
|
||||
def post(self, request, format=None):
|
||||
"""
|
||||
Reissue a token for an approved registration.
|
||||
|
||||
Clears other tokens for it.
|
||||
"""
|
||||
uuid = request.data.get('registration', None)
|
||||
if uuid is None:
|
||||
return Response(
|
||||
{'registration': ["This field is required.", ]},
|
||||
status=400)
|
||||
try:
|
||||
registration = Registration.objects.get(uuid=uuid)
|
||||
except Registration.DoesNotExist:
|
||||
return Response(
|
||||
{'notes': ['No registration with this id.']},
|
||||
status=404)
|
||||
if not registration.approved:
|
||||
return Response(
|
||||
{'notes': ['This registration has not been approved.']},
|
||||
status=400)
|
||||
|
||||
for token in registration.tokens:
|
||||
token.delete()
|
||||
|
||||
token = create_token(registration)
|
||||
email_token(registration, token)
|
||||
return Response(
|
||||
{'notes': ['Token reissued.']}, status=200)
|
||||
|
||||
@admin
|
||||
def delete(self, request, format=None):
|
||||
"""
|
||||
Delete all expired tokens.
|
||||
"""
|
||||
now = timezone.now()
|
||||
Token.objects.filter(expires__lt=now).delete()
|
||||
return Response(
|
||||
{'notes': ['Deleted all expired tokens.']}, status=200)
|
||||
|
||||
|
||||
class TokenDetail(APIViewWithLogger):
|
||||
|
||||
|
|
Loading…
Reference in New Issue