Validate domains unconditionally (bug 1130236)

Ensure that we validate the domain status of user/project for
a user authenticating via the v2 API.

This patch builds on the initial functional change done by Dolph,
and fixes up the tests that broke sure to domain being required in
any tests that setup data directly in the backends.

Fixes Bug #1130236

Change-Id: I66dfd453fb95fa4fa3fde713b663386a2c2ecdf8
This commit is contained in:
Dolph Mathews 2013-02-19 10:39:22 -06:00 committed by Henry Nash
parent a066b69fbe
commit cd3f58a8d0
11 changed files with 134 additions and 50 deletions

View File

@ -91,6 +91,9 @@ class UserAuthInfo(object):
else:
user_ref = self.identity_api.get_user(
context=self.context, user_id=user_id)
domain_ref = self.identity_api.get_domain(
context=self.context, domain_id=user_ref['domain_id'])
self._assert_domain_is_enabled(domain_ref)
except exception.UserNotFound as e:
LOG.exception(e)
raise exception.Unauthorized(e)

View File

@ -20,7 +20,10 @@ from keystone import exception
class DictKvs(dict):
def get(self, key, default=None):
try:
return self[key]
if isinstance(self[key], dict):
return self[key].copy()
else:
return self[key][:]
except KeyError:
if default is not None:
return default

View File

@ -242,25 +242,43 @@ class TestCase(NoModule, unittest.TestCase):
# TODO(termie): doing something from json, probably based on Django's
# loaddata will be much preferred.
if hasattr(self, 'identity_api'):
for domain in fixtures.DOMAINS:
try:
rv = self.identity_api.create_domain(domain['id'], domain)
except (exception.Conflict, exception.NotImplemented):
pass
setattr(self, 'domain_%s' % domain['id'], domain)
for tenant in fixtures.TENANTS:
rv = self.identity_api.create_project(tenant['id'], tenant)
try:
rv = self.identity_api.create_project(tenant['id'], tenant)
except exception.Conflict:
rv = self.identity_api.get_project(tenant['id'])
pass
setattr(self, 'tenant_%s' % tenant['id'], rv)
for role in fixtures.ROLES:
try:
rv = self.identity_api.create_role(role['id'], role)
except exception.Conflict:
rv = self.identity_api.get_role(role['id'])
pass
setattr(self, 'role_%s' % role['id'], rv)
for user in fixtures.USERS:
user_copy = user.copy()
tenants = user_copy.pop('tenants')
rv = self.identity_api.create_user(user['id'],
user_copy.copy())
try:
rv = self.identity_api.create_user(user['id'],
user_copy.copy())
except exception.Conflict:
pass
for tenant_id in tenants:
self.identity_api.add_user_to_project(tenant_id,
user['id'])
try:
self.identity_api.add_user_to_project(tenant_id,
user['id'])
except exception.Conflict:
pass
setattr(self, 'user_%s' % user['id'], user_copy)
for metadata in fixtures.METADATA:

View File

@ -79,6 +79,7 @@ class Auth(controller.V2Controller):
context, auth)
user_ref, tenant_ref, metadata_ref, expiry = auth_info
core.validate_auth_info(self, context, user_ref, tenant_ref)
trust_id = metadata_ref.get('trust_id')
user_ref = self._filter_domain_id(user_ref)
if tenant_ref:
@ -88,9 +89,6 @@ class Auth(controller.V2Controller):
metadata_ref,
expiry)
# FIXME(dolph): domains will not be validated, as we just removed them
core.validate_auth_info(self, context, user_ref, tenant_ref)
if tenant_ref:
catalog_ref = self.catalog_api.get_catalog(
context=context,

View File

@ -79,15 +79,13 @@ def validate_auth_info(self, context, user_ref, tenant_ref):
raise exception.Unauthorized(msg)
# If the user's domain is disabled don't allow them to authenticate
# TODO(dolph): remove this check after default-domain migration
if user_ref.get('domain_id') is not None:
user_domain_ref = self.identity_api.get_domain(
context,
user_ref['domain_id'])
if user_domain_ref and not user_domain_ref.get('enabled', True):
msg = 'Domain is disabled: %s' % user_domain_ref['id']
LOG.warning(msg)
raise exception.Unauthorized(msg)
user_domain_ref = self.identity_api.get_domain(
context,
user_ref['domain_id'])
if user_domain_ref and not user_domain_ref.get('enabled', True):
msg = 'Domain is disabled: %s' % user_domain_ref['id']
LOG.warning(msg)
raise exception.Unauthorized(msg)
if tenant_ref:
# If the project is disabled don't allow them to authenticate
@ -97,16 +95,14 @@ def validate_auth_info(self, context, user_ref, tenant_ref):
raise exception.Unauthorized(msg)
# If the project's domain is disabled don't allow them to authenticate
# TODO(dolph): remove this check after default-domain migration
if tenant_ref.get('domain_id') is not None:
project_domain_ref = self.identity_api.get_domain(
context,
tenant_ref['domain_id'])
if (project_domain_ref and
not project_domain_ref.get('enabled', True)):
msg = 'Domain is disabled: %s' % project_domain_ref['id']
LOG.warning(msg)
raise exception.Unauthorized(msg)
project_domain_ref = self.identity_api.get_domain(
context,
tenant_ref['domain_id'])
if (project_domain_ref and
not project_domain_ref.get('enabled', True)):
msg = 'Domain is disabled: %s' % project_domain_ref['id']
LOG.warning(msg)
raise exception.Unauthorized(msg)
@dependency.provider('token_api')

View File

@ -118,3 +118,13 @@ ROLES = [
}
]
DOMAINS = [
{
'id': DEFAULT_DOMAIN_ID,
'name': 'Default',
'enabled': True,
'description': 'Owns users and tenants (i.e. projects) available '
'on Identity API v2.'
}
]

