Pass ?allow_expired

When a service token is present we should bypass the expiry checks and
pass the allow_expired flag to the server. This will let the server
return expired tokens.

This has a very basic policy enforcement that is not backwards
compatible with the current (sensible) default. We will need to discuss
how we can make this work.

Implements bp: allow-expired
Change-Id: If3583ac08e33380f1c52ad50d7d5c74194393480
This commit is contained in:
Jamie Lennox 2016-09-29 09:32:19 +10:00 committed by Steve Martinelli
parent 29a879c0ed
commit 4c6282ff70
8 changed files with 268 additions and 54 deletions

View File

@ -318,10 +318,14 @@ class BaseAuthProtocol(object):
def __init__(self,
app,
log=_LOG,
enforce_token_bind=_BIND_MODE.PERMISSIVE):
enforce_token_bind=_BIND_MODE.PERMISSIVE,
service_token_roles=None,
service_token_roles_required=False):
self.log = log
self._app = app
self._enforce_token_bind = enforce_token_bind
self._service_token_roles = set(service_token_roles or [])
self._service_token_roles_required = service_token_roles_required
@webob.dec.wsgify(RequestClass=_request._AuthTokenRequest)
def __call__(self, req):
@ -350,20 +354,7 @@ class BaseAuthProtocol(object):
"""
user_auth_ref = None
serv_auth_ref = None
if request.user_token:
self.log.debug('Authenticating user token')
try:
data, user_auth_ref = self._do_fetch_token(request.user_token)
self._validate_token(user_auth_ref)
if not request.service_token:
self._confirm_token_bind(user_auth_ref, request)
except ksm_exceptions.InvalidToken:
self.log.info(_LI('Invalid user token'))
request.user_token_valid = False
else:
request.user_token_valid = True
request.token_info = data
allow_expired = False
if request.service_token:
self.log.debug('Authenticating service token')
@ -375,12 +366,56 @@ class BaseAuthProtocol(object):
self.log.info(_LI('Invalid service token'))
request.service_token_valid = False
else:
request.service_token_valid = True
# FIXME(jamielennox): The new behaviour for service tokens is
# that they have to pass the policy check to be allowed.
# Previously any token was accepted here. For now we will
# continue to mark service tokens as valid if they are valid
# but we will only allow service role tokens to do
# allow_expired. In future we should reject any token that
# isn't a service token here.
role_names = set(serv_auth_ref.role_names)
check = self._service_token_roles.intersection(role_names)
role_check_passed = bool(check)
# if service_token_role_required then the service token is only
# valid if the roles check out. Otherwise at this point it is
# true because keystone has already validated it.
if self._service_token_roles_required:
request.service_token_valid = role_check_passed
else:
self.log.warning(_LW('A valid token was submitted as a '
'service token, but it was not a '
'valid service token. This is '
'incorrect but backwards compatible '
'behaviour. This will be removed in '
'future releases.'))
request.service_token_valid = True
# allow_expired always requires passing the role check.
allow_expired = role_check_passed
if request.user_token:
self.log.debug('Authenticating user token')
try:
data, user_auth_ref = self._do_fetch_token(
request.user_token,
allow_expired=allow_expired)
self._validate_token(user_auth_ref,
allow_expired=allow_expired)
if not request.service_token:
self._confirm_token_bind(user_auth_ref, request)
except ksm_exceptions.InvalidToken:
self.log.info(_LI('Invalid user token'))
request.user_token_valid = False
else:
request.user_token_valid = True
request.token_info = data
request.token_auth = _user_plugin.UserAuthPlugin(user_auth_ref,
serv_auth_ref)
def _validate_token(self, auth_ref):
def _validate_token(self, auth_ref, allow_expired=False):
"""Perform the validation steps on the token.
:param auth_ref: The token data
@ -389,7 +424,7 @@ class BaseAuthProtocol(object):
:raises exc.InvalidToken: if token is rejected
"""
# 0 seconds of validity means it is invalid right now
if auth_ref.will_expire_soon(stale_duration=0):
if (not allow_expired) and auth_ref.will_expire_soon(stale_duration=0):
raise ksm_exceptions.InvalidToken(_('Token authorization failed'))
def _do_fetch_token(self, token, **kwargs):
@ -518,10 +553,20 @@ class AuthProtocol(BaseAuthProtocol):
list_opts(),
conf)
token_roles_required = self._conf.get('service_token_roles_required')
if not token_roles_required:
log.warning(_LW('AuthToken middleware is set with '
'keystone_authtoken.service_token_roles_required '
'set to False. This is backwards compatible but '
'deprecated behaviour. Please set this to True.'))
super(AuthProtocol, self).__init__(
app,
log=log,
enforce_token_bind=self._conf.get('enforce_token_bind'))
enforce_token_bind=self._conf.get('enforce_token_bind'),
service_token_roles=self._conf.get('service_token_roles'),
service_token_roles_required=token_roles_required)
# delay_auth_decision means we still allow unauthenticated requests
# through and we let the downstream service make the final decision
@ -674,7 +719,7 @@ class AuthProtocol(BaseAuthProtocol):
if cached:
return cached
def fetch_token(self, token):
def fetch_token(self, token, allow_expired=False):
"""Retrieve a token from either a PKI bundle or the identity server.
:param str token: token id
@ -709,7 +754,9 @@ class AuthProtocol(BaseAuthProtocol):
else:
data = self._validate_offline(token, token_hashes)
if not data:
data = self._identity_server.verify_token(token)
data = self._identity_server.verify_token(
token,
allow_expired=allow_expired)
self._token_cache.set(token_hashes[0], data)
@ -765,8 +812,8 @@ class AuthProtocol(BaseAuthProtocol):
return data
def _validate_token(self, auth_ref):
super(AuthProtocol, self)._validate_token(auth_ref)
def _validate_token(self, auth_ref, **kwargs):
super(AuthProtocol, self)._validate_token(auth_ref, **kwargs)
if auth_ref.version == 'v2.0' and not auth_ref.project_id:
msg = _('Unable to determine service tenancy.')

