Implement system reader role for groups

This commit introduces the system reader role to the group API, making
it easier for administrators to delegate subsets of responsibilities
to the API by default. This commit also maintains the ability for
any user to be able to fetch their own group memberships, which
encapsulates a bunch of tests for what regular project users can do
with groups.

Subsequent patches will incorporate:

  - system member test coverage
  - system admin functionality
  - domain reader functionality
  - domain member test coverage
  - domain admin functionality

Change-Id: I24ff27da79bb01322e05c6d8cd37f02693fd5b9f
Related-Bug: 1805369
Related-Bug: 1808859
Related-Bug: 968696
This commit is contained in:
Lance Bragstad 2018-12-17 22:40:04 +00:00
parent f4162e3680
commit feb0d58df4
3 changed files with 470 additions and 12 deletions

View File

@ -12,6 +12,7 @@
# This file handles all flask-restful resources for /v3/groups
import flask
import flask_restful
import functools
from six.moves import http_client
@ -30,6 +31,20 @@ ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs
def _build_group_target_enforcement():
target = {}
try:
target['group'] = PROVIDERS.identity_api.get_group(
flask.request.view_args.get('group_id')
)
except exception.NotFound: # nosec
# Defer existance in the event the group doesn't exist, we'll
# check this later anyway.
pass
return target
class GroupsResource(ks_flask.ResourceBase):
collection_key = 'groups'
member_key = 'group'
@ -46,7 +61,10 @@ class GroupsResource(ks_flask.ResourceBase):
GET/HEAD /groups/{group_id}
"""
ENFORCER.enforce_call(action='identity:get_group')
ENFORCER.enforce_call(
action='identity:get_group',
build_target=_build_group_target_enforcement
)
return self.wrap_member(PROVIDERS.identity_api.get_group(group_id))
def _list_groups(self):

View File

@ -10,14 +10,47 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import versionutils
from oslo_policy import policy
from keystone.common.policies import base
SYSTEM_READER_OR_OWNER = (
'(role:reader and system_scope:all) or user_id:%(user_id)s'
)
DEPRECATED_REASON = """
As of the Stein release, the group API understands how to handle system-scoped
tokens in addition to project and domain tokens, making the API more accessible
to users without compromising security or manageability for administrators. The
new default policies for this API account for these changes automatically.
"""
deprecated_get_group = policy.DeprecatedRule(
name=base.IDENTITY % 'get_group',
check_str=base.RULE_ADMIN_REQUIRED
)
deprecated_list_groups = policy.DeprecatedRule(
name=base.IDENTITY % 'list_groups',
check_str=base.RULE_ADMIN_REQUIRED
)
deprecated_list_groups_for_user = policy.DeprecatedRule(
name=base.IDENTITY % 'list_groups_for_user',
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_list_users_in_group = policy.DeprecatedRule(
name=base.IDENTITY % 'list_users_in_group',
check_str=base.RULE_ADMIN_REQUIRED
)
deprecated_check_user_in_group = policy.DeprecatedRule(
name=base.IDENTITY % 'check_user_in_group',
check_str=base.RULE_ADMIN_REQUIRED
)
group_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'get_group',
check_str=base.RULE_ADMIN_REQUIRED,
check_str=base.SYSTEM_READER,
# FIXME(lbragstad): Groups have traditionally been a resource managed
# by system or cloud administrators. If, or when, keystone supports the
# ability for groups to be created or managed by project
@ -29,25 +62,34 @@ group_policies = [
operations=[{'path': '/v3/groups/{group_id}',
'method': 'GET'},
{'path': '/v3/groups/{group_id}',
'method': 'HEAD'}]),
'method': 'HEAD'}],
deprecated_rule=deprecated_get_group,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_groups',
check_str=base.RULE_ADMIN_REQUIRED,
check_str=base.SYSTEM_READER,
scope_types=['system'],
description='List groups.',
operations=[{'path': '/v3/groups',
'method': 'GET'},
{'path': '/v3/groups',
'method': 'HEAD'}]),
'method': 'HEAD'}],
deprecated_rule=deprecated_list_groups,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_groups_for_user',
check_str=base.RULE_ADMIN_OR_OWNER,
scope_types=['system'],
check_str=SYSTEM_READER_OR_OWNER,
scope_types=['system', 'project'],
description='List groups to which a user belongs.',
operations=[{'path': '/v3/users/{user_id}/groups',
'method': 'GET'},
{'path': '/v3/users/{user_id}/groups',
'method': 'HEAD'}]),
'method': 'HEAD'}],
deprecated_rule=deprecated_list_groups_for_user,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'create_group',
check_str=base.RULE_ADMIN_REQUIRED,
@ -71,13 +113,16 @@ group_policies = [
'method': 'DELETE'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_users_in_group',
check_str=base.RULE_ADMIN_REQUIRED,
check_str=base.SYSTEM_READER,
scope_types=['system'],
description='List members of a specific group.',
operations=[{'path': '/v3/groups/{group_id}/users',
'method': 'GET'},
{'path': '/v3/groups/{group_id}/users',
'method': 'HEAD'}]),
'method': 'HEAD'}],
deprecated_rule=deprecated_list_users_in_group,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'remove_user_from_group',
check_str=base.RULE_ADMIN_REQUIRED,
@ -87,13 +132,16 @@ group_policies = [
'method': 'DELETE'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'check_user_in_group',
check_str=base.RULE_ADMIN_REQUIRED,
check_str=base.SYSTEM_READER,
scope_types=['system'],
description='Check whether a user is a member of a group.',
operations=[{'path': '/v3/groups/{group_id}/users/{user_id}',
'method': 'HEAD'},
{'path': '/v3/groups/{group_id}/users/{user_id}',
'method': 'GET'}]),
'method': 'GET'}],
deprecated_rule=deprecated_check_user_in_group,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'add_user_to_group',
check_str=base.RULE_ADMIN_REQUIRED,

View File

@ -0,0 +1,392 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from six.moves import http_client
from keystone.common import provider_api
import keystone.conf
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import base_classes
from keystone.tests.unit import ksfixtures
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class _SystemUserGroupTests(object):
"""Common default functionality for all system users."""
def test_user_can_list_groups(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
with self.test_client() as c:
r = c.get('/v3/groups', headers=self.headers)
self.assertEqual(1, len(r.json['groups']))
self.assertEqual(group['id'], r.json['groups'][0]['id'])
def test_user_can_get_a_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
with self.test_client() as c:
r = c.get('/v3/groups/%s' % group['id'], headers=self.headers)
self.assertEqual(group['id'], r.json['group']['id'])
def test_user_can_list_group_members(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
r = c.get(
'/v3/groups/%s/users' % group['id'], headers=self.headers
)
self.assertEqual(1, len(r.json['users']))
self.assertEqual(user['id'], r.json['users'][0]['id'])
def test_user_can_list_groups_for_other_users(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
r = c.get(
'/v3/users/%s/groups' % user['id'], headers=self.headers
)
self.assertEqual(1, len(r.json['groups']))
self.assertEqual(group['id'], r.json['groups'][0]['id'])
def test_user_can_check_if_user_in_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.get(
'/v3/groups/%s/users/%s' % (group['id'], user['id']),
headers=self.headers,
expected_status_code=http_client.NO_CONTENT
)
def test_user_cannot_get_non_existent_group_not_found(self):
with self.test_client() as c:
c.get(
'/v3/groups/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.NOT_FOUND
)
class _SystemMemberAndReaderGroupTests(object):
"""Common default functionality for system readers and system members."""
def test_user_cannot_create_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
create = {
'group': {
'name': uuid.uuid4().hex,
'domain_id': domain['id']
}
}
with self.test_client() as c:
c.post(
'/v3/groups', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
update = {'group': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/groups/%s' % group['id'], json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
with self.test_client() as c:
c.delete(
'/v3/groups/%s' % group['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_add_users_to_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
with self.test_client() as c:
c.put(
'/v3/groups/%s/users/%s' % (group['id'], user['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_remove_users_from_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.delete(
'/v3/groups/%s/users/%s' % (group['id'], user['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserGroupTests,
_SystemMemberAndReaderGroupTests):
def setUp(self):
super(SystemReaderTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
system_reader = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
system_reader
)['id']
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.bootstrapper.reader_role_id
)
auth = self.build_authentication_request(
user_id=self.user_id, password=system_reader['password'],
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
class ProjectUserTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin):
def setUp(self):
super(ProjectUserTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
user = unit.new_user_ref(domain_id=self.domain_id)
self.user_id = PROVIDERS.identity_api.create_user(user)['id']
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex, unit.new_project_ref(domain_id=self.domain_id)
)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=self.user_id,
project_id=project['id']
)
auth = self.build_authentication_request(
user_id=self.user_id,
password=user['password'],
project_id=project['id']
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def test_user_can_get_list_their_own_groups(self):
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=self.domain_id)
)
PROVIDERS.identity_api.add_user_to_group(self.user_id, group['id'])
with self.test_client() as c:
r = c.get(
'/v3/users/%s/groups' % self.user_id, headers=self.headers
)
self.assertEqual(1, len(r.json['groups']))
self.assertEqual(group['id'], r.json['groups'][0]['id'])
def test_user_cannot_list_groups_for_other_users(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.get(
'/v3/users/%s/groups' % user['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_groups(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
with self.test_client() as c:
c.get(
'/v3/groups', headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_get_a_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
with self.test_client() as c:
c.get(
'/v3/groups/%s' % group['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_group_members(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.get(
'/v3/groups/%s/users' % group['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_check_if_user_in_group(self):
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=domain['id'])
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=domain['id'])
)
PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
with self.test_client() as c:
c.get(
'/v3/groups/%s/users/%s' % (group['id'], user['id']),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_get_non_existent_group_forbidden(self):
with self.test_client() as c:
c.get(
'/v3/groups/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)