View File

@ -612,9 +612,6 @@ class AuthWithTrust(AuthTest):
return auth_response
def fetch_v3_token_from_trust(self):
self.identity_api.create_domain("default",
{"name": "default",
"id": "default"})
v3_password_data = {
'identity': {
"methods": ["password"],

View File

@ -23,7 +23,6 @@ from keystone.catalog import core
from keystone import config
from keystone import exception
from keystone.openstack.common import timeutils
from keystone import config
from keystone import test
@ -1582,10 +1581,11 @@ class IdentityTests(object):
self.identity_api.create_domain(domain1['id'], domain1)
self.identity_api.create_domain(domain2['id'], domain2)
domains = self.identity_api.list_domains()
self.assertEquals(len(domains), 2)
self.assertEquals(len(domains), 3)
domain_ids = []
for domain in domains:
domain_ids.append(domain.get('id'))
self.assertIn(DEFAULT_DOMAIN_ID, domain_ids)
self.assertIn(domain1['id'], domain_ids)
self.assertIn(domain2['id'], domain_ids)

View File

@ -23,6 +23,9 @@ from keystone import exception
from keystone import test
from keystone import token
import default_fixtures
ROOTDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SSLDIR = "%s/tests/ssl/" % ROOTDIR
CONF = test.CONF
@ -46,6 +49,7 @@ class CertSetupTestCase(test.TestCase):
CONF.signing.keyfile = os.path.join(KEYDIR, "signing_key.pem")
self.load_backends()
self.load_fixtures(default_fixtures)
self.controller = token.controllers.Auth()
def test_can_handle_missing_certs(self):

View File

@ -340,7 +340,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase):
self.assertIn('signed', r.body)
class ATestTokenRevoking(test_v3.RestfulTestCase):
class TestTokenRevoking(test_v3.RestfulTestCase):
"""Test token revoking for relevant v3 identity apis"""
def setUp(self):
@ -360,7 +360,7 @@ class ATestTokenRevoking(test_v3.RestfulTestCase):
- User3 is a member of group2
"""
super(ATestTokenRevoking, self).setUp()
super(TestTokenRevoking, self).setUp()
# Start by creating a couple of domains and projects
self.domainA = self.new_domain_ref()

View File

@ -79,26 +79,81 @@ class IdentityTestCase(test_v3.RestfulTestCase):
def test_disable_domain(self):
"""PATCH /domains/{domain_id} (set enabled=False)"""
self.domain['enabled'] = False
# Create a 2nd set of entities in a 2nd domain
self.domain2 = self.new_domain_ref()
self.identity_api.create_domain(self.domain2['id'], self.domain2)
self.project2 = self.new_project_ref(
domain_id=self.domain2['id'])
self.identity_api.create_project(self.project2['id'], self.project2)
self.user2 = self.new_user_ref(
domain_id=self.domain2['id'],
project_id=self.project2['id'])
self.identity_api.create_user(self.user2['id'], self.user2)
self.identity_api.add_user_to_project(self.project2['id'],
self.user2['id'])
# First check a user in that domain can authenticate, via
# Both v2 and v3
body = {
'auth': {
'passwordCredentials': {
'userId': self.user2['id'],
'password': self.user2['password']
},
'tenantId': self.project2['id']
}
}
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body)
auth_data = self.build_authentication_request(
user_id=self.user2['id'],
password=self.user2['password'],
project_id=self.project2['id'])
resp = self.post('/auth/tokens', body=auth_data)
# Now disable the domain
self.domain2['enabled'] = False
r = self.patch('/domains/%(domain_id)s' % {
'domain_id': self.domain_id},
'domain_id': self.domain2['id']},
body={'domain': {'enabled': False}})
self.assertValidDomainResponse(r, self.domain)
self.assertValidDomainResponse(r, self.domain2)
# check that the project and user are still enabled
# FIXME(gyee): are these tests still valid since user should not
# be able to authenticate into a disabled domain
#r = self.get('/projects/%(project_id)s' % {
# 'project_id': self.project_id})
#self.assertValidProjectResponse(r, self.project)
#self.assertTrue(r.body['project']['enabled'])
# Make sure the user can no longer authenticate, via
# either API
body = {
'auth': {
'passwordCredentials': {
'userId': self.user2['id'],
'password': self.user2['password']
},
'tenantId': self.project2['id']
}
}
resp = self.admin_request(path='/v2.0/tokens',
method='POST',
body=body,
expected_status=401)
#r = self.get('/users/%(user_id)s' % {
# 'user_id': self.user['id']})
#self.assertValidUserResponse(r, self.user)
#self.assertTrue(r.body['user']['enabled'])
# Try looking up in v3 by name and id
auth_data = self.build_authentication_request(
user_id=self.user2['id'],
password=self.user2['password'],
project_id=self.project2['id'])
resp = self.post('/auth/tokens', body=auth_data,
expected_status=401)
# TODO(dolph): assert that v2 & v3 auth return 401
auth_data = self.build_authentication_request(
username=self.user2['name'],
user_domain_id=self.domain2['id'],
password=self.user2['password'],
project_id=self.project2['id'])
resp = self.post('/auth/tokens', body=auth_data,
expected_status=401)
def test_delete_enabled_domain_fails(self):
"""DELETE /domains/{domain_id}...(when domain enabled)"""