View File

@ -44,7 +44,7 @@ class _RequestStrategy(object):
def __init__(self, adap, include_service_catalog=None):
self._include_service_catalog = include_service_catalog
def verify_token(self, user_token):
def verify_token(self, user_token, allow_expired=False):
pass
@_convert_fetch_cert_exception
@ -73,7 +73,8 @@ class _V2RequestStrategy(_RequestStrategy):
super(_V2RequestStrategy, self).__init__(adap, **kwargs)
self._client = v2_client.Client(session=adap)
def verify_token(self, token):
def verify_token(self, token, allow_expired=False):
# NOTE(jamielennox): allow_expired is ignored on V2
auth_ref = self._client.tokens.validate_access_info(token)
if not auth_ref:
@ -100,10 +101,11 @@ class _V3RequestStrategy(_RequestStrategy):
super(_V3RequestStrategy, self).__init__(adap, **kwargs)
self._client = v3_client.Client(session=adap)
def verify_token(self, token):
def verify_token(self, token, allow_expired=False):
auth_ref = self._client.tokens.validate(
token,
include_catalog=self._include_service_catalog)
include_catalog=self._include_service_catalog,
allow_expired=allow_expired)
if not auth_ref:
msg = _('Failed to fetch token data from identity server')
@ -197,13 +199,14 @@ class IdentityServer(object):
msg = _('No compatible apis supported by server')
raise ksm_exceptions.ServiceError(msg)
def verify_token(self, user_token, retry=True):
def verify_token(self, user_token, retry=True, allow_expired=False):
"""Authenticate user token with identity server.
:param user_token: user's token id
:param retry: flag that forces the middleware to retry
user authentication when an indeterminate
response is received. Optional.
:param allow_expired: Allow retrieving an expired token.
:returns: access info received from identity server on success
:rtype: :py:class:`keystoneauth1.access.AccessInfo`
:raises exc.InvalidToken: if token is rejected
@ -211,7 +214,9 @@ class IdentityServer(object):
"""
try:
auth_ref = self._request_strategy.verify_token(user_token)
auth_ref = self._request_strategy.verify_token(
user_token,
allow_expired=allow_expired)
except ksa_exceptions.NotFound as e:
self._LOG.warning(_LW('Authorization failed for token'))
self._LOG.warning(_LW('Identity response: %s'), e.response.text)

View File

@ -181,6 +181,19 @@ _OPTS = [
' only while migrating from a less secure algorithm to a more'
' secure one. Once all the old tokens are expired this option'
' should be set to a single value for better performance.'),
cfg.ListOpt('service_token_roles', default=['service'],
help='A choice of roles that must be present in a service'
' token. Service tokens are allowed to request that an expired'
' token can be used and so this check should tightly control'
' that only actual services should be sending this token.'
' Roles here are applied as an ANY check so any role in this'
' list must be present. For backwards compatibility reasons'
' this currently only affects the allow_expired check.'),
cfg.BoolOpt('service_token_roles_required', default=False,
help='For backwards compatibility reasons we must let valid'
' service tokens pass that don\'t pass the service_token_roles'
' check as valid. Setting this true will become the default'
' in a future release and should be enabled if possible.'),
]

