LDAP Assignment does not support grant v3 API

The LDAP assignment backend is missing support for several of the v3
APIs. This patch implements Role Grant CRUD for V3 Assignment API:

- Role Grant CRUD
  + create_grant
  + get_grant
  + delete_grant
  + update_grant

- GET /role_assignments
  + list_role_assignments

Closes-Bug: #1248952
Partial-Bug: #1101287
Partial-Bug: #1221805

Change-Id: I1fb247b538e6a11085a18f0103cb8508d58e664f
This commit is contained in:
Marcos Lobo 2013-11-18 14:59:46 +01:00 committed by Adam Young
parent e0b24f9609
commit 829a234931
2 changed files with 323 additions and 33 deletions

View File

@ -91,13 +91,35 @@ class Assignment(assignment.Driver):
(self.project._id_to_dn(tenant_id))
if self.user._dn_to_id(a.user_dn) == user_id]
def _get_roles_for_group_and_project(group_id, project_id):
self.identity_api.get_group(group_id)
self.get_project(project_id)
group_dn = self.group._id_to_dn(group_id)
# NOTE(marcos-fermin-lobo): In Active Directory, for functions
# such as "self.role.get_role_assignments", it returns
# the key "CN" or "OU" in uppercase.
# The group_dn var has "CN" and "OU" in lowercase.
# For this reason, it is necessary to use the "upper()"
# function so both are consistent.
return [self.role._dn_to_id(a.role_dn)
for a in self.role.get_role_assignments
(self.project._id_to_dn(project_id))
if a.user_dn.upper() == group_dn.upper()]
if domain_id is not None:
msg = 'Domain metadata not supported by LDAP'
raise exception.NotImplemented(message=msg)
if tenant_id is None or user_id is None:
if group_id is None and user_id is None:
return {}
metadata_ref = _get_roles_for_just_user_and_project(user_id, tenant_id)
if tenant_id is None:
return {}
if user_id is None:
metadata_ref = _get_roles_for_group_and_project(group_id,
tenant_id)
else:
metadata_ref = _get_roles_for_just_user_and_project(user_id,
tenant_id)
if not metadata_ref:
return {}
return {'roles': [self._role_to_dict(r, False) for r in metadata_ref]}
@ -145,10 +167,21 @@ class Assignment(assignment.Driver):
role_dn = self._subrole_id_to_dn(role_id, tenant_id)
self.role.add_user(role_id, role_dn, user_dn, user_id, tenant_id)
tenant_dn = self.project._id_to_dn(tenant_id)
return UserRoleAssociation(
role_dn=role_dn,
user_dn=user_dn,
tenant_dn=tenant_dn)
return UserRoleAssociation(role_dn=role_dn,
user_dn=user_dn,
tenant_dn=tenant_dn)
def _add_role_to_group_and_project(self, group_id, tenant_id, role_id):
self.identity_api.get_group(group_id)
self.get_project(tenant_id)
self.get_role(role_id)
group_dn = self.group._id_to_dn(group_id)
role_dn = self._subrole_id_to_dn(role_id, tenant_id)
self.role.add_user(role_id, role_dn, group_dn, group_id, tenant_id)
tenant_dn = self.project._id_to_dn(tenant_id)
return GroupRoleAssociation(group_dn=group_dn,
role_dn=role_dn,
tenant_dn=tenant_dn)
def _create_metadata(self, user_id, tenant_id, metadata):
return {}
@ -190,6 +223,14 @@ class Assignment(assignment.Driver):
self.project._id_to_dn(tenant_id),
user_id, role_id)
def _remove_role_from_group_and_project(self, group_id, tenant_id,
role_id):
role_dn = self._subrole_id_to_dn(role_id, tenant_id)
return self.role.delete_user(role_dn,
self.group._id_to_dn(group_id),
self.project._id_to_dn(tenant_id),
group_id, role_id)
def update_role(self, role_id, role):
self.get_role(role_id)
return self.role.update(role_id, role)
@ -255,28 +296,112 @@ class Assignment(assignment.Driver):
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
raise exception.NotImplemented()
self.get_role(role_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_project(project_id)
if project_id and inherited_to_projects:
msg = _('Inherited roles can only be assigned to domains')
raise exception.Conflict(type='role grant', details=msg)
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
if user_id is None:
metadata_ref['roles'] = self._add_role_to_group_and_project(
group_id, project_id, role_id)
else:
metadata_ref['roles'] = self.add_role_to_user_and_project(
user_id, project_id, role_id)
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
raise exception.NotImplemented()
role_ref = self.get_role(role_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_project(project_id)
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
role_ids = set(self._roles_from_role_dicts(
metadata_ref.get('roles', []), inherited_to_projects))
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id)
return role_ref
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
raise exception.NotImplemented()
if user_id:
self.identity_api.get_user(user_id)
if group_id:
self.identity_api.get_group(group_id)
self.get_role(role_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_project(project_id)
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
try:
if user_id is None:
metadata_ref['roles'] = (
self._remove_role_from_group_and_project(
group_id, project_id, role_id))
else:
metadata_ref['roles'] = self.remove_role_from_user_and_project(
user_id, project_id, role_id)
except KeyError:
raise exception.RoleNotFound(role_id=role_id)
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None,
inherited_to_projects=False):
raise exception.NotImplemented()
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_project(project_id)
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
return [self.get_role(x) for x in
self._roles_from_role_dicts(metadata_ref.get('roles', []),
inherited_to_projects)]
def get_domain_by_name(self, domain_name):
raise exception.NotImplemented()
def list_role_assignments(self):
raise exception.NotImplemented()
role_assignments = []
for a in self.role.list_role_assignments(self.project.tree_dn):
assignment = {'role_id': self.role._dn_to_id(a.role_dn),
'user_id': self.user._dn_to_id(a.user_dn),
'project_id': self.project._dn_to_id(a.project_dn)}
role_assignments.append(assignment)
return role_assignments
# TODO(termie): turn this into a data object and move logic to driver
@ -551,3 +676,32 @@ class RoleApi(common_ldap.BaseLdap):
finally:
conn.unbind_s()
super(RoleApi, self).delete(role_id)
def list_role_assignments(self, project_tree_dn):
"""Returns a list of all the role assignments linked to project_tree_dn
attribute.
"""
conn = self.get_connection()
query = '(objectClass=%s)' % (self.object_class)
try:
roles = conn.search_s(project_tree_dn,
ldap.SCOPE_SUBTREE,
query)
except ldap.NO_SUCH_OBJECT:
return []
finally:
conn.unbind_s()
res = []
for role_dn, role in roles:
tenant = ldap.dn.str2dn(role_dn)
tenant.pop(0)
# It obtains the tenant DN to construct the UserRoleAssociation
# object.
tenant_dn = ldap.dn.dn2str(tenant)
for user_dn in role[self.member_attribute]:
res.append(UserRoleAssociation(
user_dn=user_dn,
role_dn=role_dn,
tenant_dn=tenant_dn))
return res

View File

@ -124,20 +124,67 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.identity_api.get_user,
self.user_foo['id'])
def test_get_role_grant_by_user_and_project(self):
self.skipTest('Blocked by bug 1101287')
def test_get_role_grants_for_user_and_project_404(self):
self.skipTest('Blocked by bug 1101287')
def test_add_role_grant_to_user_and_project_404(self):
self.skipTest('Blocked by bug 1101287')
def test_remove_role_grant_from_user_and_project(self):
self.skipTest('Blocked by bug 1101287')
self.assignment_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_baz['id'],
role_id='member')
roles_ref = self.assignment_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_baz['id'])
self.assertDictEqual(roles_ref[0], self.role_member)
self.assignment_api.delete_grant(user_id=self.user_foo['id'],
project_id=self.tenant_baz['id'],
role_id='member')
roles_ref = self.assignment_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_baz['id'])
self.assertEqual(len(roles_ref), 0)
self.assertRaises(exception.NotFound,
self.assignment_api.delete_grant,
user_id=self.user_foo['id'],
project_id=self.tenant_baz['id'],
role_id='member')
def test_get_and_remove_role_grant_by_group_and_project(self):
self.skipTest('Blocked by bug 1101287')
new_domain = self._get_domain_fixture()
new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'enabled': True,
'domain_id': new_domain['id']}
self.identity_api.create_user(new_user['id'], new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
roles_ref = self.assignment_api.list_grants(
group_id=new_group['id'],
project_id=self.tenant_bar['id'])
self.assertEqual(roles_ref, [])
self.assertEqual(len(roles_ref), 0)
self.assignment_api.create_grant(group_id=new_group['id'],
project_id=self.tenant_bar['id'],
role_id='member')
roles_ref = self.assignment_api.list_grants(
group_id=new_group['id'],
project_id=self.tenant_bar['id'])
self.assertNotEmpty(roles_ref)
self.assertDictEqual(roles_ref[0], self.role_member)
self.assignment_api.delete_grant(group_id=new_group['id'],
project_id=self.tenant_bar['id'],
role_id='member')
roles_ref = self.assignment_api.list_grants(
group_id=new_group['id'],
project_id=self.tenant_bar['id'])
self.assertEqual(len(roles_ref), 0)
self.assertRaises(exception.NotFound,
self.assignment_api.delete_grant,
group_id=new_group['id'],
project_id=self.tenant_bar['id'],
role_id='member')
def test_delete_user_grant_no_user(self):
self.skipTest('Blocked by bug 1101287')
@ -179,10 +226,61 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.skipTest('N/A: LDAP does not support multiple domains')
def test_list_projects_for_user(self):
self.skipTest('Blocked by bug 1101287')
domain = self._get_domain_fixture()
user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'domain_id': domain['id'],
'enabled': True}
self.identity_api.create_user(user1['id'], user1)
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
self.assertEqual(len(user_projects), 0)
self.assignment_api.create_grant(user_id=user1['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_member['id'])
self.assignment_api.create_grant(user_id=user1['id'],
project_id=self.tenant_baz['id'],
role_id=self.role_member['id'])
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
self.assertEqual(len(user_projects), 2)
def test_list_projects_for_user_with_grants(self):
self.skipTest('Blocked by bug 1221805')
domain = self._get_domain_fixture()
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': domain['id']}
self.identity_api.create_user(new_user['id'], new_user)
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain['id']}
self.identity_api.create_group(group1['id'], group1)
group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain['id']}
self.identity_api.create_group(group2['id'], group2)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain['id']}
self.assignment_api.create_project(project1['id'], project1)
project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain['id']}
self.assignment_api.create_project(project2['id'], project2)
self.identity_api.add_user_to_group(new_user['id'],
group1['id'])
self.identity_api.add_user_to_group(new_user['id'],
group2['id'])
self.assignment_api.create_grant(user_id=new_user['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_member['id'])
self.assignment_api.create_grant(user_id=new_user['id'],
project_id=project1['id'],
role_id=self.role_admin['id'])
self.assignment_api.create_grant(group_id=group2['id'],
project_id=project2['id'],
role_id=self.role_admin['id'])
user_projects = self.assignment_api.list_projects_for_user(
new_user['id'])
self.assertEqual(len(user_projects), 2)
def test_create_duplicate_user_name_in_different_domains(self):
self.skipTest('Blocked by bug 1101276')
@ -217,7 +315,33 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.skipTest('N/A: LDAP does not support multiple domains')
def test_list_role_assignments_unfiltered(self):
self.skipTest('Blocked by bug 1221805')
new_domain = self._get_domain_fixture()
new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'password': uuid.uuid4().hex, 'enabled': True,
'domain_id': new_domain['id']}
self.identity_api.create_user(new_user['id'],
new_user)
new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': new_domain['id']}
self.assignment_api.create_project(new_project['id'], new_project)
# First check how many role grant already exist
existing_assignments = len(self.assignment_api.list_role_assignments())
self.assignment_api.create_grant(user_id=new_user['id'],
project_id=new_project['id'],
role_id='other')
self.assignment_api.create_grant(group_id=new_group['id'],
project_id=new_project['id'],
role_id='admin')
# Read back the list of assignments - check it is gone up by 2
after_assignments = len(self.assignment_api.list_role_assignments())
self.assertEqual(after_assignments, existing_assignments + 2)
def test_list_role_assignments_bad_role(self):
self.skipTest('Blocked by bug 1221805')
@ -920,16 +1044,10 @@ class LDAPIdentity(tests.TestCase, BaseLDAPIdentity):
'N/A: LDAP does not support multiple domains')
def test_create_grant_no_user(self):
# The LDAP assignment backend doesn't implement create_grant.
self.assertRaises(
exception.NotImplemented,
super(BaseLDAPIdentity, self).test_create_grant_no_user)
self.skipTest('Blocked by bug 1101287')
def test_create_grant_no_group(self):
# The LDAP assignment backend doesn't implement create_grant.
self.assertRaises(
exception.NotImplemented,
super(BaseLDAPIdentity, self).test_create_grant_no_group)
self.skipTest('Blocked by bug 1101287')
class LDAPIdentityEnabledEmulation(LDAPIdentity):
@ -1044,6 +1162,15 @@ class LdapIdentitySqlAssignment(sql.Base, tests.TestCase, BaseLDAPIdentity):
self.skipTest(
'N/A: Not part of SQL backend')
def test_add_role_grant_to_user_and_project_404(self):
self.skipTest('Blocked by bug 1101287')
def test_get_role_grants_for_user_and_project_404(self):
self.skipTest('Blocked by bug 1101287')
def test_list_projects_for_user_with_grants(self):
self.skipTest('Blocked by bug 1221805')
class MultiLDAPandSQLIdentity(sql.Base, tests.TestCase, BaseLDAPIdentity):
"""Class to test common SQL plus individual LDAP backends.
@ -1274,3 +1401,12 @@ class MultiLDAPandSQLIdentity(sql.Base, tests.TestCase, BaseLDAPIdentity):
self.assertFalse(conf.identity.domain_specific_drivers_enabled)
# ..and make sure a domain-specifc options is also set
self.assertEqual(conf.ldap.url, 'fake://memory1')
def test_add_role_grant_to_user_and_project_404(self):
self.skipTest('Blocked by bug 1101287')
def test_get_role_grants_for_user_and_project_404(self):
self.skipTest('Blocked by bug 1101287')
def test_list_projects_for_user_with_grants(self):
self.skipTest('Blocked by bug 1221805')