External OAuth2.0 Authorization Server Support

Added the ability to authenticate using a system-scoped token and the
ability to authenticate using a cached token to the
external_oauth2_token filter.

Implements: blueprint enhance-oauth2-interoperability
Change-Id: I1fb4921faaafd5288d5909762ff5553e5e2475dc
This commit is contained in:
Yusuke Niimi 2023-07-14 07:06:27 +00:00
parent de15a610e1
commit 53b4cb21ad
4 changed files with 758 additions and 24 deletions

View File

@ -196,6 +196,181 @@ is not able to discover it.
oslo_config_project = nova
# oslo_config_file = /not_discoverable_location/nova.conf
Configuration for external authorization
----------------------------------------
If an external authorization server is used from Keystonemiddleware, the
configuration file settings for the main application must be changed. The
system supports 5 authentication methods, tls_client_auth, client_secret_basic,
client_secret_post, client_secret_jwt, and private_key_jwt, which are specified
in auth_method. The required config depends on the authentication method.
The two config file modifications required when using an external authorization
server are described below.
.. NOTE::
Settings for accepting https requests and mTLS connections depend on each
OpenStack service that uses Keystonemiddleware.
Change to use the ext_oauth2_token filter instead of authtoken:
.. code-block:: ini
[pipeline:main]
pipeline = ext_oauth2_token myService
[filter:ext_oauth2_token]
paste.filter_factory = keystonemiddleware.external_oauth2_token:filter_factory
Add the config group for external authentication server:
.. code-block:: ini
[ext_oauth2_auth]
# Required if identity server requires client certificate.
#certfile = <None>
# Required if identity server requires client private key.
#keyfile = <None>
# A PEM encoded Certificate Authority to use when verifying HTTPs
# connections. Defaults to system CAs.
#cafile = <None>
# Verify HTTPS connections.
#insecure = False
# Request timeout value for communicating with Identity API server.
#http_connect_timeout = <None>
# The endpoint for introspect API, it is used to verify that the OAuth 2.0
# access token is valid.
#introspect_endpoint = <None>
# The Audience should be the URL of the Authorization Server's Token
# Endpoint. The Authorization Server will verify that it is an intended
# audience for the token.
#audience = <None>
# The auth_method must use the authentication method specified by the
# Authorization Server. The system supports 5 authentication methods such
# as tls_client_auth, client_secret_basic, client_secret_post,
# client_secret_jwt, private_key_jwt.
#auth_method = client_secret_basic
# The OAuth 2.0 Client Identifier valid at the Authorization Server.
#client_id = <None>
# The OAuth 2.0 client secret. When the auth_method is client_secret_basic,
# client_secret_post, or client_secret_jwt, the value is used, and
# otherwise the value is ignored.
#client_secret = <None>
# If the access token generated by the Authorization Server is bound to the
# OAuth 2.0 certificate thumbprint, the value can be set to true, and then
# the keystone middleware will verify the thumbprint.
#thumbprint_verify = False
# The jwt_key_file must use the certificate key file which has been
# registered with the Authorization Server. When the auth_method is
# private_key_jwt, the value is used, and otherwise the value is ignored.
#jwt_key_file = <None>
# The jwt_algorithm must use the algorithm specified by the Authorization
# Server. When the auth_method is client_secret_jwt, this value is often
# set to HS256, when the auth_method is private_key_jwt, the value is often
# set to RS256, and otherwise the value is ignored.
#jwt_algorithm = <None>
# This value is used to calculate the expiration time. If after the
# expiration time, the access token can not be accepted. When the
# auth_method is client_secret_jwt or private_key_jwt, the value is used,
# and otherwise the value is ignored.
#jwt_bearer_time_out = 3600
# Specifies the method for obtaining the project ID that currently needs
# to be accessed.
#mapping_project_id = <None>
# Specifies the method for obtaining the project name that currently needs
# to be accessed.
#mapping_project_name = <None>
# Specifies the method for obtaining the project domain ID that currently
# needs to be accessed.
#mapping_project_domain_id = <None>
# Specifies the method for obtaining the project domain name that currently
# needs to be accessed.
#mapping_project_domain_name = <None>
# Specifies the method for obtaining the user ID.
#mapping_user_id = client_id
# Specifies the method for obtaining the user name.
#mapping_user_name = username
# Specifies the method for obtaining the domain ID which the user belongs.
#mapping_user_domain_id = <None>
# Specifies the method for obtaining the domain name which the user
# belongs.
#mapping_user_domain_name = <None>
# Specifies the method for obtaining the list of roles in a project or
# domain owned by the user.
#mapping_roles = <None>
# Specifies the method for obtaining the scope information indicating
# whether a token is system-scoped.
#mapping_system_scope = <None>
# Specifies the method for obtaining the token expiration time.
#mapping_expires_at = <None>
# Optionally specify a list of memcached server(s) to use for caching.
# If left undefined, tokens will instead be cached in-process.
#memcached_servers = <None>
# In order to prevent excessive effort spent validating tokens, the
# middleware caches previously-seen tokens for a configurable duration
# (in seconds). Set to -1 to disable caching completely.
#token_cache_time = 300
# (Optional) If defined, indicate whether token data should be
# authenticated or authenticated and encrypted. If MAC, token data is
# authenticated (with HMAC) in the cache. If ENCRYPT, token data is
# encrypted and authenticated in the cache. If the value is not one of
# these options or empty, auth_token will raise an exception on
# initialization.
#memcache_security_strategy = <None>
# (Optional, mandatory if memcache_security_strategy is defined)
# This string is used for key derivation.
#memcache_secret_key = <None>
# (Optional) Number of seconds memcached server is considered dead before
# it is tried again.
#memcache_pool_dead_retry = 5 * 60
# (Optional) Maximum total number of open connections to every memcached
# server.
#memcache_pool_maxsize = 10
# (Optional) Socket timeout in seconds for communicating with a memcached
# server.
#memcache_pool_socket_timeout = 3
# (Optional) Number of seconds a connection to memcached is held unused in
# the pool before it is closed.
#memcache_pool_unused_timeout = 60
# (Optional) Number of seconds that an operation will wait to get a
# memcached client connection from the pool.
#memcache_pool_conn_get_timeout = 10
# (Optional) Use the advanced (eventlet safe) memcached client pool.
#memcache_use_advanced_pool = True
Improving response time
-----------------------

