Unimplemented get roles by group for project list

The list_projects_for_user() function on LDAP assignment backend only
listed projects with associations across user_id, not across group_ids.
This function admits the group_ids parameter, but never is used on the
body of the function.

Change-Id: I0ff6791e11aa18ffb3a4407e8e5958ac03f2086b
Closes-Bug: #1284639
This commit is contained in:
Marcos Lobo 2014-02-26 10:10:00 +01:00 committed by Marcos Fermin Lobo
parent 455d50e8ae
commit 33a3822ecd
2 changed files with 115 additions and 6 deletions

View File

@ -133,12 +133,16 @@ class Assignment(assignment.Driver):
return self.role.get_all()
def list_projects_for_user(self, user_id, group_ids, hints):
# NOTE(henry-nash): The LDAP backend is being deprecated, so no
# support is provided for projects that the user has a role on solely
# by virtue of group membership.
user_dn = self.user._id_to_dn(user_id)
associations = (self.role.list_project_roles_for_user
(user_dn, self.project.tree_dn))
for group_id in group_ids:
group_dn = self.group._id_to_dn(group_id)
for group_role in self.role.list_project_roles_for_group(
group_dn, self.project.tree_dn):
associations.append(group_role)
# Since the LDAP backend doesn't store the domain_id in the LDAP
# records (and only supports the default domain), we fill in the
# domain_id before we return the list.
@ -590,6 +594,40 @@ class RoleApi(common_ldap.BaseLdap):
tenant_dn=tenant_dn))
return res
def list_project_roles_for_group(self, group_dn, project_subtree):
group_dn_esc = ldap.filter.escape_filter_chars(group_dn)
conn = self.get_connection()
query = '(&(objectClass=%s)(%s=%s))' % (self.object_class,
self.member_attribute,
group_dn_esc)
try:
roles = conn.search_s(project_subtree,
ldap.SCOPE_SUBTREE,
query,
attrlist=['1.1'])
except ldap.NO_SUCH_OBJECT:
# Return no roles rather than raise an exception if the project
# subtree entry doesn't exist because an empty subtree is not
# an error.
return []
finally:
conn.unbind_s()
res = []
for role_dn, _ in roles:
# ldap.dn.str2dn returns a list, where the first
# element is the first RDN.
# For a role assignment, this contains the role ID,
# the remainder is the DN of the project.
project = ldap.dn.str2dn(role_dn)
project.pop(0)
project_dn = ldap.dn.dn2str(project)
res.append(GroupRoleAssociation(
group_dn=group_dn,
role_dn=role_dn,
tenant_dn=project_dn))
return res
def roles_delete_subtree_by_project(self, tenant_dn):
conn = self.get_connection()
query = '(objectClass=%s)' % self.object_class

View File

@ -260,15 +260,86 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
'enabled': True}
self.identity_api.create_user(user1['id'], user1)
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
self.assertEqual(0, len(user_projects))
self.assertThat(user_projects, matchers.HasLength(0))
# new grant(user1, role_member, tenant_bar)
self.assignment_api.create_grant(user_id=user1['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_member['id'])
# new grant(user1, role_member, tenant_baz)
self.assignment_api.create_grant(user_id=user1['id'],
project_id=self.tenant_baz['id'],
role_id=self.role_member['id'])
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
self.assertEqual(2, len(user_projects))
self.assertThat(user_projects, matchers.HasLength(2))
# Now, check number of projects through groups
user2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'domain_id': domain['id'],
'enabled': True}
self.identity_api.create_user(user2['id'], user2)
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain['id']}
self.identity_api.create_group(group1['id'], group1)
self.identity_api.add_user_to_group(user2['id'], group1['id'])
# new grant(group1(user2), role_member, tenant_bar)
self.assignment_api.create_grant(group_id=group1['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_member['id'])
# new grant(group1(user2), role_member, tenant_baz)
self.assignment_api.create_grant(group_id=group1['id'],
project_id=self.tenant_baz['id'],
role_id=self.role_member['id'])
user_projects = self.assignment_api.list_projects_for_user(user2['id'])
self.assertThat(user_projects, matchers.HasLength(2))
# new grant(group1(user2), role_other, tenant_bar)
self.assignment_api.create_grant(group_id=group1['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_other['id'])
user_projects = self.assignment_api.list_projects_for_user(user2['id'])
self.assertThat(user_projects, matchers.HasLength(2))
def test_list_projects_for_user_and_groups(self):
domain = self._get_domain_fixture()
# Create user1
user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'domain_id': domain['id'],
'enabled': True}
self.identity_api.create_user(user1['id'], user1)
# Create new group for user1
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain['id']}
self.identity_api.create_group(group1['id'], group1)
# Add user1 to group1
self.identity_api.add_user_to_group(user1['id'], group1['id'])
# Now, add grant to user1 and group1 in tenant_bar
self.assignment_api.create_grant(user_id=user1['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_member['id'])
self.assignment_api.create_grant(group_id=group1['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_member['id'])
# The result is user1 has only one project granted
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
self.assertThat(user_projects, matchers.HasLength(1))
# Now, delete user1 grant into tenant_bar and check
self.assignment_api.delete_grant(user_id=user1['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_member['id'])
# The result is user1 has only one project granted.
# Granted through group1.
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
self.assertThat(user_projects, matchers.HasLength(1))
def test_list_projects_for_user_with_grants(self):
domain = self._get_domain_fixture()
@ -308,7 +379,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
user_projects = self.assignment_api.list_projects_for_user(
new_user['id'])
self.assertEqual(2, len(user_projects))
self.assertEqual(3, len(user_projects))
def test_create_duplicate_user_name_in_different_domains(self):
self.skipTest('Blocked by bug 1101276')