129 lines
4.5 KiB
Python
129 lines
4.5 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import os
|
|
|
|
from oslo_log import log as logging
|
|
from oslo_serialization import jsonutils
|
|
from oslo_utils import timeutils
|
|
|
|
from keystonemiddleware.auth_token import _exceptions as exc
|
|
from keystonemiddleware.i18n import _
|
|
|
|
_LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class Revocations(object):
|
|
_FILE_NAME = 'revoked.pem'
|
|
|
|
def __init__(self, timeout, signing_directory, identity_server,
|
|
cms_verify, log=_LOG):
|
|
self._cache_timeout = timeout
|
|
self._signing_directory = signing_directory
|
|
self._identity_server = identity_server
|
|
self._cms_verify = cms_verify
|
|
self._log = log
|
|
|
|
self._fetched_time_prop = None
|
|
self._list_prop = None
|
|
|
|
@property
|
|
def _fetched_time(self):
|
|
if not self._fetched_time_prop:
|
|
# If the fetched list has been written to disk, use its
|
|
# modification time.
|
|
file_path = self._signing_directory.calc_path(self._FILE_NAME)
|
|
if os.path.exists(file_path):
|
|
mtime = os.path.getmtime(file_path)
|
|
fetched_time = datetime.datetime.utcfromtimestamp(mtime)
|
|
# Otherwise the list will need to be fetched.
|
|
else:
|
|
fetched_time = datetime.datetime.min
|
|
self._fetched_time_prop = fetched_time
|
|
return self._fetched_time_prop
|
|
|
|
@_fetched_time.setter
|
|
def _fetched_time(self, value):
|
|
self._fetched_time_prop = value
|
|
|
|
def _fetch(self):
|
|
revocation_list_data = self._identity_server.fetch_revocation_list()
|
|
return self._cms_verify(revocation_list_data)
|
|
|
|
@property
|
|
def _list(self):
|
|
timeout = self._fetched_time + self._cache_timeout
|
|
list_is_current = timeutils.utcnow() < timeout
|
|
|
|
if list_is_current:
|
|
# Load the list from disk if required
|
|
if not self._list_prop:
|
|
self._list_prop = jsonutils.loads(
|
|
self._signing_directory.read_file(self._FILE_NAME))
|
|
else:
|
|
self._list = self._fetch()
|
|
return self._list_prop
|
|
|
|
@_list.setter
|
|
def _list(self, value):
|
|
"""Save a revocation list to memory and to disk.
|
|
|
|
:param value: A json-encoded revocation list
|
|
|
|
"""
|
|
self._list_prop = jsonutils.loads(value)
|
|
self._fetched_time = timeutils.utcnow()
|
|
self._signing_directory.write_file(self._FILE_NAME, value)
|
|
|
|
def _is_revoked(self, token_id):
|
|
"""Indicate whether the token_id appears in the revocation list."""
|
|
revoked_tokens = self._list.get('revoked', None)
|
|
if not revoked_tokens:
|
|
return False
|
|
|
|
revoked_ids = (x['id'] for x in revoked_tokens)
|
|
return token_id in revoked_ids
|
|
|
|
def _any_revoked(self, token_ids):
|
|
for token_id in token_ids:
|
|
if self._is_revoked(token_id):
|
|
return True
|
|
return False
|
|
|
|
def check(self, token_ids):
|
|
if self._any_revoked(token_ids):
|
|
self._log.debug('Token is marked as having been revoked')
|
|
raise exc.InvalidToken(_('Token has been revoked'))
|
|
|
|
def check_by_audit_id(self, audit_ids):
|
|
"""Check whether the audit_id appears in the revocation list.
|
|
|
|
:raises keystonemiddleware.auth_token._exceptions.InvalidToken:
|
|
if the audit ID(s) appear in the revocation list.
|
|
|
|
"""
|
|
revoked_tokens = self._list.get('revoked', None)
|
|
if not revoked_tokens:
|
|
# There's no revoked tokens, so nothing to do.
|
|
return
|
|
|
|
# The audit_id may not be present in the revocation events because
|
|
# earlier versions of the identity server didn't provide them.
|
|
revoked_ids = set(
|
|
x['audit_id'] for x in revoked_tokens if 'audit_id' in x)
|
|
for audit_id in audit_ids:
|
|
if audit_id in revoked_ids:
|
|
self._log.debug(
|
|
'Token is marked as having been revoked by audit id')
|
|
raise exc.InvalidToken(_('Token has been revoked'))
|