View File

@ -32,6 +32,7 @@ from keystoneauth1 import loading
from keystoneauth1.loading import session as session_loading
from keystonemiddleware._common import config
from keystonemiddleware.auth_token import _cache
from keystonemiddleware.exceptions import ConfigurationError
from keystonemiddleware.exceptions import KeystoneMiddlewareException
from keystonemiddleware.i18n import _
@ -124,6 +125,62 @@ _EXTERNAL_AUTH2_OPTS = [
cfg.StrOpt('mapping_roles',
help='Specifies the method for obtaining the list of roles in '
'a project or domain owned by the user.'),
cfg.StrOpt('mapping_system_scope',
help='Specifies the method for obtaining the scope information '
'indicating whether a token is system-scoped.'),
cfg.StrOpt('mapping_expires_at',
help='Specifies the method for obtaining the token expiration '
'time.'),
cfg.ListOpt('memcached_servers',
deprecated_name='memcache_servers',
help='Optionally specify a list of memcached server(s) to '
'use for caching. If left undefined, tokens will '
'instead be cached in-process.'),
cfg.IntOpt('token_cache_time',
default=300,
help='In order to prevent excessive effort spent validating '
'tokens, the middleware caches previously-seen tokens '
'for a configurable duration (in seconds). Set to -1 to '
'disable caching completely.'),
cfg.StrOpt('memcache_security_strategy',
default='None',
choices=('None', 'MAC', 'ENCRYPT'),
ignore_case=True,
help='(Optional) If defined, indicate whether token data '
'should be authenticated or authenticated and encrypted. '
'If MAC, token data is authenticated (with HMAC) in the '
'cache. If ENCRYPT, token data is encrypted and '
'authenticated in the cache. If the value is not one of '
'these options or empty, auth_token will raise an '
'exception on initialization.'),
cfg.StrOpt('memcache_secret_key',
secret=True,
help='(Optional, mandatory if memcache_security_strategy is '
'defined) This string is used for key derivation.'),
cfg.IntOpt('memcache_pool_dead_retry',
default=5 * 60,
help='(Optional) Number of seconds memcached server is '
'considered dead before it is tried again.'),
cfg.IntOpt('memcache_pool_maxsize',
default=10,
help='(Optional) Maximum total number of open connections to '
'every memcached server.'),
cfg.IntOpt('memcache_pool_socket_timeout',
default=3,
help='(Optional) Socket timeout in seconds for communicating '
'with a memcached server.'),
cfg.IntOpt('memcache_pool_unused_timeout',
default=60,
help='(Optional) Number of seconds a connection to memcached '
'is held unused in the pool before it is closed.'),
cfg.IntOpt('memcache_pool_conn_get_timeout',
default=10,
help='(Optional) Number of seconds that an operation will wait '
'to get a memcached client connection from the pool.'),
cfg.BoolOpt('memcache_use_advanced_pool',
default=True,
help='(Optional) Use the advanced (eventlet safe) memcached '
'client pool.')
]
cfg.CONF.register_opts(_EXTERNAL_AUTH2_OPTS,
@ -296,13 +353,13 @@ class PrivateKeyJwtAuthClient(AbstractAuthClient):
raise ConfigurationError(_('Configuration error. The JWT key file '
'content is empty.'))
ita = round(time.time())
iat = round(time.time())
try:
client_assertion = jwt.encode(
payload={
'jti': str(uuid.uuid4()),
'iat': str(ita),
'exp': str(ita + self.jwt_bearer_time_out),
'iat': str(iat),
'exp': str(iat + self.jwt_bearer_time_out),
'iss': self.client_id,
'sub': self.client_id,
'aud': self.audience},
@ -438,6 +495,7 @@ class ExternalAuth2Protocol(object):
_EXT_AUTH_CONFIG_GROUP_NAME,
all_opts,
conf)
self._token_cache = self._token_cache_factory()
self._session = self._create_session()
self._audience = self._get_config_option('audience', is_required=True)
@ -452,6 +510,30 @@ class ExternalAuth2Protocol(object):
self._audience, self._client_id,
self._get_config_option, self._log)
def _token_cache_factory(self):
security_strategy = self._conf.get('memcache_security_strategy')
cache_kwargs = dict(
cache_time=int(self._conf.get('token_cache_time')),
memcached_servers=self._conf.get('memcached_servers'),
use_advanced_pool=self._conf.get(
'memcache_use_advanced_pool'),
dead_retry=self._conf.get('memcache_pool_dead_retry'),
maxsize=self._conf.get('memcache_pool_maxsize'),
unused_timeout=self._conf.get(
'memcache_pool_unused_timeout'),
conn_get_timeout=self._conf.get(
'memcache_pool_conn_get_timeout'),
socket_timeout=self._conf.get(
'memcache_pool_socket_timeout'),
)
if security_strategy.lower() != 'none':
secret_key = self._conf.get('memcache_secret_key')
return _cache.SecureTokenCache(self._log,
security_strategy,
secret_key,
**cache_kwargs)
return _cache.TokenCache(self._log, **cache_kwargs)
@webob.dec.wsgify()
def __call__(self, req):
"""Handle incoming request."""
@ -475,6 +557,7 @@ class ExternalAuth2Protocol(object):
self._log.info('Unable to obtain the access token.')
raise InvalidToken(_('Unable to obtain the access token.'))
self._token_cache.initialize(request.environ)
token_data = self._fetch_token(access_token)
if (self._get_config_option('thumbprint_verify',
@ -610,6 +693,30 @@ class ExternalAuth2Protocol(object):
authorization server.
"""
try:
cached = self._token_cache.get(access_token)
if cached:
self._log.debug('The cached token: %s' % cached)
if (not isinstance(cached, dict)
or 'origin_token_metadata' not in cached):
self._log.warning('The cached data is invalid. %s' %
cached)
raise InvalidToken(_('The token is invalid.'))
origin_token_metadata = cached.get('origin_token_metadata')
if not origin_token_metadata.get('active'):
self._log.warning('The cached data is invalid. %s' %
cached)
raise InvalidToken(_('The token is invalid.'))
expire_at = self._read_data_from_token(
origin_token_metadata, 'mapping_expires_at',
is_required=False, value_type=int)
if expire_at:
if int(expire_at) < int(time.time()):
cached['origin_token_metadata']['active'] = False
self._token_cache.set(access_token, cached)
self._log.warning(
'The cached data is invalid. %s' % cached)
raise InvalidToken(_('The token is invalid.'))
return cached
http_response = self._http_client.introspect(access_token)
if http_response.status_code != 200:
self._log.critical('The introspect API returns an '
@ -624,11 +731,16 @@ class ExternalAuth2Protocol(object):
self._log.debug('The introspect API response: %s' %
origin_token_metadata)
if not origin_token_metadata.get('active'):
self._token_cache.set(
access_token,
{'origin_token_metadata': origin_token_metadata})
self._log.info('The token is invalid. response: %s' %
origin_token_metadata)
raise InvalidToken(_('The token is invalid.'))
token_data = self._parse_necessary_info(origin_token_metadata)
self._token_cache.set(access_token, token_data)
return token_data
return self._parse_necessary_info(origin_token_metadata)
except (ConfigurationError, ForbiddenToken,
ServiceError, InvalidToken):
raise
@ -644,12 +756,14 @@ class ExternalAuth2Protocol(object):
'verification process.'))
def _read_data_from_token(self, token_metadata, config_key,
is_required=False, value_type=str):
is_required=False, value_type=None):
"""Read value from token metadata.
Read the necessary information from the token metadata with the
config key.
"""
if not value_type:
value_type = str
meta_key = self._get_config_option(config_key, is_required=is_required)
if not meta_key:
return None
@ -718,23 +832,31 @@ class ExternalAuth2Protocol(object):
token_data['roles'] = roles
token_data['is_admin'] = is_admin
project_id = self._read_data_from_token(
token_metadata, 'mapping_project_id', is_required=False)
if project_id:
token_data['project_id'] = project_id
token_data['project_name'] = self._read_data_from_token(
token_metadata, 'mapping_project_name', is_required=True)
token_data['project_domain_id'] = self._read_data_from_token(
token_metadata, 'mapping_project_domain_id', is_required=True)
token_data['project_domain_name'] = self._read_data_from_token(
token_metadata, 'mapping_project_domain_name',
is_required=True)
system_scope = self._read_data_from_token(
token_metadata, 'mapping_system_scope',
is_required=False, value_type=bool)
if system_scope:
token_data['system_scope'] = 'all'
else:
token_data['domain_id'] = self._read_data_from_token(
token_metadata, 'mapping_project_domain_id', is_required=True)
token_data['domain_name'] = self._read_data_from_token(
token_metadata, 'mapping_project_domain_name',
is_required=True)
project_id = self._read_data_from_token(
token_metadata, 'mapping_project_id', is_required=False)
if project_id:
token_data['project_id'] = project_id
token_data['project_name'] = self._read_data_from_token(
token_metadata, 'mapping_project_name', is_required=True)
token_data['project_domain_id'] = self._read_data_from_token(
token_metadata, 'mapping_project_domain_id',
is_required=True)
token_data['project_domain_name'] = self._read_data_from_token(
token_metadata, 'mapping_project_domain_name',
is_required=True)
else:
token_data['domain_id'] = self._read_data_from_token(
token_metadata, 'mapping_project_domain_id',
is_required=True)
token_data['domain_name'] = self._read_data_from_token(
token_metadata, 'mapping_project_domain_name',
is_required=True)
token_data['user_id'] = self._read_data_from_token(
token_metadata, 'mapping_user_id', is_required=True)
@ -815,7 +937,11 @@ class ExternalAuth2Protocol(object):
'is_admin')
request.environ['HTTP_X_USER'] = token_data.get('user_name')
if token_data.get('project_id'):
if token_data.get('system_scope'):
request.environ['HTTP_OPENSTACK_SYSTEM_SCOPE'] = token_data.get(
'system_scope'
)
elif token_data.get('project_id'):
request.environ['HTTP_X_PROJECT_ID'] = token_data.get('project_id')
request.environ['HTTP_X_PROJECT_NAME'] = token_data.get(
'project_name')

View File

@ -18,6 +18,7 @@ import hashlib
import jwt.utils
import logging
import ssl
from testtools import matchers
import time
from unittest import mock
import uuid
@ -32,6 +33,7 @@ import testresources
from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1 import session
from keystonemiddleware.auth_token import _cache
from keystonemiddleware import external_oauth2_token
from keystonemiddleware.tests.unit.auth_token import base
from keystonemiddleware.tests.unit.auth_token.test_auth_token_middleware \
@ -97,6 +99,8 @@ JWT_KEY_CONTENT = (
'jIgmPTKGR0FedjAeCBByH9vkw8iRg7w=\n'
'-----END PRIVATE KEY-----\n')
MEMCACHED_SERVERS = ['localhost:11211']
def get_authorization_header(token):
return {'Authorization': f'Bearer {token}'}
@ -120,7 +124,18 @@ def get_config(
mapping_user_name=None,
mapping_user_domain_id=None,
mapping_user_domain_name=None,
mapping_roles=None):
mapping_roles=None,
mapping_system_scope=None,
mapping_expires_at=None,
memcached_servers=None,
memcache_use_advanced_pool=None,
memcache_pool_dead_retry=None,
memcache_pool_maxsize=None,
memcache_pool_unused_timeout=None,
memcache_pool_conn_get_timeout=None,
memcache_pool_socket_timeout=None,
memcache_security_strategy=None,
memcache_secret_key=None):
conf = {}
if introspect_endpoint is not None:
conf['introspect_endpoint'] = introspect_endpoint
@ -160,6 +175,28 @@ def get_config(
conf['mapping_user_domain_name'] = mapping_user_domain_name
if mapping_roles is not None:
conf['mapping_roles'] = mapping_roles
if mapping_system_scope is not None:
conf['mapping_system_scope'] = mapping_system_scope
if memcached_servers is not None:
conf['memcached_servers'] = memcached_servers
if memcached_servers is not None:
conf['mapping_expires_at'] = mapping_expires_at
if memcache_use_advanced_pool is not None:
conf['memcache_use_advanced_pool'] = memcache_use_advanced_pool
if memcache_pool_dead_retry is not None:
conf['memcache_pool_dead_retry'] = memcache_pool_dead_retry
if memcache_pool_maxsize is not None:
conf['memcache_pool_maxsize'] = memcache_pool_maxsize
if memcache_pool_unused_timeout is not None:
conf['memcache_pool_unused_timeout'] = memcache_pool_unused_timeout
if memcache_pool_conn_get_timeout is not None:
conf['memcache_pool_conn_get_timeout'] = memcache_pool_conn_get_timeout
if memcache_pool_socket_timeout is not None:
conf['memcache_pool_socket_timeout'] = memcache_pool_socket_timeout
if memcache_security_strategy is not None:
conf['memcache_security_strategy'] = memcache_security_strategy
if memcache_secret_key is not None:
conf['memcache_secret_key'] = memcache_secret_key
return conf
@ -281,7 +318,8 @@ class BaseExternalOauth2TokenMiddlewareTest(base.BaseAuthTokenTestCase,
exp_time=None,
cert_thumb=None,
metadata=None,
status_code=200
status_code=200,
system_scope=False
):
if auth_method == 'tls_client_auth':
body = 'client_id=%s&token=%s&token_type_hint=access_token' % (
@ -333,6 +371,9 @@ class BaseExternalOauth2TokenMiddlewareTest(base.BaseAuthTokenTestCase,
'acr': '1',
'scope': 'default'
}
if system_scope:
resp['system_scope'] = 'all'
if exp_time is not None:
resp['exp'] = exp_time
else:
@ -384,6 +425,7 @@ class BaseExternalOauth2TokenMiddlewareTest(base.BaseAuthTokenTestCase,
self.assertEqual(project_name, request_environ['HTTP_X_TENANT_NAME'])
self.assertEqual(project_id, request_environ['HTTP_X_TENANT'])
self.assertNotIn('HTTP_OPENSTACK_SYSTEM_SCOPE', request_environ)
self.assertNotIn('HTTP_X_DOMAIN_ID', request_environ)
self.assertNotIn('HTTP_X_DOMAIN_NAME', request_environ)
@ -411,7 +453,40 @@ class BaseExternalOauth2TokenMiddlewareTest(base.BaseAuthTokenTestCase,
self.assertEqual(domain_id, request_environ['HTTP_X_DOMAIN_ID'])
self.assertEqual(domain_name, request_environ['HTTP_X_DOMAIN_NAME'])
self.assertNotIn('HTTP_OPENSTACK_SYSTEM_SCOPE', request_environ)
self.assertNotIn('HTTP_X_PROJECT_ID', request_environ)
self.assertNotIn('HTTP_X_PROJECT_NAME', request_environ)
self.assertNotIn('HTTP_X_PROJECT_DOMAIN_ID', request_environ)
self.assertNotIn('HTTP_X_PROJECT_DOMAIN_NAME', request_environ)
self.assertNotIn('HTTP_X_TENANT_ID', request_environ)
self.assertNotIn('HTTP_X_TENANT_NAME', request_environ)
self.assertNotIn('HTTP_X_TENANT', request_environ)
def _check_env_value_system_scope(self, request_environ,
user_id, user_name,
user_domain_id, user_domain_name,
roles, is_admin=True, system_scope=True):
self.assertEqual('Confirmed',
request_environ['HTTP_X_IDENTITY_STATUS'])
self.assertEqual(roles, request_environ['HTTP_X_ROLES'])
self.assertEqual(roles, request_environ['HTTP_X_ROLE'])
self.assertEqual(user_id, request_environ['HTTP_X_USER_ID'])
self.assertEqual(user_name, request_environ['HTTP_X_USER_NAME'])
self.assertEqual(user_domain_id,
request_environ['HTTP_X_USER_DOMAIN_ID'], )
self.assertEqual(user_domain_name,
request_environ['HTTP_X_USER_DOMAIN_NAME'])
if is_admin:
self.assertEqual('true',
request_environ['HTTP_X_IS_ADMIN_PROJECT'])
else:
self.assertNotIn('HTTP_X_IS_ADMIN_PROJECT', request_environ)
self.assertEqual(user_name, request_environ['HTTP_X_USER'])
self.assertEqual('all', request_environ['HTTP_OPENSTACK_SYSTEM_SCOPE'])
self.assertNotIn('HTTP_X_DOMAIN_ID', request_environ)
self.assertNotIn('HTTP_X_DOMAIN_NAME', request_environ)
self.assertNotIn('HTTP_X_PROJECT_ID', request_environ)
self.assertNotIn('HTTP_X_PROJECT_NAME', request_environ)
self.assertNotIn('HTTP_X_PROJECT_DOMAIN_ID', request_environ)
@ -1679,6 +1754,42 @@ class ExternalOauth2TokenMiddlewareClientSecretBasicTest(
self._user_domain_id, self._user_domain_name,
self._project_domain_id, self._project_domain_name, self._roles)
def test_system_scope_200(self):
conf = copy.deepcopy(self._test_conf)
conf.pop('mapping_project_id')
conf['mapping_system_scope'] = "system.all"
self.set_middleware(conf=conf)
self._default_metadata["system"] = {"all": True}
def mock_resp(request, context):
return self._introspect_response(
request, context,
auth_method=self._auth_method,
introspect_client_id=self._test_client_id,
introspect_client_secret=self._test_client_secret,
access_token=self._token,
active=True,
metadata=self._default_metadata,
system_scope=True
)
self.requests_mock.post(self._introspect_endpoint,
json=mock_resp)
self.requests_mock.get(self._auth_url,
json=VERSION_LIST_v3,
status_code=300)
resp = self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=200,
method='GET', path='/vnfpkgm/v1/vnf_packages',
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
self.assertEqual(FakeApp.SUCCESS, resp.body)
self._check_env_value_system_scope(
resp.request.environ, self._user_id, self._user_name,
self._user_domain_id, self._user_domain_name, self._roles)
def test_process_response_401(self):
conf = copy.deepcopy(self._test_conf)
conf.pop('mapping_project_id')
@ -1714,6 +1825,321 @@ class ExternalOauth2TokenMiddlewareClientSecretBasicTest(
'Authorization OAuth 2.0 uri="%s"' % self._audience)
class ExternalAuth2ProtocolTest(BaseExternalOauth2TokenMiddlewareTest):
def setUp(self):
super(ExternalAuth2ProtocolTest, self).setUp()
self._test_client_id = str(uuid.uuid4())
self._test_client_secret = str(uuid.uuid4())
self._auth_method = 'client_secret_basic'
self._test_conf = get_config(
introspect_endpoint=self._introspect_endpoint,
audience=self._audience,
auth_method=self._auth_method,
client_id=self._test_client_id,
client_secret=self._test_client_secret,
thumbprint_verify=False,
mapping_project_id='access_project.id',
mapping_project_name='access_project.name',
mapping_project_domain_id='access_project.domain.id',
mapping_project_domain_name='access_project.domain.name',
mapping_user_id='client_id',
mapping_user_name='username',
mapping_user_domain_id='user_domain.id',
mapping_user_domain_name='user_domain.name',
mapping_roles='roles',
mapping_system_scope='system.all',
mapping_expires_at='exp',
memcached_servers=','.join(MEMCACHED_SERVERS),
memcache_use_advanced_pool=True,
memcache_pool_dead_retry=300,
memcache_pool_maxsize=10,
memcache_pool_unused_timeout=60,
memcache_pool_conn_get_timeout=10,
memcache_pool_socket_timeout=3,
memcache_security_strategy=None,
memcache_secret_key=None
)
uuid_token_default = self.examples.v3_UUID_TOKEN_DEFAULT
uuid_serv_token_default = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT
uuid_token_bind = self.examples.v3_UUID_TOKEN_BIND
uuid_service_token_bind = self.examples.v3_UUID_SERVICE_TOKEN_BIND
self.token_dict = {
'uuid_token_default': uuid_token_default,
'uuid_service_token_default': uuid_serv_token_default,
'uuid_token_bind': uuid_token_bind,
'uuid_service_token_bind': uuid_service_token_bind,
}
self._token = self.token_dict['uuid_token_default']
self._user_id = str(uuid.uuid4()) + '_user_id'
self._user_name = str(uuid.uuid4()) + '_user_name'
self._user_domain_id = str(uuid.uuid4()) + '_user_domain_id'
self._user_domain_name = str(uuid.uuid4()) + '_user_domain_name'
self._project_id = str(uuid.uuid4()) + '_project_id'
self._project_name = str(uuid.uuid4()) + '_project_name'
self._project_domain_id = str(uuid.uuid4()) + 'project_domain_id'
self._project_domain_name = str(uuid.uuid4()) + 'project_domain_name'
self._roles = 'admin,member,reader'
self._default_metadata = {
'access_project': {
'id': self._project_id,
'name': self._project_name,
'domain': {
'id': self._project_domain_id,
'name': self._project_domain_name
}
},
'user_domain': {
'id': self._user_domain_id,
'name': self._user_domain_name
},
'roles': self._roles,
'client_id': self._user_id,
'username': self._user_name,
'exp': int(time.time()) + 3600
}
self._clear_call_count = 0
cert = self.examples.V3_OAUTH2_MTLS_CERTIFICATE
self._pem_client_cert = cert.decode('ascii')
self._der_client_cert = ssl.PEM_cert_to_DER_cert(self._pem_client_cert)
thumb_sha256 = hashlib.sha256(self._der_client_cert).digest()
self._cert_thumb = jwt.utils.base64url_encode(thumb_sha256).decode(
'ascii')
def test_token_cache_factory_insecure(self):
conf = copy.deepcopy(self._test_conf)
self.set_middleware(conf=conf)
self.assertIsInstance(self.middleware._token_cache, _cache.TokenCache)
def test_token_cache_factory_secure(self):
conf = copy.deepcopy(self._test_conf)
conf["memcache_secret_key"] = "test_key"
conf["memcache_security_strategy"] = "MAC"
self.set_middleware(conf=conf)
self.assertIsInstance(self.middleware._token_cache,
_cache.SecureTokenCache)
conf["memcache_security_strategy"] = "ENCRYPT"
self.set_middleware(conf=conf)
self.assertIsInstance(self.middleware._token_cache,
_cache.SecureTokenCache)
def test_caching_token_on_verify(self):
conf = copy.deepcopy(self._test_conf)
self.set_middleware(conf=conf)
self.middleware._token_cache._env_cache_name = 'cache'
cache = _cache._FakeClient()
self.middleware._token_cache.initialize(env={'cache': cache})
orig_cache_set = cache.set
cache.set = mock.Mock(side_effect=orig_cache_set)
def mock_resp(request, context):
return self._introspect_response(
request, context,
auth_method=self._auth_method,
introspect_client_id=self._test_client_id,
introspect_client_secret=self._test_client_secret,
access_token=self._token,
active=True,
metadata=self._default_metadata
)
self.requests_mock.post(self._introspect_endpoint,
json=mock_resp)
self.requests_mock.get(self._auth_url,
json=VERSION_LIST_v3,
status_code=300)
self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=200,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
self.assertThat(1, matchers.Equals(cache.set.call_count))
self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=200,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
# Assert that the token wasn't cached again.
self.assertThat(1, matchers.Equals(cache.set.call_count))
def test_caching_token_timeout(self):
conf = copy.deepcopy(self._test_conf)
self.set_middleware(conf=conf)
self.middleware._token_cache._env_cache_name = 'cache'
cache = _cache._FakeClient()
self.middleware._token_cache.initialize(env={'cache': cache})
self._default_metadata['exp'] = int(time.time()) - 3600
orig_cache_set = cache.set
cache.set = mock.Mock(side_effect=orig_cache_set)
def mock_resp(request, context):
return self._introspect_response(
request, context,
auth_method=self._auth_method,
introspect_client_id=self._test_client_id,
introspect_client_secret=self._test_client_secret,
access_token=self._token,
active=True,
metadata=self._default_metadata
)
self.requests_mock.post(self._introspect_endpoint,
json=mock_resp)
self.requests_mock.get(self._auth_url,
json=VERSION_LIST_v3,
status_code=300)
self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=200,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
self.assertThat(1, matchers.Equals(cache.set.call_count))
# Confirm that authentication fails due to timeout.
self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=401,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
@mock.patch('keystonemiddleware.auth_token._cache.TokenCache.get')
def test_caching_token_type_invalid(self, mock_cache_get):
mock_cache_get.return_value = "test"
conf = copy.deepcopy(self._test_conf)
self.set_middleware(conf=conf)
self.middleware._token_cache._env_cache_name = 'cache'
cache = _cache._FakeClient()
self.middleware._token_cache.initialize(env={'cache': cache})
orig_cache_set = cache.set
cache.set = mock.Mock(side_effect=orig_cache_set)
def mock_resp(request, context):
return self._introspect_response(
request, context,
auth_method=self._auth_method,
introspect_client_id=self._test_client_id,
introspect_client_secret=self._test_client_secret,
access_token=self._token,
active=True,
metadata=self._default_metadata
)
self.requests_mock.post(self._introspect_endpoint,
json=mock_resp)
self.requests_mock.get(self._auth_url,
json=VERSION_LIST_v3,
status_code=300)
self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=401,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
def test_caching_token_not_active(self):
conf = copy.deepcopy(self._test_conf)
self.set_middleware(conf=conf)
self.middleware._token_cache._env_cache_name = 'cache'
cache = _cache._FakeClient()
self.middleware._token_cache.initialize(env={'cache': cache})
orig_cache_set = cache.set
cache.set = mock.Mock(side_effect=orig_cache_set)
def mock_resp(request, context):
return self._introspect_response(
request, context,
auth_method=self._auth_method,
introspect_client_id=self._test_client_id,
introspect_client_secret=self._test_client_secret,
access_token=self._token,
active=False,
metadata=self._default_metadata
)
self.requests_mock.post(self._introspect_endpoint,
json=mock_resp)
self.requests_mock.get(self._auth_url,
json=VERSION_LIST_v3,
status_code=300)
self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=401,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
self.assertThat(1, matchers.Equals(cache.set.call_count))
self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=401,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
# Assert that the token wasn't cached again.
self.assertThat(1, matchers.Equals(cache.set.call_count))
def test_caching_token_invalid(self):
conf = copy.deepcopy(self._test_conf)
self.set_middleware(conf=conf)
self.middleware._token_cache._env_cache_name = 'cache'
cache = _cache._FakeClient()
self.middleware._token_cache.initialize(env={'cache': cache})
orig_cache_set = cache.set
cache.set = mock.Mock(side_effect=orig_cache_set)
def mock_resp(request, context):
return self._introspect_response(
request, context,
auth_method=self._auth_method,
introspect_client_id=self._test_client_id,
introspect_client_secret=self._test_client_secret,
access_token=self._token,
active=True,
metadata=self._default_metadata
)
self.requests_mock.post(self._introspect_endpoint,
json=mock_resp)
self.requests_mock.get(self._auth_url,
json=VERSION_LIST_v3,
status_code=300)
self.call_middleware(
headers=get_authorization_header(self._token),
expected_status=200,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
self.assertThat(1, matchers.Equals(cache.set.call_count))
# Confirm that authentication fails due to invalid token.
self.call_middleware(
headers=get_authorization_header(str(uuid.uuid4()) + '_token'),
expected_status=500,
method='GET', path='/vnfpkgm/v1/vnf_packages',
der_client_cert=self._der_client_cert,
environ={'wsgi.input': FakeWsgiInput(FakeSocket(None))}
)
self._token = self.token_dict['uuid_token_default']
class FilterFactoryTest(utils.BaseTestCase):
def test_filter_factory(self):

View File

@ -0,0 +1,7 @@
---
features:
- |
[`blueprint enhance-oauth2-interoperability <https://blueprints.launchpad.net/keystone/+spec/enhance-oauth2-interoperability>`_]
Added the ability to authenticate using a system-scoped token and the
ability to authenticate using a cached token to the external_oauth2_token
filter.