From c56611ff58cc732a021dd28cd252a0a87ee3aaa6 Mon Sep 17 00:00:00 2001 From: Vishakha Agarwal Date: Fri, 22 Feb 2019 00:51:40 +0530 Subject: [PATCH] Implement domain reader for role_assignments This change adds tests cases for the default roles keystone supports at install time. It also modifies the policies for the role_assignments API to be more self-service by properly checking for scopes if accessed with a domain-scoped tokens. This gives domain users the power to query role assignments within the domain they have authorization on without exposing other assignment information in the deployment, domains, or projects. Subsequent patches will: - add functionality for domain members - add functionality for domain admins - add functionality for project readers - add functionality for project members - add functionality for project admins - remove the obsolete policies from policy.v3cloudsample.json Co-Authored-By: Lance Bragstad Partial-Bug: 1750673 Change-Id: I0c6d202a315d4683e2589f0d9121e93c97fb13e4 (cherry picked from commit 425d48ec0aa44b46c628d8c238bcf97f315d0f05) --- keystone/api/role_assignments.py | 26 +- keystone/common/policies/role_assignment.py | 9 +- .../unit/protection/v3/test_assignment.py | 311 ++++++++++++++++++ 3 files changed, 342 insertions(+), 4 deletions(-) diff --git a/keystone/api/role_assignments.py b/keystone/api/role_assignments.py index 76cf55fc9a..d1cfd90c48 100644 --- a/keystone/api/role_assignments.py +++ b/keystone/api/role_assignments.py @@ -49,9 +49,31 @@ class RoleAssignmentsResource(ks_flask.ResourceBase): 'group.id', 'role.id', 'scope.domain.id', 'scope.project.id', 'scope.OS-INHERIT:inherited_to', 'user.id', 'scope.system' ] + target = None + if self.oslo_context.domain_id: + target = {'domain_id': self.oslo_context.domain_id} ENFORCER.enforce_call(action='identity:list_role_assignments', - filters=filters) - return self._build_role_assignments_list() + filters=filters, + target_attr=target) + + assignments = self._build_role_assignments_list() + + if self.oslo_context.domain_id: + domain_assignments = [] + for assignment in assignments['role_assignments']: + domain_id = assignment['scope'].get('domain', {}).get('id') + project_id = assignment['scope'].get('project', {}).get('id') + if domain_id == self.oslo_context.domain_id: + domain_assignments.append(assignment) + continue + elif project_id: + project = PROVIDERS.resource_api.get_project(project_id) + if project.get('domain_id') == self.oslo_context.domain_id: + domain_assignments.append(assignment) + + assignments['role_assignments'] = domain_assignments + + return assignments def _list_role_assignments_for_tree(self): filters = [ diff --git a/keystone/common/policies/role_assignment.py b/keystone/common/policies/role_assignment.py index 86ad9046b6..2dda64d00a 100644 --- a/keystone/common/policies/role_assignment.py +++ b/keystone/common/policies/role_assignment.py @@ -15,6 +15,11 @@ from oslo_policy import policy from keystone.common.policies import base +SYSTEM_READER_OR_DOMAIN_READER = ( + '(' + base.SYSTEM_READER + ') or ' + '(role:reader and domain_id:%(target.domain_id)s)' +) + deprecated_list_role_assignments = policy.DeprecatedRule( name=base.IDENTITY % 'list_role_assignments', check_str=base.RULE_ADMIN_REQUIRED @@ -31,7 +36,7 @@ account for these changes automatically. role_assignment_policies = [ policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_role_assignments', - check_str=base.SYSTEM_READER, + check_str=SYSTEM_READER_OR_DOMAIN_READER, # FIXME(lbragstad): This API will behave differently depending on the # token scope used to call the API. A system administrator should be # able to list all role assignment across the entire deployment. A @@ -40,7 +45,7 @@ role_assignment_policies = [ # make keystone smart enough to handle those cases in code, we can add # 'project' to the scope_types below. For now, this should be a system # administrator only operation to maintain backwards compatibility. - scope_types=['system'], + scope_types=['system', 'domain'], description='List role assignments.', operations=[{'path': '/v3/role_assignments', 'method': 'GET'}, diff --git a/keystone/tests/unit/protection/v3/test_assignment.py b/keystone/tests/unit/protection/v3/test_assignment.py index 1827447c4b..3e85f1d885 100644 --- a/keystone/tests/unit/protection/v3/test_assignment.py +++ b/keystone/tests/unit/protection/v3/test_assignment.py @@ -649,6 +649,275 @@ class _SystemUserTests(object): self.assertIn(assignment, expected) +class _DomainUserTests(object): + """Common functionality for domain users.""" + + def _setup_test_role_assignments_for_domain(self): + # Populate role assignment within `self.domain_id` so that we can + # assert users can view assignments within the domain they have + # authorization on + role_id = self.bootstrapper.reader_role_id + + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + ) + + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + ) + + project = PROVIDERS.resource_api.create_project( + uuid.uuid4().hex, + unit.new_project_ref(domain_id=self.domain_id) + ) + + # create a user+project role assignment. + PROVIDERS.assignment_api.create_grant( + role_id, user_id=user['id'], project_id=project['id'] + ) + + # create a user+domain role assignment. + PROVIDERS.assignment_api.create_grant( + role_id, user_id=user['id'], domain_id=self.domain_id + ) + + # create a group+project role assignment. + PROVIDERS.assignment_api.create_grant( + role_id, group_id=group['id'], project_id=project['id'] + ) + + # create a group+domain role assignment. + PROVIDERS.assignment_api.create_grant( + role_id, group_id=group['id'], domain_id=self.domain_id + ) + + return { + 'user_id': user['id'], + 'group_id': group['id'], + 'project_id': project['id'], + 'role_id': role_id, + } + + def test_user_can_list_all_assignments_in_their_domain(self): + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + self.expected.append({ + 'user_id': domain_assignments['user_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }) + self.expected.append({ + 'user_id': domain_assignments['user_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + }) + self.expected.append({ + 'group_id': domain_assignments['group_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }) + self.expected.append({ + 'group_id': domain_assignments['group_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + }) + + with self.test_client() as c: + r = c.get('/v3/role_assignments', headers=self.headers) + self.assertEqual( + len(self.expected), len(r.json['role_assignments']) + ) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, self.expected) + + def test_user_can_filter_role_assignments_by_project_in_domain(self): + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + expected = [ + { + 'user_id': domain_assignments['user_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + }, + { + 'group_id': domain_assignments['group_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + } + ] + + project_id = domain_assignments['project_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.project.id=%s' % project_id, + headers=self.headers + ) + self.assertEqual(len(expected), len(r.json['role_assignments'])) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, expected) + + def test_user_can_filter_role_assignments_by_domain(self): + # This shouldn't really provide any more value than just calling GET + # /v3/role_assignments with a domain-scoped token, but we test it + # anyway. + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + self.expected.append({ + 'user_id': domain_assignments['user_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }) + self.expected.append({ + 'group_id': domain_assignments['group_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }) + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.domain.id=%s' % self.domain_id, + headers=self.headers + ) + self.assertEqual( + len(self.expected), len(r.json['role_assignments']) + ) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, self.expected) + + def test_user_can_filter_role_assignments_by_user_of_domain(self): + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + expected = [ + { + 'user_id': domain_assignments['user_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }, + { + 'user_id': domain_assignments['user_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + } + ] + + user_id = domain_assignments['user_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?user.id=%s' % user_id, + headers=self.headers + ) + self.assertEqual(len(expected), len(r.json['role_assignments'])) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, expected) + + def test_user_can_filter_role_assignments_by_group_of_domain(self): + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + expected = [ + { + 'group_id': domain_assignments['group_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }, + { + 'group_id': domain_assignments['group_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + } + ] + + group_id = domain_assignments['group_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?group.id=%s' % group_id, + headers=self.headers + ) + self.assertEqual(len(expected), len(r.json['role_assignments'])) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, expected) + + def test_user_cannot_filter_role_assignments_by_system(self): + self._setup_test_role_assignments() + self._setup_test_role_assignments_for_domain() + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.system=all', + headers=self.headers + ) + self.assertEqual(0, len(r.json['role_assignments'])) + + def test_user_cannot_filter_role_assignments_by_other_domain(self): + assignments = self._setup_test_role_assignments() + domain = assignments['domain_id'] + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.domain.id=%s' % domain, + headers=self.headers + ) + self.assertEqual([], r.json['role_assignments']) + + def test_user_cannot_filter_role_assignments_by_other_domain_project(self): + assignments = self._setup_test_role_assignments() + self._setup_test_role_assignments_for_domain() + + # This project is in an entirely separate domain that this user doesn't + # have authorization to access, so they should only see an empty list + project_id = assignments['project_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.project.id=%s' % project_id, + headers=self.headers + ) + self.assertEqual(0, len(r.json['role_assignments'])) + + def test_user_cannot_filter_role_assignments_by_other_domain_user(self): + assignments = self._setup_test_role_assignments() + self._setup_test_role_assignments_for_domain() + + # This user doesn't have any role assignments on self.domain_id, so the + # domain user of self.domain_id should only see an empty list of role + # assignments. + user_id = assignments['user_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?user.id=%s' % user_id, + headers=self.headers + ) + self.assertEqual(0, len(r.json['role_assignments'])) + + def test_user_cannot_filter_role_assignments_by_other_domain_group(self): + assignments = self._setup_test_role_assignments() + self._setup_test_role_assignments_for_domain() + + # This group doesn't have any role assignments on self.domain_id, so + # the domain user of self.domain_id should only see an empty list of + # role assignments. + group_id = assignments['group_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?group.id=%s' % group_id, + headers=self.headers, + ) + self.assertEqual(0, len(r.json['role_assignments'])) + + class SystemReaderTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, _AssignmentTestUtilities, @@ -758,3 +1027,45 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap, r = c.post('/v3/auth/tokens', json=auth) self.token_id = r.headers['X-Subject-Token'] self.headers = {'X-Auth-Token': self.token_id} + + +class DomainReaderTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _AssignmentTestUtilities, + _DomainUserTests): + + def setUp(self): + super(DomainReaderTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + self.domain_id = domain['id'] + domain_reader = unit.new_user_ref(domain_id=self.domain_id) + self.user_id = PROVIDERS.identity_api.create_user(domain_reader)['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + domain_id=self.domain_id + ) + self.expected = [ + # assignment of the user running the test case + { + 'user_id': self.user_id, + 'domain_id': self.domain_id, + 'role_id': self.bootstrapper.reader_role_id + }] + + auth = self.build_authentication_request( + user_id=self.user_id, password=domain_reader['password'], + domain_id=self.domain_id, + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id}