Merge "SQL and LDAP fixes for get_roles_for_user_and_project user=group ID" into stable/icehouse

This commit is contained in:
Jenkins 2014-05-29 14:17:01 +00:00 committed by Gerrit Code Review
commit 1f15f62e4b
7 changed files with 378 additions and 9 deletions

View File

@ -88,24 +88,19 @@ class Assignment(assignment.Driver):
def _get_roles_for_just_user_and_project(user_id, tenant_id):
self.get_project(tenant_id)
user_dn = self.user._id_to_dn(user_id)
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(tenant_id))
if self.user._dn_to_id(a.user_dn) == user_id]
if common_ldap.is_dn_equal(a.user_dn, user_dn)]
def _get_roles_for_group_and_project(group_id, project_id):
self.get_project(project_id)
group_dn = self.group._id_to_dn(group_id)
# NOTE(marcos-fermin-lobo): In Active Directory, for functions
# such as "self.role.get_role_assignments", it returns
# the key "CN" or "OU" in uppercase.
# The group_dn var has "CN" and "OU" in lowercase.
# For this reason, it is necessary to use the "upper()"
# function so both are consistent.
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(project_id))
if a.user_dn.upper() == group_dn.upper()]
if common_ldap.is_dn_equal(a.user_dn, group_dn)]
if domain_id is not None:
msg = _('Domain metadata not supported by LDAP')

View File

@ -86,6 +86,21 @@ class Assignment(assignment.Driver):
session = sql.get_session()
q = session.query(RoleAssignment)
def _calc_assignment_type():
# Figure out the assignment type we're checking for from the args.
if user_id:
if tenant_id:
return AssignmentType.USER_PROJECT
else:
return AssignmentType.USER_DOMAIN
else:
if tenant_id:
return AssignmentType.GROUP_PROJECT
else:
return AssignmentType.GROUP_DOMAIN
q = q.filter_by(type=_calc_assignment_type())
q = q.filter_by(actor_id=user_id or group_id)
q = q.filter_by(target_id=tenant_id or domain_id)
refs = q.all()

View File

@ -13,6 +13,7 @@
# under the License.
import os.path
import re
import ldap
import ldap.filter
@ -101,6 +102,115 @@ def ldap_scope(scope):
'options': ', '.join(LDAP_SCOPES.keys())})
def prep_case_insensitive(value):
"""Prepare a string for case-insensitive comparison.
This is defined in RFC4518. For simplicity, all this function does is
lowercase all the characters, strip leading and trailing whitespace,
and compress sequences of spaces to a single space.
"""
value = re.sub(r'\s+', ' ', value.strip().lower())
return value
def is_ava_value_equal(attribute_type, val1, val2):
"""Returns True if and only if the AVAs are equal.
When comparing AVAs, the equality matching rule for the attribute type
should be taken into consideration. For simplicity, this implementation
does a case-insensitive comparison.
Note that this function uses prep_case_insenstive so the limitations of
that function apply here.
"""
return prep_case_insensitive(val1) == prep_case_insensitive(val2)
def is_rdn_equal(rdn1, rdn2):
"""Returns True if and only if the RDNs are equal.
* RDNs must have the same number of AVAs.
* Each AVA of the RDNs must be the equal for the same attribute type. The
order isn't significant. Note that an attribute type will only be in one
AVA in an RDN, otherwise the DN wouldn't be valid.
* Attribute types aren't case sensitive. Note that attribute type
comparison is more complicated than implemented. This function only
compares case-insentive. The code should handle multiple names for an
attribute type (e.g., cn, commonName, and 2.5.4.3 are the same).
Note that this function uses is_ava_value_equal to compare AVAs so the
limitations of that function apply here.
"""
if len(rdn1) != len(rdn2):
return False
for attr_type_1, val1, dummy in rdn1:
found = False
for attr_type_2, val2, dummy in rdn2:
if attr_type_1.lower() != attr_type_2.lower():
continue
found = True
if not is_ava_value_equal(attr_type_1, val1, val2):
return False
break
if not found:
return False
return True
def is_dn_equal(dn1, dn2):
"""Returns True if and only if the DNs are equal.
Two DNs are equal if they've got the same number of RDNs and if the RDNs
are the same at each position. See RFC4517.
Note that this function uses is_rdn_equal to compare RDNs so the
limitations of that function apply here.
:param dn1: Either a string DN or a DN parsed by ldap.dn.str2dn.
:param dn2: Either a string DN or a DN parsed by ldap.dn.str2dn.
"""
if not isinstance(dn1, list):
dn1 = ldap.dn.str2dn(dn1)
if not isinstance(dn2, list):
dn2 = ldap.dn.str2dn(dn2)
if len(dn1) != len(dn2):
return False
for rdn1, rdn2 in zip(dn1, dn2):
if not is_rdn_equal(rdn1, rdn2):
return False
return True
def dn_startswith(descendant_dn, dn):
"""Returns True if and only if the descendant_dn is under the dn.
:param descendant_dn: Either a string DN or a DN parsed by ldap.dn.str2dn.
:param dn: Either a string DN or a DN parsed by ldap.dn.str2dn.
"""
if not isinstance(descendant_dn, list):
descendant_dn = ldap.dn.str2dn(descendant_dn)
if not isinstance(dn, list):
dn = ldap.dn.str2dn(dn)
if len(descendant_dn) <= len(dn):
return False
return is_dn_equal(descendant_dn[len(dn):], dn)
_HANDLERS = {}

