Prevent domains creation for the default LDAP+SQL

This patch is supposed to fix the bug 'Unable to delete domain if user
from other domain was added', but because of the recent changes this
should be done by configuring domain-specifc LDAPs. However the original
scenario must not be reproducible.

The rule is that if you have the standard LDAP driver for identity
(without configuring domain-specifc LDAPs), then you can really only
have the default domain. However currently it's possible to create
domains with the folowing configuration:

[identity]
driver = keystone.identity.backends.ldap.Identity
[assignment]
driver = keystone.assignment.backends.sql.Assignment

At the same time such new domains can not be updated nor deleted. This
fix prevents new domains creation for such configuration.

Also this fix adds an additional test to MultiLDAPandSQLIdentity to make
sure that the original bug has gone.

Closes-Bug: #1262360

Change-Id: I3baee40f74cefaae601aea34e75630cc618ac31a
This commit is contained in:
Alexey Miroshkin 2014-08-26 15:34:22 +04:00
parent 1cacde3f59
commit 0da55a921f
5 changed files with 62 additions and 0 deletions

View File

@ -320,6 +320,9 @@ class Manager(manager.Manager):
@notifications.created('domain')
def create_domain(self, domain_id, domain):
if (not self.identity_api.multiple_domains_supported and
domain_id != CONF.identity.default_domain_id):
raise exception.Forbidden(_('Multiple domains are not supported'))
domain.setdefault('enabled', True)
domain['enabled'] = clean.domain_enabled(domain['enabled'])
ret = self.driver.create_domain(domain_id, domain)

View File

@ -838,6 +838,11 @@ class Driver(object):
"""Indicates if Driver supports domains."""
return True
@property
def multiple_domains_supported(self):
return (self.is_domain_aware() or
CONF.identity.domain_specific_drivers_enabled)
def generates_uuids(self):
"""Indicates if Driver generates UUIDs as the local entity ID."""
return True

View File

@ -215,6 +215,19 @@ def skip_if_cache_disabled(*sections):
return wrapper
def skip_if_no_multiple_domains_support(f):
"""This decorator is used to skip a test if an identity driver
does not support multiple domains.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
test_obj = args[0]
if not test_obj.identity_api.multiple_domains_supported:
raise testcase.TestSkipped('No multiple domains support')
return f(*args, **kwargs)
return wrapper
class UnexpectedExit(Exception):
pass

View File

@ -2014,6 +2014,7 @@ class IdentityTests(object):
self.assertIn(self.tenant_mtu['id'], project_ids)
self.assertIn(self.tenant_service['id'], project_ids)
@tests.skip_if_no_multiple_domains_support
def test_list_projects_for_alternate_domain(self):
domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_domain(domain1['id'], domain1)
@ -2480,6 +2481,7 @@ class IdentityTests(object):
group1['id'],
group1)
@tests.skip_if_no_multiple_domains_support
def test_project_crud(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True}
@ -2559,6 +2561,7 @@ class IdentityTests(object):
self.assignment_api.get_domain,
domain['id'])
@tests.skip_if_no_multiple_domains_support
def test_create_domain_case_sensitivity(self):
# create a ref with a lowercase name
ref = {
@ -2694,6 +2697,7 @@ class IdentityTests(object):
self.assertEqual(3, len(user_projects))
@tests.skip_if_cache_disabled('assignment')
@tests.skip_if_no_multiple_domains_support
def test_domain_rename_invalidates_get_domain_by_name_cache(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True}
@ -2763,6 +2767,7 @@ class IdentityTests(object):
domain_id)
@tests.skip_if_cache_disabled('assignment')
@tests.skip_if_no_multiple_domains_support
def test_project_rename_invalidates_get_project_by_name_cache(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True}
@ -2782,6 +2787,7 @@ class IdentityTests(object):
domain['id'])
@tests.skip_if_cache_disabled('assignment')
@tests.skip_if_no_multiple_domains_support
def test_cache_layer_project_crud(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True}
@ -2936,6 +2942,7 @@ class IdentityTests(object):
group_id=uuid.uuid4().hex,
project_id=self.tenant_bar['id'])
@tests.skip_if_no_multiple_domains_support
def test_get_default_domain_by_name(self):
domain_name = 'default'

View File

@ -1459,6 +1459,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
self.assignment_api.get_domain,
domain['id'])
@tests.skip_if_no_multiple_domains_support
def test_create_domain_case_sensitivity(self):
# domains are read-only, so case sensitivity isn't an issue
ref = {
@ -1963,6 +1964,14 @@ class LdapIdentitySqlAssignment(BaseLDAPIdentity, tests.SQLDriverOverrides,
self.assertThat(new_member_assignments,
matchers.Equals(expected_member_assignments))
def test_create_domain(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True}
self.assertRaises(exception.Forbidden,
self.assignment_api.create_domain,
domain['id'],
domain)
class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment):
"""Class to test mapping of default LDAP backend.
@ -2420,3 +2429,28 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
'role_id': MEMBER_ROLE_ID}]
self.assertThat(new_member_assignments,
matchers.Equals(expected_member_assignments))
def test_delete_domain_with_user_added(self):
domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'enabled': True}
project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain['id'],
'description': uuid.uuid4().hex, 'enabled': True
}
self.assignment_api.create_domain(domain['id'], domain)
self.assignment_api.create_project(project['id'], project)
project_ref = self.assignment_api.get_project(project['id'])
self.assertDictEqual(project_ref, project)
self.assignment_api.create_grant(user_id=self.user_foo['id'],
project_id=project['id'],
role_id=self.role_member['id'])
self.assignment_api.delete_grant(user_id=self.user_foo['id'],
project_id=project['id'],
role_id=self.role_member['id'])
domain['enabled'] = False
self.assignment_api.update_domain(domain['id'], domain)
self.assignment_api.delete_domain(domain['id'])
self.assertRaises(exception.DomainNotFound,
self.assignment_api.get_domain,
domain['id'])