LDAP fix for get_roles_for_user_and_project user=group ID

When there was a role assigned to a group with the same ID as a
user, the LDAP assignment backend would incorrectly return the
assignment to the group when requesting roles for the user via
the get_roles_for_user_and_project method.

With this change, assignments to a group with the same ID are not
returned for the user when calling get_roles_for_user_and_project.

Functions were added to compare DNs more accurately based on the
LDAP RFCs.

The fakeldap code was changed to normalize the values when
comparing values for checking if the values match the filter.

Co-Authored By: Nathan Kinder <nkinder@redhat.com>
Co-Authored By: Adam Young <ayoung@redhat.com>

Change-Id: Ia6f1ae2e3af1e968f1a393bd4f2f38812a88a5d0
Closes-Bug: #1309228
This commit is contained in:
Brant Knudson 2014-05-21 17:30:42 -05:00
parent 72f046fd09
commit 729dcad738
5 changed files with 318 additions and 42 deletions

View File

@ -89,10 +89,11 @@ class Assignment(assignment.Driver):
def _get_roles_for_just_user_and_project(user_id, tenant_id):
self.get_project(tenant_id)
user_dn = self.user._id_to_dn(user_id)
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(tenant_id))
if self.user._dn_to_id(a.user_dn) == user_id]
if common_ldap.is_dn_equal(a.user_dn, user_dn)]
def _get_roles_for_group_and_project(group_id, project_id):
self.get_project(project_id)
@ -106,7 +107,7 @@ class Assignment(assignment.Driver):
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(project_id))
if a.user_dn.upper() == group_dn.upper()]
if common_ldap.is_dn_equal(a.user_dn, group_dn)]
if domain_id is not None:
msg = _('Domain metadata not supported by LDAP')

View File

