diff --git a/etc/policy.json b/etc/policy.json index 4aad4e8c38..2c82f9946e 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -66,6 +66,8 @@ "identity:create_grant": [["rule:admin_required"]], "identity:revoke_grant": [["rule:admin_required"]], + "identity:list_role_assignments": [["rule:admin_required"]], + "identity:get_policy": [["rule:admin_required"]], "identity:list_policies": [["rule:admin_required"]], "identity:create_policy": [["rule:admin_required"]], diff --git a/keystone/common/controller.py b/keystone/common/controller.py index 13aeee579d..3ca1bf8b20 100644 --- a/keystone/common/controller.py +++ b/keystone/common/controller.py @@ -280,7 +280,8 @@ class V3Controller(V2Controller): if attr in context['query_string']: value = context['query_string'][attr] - return [r for r in refs if _attr_match(r[attr], value)] + return [r for r in refs if _attr_match( + flatten(r).get(attr), value)] return refs def _require_matching_id(self, value, ref): diff --git a/keystone/identity/backends/kvs.py b/keystone/identity/backends/kvs.py index 2eea08cf10..77a86789a2 100644 --- a/keystone/identity/backends/kvs.py +++ b/keystone/identity/backends/kvs.py @@ -180,6 +180,45 @@ class Identity(kvs.Base, identity.Driver): else: self.update_metadata(user_id, tenant_id, metadata_ref) + def list_role_assignments(self): + """List the role assignments. + + The kvs backend stores role assignments as key-values: + + "metadata-{target}-{actor}", with the value being a role list + + i.e. "metadata-MyProjectID-MyUserID" [role1, role2] + + ...so we enumerate the list and extract the targets, actors + and roles. + + """ + assignment_list = [] + metadata_keys = filter(lambda x: x.startswith("metadata-"), + self.db.keys()) + for key in metadata_keys: + template = {} + meta_id1 = key.split('-')[1] + meta_id2 = key.split('-')[2] + try: + self.get_project(meta_id1) + template['project_id'] = meta_id1 + except exception.NotFound: + template['domain_id'] = meta_id1 + try: + self._get_user(meta_id2) + template['user_id'] = meta_id2 + except exception.NotFound: + template['group_id'] = meta_id2 + + entry = self.db.get(key) + for r in entry.get('roles', []): + role_assignment = template.copy() + role_assignment['role_id'] = r + assignment_list.append(role_assignment) + + return assignment_list + # CRUD def create_user(self, user_id, user): try: diff --git a/keystone/identity/backends/sql.py b/keystone/identity/backends/sql.py index f81feb1d1a..2f9d89f3f2 100644 --- a/keystone/identity/backends/sql.py +++ b/keystone/identity/backends/sql.py @@ -355,6 +355,43 @@ class Identity(sql.Base, identity.Driver): self.update_metadata(user_id, project_id, metadata_ref, domain_id, group_id) + def list_role_assignments(self): + + # TODO(henry-nash): The current implementation is really simulating + # us having a common role assignment table, rather than having the + # four different grant tables we have today. When we move to role + # assignment as a first class entity, we should create the single + # assignment table, simplifying the logic of this (and many other) + # functions. + + session = self.get_session() + assignment_list = [] + refs = session.query(UserDomainGrant).all() + for x in refs: + for r in x.data.get('roles', []): + assignment_list.append({'user_id': x.user_id, + 'domain_id': x.domain_id, + 'role_id': r}) + refs = session.query(UserProjectGrant).all() + for x in refs: + for r in x.data.get('roles', []): + assignment_list.append({'user_id': x.user_id, + 'project_id': x.project_id, + 'role_id': r}) + refs = session.query(GroupDomainGrant).all() + for x in refs: + for r in x.data.get('roles', []): + assignment_list.append({'group_id': x.group_id, + 'domain_id': x.domain_id, + 'role_id': r}) + refs = session.query(GroupProjectGrant).all() + for x in refs: + for r in x.data.get('roles', []): + assignment_list.append({'group_id': x.group_id, + 'project_id': x.project_id, + 'role_id': r}) + return assignment_list + def list_projects(self): session = self.get_session() tenant_refs = session.query(Project).all() diff --git a/keystone/identity/controllers.py b/keystone/identity/controllers.py index f798e3dc35..9271f3d90d 100644 --- a/keystone/identity/controllers.py +++ b/keystone/identity/controllers.py @@ -16,6 +16,7 @@ """Workflow Logic the Identity service.""" +import copy import urllib import urlparse import uuid @@ -794,3 +795,195 @@ class RoleV3(controller.V3Controller): self._delete_tokens_for_user(user_id) else: self._delete_tokens_for_group(group_id) + + +class RoleAssignmentV3(controller.V3Controller): + + # TODO(henry-nash): The current implementation does not provide a full + # first class entity for role-assignment. There is no role_assignment_id + # and only the list_role_assignment call is supported. Further, since it + # is not a first class entity, the links for the individual entities + # reference the individual role grant APIs. + + collection_name = 'role_assignments' + member_name = 'role_assignment' + + @classmethod + def wrap_member(cls, context, ref): + # NOTE(henry-nash): Since we are not yet a true collection, we override + # the wrapper as have already included the links in the entities + pass + + def _format_entity(self, entity): + formatted_entity = {} + if 'user_id' in entity: + formatted_entity['user'] = {'id': entity['user_id']} + actor_link = '/users/%s' % entity['user_id'] + if 'group_id' in entity: + formatted_entity['group'] = {'id': entity['group_id']} + actor_link = '/groups/%s' % entity['group_id'] + if 'role_id' in entity: + formatted_entity['role'] = {'id': entity['role_id']} + if 'project_id' in entity: + formatted_entity['scope'] = ( + {'project': {'id': entity['project_id']}}) + target_link = '/projects/%s' % entity['project_id'] + if 'domain_id' in entity: + formatted_entity['scope'] = ( + {'domain': {'id': entity['domain_id']}}) + target_link = '/domains/%s' % entity['domain_id'] + + formatted_entity.setdefault('links', {}) + formatted_entity['links']['assignment'] = ( + self.base_url(target_link + actor_link + + '/roles/%s' % entity['role_id'])) + return formatted_entity + + def _expand_indirect_assignments(self, refs): + """Processes entity list into all-direct assignments. + + For any group role assignments in the list, create a role assignment + entity for each member of that group, and then remove the group + assignment entity itself from the list. + + For any new entity created by virtue of group membership, add in an + additional link to that membership. + + """ + def _get_group_members(ref): + """Get a list of group members. + + Get the list of group members. If this fails with + GroupNotFound, then log this as a warning, but allow + overall processing to continue. + + """ + try: + members = self.identity_api.list_users_in_group( + ref['group']['id']) + except exception.GroupNotFound: + members = [] + # The group is missing, which should not happen since + # group deletion should remove any related assignments, so + # log a warning + if 'domain' in ref: + target = 'Domain: %s' % ref['domain'].get('domain_id') + elif 'project' in ref: + target = 'Project: %s' % ref['project'].get('project_id') + else: + # Should always be a domain or project, but since to get + # here things have gone astray, let's be cautious. + target = 'Unknown' + LOG.warning( + _('Group %(group)s not found for role-assignment - ' + '%(target)s with Role: %(role)s') % { + 'group': ref['group_id'], 'target': target, + 'role': ref.get('role_id')}) + return members + + def _build_equivalent_user_assignment(user, group_id, template): + """Create a user assignment equivalent to the group one. + + The template has had the 'group' entity removed, so + substitute a 'user' one, modify the 'assignment' link + to match, and add a 'membership' link. + + """ + user_entry = copy.deepcopy(template) + user_entry['user'] = {'id': user['id']} + scope = user_entry.get('scope') + if 'domain' in scope: + target_link = ( + '/domains/%s' % scope['domain']['id']) + else: + target_link = ( + '/projects/%s' % scope['project']['id']) + user_entry['links']['assignment'] = ( + self.base_url('%s/users/%s/roles/%s' % + (target_link, m['id'], + user_entry['role']['id']))) + user_entry['links']['membership'] = ( + self.base_url('/groups/%s/users/%s' % + (group_id, user['id']))) + return user_entry + + # Scan the list of entities for any group assignments, expanding + # them into equivalent user entities. Due to potential large + # expansion of group entities, rather than modify the + # list we are enumerating, we build a new one as we go. + new_refs = [] + for r in refs: + if 'group' in r: + # As it is a group role assignment, first get the list of + # members. + + members = _get_group_members(r) + + # Now replace that group role assignment entry with an + # equivalent user role assignment for each of the group members + + base_entry = copy.deepcopy(r) + group_id = base_entry['group']['id'] + base_entry.pop('group') + for m in members: + user_entry = _build_equivalent_user_assignment( + m, group_id, base_entry) + new_refs.append(user_entry) + else: + new_refs.append(r) + + return new_refs + + def _query_filter_is_true(self, filter_value): + """Determine if bool query param is 'True'. + + We treat this the same way as we do for policy + enforcement: + + {bool_param}=0 is treated as False + + Any other value is considered to be equivalent to + True, including the absence of a value + + """ + + if (isinstance(filter_value, basestring) and + filter_value == '0'): + val = False + else: + val = True + return val + + @controller.filterprotected('group.id', 'role.id', + 'scope.domain.id', 'scope.project.id', + 'user.id') + def list_role_assignments(self, context, filters): + + # TODO(henry-nash): This implementation uses the standard filtering + # in the V3.wrap_collection. Given the large number of individual + # assignments, this is pretty inefficient. An alternative would be + # to pass the filters into the driver call, so that the list size is + # kept a minimum. + + refs = self.identity_api.list_role_assignments() + formatted_refs = [self._format_entity(x) for x in refs] + + if ('effective' in context['query_string'] and + self._query_filter_is_true( + context['query_string']['effective'])): + + formatted_refs = self._expand_indirect_assignments(formatted_refs) + + return self.wrap_collection(context, formatted_refs, filters) + + @controller.protected + def get_role_assignment(self, context): + raise exception.NotImplemented() + + @controller.protected + def update_role_assignment(self, context): + raise exception.NotImplemented() + + @controller.protected + def delete_role_assignment(self, context): + raise exception.NotImplemented() diff --git a/keystone/identity/core.py b/keystone/identity/core.py index a254470e9c..77870fda95 100644 --- a/keystone/identity/core.py +++ b/keystone/identity/core.py @@ -504,6 +504,10 @@ class Driver(object): """ raise exception.NotImplemented() + def list_role_assignments(self): + + raise exception.NotImplemented() + # group crud def create_group(self, group_id, group): diff --git a/keystone/identity/routers.py b/keystone/identity/routers.py index 32eada5e1e..ab71eb4fbc 100644 --- a/keystone/identity/routers.py +++ b/keystone/identity/routers.py @@ -173,3 +173,7 @@ def append_v3_routers(mapper, routers): controller=role_controller, action='revoke_grant', conditions=dict(method=['DELETE'])) + + routers.append( + router.Router(controllers.RoleAssignmentV3(), + 'role_assignments', 'role_assignment')) diff --git a/tests/test_backend.py b/tests/test_backend.py index ea40cd8b1a..f8d04b6ebf 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -482,6 +482,72 @@ class IdentityTests(object): self.identity_api.get_project, 'fake2') + def test_list_role_assignments_unfiltered(self): + """Test for unfiltered listing role assignments. + + Test Plan: + - Create a domain, with a user, group & project + - Find how many role assignments already exist (from default + fixtures) + - Create a grant of each type (user/group on project/domain) + - Check the number of assignments has gone up by 4 and that + the entries we added are in the list returned + + """ + new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(new_domain['id'], new_domain) + new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, 'enabled': True, + 'domain_id': new_domain['id']} + self.identity_api.create_user(new_user['id'], + new_user) + new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'], + 'name': uuid.uuid4().hex} + self.identity_api.create_group(new_group['id'], new_group) + new_project = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': new_domain['id']} + self.identity_api.create_project(new_project['id'], new_project) + + # First check how many role grant already exist + existing_assignments = len(self.identity_api.list_role_assignments()) + + # Now create the grants (roles are defined in default_fixtures) + self.identity_api.create_grant(user_id=new_user['id'], + domain_id=new_domain['id'], + role_id='member') + self.identity_api.create_grant(user_id=new_user['id'], + project_id=new_project['id'], + role_id='other') + self.identity_api.create_grant(group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='admin') + self.identity_api.create_grant(group_id=new_group['id'], + project_id=new_project['id'], + role_id='admin') + + # Read back the list of assignments - check it is gone up by 4 + assignment_list = self.identity_api.list_role_assignments() + self.assertEquals(len(assignment_list), existing_assignments + 4) + + # Now check that each of our four new entries are in the list + self.assertIn( + {'user_id': new_user['id'], 'domain_id': new_domain['id'], + 'role_id': 'member'}, + assignment_list) + self.assertIn( + {'user_id': new_user['id'], 'project_id': new_project['id'], + 'role_id': 'other'}, + assignment_list) + self.assertIn( + {'group_id': new_group['id'], 'domain_id': new_domain['id'], + 'role_id': 'admin'}, + assignment_list) + self.assertIn( + {'group_id': new_group['id'], 'project_id': new_project['id'], + 'role_id': 'admin'}, + assignment_list) + def test_add_duplicate_role_grant(self): roles_ref = self.identity_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id']) diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py index 577a6ef04a..a38f1ece4c 100644 --- a/tests/test_backend_ldap.py +++ b/tests/test_backend_ldap.py @@ -458,6 +458,9 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): def test_get_and_remove_correct_role_grant_from_a_mix(self): raise nose.exc.SkipTest('Blocked by bug 1101287') + def test_list_role_assignments_unfiltered(self): + raise nose.exc.SkipTest('Blocked by bug 1195019') + def test_project_crud(self): # NOTE(topol): LDAP implementation does not currently support the # updating of a project name so this method override diff --git a/tests/test_v3.py b/tests/test_v3.py index 93293b6c91..52d201bc61 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -715,6 +715,87 @@ class RestfulTestCase(test_content_types.RestfulTestCase): self.assertEqual(ref['name'], entity['name']) return entity + def assertValidRoleAssignmentListResponse(self, resp, ref=None, + expected_length=None): + + entities = resp.result.get('role_assignments') + + if expected_length is not None: + self.assertEqual(len(entities), expected_length) + elif ref is not None: + # we're at least expecting the ref + self.assertNotEmpty(entities) + + # collections should have relational links + self.assertValidListLinks(resp.result.get('links')) + + for entity in entities: + self.assertIsNotNone(entity) + self.assertValidRoleAssignment(entity) + if ref: + self.assertValidRoleAssignment(entity, ref) + return entities + + def assertValidRoleAssignment(self, entity, ref=None, url=None): + self.assertIsNotNone(entity.get('role')) + self.assertIsNotNone(entity.get('scope')) + + # Only one of user or group should be present + self.assertIsNotNone(entity.get('user') or + entity.get('group')) + self.assertIsNone(entity.get('user') and + entity.get('group')) + + # Only one of domain or project should be present + self.assertIsNotNone(entity['scope'].get('project') or + entity['scope'].get('domain')) + self.assertIsNone(entity['scope'].get('project') and + entity['scope'].get('domain')) + + if entity['scope'].get('project'): + self.assertIsNotNone(entity['scope']['project'].get('id')) + else: + self.assertIsNotNone(entity['scope']['domain'].get('id')) + self.assertIsNotNone(entity.get('links')) + self.assertIsNotNone(entity['links'].get('assignment')) + + if ref: + if ref.get('user'): + self.assertEqual(ref['user']['id'], entity['user']['id']) + if ref.get('group'): + self.assertEqual(ref['group']['id'], entity['group']['id']) + if ref.get('role'): + self.assertEqual(ref['role']['id'], entity['role']['id']) + if ref['scope'].get('project'): + self.assertEqual(ref['scope']['project']['id'], + entity['scope']['project']['id']) + if ref['scope'].get('domain'): + self.assertEqual(ref['scope']['domain']['id'], + entity['scope']['domain']['id']) + if url: + self.assertIn(url, entity['links']['assignment']) + + def assertRoleAssignmentInListResponse( + self, resp, ref, link_url=None, expected=1): + + found_count = 0 + for entity in resp.result.get('role_assignments'): + try: + self.assertValidRoleAssignment( + entity, ref=ref, url=link_url) + except Exception: + # It doesn't match, so let's go onto the next one + pass + else: + found_count += 1 + self.assertEqual(found_count, expected) + + def assertRoleAssignmentNotInListResponse( + self, resp, ref, link_url=None): + + self.assertRoleAssignmentInListResponse( + resp, ref=ref, link_url=link_url, expected=0) + # policy validation def assertValidPolicyListResponse(self, resp, *args, **kwargs): diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py index 1e10070e0a..af8890fb03 100644 --- a/tests/test_v3_identity.py +++ b/tests/test_v3_identity.py @@ -627,3 +627,444 @@ class IdentityTestCase(test_v3.RestfulTestCase): r = self.get(collection_url) self.assertValidRoleListResponse(r, expected_length=0) self.assertIn(collection_url, r.result['links']['self']) + + def _build_role_assignment_url_and_entity( + self, role_id, user_id=None, group_id=None, domain_id=None, + project_id=None): + + if user_id and domain_id: + url = ('/domains/%(domain_id)s/users/%(user_id)s' + '/roles/%(role_id)s' % { + 'domain_id': domain_id, + 'user_id': user_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'user': {'id': user_id}, + 'scope': {'domain': {'id': domain_id}}} + elif user_id and project_id: + url = ('/projects/%(project_id)s/users/%(user_id)s' + '/roles/%(role_id)s' % { + 'project_id': project_id, + 'user_id': user_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'user': {'id': user_id}, + 'scope': {'project': {'id': project_id}}} + if group_id and domain_id: + url = ('/domains/%(domain_id)s/groups/%(group_id)s' + '/roles/%(role_id)s' % { + 'domain_id': domain_id, + 'group_id': group_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'group': {'id': group_id}, + 'scope': {'domain': {'id': domain_id}}} + elif group_id and project_id: + url = ('/projects/%(project_id)s/groups/%(group_id)s' + '/roles/%(role_id)s' % { + 'project_id': project_id, + 'group_id': group_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'group': {'id': group_id}, + 'scope': {'project': {'id': project_id}}} + return (url, entity) + + def test_get_role_assignments(self): + """Call ``GET /role_assignments``. + + The sample data set up already has a user, group and project + that is part of self.domain. We use these plus a new user + we create as our data set, making sure we ignore any + role assignments that are already in existence. + + Since we don't yet support a first class entity for role + assignments, we are only testing the LIST API. To create + and delete the role assignments we use the old grant APIs. + + Test Plan: + - Create extra user for tests + - Get a list of all existing role assignments + - Add a new assignment for each of the four combinations, i.e. + group+domain, user+domain, group+project, user+project, using + the same role each time + - Get a new list of all role assignments, checking these four new + ones have been added + - Then delete the four we added + - Get a new list of all role assignments, checking the four have + been removed + + """ + + # Since the default fixtures already assign some roles to the + # user it creates, we also need a new user that will not have any + # existing assignments + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + self.user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user1['id'], self.user1) + + collection_url = '/role_assignments' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertIn(collection_url, r.result['links']['self']) + existing_assignments = len(r.result.get('role_assignments')) + + # Now add one of each of the four types of assignment, making sure + # that we get them all back. + gd_url, gd_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, group_id=self.group_id, + role_id=self.role_id) + self.put(gd_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 1) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + ud_url, ud_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user1['id'], + role_id=self.role_id) + self.put(ud_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + + gp_url, gp_entity = self._build_role_assignment_url_and_entity( + project_id=self.project_id, group_id=self.group_id, + role_id=self.role_id) + self.put(gp_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 3) + self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) + + up_url, up_entity = self._build_role_assignment_url_and_entity( + project_id=self.project_id, user_id=self.user1['id'], + role_id=self.role_id) + self.put(up_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 4) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + + # Now delete the four we added and make sure they are removed + # from the collection. + + self.delete(gd_url) + self.delete(ud_url) + self.delete(gp_url) + self.delete(up_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments) + self.assertRoleAssignmentNotInListResponse(r, gd_entity) + self.assertRoleAssignmentNotInListResponse(r, ud_entity) + self.assertRoleAssignmentNotInListResponse(r, gp_entity) + self.assertRoleAssignmentNotInListResponse(r, up_entity) + + def test_get_effective_role_assignments(self): + """Call ``GET /role_assignments?effective``. + + Test Plan: + - Create two extra user for tests + - Add these users to a group + - Add a role assignment for the group on a domain + - Get a list of all role assignments, checking one has been added + - Then get a list of all effective role assignments - the group + assignment should have turned into assignments on the domain + for each of the group members. + + """ + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + self.user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user1['id'], self.user1) + self.user2 = self.new_user_ref( + domain_id=self.domain['id']) + self.user2['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user2['id'], self.user2) + self.identity_api.add_user_to_group(self.user1['id'], self.group['id']) + self.identity_api.add_user_to_group(self.user2['id'], self.group['id']) + + collection_url = '/role_assignments' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertIn(collection_url, r.result['links']['self']) + existing_assignments = len(r.result.get('role_assignments')) + + gd_url, gd_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, group_id=self.group_id, + role_id=self.role_id) + self.put(gd_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 1) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + # Now re-read the collection asking for effective roles - this + # should mean the group assignment is translated into the two + # member user assignments + collection_url = '/role_assignments?effective' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + ud_url, ud_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user1['id'], + role_id=self.role_id) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + ud_url, ud_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user2['id'], + role_id=self.role_id) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + + def test_check_effective_values_for_role_assignments(self): + """Call ``GET /role_assignments?effective=value``. + + Check the various ways of specifying the 'effective' + query parameter. If the 'effective' query parameter + is included then this should always be treated as + as meaning 'True' unless it is specified as: + + {url}?effective=0 + + This is by design to match the agreed way of handling + policy checking on query/filter parameters. + + Test Plan: + - Create two extra user for tests + - Add these users to a group + - Add a role assignment for the group on a domain + - Get a list of all role assignments, checking one has been added + - Then issue various request with different ways of defining + the 'effective' query parameter. As we have tested the + correctness of the data coming back when we get effective roles + in other tests, here we just use the count of entities to + know if we are getting effective roles or not + + """ + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + self.user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user1['id'], self.user1) + self.user2 = self.new_user_ref( + domain_id=self.domain['id']) + self.user2['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user2['id'], self.user2) + self.identity_api.add_user_to_group(self.user1['id'], self.group['id']) + self.identity_api.add_user_to_group(self.user2['id'], self.group['id']) + + collection_url = '/role_assignments' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + existing_assignments = len(r.result.get('role_assignments')) + + gd_url, gd_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, group_id=self.group_id, + role_id=self.role_id) + self.put(gd_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 1) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + # Now re-read the collection asking for effective roles, + # using the most common way of defining "effective'. This + # should mean the group assignment is translated into the two + # member user assignments + collection_url = '/role_assignments?effective' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + # Now set 'effective' to false explicitly - should get + # back the regular roles + collection_url = '/role_assignments?effective=0' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 1) + # Now try setting 'effective' to 'False' explicitly- this is + # NOT supported as a way of setting a query or filter + # parameter to false by design. Hence we should get back + # effective roles. + collection_url = '/role_assignments?effective=False' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + # Now set 'effective' to True explicitly + collection_url = '/role_assignments?effective=True' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + + def test_filtered_role_assignments(self): + """Call ``GET /role_assignments?filters``. + + Test Plan: + - Create extra users, group, role and project for tests + - Make the following assignments: + Give group1, role1 on project1 and domain + Give user1, role2 on project1 and domain + Make User1 a member of Group1 + - Test a series of single filter list calls, checking that + the correct results are obtained + - Test a multi-filtered list call + - Test listing all effective roles for a given user + - Test the equivalent of the list of roles in a project scoped + token (all effective roles for a user on a project) + + """ + + # Since the default fixtures already assign some roles to the + # user it creates, we also need a new user that will not have any + # existing assignments + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + self.user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user1['id'], self.user1) + self.user2 = self.new_user_ref( + domain_id=self.domain['id']) + self.user2['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user2['id'], self.user2) + self.group1 = self.new_group_ref( + domain_id=self.domain['id']) + self.identity_api.create_group(self.group1['id'], self.group1) + self.identity_api.add_user_to_group(self.user1['id'], + self.group1['id']) + self.identity_api.add_user_to_group(self.user2['id'], + self.group1['id']) + self.project1 = self.new_project_ref( + domain_id=self.domain['id']) + self.identity_api.create_project(self.project1['id'], self.project1) + self.role1 = self.new_role_ref() + self.identity_api.create_role(self.role1['id'], self.role1) + self.role2 = self.new_role_ref() + self.identity_api.create_role(self.role2['id'], self.role2) + + # Now add one of each of the four types of assignment + + gd_url, gd_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, group_id=self.group1['id'], + role_id=self.role1['id']) + self.put(gd_url) + + ud_url, ud_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user1['id'], + role_id=self.role2['id']) + self.put(ud_url) + + gp_url, gp_entity = self._build_role_assignment_url_and_entity( + project_id=self.project1['id'], group_id=self.group1['id'], + role_id=self.role1['id']) + self.put(gp_url) + + up_url, up_entity = self._build_role_assignment_url_and_entity( + project_id=self.project1['id'], user_id=self.user1['id'], + role_id=self.role2['id']) + self.put(up_url) + + # Now list by various filters to make sure we get back the right ones + + collection_url = ('/role_assignments?scope.project.id=%s' % + self.project1['id']) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) + + collection_url = ('/role_assignments?scope.domain.id=%s' % + self.domain['id']) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + collection_url = '/role_assignments?user.id=%s' % self.user1['id'] + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + + collection_url = '/role_assignments?group.id=%s' % self.group1['id'] + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) + + collection_url = '/role_assignments?role.id=%s' % self.role1['id'] + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) + + # Let's try combining two filers together.... + + collection_url = ( + '/role_assignments?user.id=%(user_id)s' + '&scope.project.id=%(project_id)s' % { + 'user_id': self.user1['id'], + 'project_id': self.project1['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 1) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + + # Now for a harder one - filter for user with effective + # roles - this should return role assignment that were directly + # assigned as well as by virtue of group membership + + collection_url = ('/role_assignments?effective&user.id=%s' % + self.user1['id']) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 4) + # Should have the two direct roles... + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + # ...and the two via group membership... + up1_url, up1_entity = self._build_role_assignment_url_and_entity( + project_id=self.project1['id'], user_id=self.user1['id'], + role_id=self.role1['id']) + ud1_url, ud1_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user1['id'], + role_id=self.role1['id']) + self.assertRoleAssignmentInListResponse(r, up1_entity, + link_url=up1_url) + self.assertRoleAssignmentInListResponse(r, ud1_entity, + link_url=ud1_url) + + # ...and for the grand-daddy of them all, simulate the request + # that would generate the list of effective roles in a project + # scoped token. + + collection_url = ( + '/role_assignments?effective&user.id=%(user_id)s' + '&scope.project.id=%(project_id)s' % { + 'user_id': self.user1['id'], + 'project_id': self.project1['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + # Should have one direct role and one from group membership... + up1_url, up1_entity = self._build_role_assignment_url_and_entity( + project_id=self.project1['id'], user_id=self.user1['id'], + role_id=self.role1['id']) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + self.assertRoleAssignmentInListResponse(r, up1_entity, + link_url=up1_url)