View File

@ -51,6 +51,19 @@ def _process_attr(attr_name, value_or_values):
def normalize_dn(dn):
# Capitalize the attribute names as an LDAP server might.
# NOTE(blk-u): Special case for this tested value, used with
# test_user_id_comma. The call to str2dn here isn't always correct
# here, because `dn` is escaped for an LDAP filter. str2dn() normally
# works only because there's no special characters in `dn`.
if dn == 'cn=Doe\\5c, John,ou=Users,cn=example,cn=com':
return 'CN=Doe\\, John,OU=Users,CN=example,CN=com'
# NOTE(blk-u): Another special case for this tested value. When a
# roleOccupant has an escaped comma, it gets converted to \2C.
if dn == 'cn=Doe\\, John,ou=Users,cn=example,cn=com':
return 'CN=Doe\\2C John,OU=Users,CN=example,CN=com'
dn = ldap.dn.str2dn(dn)
norm = []
for part in dn:
@ -118,7 +131,9 @@ def _match(key, value, attrs):
str_sids = [str(x) for x in attrs[key]]
return str(value) in str_sids
if key != 'objectclass':
return _process_attr(key, value)[0] in attrs[key]
check_value = _process_attr(key, value)[0]
norm_values = list(_process_attr(key, x)[0] for x in attrs[key])
return check_value in norm_values
# it is an objectclass check, so check subclasses
values = _subs(value)
for v in values:

View File

@ -1377,6 +1377,43 @@ class IdentityTests(object):
self.assertIn(role_list[1]['id'], combined_role_list)
self.assertIn(role_list[2]['id'], combined_role_list)
def test_get_roles_for_user_and_project_user_group_same_id(self):
"""When a user has the same ID as a group,
get_roles_for_user_and_project returns only the roles for the user and
not the group.
"""
# Setup: create user, group with same ID, role, and project;
# assign the group the role on the project.
user_group_id = uuid.uuid4().hex
user1 = {'id': user_group_id, 'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID, }
self.identity_api.create_user(user_group_id, user1)
group1 = {'id': user_group_id, 'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID, }
self.identity_api.create_group(user_group_id, group1)
role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role1['id'], role1)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': DEFAULT_DOMAIN_ID, }
self.assignment_api.create_project(project1['id'], project1)
self.assignment_api.create_grant(role1['id'],
group_id=user_group_id,
project_id=project1['id'])
# Check the roles, shouldn't be any since the user wasn't granted any.
roles = self.assignment_api.get_roles_for_user_and_project(
user_group_id, project1['id'])
self.assertEqual([], roles, 'role for group is %s' % role1['id'])
def test_delete_role_with_user_and_group_grants(self):
role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role1['id'], role1)

View File

@ -547,6 +547,34 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
self.skipTest("Using arbitrary attributes doesn't work under LDAP")
def test_user_id_comma_grants(self):
"""Even if the user has a , in their ID, can get user and group grants.
"""
# Create a user with a , in their ID
# NOTE(blk-u): the DN for this user is hard-coded in fakeldap!
user_id = u'Doe, John'
user = {
'id': user_id,
'name': self.getUniqueString(),
'password': self.getUniqueString(),
'domain_id': CONF.identity.default_domain_id,
}
self.identity_api.create_user(user_id, user)
# Grant the user a role on a project.
role_id = 'member'
project_id = self.tenant_baz['id']
self.assignment_api.create_grant(role_id, user_id=user_id,
project_id=project_id)
role_ref = self.assignment_api.get_grant(role_id, user_id=user_id,
project_id=project_id)
self.assertEqual(role_id, role_ref['id'])
class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
def setUp(self):

View File

