diff --git a/keystone/api/domains.py b/keystone/api/domains.py index 38509b8559..d55a6b3c53 100644 --- a/keystone/api/domains.py +++ b/keystone/api/domains.py @@ -31,6 +31,20 @@ ENFORCER = rbac_enforcer.RBACEnforcer PROVIDERS = provider_api.ProviderAPIs +def _build_domain_enforcement_target(): + target = {} + try: + target['domain'] = PROVIDERS.resource_api.get_domain( + flask.request.view_args.get('domain_id') + ) + except exception.NotFound: # nosec + # Defer existence in the event the domain doesn't exist, we'll + # check this later anyway. + pass + + return target + + def _build_enforcement_target(allow_non_existing=False): target = {} if flask.request.view_args: @@ -76,8 +90,12 @@ class DomainResource(ks_flask.ResourceBase): return self._list_domains() def _get_domain(self, domain_id): - ENFORCER.enforce_call(action='identity:get_domain') - return self.wrap_member(PROVIDERS.resource_api.get_domain(domain_id)) + ENFORCER.enforce_call( + action='identity:get_domain', + build_target=_build_domain_enforcement_target + ) + domain = PROVIDERS.resource_api.get_domain(domain_id) + return self.wrap_member(domain) def _list_domains(self): filters = ['name', 'enabled'] diff --git a/keystone/common/policies/domain.py b/keystone/common/policies/domain.py index 87f4654d8e..2a8238ed12 100644 --- a/keystone/common/policies/domain.py +++ b/keystone/common/policies/domain.py @@ -47,15 +47,14 @@ deprecated_delete_domain = policy.DeprecatedRule( domain_policies = [ policy.DocumentedRuleDefault( name=base.IDENTITY % 'get_domain', + # NOTE(lbragstad): This policy allows system, domain, and + # project-scoped tokens. check_str=( '(role:reader and system_scope:all) or ' + 'token.domain.id:%(target.domain.id)s or ' 'token.project.domain.id:%(target.domain.id)s' ), - # NOTE(lbragstad): This policy allows system-scope and project-scoped - # tokens because it should be possible for users who have a token - # scoped to a project within a domain to list the domain itself, at - # least according to the legacy policy. - scope_types=['system', 'project'], + scope_types=['system', 'domain', 'project'], description='Show domain details.', operations=[{'path': '/v3/domains/{domain_id}', 'method': 'GET'}], diff --git a/keystone/tests/unit/protection/v3/test_domains.py b/keystone/tests/unit/protection/v3/test_domains.py index 9d5f775ca5..43136a6c74 100644 --- a/keystone/tests/unit/protection/v3/test_domains.py +++ b/keystone/tests/unit/protection/v3/test_domains.py @@ -125,6 +125,107 @@ class _SystemMemberAndReaderDomainTests(object): ) +class _DomainAndProjectUserDomainTests(object): + + def test_user_can_get_a_domain(self): + with self.test_client() as c: + r = c.get('/v3/domains/%s' % self.domain_id, headers=self.headers) + self.assertEqual(self.domain_id, r.json['domain']['id']) + + def test_user_cannot_get_a_domain_they_are_not_authorized_to_access(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + with self.test_client() as c: + c.get( + '/v3/domains/%s' % domain['id'], headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_list_domains(self): + with self.test_client() as c: + c.get( + '/v3/domains', headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_filter_domains_by_name(self): + domain_name = uuid.uuid4().hex + domain = unit.new_domain_ref(name=domain_name) + domain = PROVIDERS.resource_api.create_domain(domain['id'], domain) + + PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + with self.test_client() as c: + c.get( + '/v3/domains?name=%s' % domain_name, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_filter_domains_by_enabled(self): + with self.test_client() as c: + c.get( + '/v3/domains?enabled=true', headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + c.get( + '/v3/domains?enabled=false', headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_update_a_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + update = {'domain': {'description': uuid.uuid4().hex}} + with self.test_client() as c: + c.patch( + '/v3/domains/%s' % domain['id'], json=update, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_create_a_domain(self): + create = {'domain': {'name': uuid.uuid4().hex}} + + with self.test_client() as c: + c.post( + '/v3/domains', json=create, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_delete_a_domain(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + with self.test_client() as c: + update = {'domain': {'enabled': False}} + path = '/v3/domains/%s' % domain['id'] + c.patch( + path, json=update, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + c.delete( + path, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_get_non_existant_domain_forbidden(self): + + with self.test_client() as c: + c.get( + '/v3/domains/%s' % uuid.uuid4().hex, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + class SystemReaderTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, _SystemUserDomainTests, @@ -247,3 +348,39 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap, path = '/v3/domains/%s' % domain['id'] c.patch(path, json=update, headers=self.headers) c.delete(path, headers=self.headers) + + +class DomainUserTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _DomainAndProjectUserDomainTests): + + def setUp(self): + super(DomainUserTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + self.domain_id = domain['id'] + domain_user = unit.new_user_ref(domain_id=self.domain_id) + self.domain_user_id = PROVIDERS.identity_api.create_user( + domain_user + )['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=self.domain_user_id, + domain_id=self.domain_id + ) + + auth = self.build_authentication_request( + user_id=self.domain_user_id, password=domain_user['password'], + domain_id=self.domain_id + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id}