Merge "LDAP fix for get_roles_for_user_and_project user=group ID"
This commit is contained in:
commit
ace80725fe
|
@ -89,10 +89,11 @@ class Assignment(assignment.Driver):
|
|||
|
||||
def _get_roles_for_just_user_and_project(user_id, tenant_id):
|
||||
self.get_project(tenant_id)
|
||||
user_dn = self.user._id_to_dn(user_id)
|
||||
return [self.role._dn_to_id(a.role_dn)
|
||||
for a in self.role.get_role_assignments
|
||||
(self.project._id_to_dn(tenant_id))
|
||||
if self.user._dn_to_id(a.user_dn) == user_id]
|
||||
if common_ldap.is_dn_equal(a.user_dn, user_dn)]
|
||||
|
||||
def _get_roles_for_group_and_project(group_id, project_id):
|
||||
self.get_project(project_id)
|
||||
|
@ -106,7 +107,7 @@ class Assignment(assignment.Driver):
|
|||
return [self.role._dn_to_id(a.role_dn)
|
||||
for a in self.role.get_role_assignments
|
||||
(self.project._id_to_dn(project_id))
|
||||
if a.user_dn.upper() == group_dn.upper()]
|
||||
if common_ldap.is_dn_equal(a.user_dn, group_dn)]
|
||||
|
||||
if domain_id is not None:
|
||||
msg = _('Domain metadata not supported by LDAP')
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
import abc
|
||||
import os.path
|
||||
import re
|
||||
|
||||
import codecs
|
||||
import ldap
|
||||
|
@ -141,6 +142,115 @@ def ldap_scope(scope):
|
|||
'options': ', '.join(LDAP_SCOPES.keys())})
|
||||
|
||||
|
||||
def prep_case_insensitive(value):
|
||||
"""Prepare a string for case-insensitive comparison.
|
||||
|
||||
This is defined in RFC4518. For simplicity, all this function does is
|
||||
lowercase all the characters, strip leading and trailing whitespace,
|
||||
and compress sequences of spaces to a single space.
|
||||
"""
|
||||
value = re.sub(r'\s+', ' ', value.strip().lower())
|
||||
return value
|
||||
|
||||
|
||||
def is_ava_value_equal(attribute_type, val1, val2):
|
||||
"""Returns True if and only if the AVAs are equal.
|
||||
|
||||
When comparing AVAs, the equality matching rule for the attribute type
|
||||
should be taken into consideration. For simplicity, this implementation
|
||||
does a case-insensitive comparison.
|
||||
|
||||
Note that this function uses prep_case_insenstive so the limitations of
|
||||
that function apply here.
|
||||
|
||||
"""
|
||||
|
||||
return prep_case_insensitive(val1) == prep_case_insensitive(val2)
|
||||
|
||||
|
||||
def is_rdn_equal(rdn1, rdn2):
|
||||
"""Returns True if and only if the RDNs are equal.
|
||||
|
||||
* RDNs must have the same number of AVAs.
|
||||
* Each AVA of the RDNs must be the equal for the same attribute type. The
|
||||
order isn't significant. Note that an attribute type will only be in one
|
||||
AVA in an RDN, otherwise the DN wouldn't be valid.
|
||||
* Attribute types aren't case sensitive. Note that attribute type
|
||||
comparison is more complicated than implemented. This function only
|
||||
compares case-insentive. The code should handle multiple names for an
|
||||
attribute type (e.g., cn, commonName, and 2.5.4.3 are the same).
|
||||
|
||||
Note that this function uses is_ava_value_equal to compare AVAs so the
|
||||
limitations of that function apply here.
|
||||
|
||||
"""
|
||||
|
||||
if len(rdn1) != len(rdn2):
|
||||
return False
|
||||
|
||||
for attr_type_1, val1, dummy in rdn1:
|
||||
found = False
|
||||
for attr_type_2, val2, dummy in rdn2:
|
||||
if attr_type_1.lower() != attr_type_2.lower():
|
||||
continue
|
||||
|
||||
found = True
|
||||
if not is_ava_value_equal(attr_type_1, val1, val2):
|
||||
return False
|
||||
break
|
||||
if not found:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def is_dn_equal(dn1, dn2):
|
||||
"""Returns True if and only if the DNs are equal.
|
||||
|
||||
Two DNs are equal if they've got the same number of RDNs and if the RDNs
|
||||
are the same at each position. See RFC4517.
|
||||
|
||||
Note that this function uses is_rdn_equal to compare RDNs so the
|
||||
limitations of that function apply here.
|
||||
|
||||
:param dn1: Either a string DN or a DN parsed by ldap.dn.str2dn.
|
||||
:param dn2: Either a string DN or a DN parsed by ldap.dn.str2dn.
|
||||
|
||||
"""
|
||||
|
||||
if not isinstance(dn1, list):
|
||||
dn1 = ldap.dn.str2dn(dn1)
|
||||
if not isinstance(dn2, list):
|
||||
dn2 = ldap.dn.str2dn(dn2)
|
||||
|
||||
if len(dn1) != len(dn2):
|
||||
return False
|
||||
|
||||
for rdn1, rdn2 in zip(dn1, dn2):
|
||||
if not is_rdn_equal(rdn1, rdn2):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def dn_startswith(descendant_dn, dn):
|
||||
"""Returns True if and only if the descendant_dn is under the dn.
|
||||
|
||||
:param descendant_dn: Either a string DN or a DN parsed by ldap.dn.str2dn.
|
||||
:param dn: Either a string DN or a DN parsed by ldap.dn.str2dn.
|
||||
|
||||
"""
|
||||
|
||||
if not isinstance(descendant_dn, list):
|
||||
descendant_dn = ldap.dn.str2dn(descendant_dn)
|
||||
if not isinstance(dn, list):
|
||||
dn = ldap.dn.str2dn(dn)
|
||||
|
||||
if len(descendant_dn) <= len(dn):
|
||||
return False
|
||||
|
||||
return is_dn_equal(descendant_dn[len(dn):], dn)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class LDAPHandler(object):
|
||||
'''Abstract class which defines methods for a LDAP API provider.
|
||||
|
|
|
@ -62,6 +62,11 @@ def _internal_attr(attr_name, value_or_values):
|
|||
if dn == 'cn=Doe\\5c, John,ou=Users,cn=example,cn=com':
|
||||
return 'CN=Doe\\, John,OU=Users,CN=example,CN=com'
|
||||
|
||||
# NOTE(blk-u): Another special case for this tested value. When a
|
||||
# roleOccupant has an escaped comma, it gets converted to \2C.
|
||||
if dn == 'cn=Doe\\, John,ou=Users,cn=example,cn=com':
|
||||
return 'CN=Doe\\2C John,OU=Users,CN=example,CN=com'
|
||||
|
||||
dn = ldap.dn.str2dn(core.utf8_encode(dn))
|
||||
norm = []
|
||||
for part in dn:
|
||||
|
@ -133,7 +138,9 @@ def _match(key, value, attrs):
|
|||
str_sids = [six.text_type(x) for x in attrs[key]]
|
||||
return six.text_type(value) in str_sids
|
||||
if key != 'objectclass':
|
||||
return _internal_attr(key, value)[0] in attrs[key]
|
||||
check_value = _internal_attr(key, value)[0]
|
||||
norm_values = list(_internal_attr(key, x)[0] for x in attrs[key])
|
||||
return check_value in norm_values
|
||||
# it is an objectclass check, so check subclasses
|
||||
values = _subs(value)
|
||||
for v in values:
|
||||
|
|
|
@ -594,6 +594,34 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
|
|||
|
||||
self.assertThat(ref_list, matchers.Equals([group]))
|
||||
|
||||
def test_user_id_comma_grants(self):
|
||||
"""Even if the user has a , in their ID, can get user and group grants.
|
||||
"""
|
||||
|
||||
# Create a user with a , in their ID
|
||||
# NOTE(blk-u): the DN for this user is hard-coded in fakeldap!
|
||||
user_id = u'Doe, John'
|
||||
user = {
|
||||
'id': user_id,
|
||||
'name': self.getUniqueString(),
|
||||
'password': self.getUniqueString(),
|
||||
'domain_id': CONF.identity.default_domain_id,
|
||||
}
|
||||
self.identity_api.create_user(user_id, user)
|
||||
|
||||
# Grant the user a role on a project.
|
||||
|
||||
role_id = 'member'
|
||||
project_id = self.tenant_baz['id']
|
||||
|
||||
self.assignment_api.create_grant(role_id, user_id=user_id,
|
||||
project_id=project_id)
|
||||
|
||||
role_ref = self.assignment_api.get_grant(role_id, user_id=user_id,
|
||||
project_id=project_id)
|
||||
|
||||
self.assertEqual(role_id, role_ref['id'])
|
||||
|
||||
|
||||
class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
|
||||
|
||||
|
@ -1250,45 +1278,6 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
|
|||
user1['id'], CONF.identity.default_domain_id)
|
||||
self.assertEqual(0, len(combined_role_list))
|
||||
|
||||
def test_get_roles_for_user_and_project_user_group_same_id(self):
|
||||
"""When a user has the same ID as a group,
|
||||
get_roles_for_user_and_project returns the roles for the group.
|
||||
|
||||
Overriding this test for LDAP because it works differently. The role
|
||||
for the group is returned. This is bug 1309228.
|
||||
"""
|
||||
|
||||
# Setup: create user, group with same ID, role, and project;
|
||||
# assign the group the role on the project.
|
||||
|
||||
user_group_id = uuid.uuid4().hex
|
||||
|
||||
user1 = {'id': user_group_id, 'name': uuid.uuid4().hex,
|
||||
'domain_id': CONF.identity.default_domain_id, }
|
||||
self.identity_api.create_user(user_group_id, user1)
|
||||
|
||||
group1 = {'id': user_group_id, 'name': uuid.uuid4().hex,
|
||||
'domain_id': CONF.identity.default_domain_id, }
|
||||
self.identity_api.create_group(user_group_id, group1)
|
||||
|
||||
role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
|
||||
self.assignment_api.create_role(role1['id'], role1)
|
||||
|
||||
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||
'domain_id': CONF.identity.default_domain_id, }
|
||||
self.assignment_api.create_project(project1['id'], project1)
|
||||
|
||||
self.assignment_api.create_grant(role1['id'],
|
||||
group_id=user_group_id,
|
||||
project_id=project1['id'])
|
||||
|
||||
# Check the roles, shouldn't be any since the user wasn't granted any.
|
||||
roles = self.assignment_api.get_roles_for_user_and_project(
|
||||
user_group_id, project1['id'])
|
||||
|
||||
self.assertEqual([role1['id']], roles,
|
||||
'role for group is %s' % role1['id'])
|
||||
|
||||
def test_list_projects_for_alternate_domain(self):
|
||||
self.skipTest(
|
||||
'N/A: LDAP does not support multiple domains')
|
||||
|
|
|
@ -0,0 +1,169 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import ldap.dn
|
||||
|
||||
from keystone.common import ldap as ks_ldap
|
||||
from keystone import tests
|
||||
|
||||
|
||||
class DnCompareTest(tests.BaseTestCase):
|
||||
"""Tests for the DN comparison functions in keystone.common.ldap.core."""
|
||||
|
||||
def test_prep(self):
|
||||
# prep_case_insensitive returns the string with spaces at the front and
|
||||
# end if it's already lowercase and no insignificant characters.
|
||||
value = 'lowercase value'
|
||||
self.assertEqual(value, ks_ldap.prep_case_insensitive(value))
|
||||
|
||||
def test_prep_lowercase(self):
|
||||
# prep_case_insensitive returns the string with spaces at the front and
|
||||
# end and lowercases the value.
|
||||
value = 'UPPERCASE VALUE'
|
||||
exp_value = value.lower()
|
||||
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
|
||||
|
||||
def test_prep_insignificant(self):
|
||||
# prep_case_insensitive remove insignificant spaces.
|
||||
value = 'before after'
|
||||
exp_value = 'before after'
|
||||
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
|
||||
|
||||
def test_prep_insignificant_pre_post(self):
|
||||
# prep_case_insensitive remove insignificant spaces.
|
||||
value = ' value '
|
||||
exp_value = 'value'
|
||||
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
|
||||
|
||||
def test_ava_equal_same(self):
|
||||
# is_ava_value_equal returns True if the two values are the same.
|
||||
value = 'val1'
|
||||
self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value))
|
||||
|
||||
def test_ava_equal_complex(self):
|
||||
# is_ava_value_equal returns True if the two values are the same using
|
||||
# a value that's got different capitalization and insignificant chars.
|
||||
val1 = 'before after'
|
||||
val2 = ' BEFORE afTer '
|
||||
self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2))
|
||||
|
||||
def test_ava_different(self):
|
||||
# is_ava_value_equal returns False if the values aren't the same.
|
||||
self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
|
||||
|
||||
def test_rdn_same(self):
|
||||
# is_rdn_equal returns True if the two values are the same.
|
||||
rdn = ldap.dn.str2dn('cn=val1')[0]
|
||||
self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn))
|
||||
|
||||
def test_rdn_diff_length(self):
|
||||
# is_rdn_equal returns False if the RDNs have a different number of
|
||||
# AVAs.
|
||||
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
|
||||
rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
|
||||
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
|
||||
|
||||
def test_rdn_multi_ava_same_order(self):
|
||||
# is_rdn_equal returns True if the RDNs have the same number of AVAs
|
||||
# and the values are the same.
|
||||
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
|
||||
rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
|
||||
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
|
||||
|
||||
def test_rdn_multi_ava_diff_order(self):
|
||||
# is_rdn_equal returns True if the RDNs have the same number of AVAs
|
||||
# and the values are the same, even if in a different order
|
||||
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
|
||||
rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
|
||||
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
|
||||
|
||||
def test_rdn_multi_ava_diff_type(self):
|
||||
# is_rdn_equal returns False if the RDNs have the same number of AVAs
|
||||
# and the attribute types are different.
|
||||
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
|
||||
rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
|
||||
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
|
||||
|
||||
def test_rdn_attr_type_case_diff(self):
|
||||
# is_rdn_equal returns True for same RDNs even when attr type case is
|
||||
# different.
|
||||
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
|
||||
rdn2 = ldap.dn.str2dn('CN=cn1')[0]
|
||||
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
|
||||
|
||||
def test_rdn_attr_type_alias(self):
|
||||
# is_rdn_equal returns False for same RDNs even when attr type alias is
|
||||
# used. Note that this is a limitation since an LDAP server should
|
||||
# consider them equal.
|
||||
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
|
||||
rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
|
||||
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
|
||||
|
||||
def test_dn_same(self):
|
||||
# is_dn_equal returns True if the DNs are the same.
|
||||
dn = 'cn=Babs Jansen,ou=OpenStack'
|
||||
self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
|
||||
|
||||
def test_dn_diff_length(self):
|
||||
# is_dn_equal returns False if the DNs don't have the same number of
|
||||
# RDNs
|
||||
dn1 = 'cn=Babs Jansen,ou=OpenStack'
|
||||
dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
|
||||
self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2))
|
||||
|
||||
def test_dn_equal_rdns(self):
|
||||
# is_dn_equal returns True if the DNs have the same number of RDNs
|
||||
# and each RDN is the same.
|
||||
dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
|
||||
dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
|
||||
self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2))
|
||||
|
||||
def test_dn_parsed_dns(self):
|
||||
# is_dn_equal can also accept parsed DNs.
|
||||
dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
|
||||
dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
|
||||
self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2))
|
||||
|
||||
def test_startswith_under_child(self):
|
||||
# dn_startswith returns True if descendant_dn is a child of dn.
|
||||
child = 'cn=Babs Jansen,ou=OpenStack'
|
||||
parent = 'ou=OpenStack'
|
||||
self.assertTrue(ks_ldap.dn_startswith(child, parent))
|
||||
|
||||
def test_startswith_parent(self):
|
||||
# dn_startswith returns False if descendant_dn is a parent of dn.
|
||||
child = 'cn=Babs Jansen,ou=OpenStack'
|
||||
parent = 'ou=OpenStack'
|
||||
self.assertFalse(ks_ldap.dn_startswith(parent, child))
|
||||
|
||||
def test_startswith_same(self):
|
||||
# dn_startswith returns False if DNs are the same.
|
||||
dn = 'cn=Babs Jansen,ou=OpenStack'
|
||||
self.assertFalse(ks_ldap.dn_startswith(dn, dn))
|
||||
|
||||
def test_startswith_not_parent(self):
|
||||
# dn_startswith returns False if descendant_dn is not under the dn
|
||||
child = 'cn=Babs Jansen,ou=OpenStack'
|
||||
parent = 'dc=example.com'
|
||||
self.assertFalse(ks_ldap.dn_startswith(child, parent))
|
||||
|
||||
def test_startswith_descendant(self):
|
||||
# dn_startswith returns True if descendant_dn is a descendant of dn.
|
||||
descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
|
||||
dn = 'ou=OpenStack,dc=example.com'
|
||||
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
|
||||
|
||||
def test_startswith_parsed_dns(self):
|
||||
# dn_startswith also accepts parsed DNs.
|
||||
descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
|
||||
dn = ldap.dn.str2dn('ou=OpenStack')
|
||||
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
|
Loading…
Reference in New Issue