@ -0,0 +1,169 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ldap.dn
from keystone.common import ldap as ks_ldap
from keystone import tests
class DnCompareTest(tests.BaseTestCase):
"""Tests for the DN comparison functions in keystone.common.ldap.core."""
def test_prep(self):
# prep_case_insensitive returns the string with spaces at the front and
# end if it's already lowercase and no insignificant characters.
value = 'lowercase value'
self.assertEqual(value, ks_ldap.prep_case_insensitive(value))
def test_prep_lowercase(self):
# prep_case_insensitive returns the string with spaces at the front and
# end and lowercases the value.
value = 'UPPERCASE VALUE'
exp_value = value.lower()
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
def test_prep_insignificant(self):
# prep_case_insensitive remove insignificant spaces.
value = 'before after'
exp_value = 'before after'
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
def test_prep_insignificant_pre_post(self):
# prep_case_insensitive remove insignificant spaces.
value = ' value '
exp_value = 'value'
self.assertEqual(exp_value, ks_ldap.prep_case_insensitive(value))
def test_ava_equal_same(self):
# is_ava_value_equal returns True if the two values are the same.
value = 'val1'
self.assertTrue(ks_ldap.is_ava_value_equal('cn', value, value))
def test_ava_equal_complex(self):
# is_ava_value_equal returns True if the two values are the same using
# a value that's got different capitalization and insignificant chars.
val1 = 'before after'
val2 = ' BEFORE afTer '
self.assertTrue(ks_ldap.is_ava_value_equal('cn', val1, val2))
def test_ava_different(self):
# is_ava_value_equal returns False if the values aren't the same.
self.assertFalse(ks_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
def test_rdn_same(self):
# is_rdn_equal returns True if the two values are the same.
rdn = ldap.dn.str2dn('cn=val1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn, rdn))
def test_rdn_diff_length(self):
# is_rdn_equal returns False if the RDNs have a different number of
# AVAs.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_same_order(self):
# is_rdn_equal returns True if the RDNs have the same number of AVAs
# and the values are the same.
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_diff_order(self):
# is_rdn_equal returns True if the RDNs have the same number of AVAs
# and the values are the same, even if in a different order
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_multi_ava_diff_type(self):
# is_rdn_equal returns False if the RDNs have the same number of AVAs
# and the attribute types are different.
rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_attr_type_case_diff(self):
# is_rdn_equal returns True for same RDNs even when attr type case is
# different.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('CN=cn1')[0]
self.assertTrue(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_rdn_attr_type_alias(self):
# is_rdn_equal returns False for same RDNs even when attr type alias is
# used. Note that this is a limitation since an LDAP server should
# consider them equal.
rdn1 = ldap.dn.str2dn('cn=cn1')[0]
rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
self.assertFalse(ks_ldap.is_rdn_equal(rdn1, rdn2))
def test_dn_same(self):
# is_dn_equal returns True if the DNs are the same.
dn = 'cn=Babs Jansen,ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn, dn))
def test_dn_diff_length(self):
# is_dn_equal returns False if the DNs don't have the same number of
# RDNs
dn1 = 'cn=Babs Jansen,ou=OpenStack'
dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
self.assertFalse(ks_ldap.is_dn_equal(dn1, dn2))
def test_dn_equal_rdns(self):
# is_dn_equal returns True if the DNs have the same number of RDNs
# and each RDN is the same.
dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
self.assertTrue(ks_ldap.is_dn_equal(dn1, dn2))
def test_dn_parsed_dns(self):
# is_dn_equal can also accept parsed DNs.
dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
self.assertTrue(ks_ldap.is_dn_equal(dn_str1, dn_str2))
def test_startswith_under_child(self):
# dn_startswith returns True if descendant_dn is a child of dn.
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'ou=OpenStack'
self.assertTrue(ks_ldap.dn_startswith(child, parent))
def test_startswith_parent(self):
# dn_startswith returns False if descendant_dn is a parent of dn.
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'ou=OpenStack'
self.assertFalse(ks_ldap.dn_startswith(parent, child))
def test_startswith_same(self):
# dn_startswith returns False if DNs are the same.
dn = 'cn=Babs Jansen,ou=OpenStack'
self.assertFalse(ks_ldap.dn_startswith(dn, dn))
def test_startswith_not_parent(self):
# dn_startswith returns False if descendant_dn is not under the dn
child = 'cn=Babs Jansen,ou=OpenStack'
parent = 'dc=example.com'
self.assertFalse(ks_ldap.dn_startswith(child, parent))
def test_startswith_descendant(self):
# dn_startswith returns True if descendant_dn is a descendant of dn.
descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
dn = 'ou=OpenStack,dc=example.com'
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))
def test_startswith_parsed_dns(self):
# dn_startswith also accepts parsed DNs.
descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
dn = ldap.dn.str2dn('ou=OpenStack')
self.assertTrue(ks_ldap.dn_startswith(descendant, dn))