View File

@ -67,7 +67,7 @@ EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE = {
'HTTP_X_SERVICE_PROJECT_NAME': 'service_project_name1',
'HTTP_X_SERVICE_USER_ID': 'service_user_id1',
'HTTP_X_SERVICE_USER_NAME': 'service_user_name1',
'HTTP_X_SERVICE_ROLES': 'service_role1,service_role2',
'HTTP_X_SERVICE_ROLES': 'service,service_role2',
}
EXPECTED_V3_DEFAULT_ENV_ADDITIONS = {
@ -1317,6 +1317,63 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertEqual(FAKE_ADMIN_TOKEN_ID, headers['X-Service-Token'])
def test_service_token_with_valid_service_role_not_required(self):
self.conf['service_token_roles'] = ['service']
self.conf['service_token_roles_required'] = False
self.set_middleware(conf=self.conf)
user_token = self.token_dict['uuid_token_default']
service_token = self.token_dict['uuid_service_token_default']
resp = self.call_middleware(headers={'X-Auth-Token': user_token,
'X-Service-Token': service_token})
self.assertEqual('Confirmed',
resp.request.headers['X-Service-Identity-Status'])
def test_service_token_with_invalid_service_role_not_required(self):
self.conf['service_token_roles'] = [uuid.uuid4().hex]
self.conf['service_token_roles_required'] = False
self.set_middleware(conf=self.conf)
user_token = self.token_dict['uuid_token_default']
service_token = self.token_dict['uuid_service_token_default']
resp = self.call_middleware(headers={'X-Auth-Token': user_token,
'X-Service-Token': service_token})
self.assertEqual('Confirmed',
resp.request.headers['X-Service-Identity-Status'])
def test_service_token_with_valid_service_role_required(self):
self.conf['service_token_roles'] = ['service']
self.conf['service_token_roles_required'] = True
self.set_middleware(conf=self.conf)
user_token = self.token_dict['uuid_token_default']
service_token = self.token_dict['uuid_service_token_default']
resp = self.call_middleware(headers={'X-Auth-Token': user_token,
'X-Service-Token': service_token})
self.assertEqual('Confirmed',
resp.request.headers['X-Service-Identity-Status'])
def test_service_token_with_invalid_service_role_required(self):
self.conf['service_token_roles'] = [uuid.uuid4().hex]
self.conf['service_token_roles_required'] = True
self.set_middleware(conf=self.conf)
user_token = self.token_dict['uuid_token_default']
service_token = self.token_dict['uuid_service_token_default']
resp = self.call_middleware(headers={'X-Auth-Token': user_token,
'X-Service-Token': service_token},
expected_status=401)
self.assertEqual('Invalid',
resp.request.headers['X-Service-Identity-Status'])
class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
@ -1503,6 +1560,8 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'revoked_token_hash': self.examples.REVOKED_TOKEN_HASH,
'revoked_token_hash_sha256':
self.examples.REVOKED_TOKEN_HASH_SHA256,
'uuid_service_token_default':
self.examples.UUID_SERVICE_TOKEN_DEFAULT,
}
self.requests_mock.get(BASE_URI,
@ -1521,6 +1580,7 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.UUID_TOKEN_BIND,
self.examples.UUID_TOKEN_UNKNOWN_BIND,
self.examples.UUID_TOKEN_NO_SERVICE_CATALOG,
self.examples.UUID_SERVICE_TOKEN_DEFAULT,
self.examples.SIGNED_TOKEN_SCOPED_KEY,
self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
@ -1579,10 +1639,11 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_user_plugin_token_properties(self):
token = self.examples.UUID_TOKEN_DEFAULT
token_data = self.examples.TOKEN_RESPONSES[token]
service = self.examples.UUID_SERVICE_TOKEN_DEFAULT
resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
'X-Auth-Token': token,
'X-Service-Token': token})
'X-Service-Token': service})
self.assertEqual(FakeApp.SUCCESS, resp.body)
@ -1591,17 +1652,22 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertTrue(token_auth.has_user_token)
self.assertTrue(token_auth.has_service_token)
for t in [token_auth.user, token_auth.service]:
self.assertEqual(token_data.user_id, t.user_id)
self.assertEqual(token_data.tenant_id, t.project_id)
self.assertEqual(token_data.user_id, token_auth.user.user_id)
self.assertEqual(token_data.tenant_id, token_auth.user.project_id)
self.assertThat(t.role_names, matchers.HasLength(2))
self.assertIn('role1', t.role_names)
self.assertIn('role2', t.role_names)
self.assertThat(token_auth.user.role_names, matchers.HasLength(2))
self.assertIn('role1', token_auth.user.role_names)
self.assertIn('role2', token_auth.user.role_names)
self.assertIsNone(t.trust_id)
self.assertIsNone(t.user_domain_id)
self.assertIsNone(t.project_domain_id)
self.assertIsNone(token_auth.user.trust_id)
self.assertIsNone(token_auth.user.user_domain_id)
self.assertIsNone(token_auth.user.project_domain_id)
self.assertThat(token_auth.service.role_names, matchers.HasLength(2))
self.assertIn('service', token_auth.service.role_names)
self.assertIn('service_role2', token_auth.service.role_names)
self.assertIsNone(token_auth.service.trust_id)
class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@ -1699,6 +1765,8 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_v3_TOKEN_HASH_SHA256,
'revoked_token_pkiz_hash':
self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
'uuid_service_token_default':
self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT,
}
self.requests_mock.get(BASE_URI,
@ -1813,10 +1881,12 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_user_plugin_token_properties(self):
token = self.examples.v3_UUID_TOKEN_DEFAULT
token_data = self.examples.TOKEN_RESPONSES[token]
service = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT
service_data = self.examples.TOKEN_RESPONSES[service]
resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
'X-Auth-Token': token,
'X-Service-Token': token})
'X-Service-Token': service})
self.assertEqual(FakeApp.SUCCESS, resp.body)
@ -1825,17 +1895,30 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertTrue(token_auth.has_user_token)
self.assertTrue(token_auth.has_service_token)
for t in [token_auth.user, token_auth.service]:
self.assertEqual(token_data.user_id, t.user_id)
self.assertEqual(token_data.project_id, t.project_id)
self.assertEqual(token_data.user_domain_id, t.user_domain_id)
self.assertEqual(token_data.project_domain_id, t.project_domain_id)
self.assertEqual(token_data.user_id, token_auth.user.user_id)
self.assertEqual(token_data.project_id, token_auth.user.project_id)
self.assertEqual(token_data.user_domain_id,
token_auth.user.user_domain_id)
self.assertEqual(token_data.project_domain_id,
token_auth.user.project_domain_id)
self.assertThat(t.role_names, matchers.HasLength(2))
self.assertIn('role1', t.role_names)
self.assertIn('role2', t.role_names)
self.assertThat(token_auth.user.role_names, matchers.HasLength(2))
self.assertIn('role1', token_auth.user.role_names)
self.assertIn('role2', token_auth.user.role_names)
self.assertIsNone(token_auth.user.trust_id)
self.assertIsNone(t.trust_id)
self.assertEqual(service_data.user_id, token_auth.service.user_id)
self.assertEqual(service_data.project_id,
token_auth.service.project_id)
self.assertEqual(service_data.user_domain_id,
token_auth.service.user_domain_id)
self.assertEqual(service_data.project_domain_id,
token_auth.service.project_domain_id)
self.assertThat(token_auth.service.role_names, matchers.HasLength(2))
self.assertIn('service', token_auth.service.role_names)
self.assertIn('service_role2', token_auth.service.role_names)
self.assertIsNone(token_auth.service.trust_id)
def test_expire_stored_in_cache(self):
# tests the upgrade path from storing a tuple vs just the data in the
@ -1858,6 +1941,34 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertIs(False,
req.environ['keystone.token_auth'].user.is_admin_project)
def test_service_token_with_valid_service_role_not_required(self):
s = super(v3AuthTokenMiddlewareTest, self)
s.test_service_token_with_valid_service_role_not_required()
e = self.requests_mock.request_history[3].qs.get('allow_expired')
self.assertEqual(['1'], e)
def test_service_token_with_invalid_service_role_not_required(self):
s = super(v3AuthTokenMiddlewareTest, self)
s.test_service_token_with_invalid_service_role_not_required()
e = self.requests_mock.request_history[3].qs.get('allow_expired')
self.assertIsNone(e)
def test_service_token_with_valid_service_role_required(self):
s = super(v3AuthTokenMiddlewareTest, self)
s.test_service_token_with_valid_service_role_required()
e = self.requests_mock.request_history[3].qs.get('allow_expired')
self.assertEqual(['1'], e)
def test_service_token_with_invalid_service_role_required(self):
s = super(v3AuthTokenMiddlewareTest, self)
s.test_service_token_with_invalid_service_role_required()
e = self.requests_mock.request_history[3].qs.get('allow_expired')
self.assertIsNone(e)
class DelayedAuthTests(BaseAuthTokenMiddlewareTest):

