Make sure all the auth plugins agree on the shared identity attributes.

Note: this patch also corrected some of the external auth tests where an
auth request consists of two methods with two different identities.

Closes-Bug: #1299012

Change-Id: I5d7dd42d373879322823b16b215f11a015b734f8
This commit is contained in:
guang-yee 2014-04-02 21:41:11 -07:00 committed by Brant Knudson
parent 6ddbdeebb3
commit 33fd4cf8b3
5 changed files with 135 additions and 16 deletions

View File

@ -85,6 +85,44 @@ def get_auth_method(method_name):
return AUTH_METHODS[method_name]
class AuthContext(dict):
"""Retrofitting auth_context to reconcile identity attributes.
The identity attributes must not have conflicting values among the
auth plug-ins. The only exception is `expires_at`, which is set to its
earliest value.
"""
# identity attributes need to be reconciled among the auth plugins
IDENTITY_ATTRIBUTES = frozenset(['user_id', 'project_id',
'access_token_id', 'domain_id',
'expires_at'])
def __setitem__(self, key, val):
if key in self.IDENTITY_ATTRIBUTES and key in self:
existing_val = self[key]
if key == 'expires_at':
# special treatment for 'expires_at', we are going to take
# the earliest expiration instead.
if existing_val != val:
msg = _('"expires_at" has conflicting values %(existing)s '
'and %(new)s. Will use the earliest value.')
LOG.info(msg, {'existing': existing_val, 'new': val})
if existing_val is None or val is None:
val = existing_val or val
else:
val = min(existing_val, val)
elif existing_val != val:
msg = _('Unable to reconcile identity attribute %(attribute)s '
'as it has conflicting values %(new)s and %(old)s') % (
{'attribute': key,
'new': val,
'old': existing_val})
raise exception.Unauthorized(msg)
return super(AuthContext, self).__setitem__(key, val)
# TODO(blk-u): this class doesn't use identity_api directly, but makes it
# available for consumers. Consumers should probably not be getting
# identity_api from this since it's available in global registry, then
@ -323,7 +361,9 @@ class Auth(controller.V3Controller):
try:
auth_info = AuthInfo.create(context, auth=auth)
auth_context = {'extras': {}, 'method_names': [], 'bind': {}}
auth_context = AuthContext(extras={},
method_names=[],
bind={})
self.authenticate(context, auth_info, auth_context)
if auth_context.get('access_token_id'):
auth_info.set_scope(None, auth_context['project_id'], None)

View File

@ -31,11 +31,12 @@ class AuthMethodHandler(object):
"""Authenticate user and return an authentication context.
:param context: keystone's request context
:auth_payload: the content of the authentication for a given method
:auth_context: user authentication context, a dictionary shared
by all plugins. It contains "method_names" and "extras"
by default. "method_names" is a list and "extras" is
a dictionary.
:param auth_payload: the content of the authentication for a given
method
:param auth_context: user authentication context, a dictionary shared
by all plugins. It contains "method_names" and
"extras" by default. "method_names" is a list and
"extras" is a dictionary.
If successful, plugin must set ``user_id`` in ``auth_context``.
``method_name`` is used to convey any additional authentication methods

View File

@ -106,7 +106,7 @@ class Password(auth.AuthMethodHandler):
method = METHOD_NAME
def authenticate(self, context, auth_payload, user_context):
def authenticate(self, context, auth_payload, auth_context):
"""Try to authenticate against the identity backend."""
user_info = UserAuthInfo.create(auth_payload)
@ -123,5 +123,4 @@ class Password(auth.AuthMethodHandler):
msg = _('Invalid username or password')
raise exception.Unauthorized(msg)
if 'user_id' not in user_context:
user_context['user_id'] = user_info.user_id
auth_context['user_id'] = user_info.user_id

View File

@ -350,7 +350,19 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase):
body=auth)
return r.headers.get('X-Subject-Token')
def v3_noauth_request(self, path, **kwargs):
# request does not require auth token header
path = '/v3' + path
return self.admin_request(path=path, **kwargs)
def v3_request(self, path, **kwargs):
# TODO(gyee): need to fix all the v3 auth tests. They should not
# require the token header.
# check to see if caller requires token for the API call.
if kwargs.pop('noauth', None):
return self.v3_noauth_request(path, **kwargs)
# Check if the caller has passed in auth details for
# use in requesting the token
auth_arg = kwargs.pop('auth', None)

View File

@ -14,6 +14,7 @@
import copy
import datetime
import operator
import uuid
from keystoneclient.common import cms
@ -2116,9 +2117,9 @@ class TestAuthJSON(test_v3.RestfulTestCase):
# note that they do not have to match
api = auth.controllers.Auth()
auth_data = self.build_authentication_request(
user_domain_id=self.domain['id'],
username=self.user['name'],
password=self.user['password'])['auth']
user_domain_id=self.default_domain_user['domain_id'],
username=self.default_domain_user['name'],
password=self.default_domain_user['password'])['auth']
context, auth_info, auth_context = self.build_external_auth_request(
self.default_domain_user['name'], auth_data=auth_data)
@ -2163,7 +2164,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
r = self.post('/auth/tokens', body=auth_data)
r = self.post('/auth/tokens', body=auth_data, noauth=True)
token = self.assertValidUnscopedTokenResponse(r)
self.assertNotIn('bind', token)
@ -2176,7 +2177,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
resp = self.post('/auth/tokens', body=auth_data)
resp = self.post('/auth/tokens', body=auth_data, noauth=True)
token = resp.headers.get('X-Subject-Token')
headers = {'X-Subject-Token': token}
@ -2192,7 +2193,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
r = self.post('/auth/tokens', body=auth_data)
r = self.post('/auth/tokens', body=auth_data, noauth=True)
# the unscoped token should have bind information in it
token = self.assertValidUnscopedTokenResponse(r)
@ -2203,7 +2204,7 @@ class TestAuthJSON(test_v3.RestfulTestCase):
# using unscoped token with remote user succeeds
auth_params = {'token': token, 'project_id': self.project_id}
auth_data = self.build_authentication_request(**auth_params)
r = self.post('/auth/tokens', body=auth_data)
r = self.post('/auth/tokens', body=auth_data, noauth=True)
token = self.assertValidProjectScopedTokenResponse(r)
# the bind information should be carried over from the original token
@ -2228,6 +2229,12 @@ class TestAuthJSON(test_v3.RestfulTestCase):
self.assertEqual(bind['kerberos'], self.default_domain_user['name'])
v2_token_id = v2_token_data['access']['token']['id']
# NOTE(gyee): self.get() will try to obtain an auth token if one
# is not provided. When REMOTE_USER is present in the request
# environment, the external user auth plugin is used in conjunction
# with the password auth for the admin user. Therefore, we need to
# cleanup the REMOTE_USER information from the previous call.
del self.admin_app.extra_environ['REMOTE_USER']
headers = {'X-Subject-Token': v2_token_id}
resp = self.get('/auth/tokens', headers=headers)
token_data = resp.result
@ -2332,6 +2339,18 @@ class TestAuthJSON(test_v3.RestfulTestCase):
project_domain_id=domain['id'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
def test_auth_methods_with_different_identities_fails(self):
# get the token for a user. This is self.user which is different from
# self.default_domain_user.
token = self.get_scoped_token()
# try both password and token methods with different identities and it
# should fail
auth_data = self.build_authentication_request(
token=token,
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
self.post('/auth/tokens', body=auth_data, expected_status=401)
class TestAuthXML(TestAuthJSON):
content_type = 'xml'
@ -3048,3 +3067,51 @@ class TestAPIProtectionWithoutAuthContextMiddleware(test_v3.RestfulTestCase):
'environment': {}}
r = auth_controller.validate_token(context)
self.assertEqual(200, r.status_code)
class TestAuthContext(tests.TestCase):
def setUp(self):
super(TestAuthContext, self).setUp()
self.auth_context = auth.controllers.AuthContext()
def test_pick_lowest_expires_at(self):
expires_at_1 = timeutils.isotime(timeutils.utcnow())
expires_at_2 = timeutils.isotime(timeutils.utcnow() +
datetime.timedelta(seconds=10))
# make sure auth_context picks the lowest value
self.auth_context['expires_at'] = expires_at_1
self.auth_context['expires_at'] = expires_at_2
self.assertEqual(expires_at_1, self.auth_context['expires_at'])
def test_identity_attribute_conflict(self):
for identity_attr in auth.controllers.AuthContext.IDENTITY_ATTRIBUTES:
self.auth_context[identity_attr] = uuid.uuid4().hex
if identity_attr == 'expires_at':
# 'expires_at' is a special case. Will test it in a separate
# test case.
continue
self.assertRaises(exception.Unauthorized,
operator.setitem,
self.auth_context,
identity_attr,
uuid.uuid4().hex)
def test_identity_attribute_conflict_with_none_value(self):
identity_attr = list(
auth.controllers.AuthContext.IDENTITY_ATTRIBUTES)[0]
self.auth_context[identity_attr] = None
self.assertRaises(exception.Unauthorized,
operator.setitem,
self.auth_context,
identity_attr,
uuid.uuid4().hex)
def test_non_identity_attribute_conflict_override(self):
# for attributes Keystone doesn't know about, make sure they can be
# freely manipulated
attr_name = uuid.uuid4().hex
attr_val_1 = uuid.uuid4().hex
attr_val_2 = uuid.uuid4().hex
self.auth_context[attr_name] = attr_val_1
self.auth_context[attr_name] = attr_val_2
self.assertEqual(attr_val_2, self.auth_context[attr_name])