From fa10d4945ca9658eff02b1d8e917fde50d6576ce Mon Sep 17 00:00:00 2001 From: Henry Nash Date: Fri, 21 Jun 2013 22:50:40 +0100 Subject: [PATCH] Implement GET /role_assignment API call Add support for the GET /role_assignment call as a first step to making role_assignment a first class entity. This patch also enables v3 collection filtering to match against attributes of entities being returned in the list, using the same dot notation (e.g. user.id) that we already support for policy file checking against filters. Limitations: - The current implementation uses the standard v3 collections wrapper mechanism for filtering. Given the potential numbers of role assignments in a large system, this may have performance and resource impacts. A future improvement would pass the filters into the driver layer to keep the internal assignment processing to a minimum. - The LDAP backend is not currently supported Implements bp get-role-assignments Change-Id: I6ff2ea780e39d7097a88214fbb3ddee1b924c30c --- etc/policy.json | 2 + keystone/common/controller.py | 3 +- keystone/identity/backends/kvs.py | 39 +++ keystone/identity/backends/sql.py | 37 +++ keystone/identity/controllers.py | 193 +++++++++++++ keystone/identity/core.py | 4 + keystone/identity/routers.py | 4 + tests/test_backend.py | 66 +++++ tests/test_backend_ldap.py | 3 + tests/test_v3.py | 81 ++++++ tests/test_v3_identity.py | 441 ++++++++++++++++++++++++++++++ 11 files changed, 872 insertions(+), 1 deletion(-) diff --git a/etc/policy.json b/etc/policy.json index 4aad4e8c38..2c82f9946e 100644 --- a/etc/policy.json +++ b/etc/policy.json @@ -66,6 +66,8 @@ "identity:create_grant": [["rule:admin_required"]], "identity:revoke_grant": [["rule:admin_required"]], + "identity:list_role_assignments": [["rule:admin_required"]], + "identity:get_policy": [["rule:admin_required"]], "identity:list_policies": [["rule:admin_required"]], "identity:create_policy": [["rule:admin_required"]], diff --git a/keystone/common/controller.py b/keystone/common/controller.py index 13aeee579d..3ca1bf8b20 100644 --- a/keystone/common/controller.py +++ b/keystone/common/controller.py @@ -280,7 +280,8 @@ class V3Controller(V2Controller): if attr in context['query_string']: value = context['query_string'][attr] - return [r for r in refs if _attr_match(r[attr], value)] + return [r for r in refs if _attr_match( + flatten(r).get(attr), value)] return refs def _require_matching_id(self, value, ref): diff --git a/keystone/identity/backends/kvs.py b/keystone/identity/backends/kvs.py index 2eea08cf10..77a86789a2 100644 --- a/keystone/identity/backends/kvs.py +++ b/keystone/identity/backends/kvs.py @@ -180,6 +180,45 @@ class Identity(kvs.Base, identity.Driver): else: self.update_metadata(user_id, tenant_id, metadata_ref) + def list_role_assignments(self): + """List the role assignments. + + The kvs backend stores role assignments as key-values: + + "metadata-{target}-{actor}", with the value being a role list + + i.e. "metadata-MyProjectID-MyUserID" [role1, role2] + + ...so we enumerate the list and extract the targets, actors + and roles. + + """ + assignment_list = [] + metadata_keys = filter(lambda x: x.startswith("metadata-"), + self.db.keys()) + for key in metadata_keys: + template = {} + meta_id1 = key.split('-')[1] + meta_id2 = key.split('-')[2] + try: + self.get_project(meta_id1) + template['project_id'] = meta_id1 + except exception.NotFound: + template['domain_id'] = meta_id1 + try: + self._get_user(meta_id2) + template['user_id'] = meta_id2 + except exception.NotFound: + template['group_id'] = meta_id2 + + entry = self.db.get(key) + for r in entry.get('roles', []): + role_assignment = template.copy() + role_assignment['role_id'] = r + assignment_list.append(role_assignment) + + return assignment_list + # CRUD def create_user(self, user_id, user): try: diff --git a/keystone/identity/backends/sql.py b/keystone/identity/backends/sql.py index f81feb1d1a..2f9d89f3f2 100644 --- a/keystone/identity/backends/sql.py +++ b/keystone/identity/backends/sql.py @@ -355,6 +355,43 @@ class Identity(sql.Base, identity.Driver): self.update_metadata(user_id, project_id, metadata_ref, domain_id, group_id) + def list_role_assignments(self): + + # TODO(henry-nash): The current implementation is really simulating + # us having a common role assignment table, rather than having the + # four different grant tables we have today. When we move to role + # assignment as a first class entity, we should create the single + # assignment table, simplifying the logic of this (and many other) + # functions. + + session = self.get_session() + assignment_list = [] + refs = session.query(UserDomainGrant).all() + for x in refs: + for r in x.data.get('roles', []): + assignment_list.append({'user_id': x.user_id, + 'domain_id': x.domain_id, + 'role_id': r}) + refs = session.query(UserProjectGrant).all() + for x in refs: + for r in x.data.get('roles', []): + assignment_list.append({'user_id': x.user_id, + 'project_id': x.project_id, + 'role_id': r}) + refs = session.query(GroupDomainGrant).all() + for x in refs: + for r in x.data.get('roles', []): + assignment_list.append({'group_id': x.group_id, + 'domain_id': x.domain_id, + 'role_id': r}) + refs = session.query(GroupProjectGrant).all() + for x in refs: + for r in x.data.get('roles', []): + assignment_list.append({'group_id': x.group_id, + 'project_id': x.project_id, + 'role_id': r}) + return assignment_list + def list_projects(self): session = self.get_session() tenant_refs = session.query(Project).all() diff --git a/keystone/identity/controllers.py b/keystone/identity/controllers.py index f798e3dc35..9271f3d90d 100644 --- a/keystone/identity/controllers.py +++ b/keystone/identity/controllers.py @@ -16,6 +16,7 @@ """Workflow Logic the Identity service.""" +import copy import urllib import urlparse import uuid @@ -794,3 +795,195 @@ class RoleV3(controller.V3Controller): self._delete_tokens_for_user(user_id) else: self._delete_tokens_for_group(group_id) + + +class RoleAssignmentV3(controller.V3Controller): + + # TODO(henry-nash): The current implementation does not provide a full + # first class entity for role-assignment. There is no role_assignment_id + # and only the list_role_assignment call is supported. Further, since it + # is not a first class entity, the links for the individual entities + # reference the individual role grant APIs. + + collection_name = 'role_assignments' + member_name = 'role_assignment' + + @classmethod + def wrap_member(cls, context, ref): + # NOTE(henry-nash): Since we are not yet a true collection, we override + # the wrapper as have already included the links in the entities + pass + + def _format_entity(self, entity): + formatted_entity = {} + if 'user_id' in entity: + formatted_entity['user'] = {'id': entity['user_id']} + actor_link = '/users/%s' % entity['user_id'] + if 'group_id' in entity: + formatted_entity['group'] = {'id': entity['group_id']} + actor_link = '/groups/%s' % entity['group_id'] + if 'role_id' in entity: + formatted_entity['role'] = {'id': entity['role_id']} + if 'project_id' in entity: + formatted_entity['scope'] = ( + {'project': {'id': entity['project_id']}}) + target_link = '/projects/%s' % entity['project_id'] + if 'domain_id' in entity: + formatted_entity['scope'] = ( + {'domain': {'id': entity['domain_id']}}) + target_link = '/domains/%s' % entity['domain_id'] + + formatted_entity.setdefault('links', {}) + formatted_entity['links']['assignment'] = ( + self.base_url(target_link + actor_link + + '/roles/%s' % entity['role_id'])) + return formatted_entity + + def _expand_indirect_assignments(self, refs): + """Processes entity list into all-direct assignments. + + For any group role assignments in the list, create a role assignment + entity for each member of that group, and then remove the group + assignment entity itself from the list. + + For any new entity created by virtue of group membership, add in an + additional link to that membership. + + """ + def _get_group_members(ref): + """Get a list of group members. + + Get the list of group members. If this fails with + GroupNotFound, then log this as a warning, but allow + overall processing to continue. + + """ + try: + members = self.identity_api.list_users_in_group( + ref['group']['id']) + except exception.GroupNotFound: + members = [] + # The group is missing, which should not happen since + # group deletion should remove any related assignments, so + # log a warning + if 'domain' in ref: + target = 'Domain: %s' % ref['domain'].get('domain_id') + elif 'project' in ref: + target = 'Project: %s' % ref['project'].get('project_id') + else: + # Should always be a domain or project, but since to get + # here things have gone astray, let's be cautious. + target = 'Unknown' + LOG.warning( + _('Group %(group)s not found for role-assignment - ' + '%(target)s with Role: %(role)s') % { + 'group': ref['group_id'], 'target': target, + 'role': ref.get('role_id')}) + return members + + def _build_equivalent_user_assignment(user, group_id, template): + """Create a user assignment equivalent to the group one. + + The template has had the 'group' entity removed, so + substitute a 'user' one, modify the 'assignment' link + to match, and add a 'membership' link. + + """ + user_entry = copy.deepcopy(template) + user_entry['user'] = {'id': user['id']} + scope = user_entry.get('scope') + if 'domain' in scope: + target_link = ( + '/domains/%s' % scope['domain']['id']) + else: + target_link = ( + '/projects/%s' % scope['project']['id']) + user_entry['links']['assignment'] = ( + self.base_url('%s/users/%s/roles/%s' % + (target_link, m['id'], + user_entry['role']['id']))) + user_entry['links']['membership'] = ( + self.base_url('/groups/%s/users/%s' % + (group_id, user['id']))) + return user_entry + + # Scan the list of entities for any group assignments, expanding + # them into equivalent user entities. Due to potential large + # expansion of group entities, rather than modify the + # list we are enumerating, we build a new one as we go. + new_refs = [] + for r in refs: + if 'group' in r: + # As it is a group role assignment, first get the list of + # members. + + members = _get_group_members(r) + + # Now replace that group role assignment entry with an + # equivalent user role assignment for each of the group members + + base_entry = copy.deepcopy(r) + group_id = base_entry['group']['id'] + base_entry.pop('group') + for m in members: + user_entry = _build_equivalent_user_assignment( + m, group_id, base_entry) + new_refs.append(user_entry) + else: + new_refs.append(r) + + return new_refs + + def _query_filter_is_true(self, filter_value): + """Determine if bool query param is 'True'. + + We treat this the same way as we do for policy + enforcement: + + {bool_param}=0 is treated as False + + Any other value is considered to be equivalent to + True, including the absence of a value + + """ + + if (isinstance(filter_value, basestring) and + filter_value == '0'): + val = False + else: + val = True + return val + + @controller.filterprotected('group.id', 'role.id', + 'scope.domain.id', 'scope.project.id', + 'user.id') + def list_role_assignments(self, context, filters): + + # TODO(henry-nash): This implementation uses the standard filtering + # in the V3.wrap_collection. Given the large number of individual + # assignments, this is pretty inefficient. An alternative would be + # to pass the filters into the driver call, so that the list size is + # kept a minimum. + + refs = self.identity_api.list_role_assignments() + formatted_refs = [self._format_entity(x) for x in refs] + + if ('effective' in context['query_string'] and + self._query_filter_is_true( + context['query_string']['effective'])): + + formatted_refs = self._expand_indirect_assignments(formatted_refs) + + return self.wrap_collection(context, formatted_refs, filters) + + @controller.protected + def get_role_assignment(self, context): + raise exception.NotImplemented() + + @controller.protected + def update_role_assignment(self, context): + raise exception.NotImplemented() + + @controller.protected + def delete_role_assignment(self, context): + raise exception.NotImplemented() diff --git a/keystone/identity/core.py b/keystone/identity/core.py index a254470e9c..77870fda95 100644 --- a/keystone/identity/core.py +++ b/keystone/identity/core.py @@ -504,6 +504,10 @@ class Driver(object): """ raise exception.NotImplemented() + def list_role_assignments(self): + + raise exception.NotImplemented() + # group crud def create_group(self, group_id, group): diff --git a/keystone/identity/routers.py b/keystone/identity/routers.py index 32eada5e1e..ab71eb4fbc 100644 --- a/keystone/identity/routers.py +++ b/keystone/identity/routers.py @@ -173,3 +173,7 @@ def append_v3_routers(mapper, routers): controller=role_controller, action='revoke_grant', conditions=dict(method=['DELETE'])) + + routers.append( + router.Router(controllers.RoleAssignmentV3(), + 'role_assignments', 'role_assignment')) diff --git a/tests/test_backend.py b/tests/test_backend.py index ea40cd8b1a..f8d04b6ebf 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -482,6 +482,72 @@ class IdentityTests(object): self.identity_api.get_project, 'fake2') + def test_list_role_assignments_unfiltered(self): + """Test for unfiltered listing role assignments. + + Test Plan: + - Create a domain, with a user, group & project + - Find how many role assignments already exist (from default + fixtures) + - Create a grant of each type (user/group on project/domain) + - Check the number of assignments has gone up by 4 and that + the entries we added are in the list returned + + """ + new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(new_domain['id'], new_domain) + new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, 'enabled': True, + 'domain_id': new_domain['id']} + self.identity_api.create_user(new_user['id'], + new_user) + new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'], + 'name': uuid.uuid4().hex} + self.identity_api.create_group(new_group['id'], new_group) + new_project = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': new_domain['id']} + self.identity_api.create_project(new_project['id'], new_project) + + # First check how many role grant already exist + existing_assignments = len(self.identity_api.list_role_assignments()) + + # Now create the grants (roles are defined in default_fixtures) + self.identity_api.create_grant(user_id=new_user['id'], + domain_id=new_domain['id'], + role_id='member') + self.identity_api.create_grant(user_id=new_user['id'], + project_id=new_project['id'], + role_id='other') + self.identity_api.create_grant(group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='admin') + self.identity_api.create_grant(group_id=new_group['id'], + project_id=new_project['id'], + role_id='admin') + + # Read back the list of assignments - check it is gone up by 4 + assignment_list = self.identity_api.list_role_assignments() + self.assertEquals(len(assignment_list), existing_assignments + 4) + + # Now check that each of our four new entries are in the list + self.assertIn( + {'user_id': new_user['id'], 'domain_id': new_domain['id'], + 'role_id': 'member'}, + assignment_list) + self.assertIn( + {'user_id': new_user['id'], 'project_id': new_project['id'], + 'role_id': 'other'}, + assignment_list) + self.assertIn( + {'group_id': new_group['id'], 'domain_id': new_domain['id'], + 'role_id': 'admin'}, + assignment_list) + self.assertIn( + {'group_id': new_group['id'], 'project_id': new_project['id'], + 'role_id': 'admin'}, + assignment_list) + def test_add_duplicate_role_grant(self): roles_ref = self.identity_api.get_roles_for_user_and_project( self.user_foo['id'], self.tenant_bar['id']) diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py index 577a6ef04a..a38f1ece4c 100644 --- a/tests/test_backend_ldap.py +++ b/tests/test_backend_ldap.py @@ -458,6 +458,9 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): def test_get_and_remove_correct_role_grant_from_a_mix(self): raise nose.exc.SkipTest('Blocked by bug 1101287') + def test_list_role_assignments_unfiltered(self): + raise nose.exc.SkipTest('Blocked by bug 1195019') + def test_project_crud(self): # NOTE(topol): LDAP implementation does not currently support the # updating of a project name so this method override diff --git a/tests/test_v3.py b/tests/test_v3.py index 93293b6c91..52d201bc61 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -715,6 +715,87 @@ class RestfulTestCase(test_content_types.RestfulTestCase): self.assertEqual(ref['name'], entity['name']) return entity + def assertValidRoleAssignmentListResponse(self, resp, ref=None, + expected_length=None): + + entities = resp.result.get('role_assignments') + + if expected_length is not None: + self.assertEqual(len(entities), expected_length) + elif ref is not None: + # we're at least expecting the ref + self.assertNotEmpty(entities) + + # collections should have relational links + self.assertValidListLinks(resp.result.get('links')) + + for entity in entities: + self.assertIsNotNone(entity) + self.assertValidRoleAssignment(entity) + if ref: + self.assertValidRoleAssignment(entity, ref) + return entities + + def assertValidRoleAssignment(self, entity, ref=None, url=None): + self.assertIsNotNone(entity.get('role')) + self.assertIsNotNone(entity.get('scope')) + + # Only one of user or group should be present + self.assertIsNotNone(entity.get('user') or + entity.get('group')) + self.assertIsNone(entity.get('user') and + entity.get('group')) + + # Only one of domain or project should be present + self.assertIsNotNone(entity['scope'].get('project') or + entity['scope'].get('domain')) + self.assertIsNone(entity['scope'].get('project') and + entity['scope'].get('domain')) + + if entity['scope'].get('project'): + self.assertIsNotNone(entity['scope']['project'].get('id')) + else: + self.assertIsNotNone(entity['scope']['domain'].get('id')) + self.assertIsNotNone(entity.get('links')) + self.assertIsNotNone(entity['links'].get('assignment')) + + if ref: + if ref.get('user'): + self.assertEqual(ref['user']['id'], entity['user']['id']) + if ref.get('group'): + self.assertEqual(ref['group']['id'], entity['group']['id']) + if ref.get('role'): + self.assertEqual(ref['role']['id'], entity['role']['id']) + if ref['scope'].get('project'): + self.assertEqual(ref['scope']['project']['id'], + entity['scope']['project']['id']) + if ref['scope'].get('domain'): + self.assertEqual(ref['scope']['domain']['id'], + entity['scope']['domain']['id']) + if url: + self.assertIn(url, entity['links']['assignment']) + + def assertRoleAssignmentInListResponse( + self, resp, ref, link_url=None, expected=1): + + found_count = 0 + for entity in resp.result.get('role_assignments'): + try: + self.assertValidRoleAssignment( + entity, ref=ref, url=link_url) + except Exception: + # It doesn't match, so let's go onto the next one + pass + else: + found_count += 1 + self.assertEqual(found_count, expected) + + def assertRoleAssignmentNotInListResponse( + self, resp, ref, link_url=None): + + self.assertRoleAssignmentInListResponse( + resp, ref=ref, link_url=link_url, expected=0) + # policy validation def assertValidPolicyListResponse(self, resp, *args, **kwargs): diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py index 1e10070e0a..af8890fb03 100644 --- a/tests/test_v3_identity.py +++ b/tests/test_v3_identity.py @@ -627,3 +627,444 @@ class IdentityTestCase(test_v3.RestfulTestCase): r = self.get(collection_url) self.assertValidRoleListResponse(r, expected_length=0) self.assertIn(collection_url, r.result['links']['self']) + + def _build_role_assignment_url_and_entity( + self, role_id, user_id=None, group_id=None, domain_id=None, + project_id=None): + + if user_id and domain_id: + url = ('/domains/%(domain_id)s/users/%(user_id)s' + '/roles/%(role_id)s' % { + 'domain_id': domain_id, + 'user_id': user_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'user': {'id': user_id}, + 'scope': {'domain': {'id': domain_id}}} + elif user_id and project_id: + url = ('/projects/%(project_id)s/users/%(user_id)s' + '/roles/%(role_id)s' % { + 'project_id': project_id, + 'user_id': user_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'user': {'id': user_id}, + 'scope': {'project': {'id': project_id}}} + if group_id and domain_id: + url = ('/domains/%(domain_id)s/groups/%(group_id)s' + '/roles/%(role_id)s' % { + 'domain_id': domain_id, + 'group_id': group_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'group': {'id': group_id}, + 'scope': {'domain': {'id': domain_id}}} + elif group_id and project_id: + url = ('/projects/%(project_id)s/groups/%(group_id)s' + '/roles/%(role_id)s' % { + 'project_id': project_id, + 'group_id': group_id, + 'role_id': role_id}) + entity = {'role': {'id': role_id}, + 'group': {'id': group_id}, + 'scope': {'project': {'id': project_id}}} + return (url, entity) + + def test_get_role_assignments(self): + """Call ``GET /role_assignments``. + + The sample data set up already has a user, group and project + that is part of self.domain. We use these plus a new user + we create as our data set, making sure we ignore any + role assignments that are already in existence. + + Since we don't yet support a first class entity for role + assignments, we are only testing the LIST API. To create + and delete the role assignments we use the old grant APIs. + + Test Plan: + - Create extra user for tests + - Get a list of all existing role assignments + - Add a new assignment for each of the four combinations, i.e. + group+domain, user+domain, group+project, user+project, using + the same role each time + - Get a new list of all role assignments, checking these four new + ones have been added + - Then delete the four we added + - Get a new list of all role assignments, checking the four have + been removed + + """ + + # Since the default fixtures already assign some roles to the + # user it creates, we also need a new user that will not have any + # existing assignments + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + self.user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user1['id'], self.user1) + + collection_url = '/role_assignments' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertIn(collection_url, r.result['links']['self']) + existing_assignments = len(r.result.get('role_assignments')) + + # Now add one of each of the four types of assignment, making sure + # that we get them all back. + gd_url, gd_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, group_id=self.group_id, + role_id=self.role_id) + self.put(gd_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 1) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + ud_url, ud_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user1['id'], + role_id=self.role_id) + self.put(ud_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + + gp_url, gp_entity = self._build_role_assignment_url_and_entity( + project_id=self.project_id, group_id=self.group_id, + role_id=self.role_id) + self.put(gp_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 3) + self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) + + up_url, up_entity = self._build_role_assignment_url_and_entity( + project_id=self.project_id, user_id=self.user1['id'], + role_id=self.role_id) + self.put(up_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 4) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + + # Now delete the four we added and make sure they are removed + # from the collection. + + self.delete(gd_url) + self.delete(ud_url) + self.delete(gp_url) + self.delete(up_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments) + self.assertRoleAssignmentNotInListResponse(r, gd_entity) + self.assertRoleAssignmentNotInListResponse(r, ud_entity) + self.assertRoleAssignmentNotInListResponse(r, gp_entity) + self.assertRoleAssignmentNotInListResponse(r, up_entity) + + def test_get_effective_role_assignments(self): + """Call ``GET /role_assignments?effective``. + + Test Plan: + - Create two extra user for tests + - Add these users to a group + - Add a role assignment for the group on a domain + - Get a list of all role assignments, checking one has been added + - Then get a list of all effective role assignments - the group + assignment should have turned into assignments on the domain + for each of the group members. + + """ + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + self.user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user1['id'], self.user1) + self.user2 = self.new_user_ref( + domain_id=self.domain['id']) + self.user2['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user2['id'], self.user2) + self.identity_api.add_user_to_group(self.user1['id'], self.group['id']) + self.identity_api.add_user_to_group(self.user2['id'], self.group['id']) + + collection_url = '/role_assignments' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertIn(collection_url, r.result['links']['self']) + existing_assignments = len(r.result.get('role_assignments')) + + gd_url, gd_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, group_id=self.group_id, + role_id=self.role_id) + self.put(gd_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 1) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + # Now re-read the collection asking for effective roles - this + # should mean the group assignment is translated into the two + # member user assignments + collection_url = '/role_assignments?effective' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + ud_url, ud_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user1['id'], + role_id=self.role_id) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + ud_url, ud_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user2['id'], + role_id=self.role_id) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + + def test_check_effective_values_for_role_assignments(self): + """Call ``GET /role_assignments?effective=value``. + + Check the various ways of specifying the 'effective' + query parameter. If the 'effective' query parameter + is included then this should always be treated as + as meaning 'True' unless it is specified as: + + {url}?effective=0 + + This is by design to match the agreed way of handling + policy checking on query/filter parameters. + + Test Plan: + - Create two extra user for tests + - Add these users to a group + - Add a role assignment for the group on a domain + - Get a list of all role assignments, checking one has been added + - Then issue various request with different ways of defining + the 'effective' query parameter. As we have tested the + correctness of the data coming back when we get effective roles + in other tests, here we just use the count of entities to + know if we are getting effective roles or not + + """ + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + self.user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user1['id'], self.user1) + self.user2 = self.new_user_ref( + domain_id=self.domain['id']) + self.user2['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user2['id'], self.user2) + self.identity_api.add_user_to_group(self.user1['id'], self.group['id']) + self.identity_api.add_user_to_group(self.user2['id'], self.group['id']) + + collection_url = '/role_assignments' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + existing_assignments = len(r.result.get('role_assignments')) + + gd_url, gd_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, group_id=self.group_id, + role_id=self.role_id) + self.put(gd_url) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 1) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + # Now re-read the collection asking for effective roles, + # using the most common way of defining "effective'. This + # should mean the group assignment is translated into the two + # member user assignments + collection_url = '/role_assignments?effective' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + # Now set 'effective' to false explicitly - should get + # back the regular roles + collection_url = '/role_assignments?effective=0' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 1) + # Now try setting 'effective' to 'False' explicitly- this is + # NOT supported as a way of setting a query or filter + # parameter to false by design. Hence we should get back + # effective roles. + collection_url = '/role_assignments?effective=False' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + # Now set 'effective' to True explicitly + collection_url = '/role_assignments?effective=True' + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), + existing_assignments + 2) + + def test_filtered_role_assignments(self): + """Call ``GET /role_assignments?filters``. + + Test Plan: + - Create extra users, group, role and project for tests + - Make the following assignments: + Give group1, role1 on project1 and domain + Give user1, role2 on project1 and domain + Make User1 a member of Group1 + - Test a series of single filter list calls, checking that + the correct results are obtained + - Test a multi-filtered list call + - Test listing all effective roles for a given user + - Test the equivalent of the list of roles in a project scoped + token (all effective roles for a user on a project) + + """ + + # Since the default fixtures already assign some roles to the + # user it creates, we also need a new user that will not have any + # existing assignments + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + self.user1['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user1['id'], self.user1) + self.user2 = self.new_user_ref( + domain_id=self.domain['id']) + self.user2['password'] = uuid.uuid4().hex + self.identity_api.create_user(self.user2['id'], self.user2) + self.group1 = self.new_group_ref( + domain_id=self.domain['id']) + self.identity_api.create_group(self.group1['id'], self.group1) + self.identity_api.add_user_to_group(self.user1['id'], + self.group1['id']) + self.identity_api.add_user_to_group(self.user2['id'], + self.group1['id']) + self.project1 = self.new_project_ref( + domain_id=self.domain['id']) + self.identity_api.create_project(self.project1['id'], self.project1) + self.role1 = self.new_role_ref() + self.identity_api.create_role(self.role1['id'], self.role1) + self.role2 = self.new_role_ref() + self.identity_api.create_role(self.role2['id'], self.role2) + + # Now add one of each of the four types of assignment + + gd_url, gd_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, group_id=self.group1['id'], + role_id=self.role1['id']) + self.put(gd_url) + + ud_url, ud_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user1['id'], + role_id=self.role2['id']) + self.put(ud_url) + + gp_url, gp_entity = self._build_role_assignment_url_and_entity( + project_id=self.project1['id'], group_id=self.group1['id'], + role_id=self.role1['id']) + self.put(gp_url) + + up_url, up_entity = self._build_role_assignment_url_and_entity( + project_id=self.project1['id'], user_id=self.user1['id'], + role_id=self.role2['id']) + self.put(up_url) + + # Now list by various filters to make sure we get back the right ones + + collection_url = ('/role_assignments?scope.project.id=%s' % + self.project1['id']) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) + + collection_url = ('/role_assignments?scope.domain.id=%s' % + self.domain['id']) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + + collection_url = '/role_assignments?user.id=%s' % self.user1['id'] + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + + collection_url = '/role_assignments?group.id=%s' % self.group1['id'] + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) + + collection_url = '/role_assignments?role.id=%s' % self.role1['id'] + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url) + self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url) + + # Let's try combining two filers together.... + + collection_url = ( + '/role_assignments?user.id=%(user_id)s' + '&scope.project.id=%(project_id)s' % { + 'user_id': self.user1['id'], + 'project_id': self.project1['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 1) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + + # Now for a harder one - filter for user with effective + # roles - this should return role assignment that were directly + # assigned as well as by virtue of group membership + + collection_url = ('/role_assignments?effective&user.id=%s' % + self.user1['id']) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 4) + # Should have the two direct roles... + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url) + # ...and the two via group membership... + up1_url, up1_entity = self._build_role_assignment_url_and_entity( + project_id=self.project1['id'], user_id=self.user1['id'], + role_id=self.role1['id']) + ud1_url, ud1_entity = self._build_role_assignment_url_and_entity( + domain_id=self.domain_id, user_id=self.user1['id'], + role_id=self.role1['id']) + self.assertRoleAssignmentInListResponse(r, up1_entity, + link_url=up1_url) + self.assertRoleAssignmentInListResponse(r, ud1_entity, + link_url=ud1_url) + + # ...and for the grand-daddy of them all, simulate the request + # that would generate the list of effective roles in a project + # scoped token. + + collection_url = ( + '/role_assignments?effective&user.id=%(user_id)s' + '&scope.project.id=%(project_id)s' % { + 'user_id': self.user1['id'], + 'project_id': self.project1['id']}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse(r) + self.assertEqual(len(r.result.get('role_assignments')), 2) + # Should have one direct role and one from group membership... + up1_url, up1_entity = self._build_role_assignment_url_and_entity( + project_id=self.project1['id'], user_id=self.user1['id'], + role_id=self.role1['id']) + self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url) + self.assertRoleAssignmentInListResponse(r, up1_entity, + link_url=up1_url)