View File

@ -72,7 +72,7 @@ class BaseUserPluginTests(object):
def test_with_service_information(self):
token_id, token = self.get_token()
service_id, service = self.get_token()
service_id, service = self.get_token(service=True)
plugin = self.get_plugin(token_id, service_id)
@ -111,10 +111,12 @@ class V2UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase):
def get_role_names(self, token):
return [x['name'] for x in token['access']['user'].get('roles', [])]
def get_token(self):
def get_token(self, service=False):
token = fixture.V2Token()
token.set_scope()
token.add_role()
if service:
token.add_role('service')
request_headers = {'X-Auth-Token': self.service_token.token_id}
@ -176,12 +178,14 @@ class V3UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase):
def get_role_names(self, token):
return [x['name'] for x in token['token'].get('roles', [])]
def get_token(self, project=True):
def get_token(self, project=True, service=False):
token_id = uuid.uuid4().hex
token = fixture.V3Token()
if project:
token.set_project_scope()
token.add_role()
if service:
token.add_role('service')
request_headers = {'X-Auth-Token': self.service_token_id,
'X-Subject-Token': token_id}

View File

@ -248,7 +248,7 @@ class Examples(fixtures.Fixture):
SERVICE_USER_NAME = 'service_user_name1'
SERVICE_DOMAIN_ID = 'service_domain_id1'
SERVICE_DOMAIN_NAME = 'service_domain_name1'
SERVICE_ROLE_NAME1 = 'service_role1'
SERVICE_ROLE_NAME1 = 'service'
SERVICE_ROLE_NAME2 = 'service_role2'
self.SERVICE_TYPE = 'identity'

View File

@ -69,6 +69,8 @@ class OptsTestCase(utils.TestCase):
'hash_algorithms',
'auth_type',
'auth_section',
'service_token_roles',
'service_token_roles_required',
]
opt_names = [o.name for (g, l) in result_of_old_opts for o in l]
self.assertThat(opt_names, matchers.HasLength(len(expected_opt_names)))
@ -113,6 +115,8 @@ class OptsTestCase(utils.TestCase):
'hash_algorithms',
'auth_type',
'auth_section',
'service_token_roles',
'service_token_roles_required',
]
opt_names = [o.name for (g, l) in result for o in l]
self.assertThat(opt_names, matchers.HasLength(len(expected_opt_names)))

View File

@ -0,0 +1,30 @@
---
prelude: >
Fetching expired tokens when using a valid service token is now allowed.
This will help with long running operations that must continue between
services longer than the original expiry of the token.
features:
- AuthToken middleware will now allow fetching an expired token when a valid
service token is present. This service token must contain any one of the
roles specified in ``service_token_roles``.
- Service tokens are compared against a list of possible roles for validity.
This will ensure that only services are submitting tokens as an
``X-Service-Token``.
For backwards compatibility, if ``service_token_roles_required`` is not set,
a warning will be emitted. To enforce the check properly, set
``service_token_roles_required`` to ``True``. It currently defaults to
``False``
upgrade:
- Set the ``service_token_roles`` to a list of roles that services may have.
The likely list is ``service`` or ``admin``. Any ``service_token_roles`` may
apply to accept the service token. Ensure service users have one of these
roles so interservice communication continues to work correctly. When verified,
set the ``service_token_roles_required`` flag to ``True`` to enforce this
behaviour. This will become the default setting in future releases.
deprecations:
- For backwards compatibility the ``service_token_roles_required`` option in
``[keystone_authtoken]`` was added. The option defaults to ``False`` and
has been immediately deprecated. This will allow the current behaviour
that service tokens are validated but not checked for roles to continue.
The option should be set to ``True`` as soon as possible. The option will
default to ``True`` in a future release.