@ -14,6 +14,7 @@
import abc
import os.path
import re
import codecs
import ldap
@ -141,6 +142,115 @@ def ldap_scope(scope):
'options': ', '.join(LDAP_SCOPES.keys())})
def prep_case_insensitive(value):
"""Prepare a string for case-insensitive comparison.
This is defined in RFC4518. For simplicity, all this function does is
lowercase all the characters, strip leading and trailing whitespace,
and compress sequences of spaces to a single space.
"""
value = re.sub(r'\s+', ' ', value.strip().lower())
return value
def is_ava_value_equal(attribute_type, val1, val2):
"""Returns True if and only if the AVAs are equal.
When comparing AVAs, the equality matching rule for the attribute type
should be taken into consideration. For simplicity, this implementation
does a case-insensitive comparison.
Note that this function uses prep_case_insenstive so the limitations of
that function apply here.
"""
return prep_case_insensitive(val1) == prep_case_insensitive(val2)
def is_rdn_equal(rdn1, rdn2):
"""Returns True if and only if the RDNs are equal.
* RDNs must have the same number of AVAs.
* Each AVA of the RDNs must be the equal for the same attribute type. The
order isn't significant. Note that an attribute type will only be in one
AVA in an RDN, otherwise the DN wouldn't be valid.
* Attribute types aren't case sensitive. Note that attribute type
comparison is more complicated than implemented. This function only
compares case-insentive. The code should handle multiple names for an
attribute type (e.g., cn, commonName, and 2.5.4.3 are the same).
Note that this function uses is_ava_value_equal to compare AVAs so the
limitations of that function apply here.
"""
if len(rdn1) != len(rdn2):
return False
for attr_type_1, val1, dummy in rdn1:
found = False
for attr_type_2, val2, dummy in rdn2:
if attr_type_1.lower() != attr_type_2.lower():
continue
found = True
if not is_ava_value_equal(attr_type_1, val1, val2):
return False
break
if not found:
return False
return True
def is_dn_equal(dn1, dn2):
"""Returns True if and only if the DNs are equal.
Two DNs are equal if they've got the same number of RDNs and if the RDNs
are the same at each position. See RFC4517.
Note that this function uses is_rdn_equal to compare RDNs so the
limitations of that function apply here.
:param dn1: Either a string DN or a DN parsed by ldap.dn.str2dn.
:param dn2: Either a string DN or a DN parsed by ldap.dn.str2dn.
"""
if not isinstance(dn1, list):
dn1 = ldap.dn.str2dn(dn1)
if not isinstance(dn2, list):
dn2 = ldap.dn.str2dn(dn2)
if len(dn1) != len(dn2):
return False
for rdn1, rdn2 in zip(dn1, dn2):
if not is_rdn_equal(rdn1, rdn2):
return False
return True
def dn_startswith(descendant_dn, dn):
"""Returns True if and only if the descendant_dn is under the dn.
:param descendant_dn: Either a string DN or a DN parsed by ldap.dn.str2dn.
:param dn: Either a string DN or a DN parsed by ldap.dn.str2dn.
"""
if not isinstance(descendant_dn, list):
descendant_dn = ldap.dn.str2dn(descendant_dn)
if not isinstance(dn, list):
dn = ldap.dn.str2dn(dn)
if len(descendant_dn) <= len(dn):
return False
return is_dn_equal(descendant_dn[len(dn):], dn)
@six.add_metaclass(abc.ABCMeta)
class LDAPHandler(object):
'''Abstract class which defines methods for a LDAP API provider.

View File

@ -62,6 +62,11 @@ def _internal_attr(attr_name, value_or_values):
if dn == 'cn=Doe\\5c, John,ou=Users,cn=example,cn=com':
return 'CN=Doe\\, John,OU=Users,CN=example,CN=com'
# NOTE(blk-u): Another special case for this tested value. When a
# roleOccupant has an escaped comma, it gets converted to \2C.
if dn == 'cn=Doe\\, John,ou=Users,cn=example,cn=com':
return 'CN=Doe\\2C John,OU=Users,CN=example,CN=com'
dn = ldap.dn.str2dn(core.utf8_encode(dn))
norm = []
for part in dn:
@ -133,7 +138,9 @@ def _match(key, value, attrs):
str_sids = [six.text_type(x) for x in attrs[key]]
return six.text_type(value) in str_sids
if key != 'objectclass':
return _internal_attr(key, value)[0] in attrs[key]
check_value = _internal_attr(key, value)[0]
norm_values = list(_internal_attr(key, x)[0] for x in attrs[key])
return check_value in norm_values
# it is an objectclass check, so check subclasses
values = _subs(value)
for v in values:

View File

@ -594,6 +594,34 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.assertThat(ref_list, matchers.Equals([group]))
def test_user_id_comma_grants(self):
"""Even if the user has a , in their ID, can get user and group grants.
"""
# Create a user with a , in their ID
# NOTE(blk-u): the DN for this user is hard-coded in fakeldap!
user_id = u'Doe, John'
user = {
'id': user_id,
'name': self.getUniqueString(),
'password': self.getUniqueString(),
'domain_id': CONF.identity.default_domain_id,
}
self.identity_api.create_user(user_id, user)
# Grant the user a role on a project.
role_id = 'member'
project_id = self.tenant_baz['id']
self.assignment_api.create_grant(role_id, user_id=user_id,
project_id=project_id)
role_ref = self.assignment_api.get_grant(role_id, user_id=user_id,
project_id=project_id)
self.assertEqual(role_id, role_ref['id'])
class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
@ -1250,45 +1278,6 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
user1['id'], CONF.identity.default_domain_id)
self.assertEqual(0, len(combined_role_list))
def test_get_roles_for_user_and_project_user_group_same_id(self):
"""When a user has the same ID as a group,
get_roles_for_user_and_project returns the roles for the group.
Overriding this test for LDAP because it works differently. The role
for the group is returned. This is bug 1309228.
"""
# Setup: create user, group with same ID, role, and project;
# assign the group the role on the project.
user_group_id = uuid.uuid4().hex
user1 = {'id': user_group_id, 'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id, }
self.identity_api.create_user(user_group_id, user1)
group1 = {'id': user_group_id, 'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id, }
self.identity_api.create_group(user_group_id, group1)
role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role1['id'], role1)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id, }
self.assignment_api.create_project(project1['id'], project1)
self.assignment_api.create_grant(role1['id'],
group_id=user_group_id,
project_id=project1['id'])
# Check the roles, shouldn't be any since the user wasn't granted any.
roles = self.assignment_api.get_roles_for_user_and_project(
user_group_id, project1['id'])
self.assertEqual([role1['id']], roles,
'role for group is %s' % role1['id'])
def test_list_projects_for_alternate_domain(self):
self.skipTest(
'N/A: LDAP does not support multiple domains')

View File

@ -0,0 +1,169 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ldap.dn
from keystone.common import ldap as ks_ldap
from keystone import tests
class DnCompareTest(tests.BaseTestCase):
"""Tests for the DN comparison functions in keystone.common.ldap.core."""
def test_prep(self):
# prep_case_insensitive returns the string with spaces at the front and
# end if it's already lowercase and no insignificant characters.
value = 'lowercase value'
self.assertEqual(value, ks_ldap.prep_case_insensitive(value))
def test_prep_lowercase(self):
# prep_case_insensitive returns the string with spaces at the front and
# end and lowercases the value.
value = 'UPPERCASE VALUE'
exp_value = value.lower()
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
def test_prep_insignificant(self):
# prep_case_insensitive remove insignificant spaces.
value = 'before after'
exp_value = 'before after'
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
def test_prep_insignificant_pre_post(self):
# prep_case_insensitive remove insignificant spaces.
value = ' value '
exp_value = 'value'
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
def test_ava_equal_same(self):
# is_ava_value_equal returns True if the two values are the same.
value = 'val1'
self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value))
def test_ava_equal_complex(self):
# is_ava_value_equal returns True if the two values are the same using
# a value that's got different capitalization and insignificant chars.
val1 = 'before after'
val2 = ' BEFORE afTer '
self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2))
def test_ava_different(self):
# is_ava_value_equal returns False if the values aren't the same.
self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
def test_rdn_same(self):
# is_rdn_equal returns True if the two values are the same.
rdn = ldap.dn.str2dn('cn=val1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn))
def test_rdn_diff_length(self):
# is_rdn_equal returns False if the RDNs have a different number of
# AVAs.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_same_order(self):
# is_rdn_equal returns True if the RDNs have the same number of AVAs
# and the values are the same.
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_diff_order(self):
# is_rdn_equal returns True if the RDNs have the same number of AVAs
# and the values are the same, even if in a different order
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_diff_type(self):
# is_rdn_equal returns False if the RDNs have the same number of AVAs
# and the attribute types are different.
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_attr_type_case_diff(self):
# is_rdn_equal returns True for same RDNs even when attr type case is
# different.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('CN=cn1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_attr_type_alias(self):
# is_rdn_equal returns False for same RDNs even when attr type alias is
# used. Note that this is a limitation since an LDAP server should
# consider them equal.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_dn_same(self):
# is_dn_equal returns True if the DNs are the same.
dn = 'cn=Babs Jansen,ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
def test_dn_diff_length(self):
# is_dn_equal returns False if the DNs don't have the same number of
# RDNs
dn1 = 'cn=Babs Jansen,ou=OpenStack'
dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2))
def test_dn_equal_rdns(self):
# is_dn_equal returns True if the DNs have the same number of RDNs
# and each RDN is the same.
dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2))
def test_dn_parsed_dns(self):
# is_dn_equal can also accept parsed DNs.
dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2))
def test_startswith_under_child(self):
# dn_startswith returns True if descendant_dn is a child of dn.
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'ou=OpenStack'
self.assertTrue(ks_ldap.dn_startswith(child, parent))
def test_startswith_parent(self):
# dn_startswith returns False if descendant_dn is a parent of dn.
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'ou=OpenStack'
self.assertFalse(ks_ldap.dn_startswith(parent, child))
def test_startswith_same(self):
# dn_startswith returns False if DNs are the same.
dn = 'cn=Babs Jansen,ou=OpenStack'
self.assertFalse(ks_ldap.dn_startswith(dn, dn))
def test_startswith_not_parent(self):
# dn_startswith returns False if descendant_dn is not under the dn
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'dc=example.com'
self.assertFalse(ks_ldap.dn_startswith(child, parent))
def test_startswith_descendant(self):
# dn_startswith returns True if descendant_dn is a descendant of dn.
descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
dn = 'ou=OpenStack,dc=example.com'
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
def test_startswith_parsed_dns(self):
# dn_startswith also accepts parsed DNs.
descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
dn = ldap.dn.str2dn('ou=OpenStack')
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))