diff --git a/keystone/api/groups.py b/keystone/api/groups.py index fb1ab993a9..ecb7bf2ba9 100644 --- a/keystone/api/groups.py +++ b/keystone/api/groups.py @@ -12,6 +12,7 @@ # This file handles all flask-restful resources for /v3/groups +import flask import flask_restful import functools from six.moves import http_client @@ -30,6 +31,20 @@ ENFORCER = rbac_enforcer.RBACEnforcer PROVIDERS = provider_api.ProviderAPIs +def _build_group_target_enforcement(): + target = {} + try: + target['group'] = PROVIDERS.identity_api.get_group( + flask.request.view_args.get('group_id') + ) + except exception.NotFound: # nosec + # Defer existance in the event the group doesn't exist, we'll + # check this later anyway. + pass + + return target + + class GroupsResource(ks_flask.ResourceBase): collection_key = 'groups' member_key = 'group' @@ -46,7 +61,10 @@ class GroupsResource(ks_flask.ResourceBase): GET/HEAD /groups/{group_id} """ - ENFORCER.enforce_call(action='identity:get_group') + ENFORCER.enforce_call( + action='identity:get_group', + build_target=_build_group_target_enforcement + ) return self.wrap_member(PROVIDERS.identity_api.get_group(group_id)) def _list_groups(self): diff --git a/keystone/common/policies/group.py b/keystone/common/policies/group.py index d974abd412..105e3668c8 100644 --- a/keystone/common/policies/group.py +++ b/keystone/common/policies/group.py @@ -10,14 +10,47 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_log import versionutils from oslo_policy import policy from keystone.common.policies import base +SYSTEM_READER_OR_OWNER = ( + '(role:reader and system_scope:all) or user_id:%(user_id)s' +) + +DEPRECATED_REASON = """ +As of the Stein release, the group API understands how to handle system-scoped +tokens in addition to project and domain tokens, making the API more accessible +to users without compromising security or manageability for administrators. The +new default policies for this API account for these changes automatically. +""" + +deprecated_get_group = policy.DeprecatedRule( + name=base.IDENTITY % 'get_group', + check_str=base.RULE_ADMIN_REQUIRED +) +deprecated_list_groups = policy.DeprecatedRule( + name=base.IDENTITY % 'list_groups', + check_str=base.RULE_ADMIN_REQUIRED +) +deprecated_list_groups_for_user = policy.DeprecatedRule( + name=base.IDENTITY % 'list_groups_for_user', + check_str=base.RULE_ADMIN_OR_OWNER +) +deprecated_list_users_in_group = policy.DeprecatedRule( + name=base.IDENTITY % 'list_users_in_group', + check_str=base.RULE_ADMIN_REQUIRED +) +deprecated_check_user_in_group = policy.DeprecatedRule( + name=base.IDENTITY % 'check_user_in_group', + check_str=base.RULE_ADMIN_REQUIRED +) + group_policies = [ policy.DocumentedRuleDefault( name=base.IDENTITY % 'get_group', - check_str=base.RULE_ADMIN_REQUIRED, + check_str=base.SYSTEM_READER, # FIXME(lbragstad): Groups have traditionally been a resource managed # by system or cloud administrators. If, or when, keystone supports the # ability for groups to be created or managed by project @@ -29,25 +62,34 @@ group_policies = [ operations=[{'path': '/v3/groups/{group_id}', 'method': 'GET'}, {'path': '/v3/groups/{group_id}', - 'method': 'HEAD'}]), + 'method': 'HEAD'}], + deprecated_rule=deprecated_get_group, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_groups', - check_str=base.RULE_ADMIN_REQUIRED, + check_str=base.SYSTEM_READER, scope_types=['system'], description='List groups.', operations=[{'path': '/v3/groups', 'method': 'GET'}, {'path': '/v3/groups', - 'method': 'HEAD'}]), + 'method': 'HEAD'}], + deprecated_rule=deprecated_list_groups, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_groups_for_user', - check_str=base.RULE_ADMIN_OR_OWNER, - scope_types=['system'], + check_str=SYSTEM_READER_OR_OWNER, + scope_types=['system', 'project'], description='List groups to which a user belongs.', operations=[{'path': '/v3/users/{user_id}/groups', 'method': 'GET'}, {'path': '/v3/users/{user_id}/groups', - 'method': 'HEAD'}]), + 'method': 'HEAD'}], + deprecated_rule=deprecated_list_groups_for_user, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'create_group', check_str=base.RULE_ADMIN_REQUIRED, @@ -71,13 +113,16 @@ group_policies = [ 'method': 'DELETE'}]), policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_users_in_group', - check_str=base.RULE_ADMIN_REQUIRED, + check_str=base.SYSTEM_READER, scope_types=['system'], description='List members of a specific group.', operations=[{'path': '/v3/groups/{group_id}/users', 'method': 'GET'}, {'path': '/v3/groups/{group_id}/users', - 'method': 'HEAD'}]), + 'method': 'HEAD'}], + deprecated_rule=deprecated_list_users_in_group, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'remove_user_from_group', check_str=base.RULE_ADMIN_REQUIRED, @@ -87,13 +132,16 @@ group_policies = [ 'method': 'DELETE'}]), policy.DocumentedRuleDefault( name=base.IDENTITY % 'check_user_in_group', - check_str=base.RULE_ADMIN_REQUIRED, + check_str=base.SYSTEM_READER, scope_types=['system'], description='Check whether a user is a member of a group.', operations=[{'path': '/v3/groups/{group_id}/users/{user_id}', 'method': 'HEAD'}, {'path': '/v3/groups/{group_id}/users/{user_id}', - 'method': 'GET'}]), + 'method': 'GET'}], + deprecated_rule=deprecated_check_user_in_group, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.STEIN), policy.DocumentedRuleDefault( name=base.IDENTITY % 'add_user_to_group', check_str=base.RULE_ADMIN_REQUIRED, diff --git a/keystone/tests/unit/protection/v3/test_groups.py b/keystone/tests/unit/protection/v3/test_groups.py new file mode 100644 index 0000000000..7a9d91ef8d --- /dev/null +++ b/keystone/tests/unit/protection/v3/test_groups.py @@ -0,0 +1,392 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from six.moves import http_client + +from keystone.common import provider_api +import keystone.conf +from keystone.tests.common import auth as common_auth +from keystone.tests import unit +from keystone.tests.unit import base_classes +from keystone.tests.unit import ksfixtures + +CONF = keystone.conf.CONF +PROVIDERS = provider_api.ProviderAPIs + + +class _SystemUserGroupTests(object): + """Common default functionality for all system users.""" + + def test_user_can_list_groups(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + r = c.get('/v3/groups', headers=self.headers) + self.assertEqual(1, len(r.json['groups'])) + self.assertEqual(group['id'], r.json['groups'][0]['id']) + + def test_user_can_get_a_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + r = c.get('/v3/groups/%s' % group['id'], headers=self.headers) + self.assertEqual(group['id'], r.json['group']['id']) + + def test_user_can_list_group_members(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + + with self.test_client() as c: + r = c.get( + '/v3/groups/%s/users' % group['id'], headers=self.headers + ) + self.assertEqual(1, len(r.json['users'])) + self.assertEqual(user['id'], r.json['users'][0]['id']) + + def test_user_can_list_groups_for_other_users(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + + with self.test_client() as c: + r = c.get( + '/v3/users/%s/groups' % user['id'], headers=self.headers + ) + self.assertEqual(1, len(r.json['groups'])) + self.assertEqual(group['id'], r.json['groups'][0]['id']) + + def test_user_can_check_if_user_in_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + + with self.test_client() as c: + c.get( + '/v3/groups/%s/users/%s' % (group['id'], user['id']), + headers=self.headers, + expected_status_code=http_client.NO_CONTENT + ) + + def test_user_cannot_get_non_existent_group_not_found(self): + with self.test_client() as c: + c.get( + '/v3/groups/%s' % uuid.uuid4().hex, headers=self.headers, + expected_status_code=http_client.NOT_FOUND + ) + + +class _SystemMemberAndReaderGroupTests(object): + """Common default functionality for system readers and system members.""" + + def test_user_cannot_create_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + create = { + 'group': { + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'] + } + } + + with self.test_client() as c: + c.post( + '/v3/groups', json=create, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_update_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + + update = {'group': {'description': uuid.uuid4().hex}} + + with self.test_client() as c: + c.patch( + '/v3/groups/%s' % group['id'], json=update, + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_delete_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + c.delete( + '/v3/groups/%s' % group['id'], headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_add_users_to_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + c.put( + '/v3/groups/%s/users/%s' % (group['id'], user['id']), + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_remove_users_from_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + + with self.test_client() as c: + c.delete( + '/v3/groups/%s/users/%s' % (group['id'], user['id']), + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + +class SystemReaderTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _SystemUserGroupTests, + _SystemMemberAndReaderGroupTests): + + def setUp(self): + super(SystemReaderTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + system_reader = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id + ) + self.user_id = PROVIDERS.identity_api.create_user( + system_reader + )['id'] + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.bootstrapper.reader_role_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, password=system_reader['password'], + system=True + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + +class ProjectUserTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin): + + def setUp(self): + super(ProjectUserTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + self.domain_id = domain['id'] + + user = unit.new_user_ref(domain_id=self.domain_id) + self.user_id = PROVIDERS.identity_api.create_user(user)['id'] + + project = PROVIDERS.resource_api.create_project( + uuid.uuid4().hex, unit.new_project_ref(domain_id=self.domain_id) + ) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=self.user_id, + project_id=project['id'] + ) + + auth = self.build_authentication_request( + user_id=self.user_id, + password=user['password'], + project_id=project['id'] + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + def test_user_can_get_list_their_own_groups(self): + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=self.domain_id) + ) + + PROVIDERS.identity_api.add_user_to_group(self.user_id, group['id']) + + with self.test_client() as c: + r = c.get( + '/v3/users/%s/groups' % self.user_id, headers=self.headers + ) + self.assertEqual(1, len(r.json['groups'])) + self.assertEqual(group['id'], r.json['groups'][0]['id']) + + def test_user_cannot_list_groups_for_other_users(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + + with self.test_client() as c: + c.get( + '/v3/users/%s/groups' % user['id'], headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_list_groups(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + c.get( + '/v3/groups', headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_get_a_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + + with self.test_client() as c: + c.get( + '/v3/groups/%s' % group['id'], headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_list_group_members(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + + with self.test_client() as c: + c.get( + '/v3/groups/%s/users' % group['id'], headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_check_if_user_in_group(self): + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=domain['id']) + ) + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=domain['id']) + ) + + PROVIDERS.identity_api.add_user_to_group(user['id'], group['id']) + + with self.test_client() as c: + c.get( + '/v3/groups/%s/users/%s' % (group['id'], user['id']), + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_get_non_existent_group_forbidden(self): + with self.test_client() as c: + c.get( + '/v3/groups/%s' % uuid.uuid4().hex, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + )