diff --git a/keystone/api/users.py b/keystone/api/users.py index 0c4a5f0d22..f31b5bf32f 100644 --- a/keystone/api/users.py +++ b/keystone/api/users.py @@ -162,9 +162,16 @@ class UserResource(ks_flask.ResourceBase): """ filters = ('domain_id', 'enabled', 'idp_id', 'name', 'protocol_id', 'unique_id', 'password_expires_at') + target = None + if self.oslo_context.domain_id: + target = {'domain_id': self.oslo_context.domain_id} hints = self.build_driver_hints(filters) - ENFORCER.enforce_call(action='identity:list_users', filters=filters) + ENFORCER.enforce_call( + action='identity:list_users', filters=filters, target_attr=target + ) domain = self._get_domain_id_for_list_request() + if domain is None and self.oslo_context.domain_id: + domain = self.oslo_context.domain_id refs = PROVIDERS.identity_api.list_users( domain_scope=domain, hints=hints) return self.wrap_collection(refs, hints=hints) diff --git a/keystone/common/policies/base.py b/keystone/common/policies/base.py index 6375070e1c..00b24e2c63 100644 --- a/keystone/common/policies/base.py +++ b/keystone/common/policies/base.py @@ -44,6 +44,7 @@ RULE_TRUST_OWNER = 'user_id:%(trust.trustor_user_id)s' # automatically by scope_types in oslo.policy's RuleDefault objects. SYSTEM_READER = 'role:reader and system_scope:all' SYSTEM_ADMIN = 'role:admin and system_scope:all' +DOMAIN_READER = 'role:reader and domain_id:%(target.domain_id)s' rules = [ diff --git a/keystone/common/policies/user.py b/keystone/common/policies/user.py index 346719677f..fd289bdc4b 100644 --- a/keystone/common/policies/user.py +++ b/keystone/common/policies/user.py @@ -15,10 +15,14 @@ from oslo_policy import policy from keystone.common.policies import base -# Allow access for system readers or users attempting to list their owner user -# reference. -SYSTEM_READER_OR_USER = ( - '(' + base.SYSTEM_READER + ') or user_id:%(target.user.id)s' +SYSTEM_READER_OR_DOMAIN_READER_OR_USER = ( + '(' + base.SYSTEM_READER + ') or ' + '(role:reader and token.domain.id:%(target.user.domain_id)s) or ' + 'user_id:%(target.user.id)s' +) + +SYSTEM_READER_OR_DOMAIN_READER = ( + '(' + base.SYSTEM_READER + ') or (' + base.DOMAIN_READER + ')' ) DEPRECATED_REASON = """ @@ -52,8 +56,8 @@ deprecated_delete_user = policy.DeprecatedRule( user_policies = [ policy.DocumentedRuleDefault( name=base.IDENTITY % 'get_user', - check_str=SYSTEM_READER_OR_USER, - scope_types=['system', 'project'], + check_str=SYSTEM_READER_OR_DOMAIN_READER_OR_USER, + scope_types=['system', 'domain', 'project'], description='Show user details.', operations=[{'path': '/v3/users/{user_id}', 'method': 'GET'}, @@ -64,17 +68,8 @@ user_policies = [ deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_users', - check_str=base.SYSTEM_READER, - # FIXME(lbragstad): Since listing users has traditionally always been a - # system-level API call, let's maintain that pattern here. A system - # administrator should be able to list all users in the deployment, - # which is what's supported today. Project and domain administrators - # should also be able to list users, but they should only see users - # within their project or domain. Otherwise it would be possible for - # project and domain administrators to see users unrelated to their - # project or domain, which would be a security issue. Once we have that - # support in place, we should update scope_types to include 'project'. - scope_types=['system'], + check_str=SYSTEM_READER_OR_DOMAIN_READER, + scope_types=['system', 'domain'], description='List users.', operations=[{'path': '/v3/users', 'method': 'GET'}, diff --git a/keystone/tests/unit/protection/v3/test_users.py b/keystone/tests/unit/protection/v3/test_users.py index 79c811fd82..e2648676fd 100644 --- a/keystone/tests/unit/protection/v3/test_users.py +++ b/keystone/tests/unit/protection/v3/test_users.py @@ -132,6 +132,167 @@ class _SystemMemberAndReaderUserTests(object): ) +class _DomainReaderUserTests(object): + """Functionality for all domain readers.""" + + def test_user_can_get_user_within_domain(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + + with self.test_client() as c: + r = c.get('/v3/users/%s' % user['id'], headers=self.headers) + self.assertEqual(user['id'], r.json['user']['id']) + + def test_user_cannot_get_user_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + c.get( + '/v3/users/%s' % user['id'], headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_can_list_users_within_domain(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + + with self.test_client() as c: + r = c.get('/v3/users', headers=self.headers) + self.assertEqual(2, len(r.json['users'])) + user_ids = [] + for user in r.json['users']: + user_ids.append(user['id']) + self.assertIn(self.user_id, user_ids) + self.assertIn(user['id'], user_ids) + + def test_user_cannot_list_users_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + r = c.get('/v3/users', headers=self.headers) + user_ids = [] + for u in r.json['users']: + user_ids.append(u['id']) + self.assertNotIn(user['id'], user_ids) + + def test_user_cannot_create_users_within_domain(self): + create = { + 'user': { + 'domain_id': self.domain_id, + 'name': uuid.uuid4().hex + } + } + + with self.test_client() as c: + c.post( + '/v3/users', json=create, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_create_users_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + create = { + 'user': { + 'domain_id': domain['id'], + 'name': uuid.uuid4().hex + } + } + + with self.test_client() as c: + c.post( + '/v3/users', json=create, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_update_users_within_domain(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + + update = {'user': {'email': uuid.uuid4().hex}} + with self.test_client() as c: + c.patch( + '/v3/users/%s' % user['id'], json=update, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_update_users_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + update = {'user': {'email': uuid.uuid4().hex}} + with self.test_client() as c: + c.patch( + '/v3/users/%s' % user['id'], json=update, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_update_non_existent_user_forbidden(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + + update = {'user': {'email': uuid.uuid4().hex}} + with self.test_client() as c: + c.patch( + '/v3/users/%s' % user['id'], json=update, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_delete_users_within_domain(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=self.domain_id) + ) + + with self.test_client() as c: + c.delete( + '/v3/users/%s' % user['id'], headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_delete_users_in_other_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + c.delete( + '/v3/users/%s' % user['id'], headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_delete_non_existent_user_forbidden(self): + with self.test_client() as c: + c.delete( + '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + class SystemReaderTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, _CommonUserTests, @@ -272,3 +433,38 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap, '/v3/users/%s' % uuid.uuid4().hex, headers=self.headers, expected_status_code=http_client.NOT_FOUND ) + + +class DomainReaderTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _CommonUserTests, + _DomainReaderUserTests): + + def setUp(self): + super(DomainReaderTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + self.domain_id = domain['id'] + domain_reader = unit.new_user_ref(domain_id=self.domain_id) + self.user_id = PROVIDERS.identity_api.create_user(domain_reader)['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + domain_id=self.domain_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, password=domain_reader['password'], + domain_id=self.domain_id, + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} diff --git a/releasenotes/notes/bug-1805406-252b45d443af20b3.yaml b/releasenotes/notes/bug-1805406-252b45d443af20b3.yaml index 95b8e249ce..61131dd2a5 100644 --- a/releasenotes/notes/bug-1805406-252b45d443af20b3.yaml +++ b/releasenotes/notes/bug-1805406-252b45d443af20b3.yaml @@ -20,15 +20,17 @@ deprecations: - | [`bug 1805406 `_] The user policies have been deprecated. The ``identity:get_user`` now uses - ``(role:reader and system_scope:all) or user_id:%(target.user.id)s`` + ``(role:reader and system_scope:all) or (role:reader and + token.domain.id:%(target.user.domain_id)s) or user_id:%(target.user.id)s`` instead of ``rule:admin_or_owner``. The ``identity:list_users`` policy now - uses ``role:reader and system_scope:all`` instead of + uses ``(role:reader and system_scope:all) or (role:reader + and domain_id:%(target.domain_id)s)`` instead of ``rule:admin_required``. The ``identity:create_user``, ``identity:update_user``, and ``identity:delete_user`` policies now use ``role:admin and system_scope:all`` instead of ``rule:admin_required``. These new defaults automatically account - for system-scope and support a read-only role, making it easier - for system administrators to delegate subsets of responsibility + for system-scope, domain-scope, and support a read-only role, making it easier + for system and domain administrators to delegate subsets of responsibility without compromising security. Please consider these new defaults if your deployment overrides the user policies. security: