Allow domain users to access the limit API

This commit adds domain-scope to the scope_types for limit policies,
allowing domain users to access those APIs when enforce_scope is
enabled. This commit also introduces some tests that explicitly show
how domain users are expected to behave with the limits API. A
subsequent patch will do the same for project users.

This commit also modifies the GET /v3/limit policy to allow project
users to filter responses by project_id, which isn't entirely useful
outside of just calling the API with a project-scoped token.

Change-Id: I9b38f3fd2f83efd508b2d9a6c323bbaa7169d4cd
Related-Bug: 1805880
Partial-Bug: 1818736
This commit is contained in:
Lance Bragstad 2018-11-29 21:06:09 +00:00 committed by Colleen Murphy
parent e992b79fa3
commit f249c9e2b0
4 changed files with 462 additions and 64 deletions

View File

@ -20,6 +20,7 @@ from keystone.common import json_home
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone.common import validation
from keystone import exception
from keystone.limit import schema
from keystone.server import flask as ks_flask
@ -28,6 +29,27 @@ PROVIDERS = provider_api.ProviderAPIs
ENFORCER = rbac_enforcer.RBACEnforcer
def _build_limit_enforcement_target():
target = {}
try:
limit = PROVIDERS.unified_limit_api.get_limit(
flask.request.view_args.get('limit_id')
)
target['limit'] = limit
if limit.get('project_id'):
project = PROVIDERS.resource_api.get_project(limit['project_id'])
target['limit']['project'] = project
elif limit.get('domain_id'):
domain = PROVIDERS.resource_api.get_domain(limit['domain_id'])
target['limit']['domain'] = domain
except exception.NotFound: # nosec
# Defer the existence check in the event the limit doesn't exist, this
# is checked later anyway.
pass
return target
class LimitsResource(ks_flask.ResourceBase):
collection_key = 'limits'
member_key = 'limit'
@ -38,27 +60,38 @@ class LimitsResource(ks_flask.ResourceBase):
def _list_limits(self):
filters = ['service_id', 'region_id', 'resource_name', 'project_id',
'domain_id']
ENFORCER.enforce_call(action='identity:list_limits', filters=filters)
hints = self.build_driver_hints(filters)
project_id_filter = hints.get_exact_filter_by_name('project_id')
domain_id_filter = hints.get_exact_filter_by_name('domain_id')
if project_id_filter or domain_id_filter:
if self.oslo_context.system_scope:
refs = PROVIDERS.unified_limit_api.list_limits(hints)
else:
refs = []
else:
project_id = self.oslo_context.project_id
domain_id = self.oslo_context.domain_id
if project_id:
hints.add_filter('project_id', project_id)
elif domain_id:
hints.add_filter('domain_id', domain_id)
filtered_refs = []
if self.oslo_context.system_scope:
refs = PROVIDERS.unified_limit_api.list_limits(hints)
return self.wrap_collection(refs, hints=hints)
filtered_refs = refs
elif self.oslo_context.domain_id:
refs = PROVIDERS.unified_limit_api.list_limits(hints)
projects = PROVIDERS.resource_api.list_projects_in_domain(
self.oslo_context.domain_id
)
project_ids = [project['id'] for project in projects]
for limit in refs:
if limit.get('project_id'):
if limit['project_id'] in project_ids:
filtered_refs.append(limit)
elif limit.get('domain_id'):
if limit['domain_id'] == self.oslo_context.domain_id:
filtered_refs.append(limit)
elif self.oslo_context.project_id:
hints.add_filter('project_id', self.oslo_context.project_id)
refs = PROVIDERS.unified_limit_api.list_limits(hints)
filtered_refs = refs
return self.wrap_collection(filtered_refs, hints=hints)
def _get_limit(self, limit_id):
ENFORCER.enforce_call(action='identity:get_limit')
ENFORCER.enforce_call(action='identity:get_limit',
build_target=_build_limit_enforcement_target)
ref = PROVIDERS.unified_limit_api.get_limit(limit_id)
return self.wrap_member(ref)

View File

@ -14,11 +14,23 @@ from oslo_policy import policy
from keystone.common.policies import base
SYSTEM_OR_DOMAIN_OR_PROJECT_USER = (
'(' + base.SYSTEM_READER + ') or '
'('
'domain_id:%(target.limit.domain.id)s or '
'domain_id:%(target.limit.project.domain_id)s'
') or '
'('
'project_id:%(target.limit.project_id)s and not '
'None:%(target.limit.project_id)s'
')'
)
limit_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'get_limit_model',
check_str='',
scope_types=['system', 'project'],
scope_types=['system', 'domain', 'project'],
description='Get limit enforcement model.',
operations=[{'path': '/v3/limits/model',
'method': 'GET'},
@ -26,10 +38,8 @@ limit_policies = [
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'get_limit',
check_str='(role:reader and system_scope:all) or '
'project_id:%(target.limit.project_id)s or '
'domain_id:%(target.limit.domain_id)s',
scope_types=['system', 'project', 'domain'],
check_str=SYSTEM_OR_DOMAIN_OR_PROJECT_USER,
scope_types=['system', 'domain', 'project'],
description='Show limit details.',
operations=[{'path': '/v3/limits/{limit_id}',
'method': 'GET'},
@ -38,7 +48,7 @@ limit_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_limits',
check_str='',
scope_types=['system', 'project'],
scope_types=['system', 'domain', 'project'],
description='List limits.',
operations=[{'path': '/v3/limits',
'method': 'GET'},

View File

@ -25,8 +25,11 @@ CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
def _create_limit_and_dependencies():
"""Create a limit and its dependencies to test with."""
def _create_limits_and_dependencies(domain_id=None):
"""Create limits and its dependencies for testing."""
if not domain_id:
domain_id = CONF.identity.default_domain_id
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
@ -41,18 +44,34 @@ def _create_limit_and_dependencies():
)
registered_limit = registered_limits[0]
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
domain_limit = unit.new_limit_ref(
domain_id=domain_id, service_id=service['id'],
resource_name=registered_limit['resource_name'],
resource_limit=10, id=uuid.uuid4().hex
)
limit = unit.new_limit_ref(
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex, unit.new_project_ref(domain_id=domain_id)
)
project_limit = unit.new_limit_ref(
project_id=project['id'], service_id=service['id'],
resource_name=registered_limit['resource_name'],
resource_limit=5, id=uuid.uuid4().hex
)
limits = PROVIDERS.unified_limit_api.create_limits([limit])
return limits
limits = PROVIDERS.unified_limit_api.create_limits(
[domain_limit, project_limit]
)
project_limit_id = None
domain_limit_id = None
for limit in limits:
if limit.get('domain_id'):
domain_limit_id = limit['id']
else:
project_limit_id = limit['id']
return (project_limit_id, domain_limit_id)
class _UserLimitTests(object):
@ -63,21 +82,24 @@ class _UserLimitTests(object):
c.get('/v3/limits/model', headers=self.headers)
def test_user_can_get_a_limit(self):
limits = _create_limit_and_dependencies()
limit = limits[0]
limit_id, _ = _create_limits_and_dependencies()
with self.test_client() as c:
r = c.get('/v3/limits/%s' % limit['id'], headers=self.headers)
self.assertEqual(limit['id'], r.json['limit']['id'])
r = c.get('/v3/limits/%s' % limit_id, headers=self.headers)
self.assertEqual(limit_id, r.json['limit']['id'])
def test_user_can_list_limits(self):
limits = _create_limit_and_dependencies()
limit = limits[0]
project_limit_id, domain_limit_id = _create_limits_and_dependencies()
with self.test_client() as c:
r = c.get('/v3/limits', headers=self.headers)
self.assertTrue(len(r.json['limits']) == 1)
self.assertEqual(limit['id'], r.json['limits'][0]['id'])
self.assertTrue(len(r.json['limits']) == 2)
result = []
for limit in r.json['limits']:
result.append(limit['id'])
self.assertIn(project_limit_id, result)
self.assertIn(domain_limit_id, result)
def test_user_cannot_create_limits(self):
service = PROVIDERS.catalog_api.create_service(
@ -116,25 +138,23 @@ class _UserLimitTests(object):
)
def test_user_cannot_update_limits(self):
limits = _create_limit_and_dependencies()
limit = limits[0]
limit_id, _ = _create_limits_and_dependencies()
update = {'limits': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/limits/%s' % limit['id'], json=update,
'/v3/limits/%s' % limit_id, json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_limits(self):
limits = _create_limit_and_dependencies()
limit = limits[0]
limit_id, _ = _create_limits_and_dependencies()
with self.test_client() as c:
c.delete(
'/v3/limits/%s' % limit['id'],
'/v3/limits/%s' % limit_id,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
@ -231,21 +251,24 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
self.headers = {'X-Auth-Token': self.token_id}
def test_user_can_get_a_limit(self):
limits = _create_limit_and_dependencies()
limit = limits[0]
limit_id, _ = _create_limits_and_dependencies()
with self.test_client() as c:
r = c.get('/v3/limits/%s' % limit['id'], headers=self.headers)
self.assertEqual(limit['id'], r.json['limit']['id'])
r = c.get('/v3/limits/%s' % limit_id, headers=self.headers)
self.assertEqual(limit_id, r.json['limit']['id'])
def test_user_can_list_limits(self):
limits = _create_limit_and_dependencies()
limit = limits[0]
project_limit_id, domain_limit_id = _create_limits_and_dependencies()
with self.test_client() as c:
r = c.get('/v3/limits', headers=self.headers)
self.assertTrue(len(r.json['limits']) == 1)
self.assertEqual(limit['id'], r.json['limits'][0]['id'])
self.assertTrue(len(r.json['limits']) == 2)
result = []
for limit in r.json['limits']:
result.append(limit['id'])
self.assertIn(project_limit_id, result)
self.assertIn(domain_limit_id, result)
def test_user_can_create_limits(self):
service = PROVIDERS.catalog_api.create_service(
@ -281,20 +304,335 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
c.post('/v3/limits', json=create, headers=self.headers)
def test_user_can_update_limits(self):
limits = _create_limit_and_dependencies()
limit = limits[0]
limit_id, _ = _create_limits_and_dependencies()
update = {'limits': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/limits/%s' % limit['id'], json=update,
'/v3/limits/%s' % limit_id, json=update,
headers=self.headers
)
def test_user_can_delete_limits(self):
limits = _create_limit_and_dependencies()
limit = limits[0]
limit_id, _ = _create_limits_and_dependencies()
with self.test_client() as c:
c.delete('/v3/limits/%s' % limit['id'], headers=self.headers)
c.delete('/v3/limits/%s' % limit_id, headers=self.headers)
class DomainUserTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin):
def setUp(self):
super(DomainUserTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
domain_admin = unit.new_user_ref(domain_id=self.domain_id)
self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.admin_role_id, user_id=self.user_id,
domain_id=self.domain_id
)
auth = self.build_authentication_request(
user_id=self.user_id,
password=domain_admin['password'],
domain_id=self.domain_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def test_user_can_get_project_limits_within_domain(self):
project_limit_id, _ = _create_limits_and_dependencies(
domain_id=self.domain_id
)
with self.test_client() as c:
c.get('/v3/limits/%s' % project_limit_id, headers=self.headers)
def test_user_can_get_domain_limits(self):
_, domain_limit_id = _create_limits_and_dependencies(
domain_id=self.domain_id
)
with self.test_client() as c:
r = c.get('/v3/limits/%s' % domain_limit_id, headers=self.headers)
self.assertEqual(self.domain_id, r.json['limit']['domain_id'])
def test_user_cannot_get_project_limit_outside_domain(self):
project_limit_id, _ = _create_limits_and_dependencies()
with self.test_client() as c:
c.get(
'/v3/limits/%s' % project_limit_id, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_get_domain_limits_for_other_domain(self):
_, domain_limit_id = _create_limits_and_dependencies()
with self.test_client() as c:
c.get(
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_can_list_limits_within_domain(self):
project_limit_id, domain_limit_id = _create_limits_and_dependencies(
domain_id=self.domain_id
)
with self.test_client() as c:
r = c.get('/v3/limits', headers=self.headers)
result = []
for limit in r.json['limits']:
result.append(limit['id'])
self.assertEqual(2, len(r.json['limits']))
self.assertIn(project_limit_id, result)
self.assertIn(domain_limit_id, result)
def test_user_cannot_list_limits_outside_domain(self):
_create_limits_and_dependencies()
with self.test_client() as c:
r = c.get('/v3/limits', headers=self.headers)
self.assertEqual(0, len(r.json['limits']))
def test_user_cannot_create_limits_for_domain(self):
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
registered_limit = unit.new_registered_limit_ref(
service_id=service['id'], id=uuid.uuid4().hex
)
registered_limits = (
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit]
)
)
registered_limit = registered_limits[0]
create = {
'limits': [
unit.new_limit_ref(
domain_id=self.domain_id, service_id=service['id'],
resource_name=registered_limit['resource_name'],
resource_limit=5
)
]
}
with self.test_client() as c:
c.post(
'/v3/limits', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_create_limits_for_other_domain(self):
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
registered_limit = unit.new_registered_limit_ref(
service_id=service['id'], id=uuid.uuid4().hex
)
registered_limits = (
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit]
)
)
registered_limit = registered_limits[0]
create = {
'limits': [
unit.new_limit_ref(
domain_id=CONF.identity.default_domain_id,
service_id=service['id'],
resource_name=registered_limit['resource_name'],
resource_limit=5
)
]
}
with self.test_client() as c:
c.post(
'/v3/limits', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_create_limits_for_projects_in_domain(self):
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
registered_limit = unit.new_registered_limit_ref(
service_id=service['id'], id=uuid.uuid4().hex
)
registered_limits = (
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit]
)
)
registered_limit = registered_limits[0]
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex, unit.new_project_ref(domain_id=self.domain_id)
)
create = {
'limits': [
unit.new_limit_ref(
project_id=project['id'],
service_id=service['id'],
resource_name=registered_limit['resource_name'],
resource_limit=5
)
]
}
with self.test_client() as c:
c.post(
'/v3/limits', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_create_limits_for_projects_outside_domain(self):
service = PROVIDERS.catalog_api.create_service(
uuid.uuid4().hex, unit.new_service_ref()
)
registered_limit = unit.new_registered_limit_ref(
service_id=service['id'], id=uuid.uuid4().hex
)
registered_limits = (
PROVIDERS.unified_limit_api.create_registered_limits(
[registered_limit]
)
)
registered_limit = registered_limits[0]
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
create = {
'limits': [
unit.new_limit_ref(
project_id=project['id'],
service_id=service['id'],
resource_name=registered_limit['resource_name'],
resource_limit=5
)
]
}
with self.test_client() as c:
c.post(
'/v3/limits', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_limits_for_domain(self):
_, domain_limit_id = _create_limits_and_dependencies(
domain_id=self.domain_id
)
update = {'limit': {'resource_limit': 1}}
with self.test_client() as c:
c.patch(
'/v3/limits/%s' % domain_limit_id, json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_limits_for_other_domain(self):
_, domain_limit_id = _create_limits_and_dependencies()
update = {'limit': {'resource_limit': 1}}
with self.test_client() as c:
c.patch(
'/v3/limits/%s' % domain_limit_id, json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_limits_for_projects_in_domain(self):
project_limit_id, _ = _create_limits_and_dependencies(
domain_id=self.domain_id
)
update = {'limit': {'resource_limit': 1}}
with self.test_client() as c:
c.patch(
'/v3/limits/%s' % project_limit_id, headers=self.headers,
json=update, expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_limits_for_projects_outside_domain(self):
project_limit_id, _ = _create_limits_and_dependencies()
update = {'limit': {'resource_limit': 1}}
with self.test_client() as c:
c.patch(
'/v3/limits/%s' % project_limit_id, headers=self.headers,
json=update, expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_limits_for_domain(self):
_, domain_limit_id = _create_limits_and_dependencies(
domain_id=self.domain_id
)
with self.test_client() as c:
c.delete(
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_limits_for_other_domain(self):
_, domain_limit_id = _create_limits_and_dependencies()
with self.test_client() as c:
c.delete(
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_limits_for_projects_in_domain(self):
project_limit_id, _ = _create_limits_and_dependencies(
domain_id=self.domain_id
)
with self.test_client() as c:
c.delete(
'/v3/limits/%s' % project_limit_id, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_limits_for_projects_outside_domain(self):
project_limit_id, _ = _create_limits_and_dependencies()
with self.test_client() as c:
c.delete(
'/v3/limits/%s' % project_limit_id, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)

View File

@ -560,6 +560,22 @@ class LimitsTestCase(test_v3.RestfulTestCase):
def setUp(self):
super(LimitsTestCase, self).setUp()
# FIXME(lbragstad): Remove all this duplicated logic once we get all
# keystone tests using bootstrap consistently. This is something the
# bootstrap utility already does for us.
reader_role = {'id': uuid.uuid4().hex, 'name': 'reader'}
reader_role = PROVIDERS.role_api.create_role(
reader_role['id'], reader_role
)
member_role = {'id': uuid.uuid4().hex, 'name': 'member'}
member_role = PROVIDERS.role_api.create_role(
member_role['id'], member_role
)
PROVIDERS.role_api.create_implied_role(self.role_id, member_role['id'])
PROVIDERS.role_api.create_implied_role(
member_role['id'], reader_role['id']
)
# Most of these tests require system-scoped tokens. Let's have one on
# hand so that we can use it in tests when we need it.
@ -927,6 +943,8 @@ class LimitsTestCase(test_v3.RestfulTestCase):
def test_list_limit_with_project_id_filter(self):
# create two limit in different projects for test.
self.config_fixture.config(group='oslo_policy',
enforce_scope=True)
ref1 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id,
region_id=self.region_id,
@ -955,21 +973,19 @@ class LimitsTestCase(test_v3.RestfulTestCase):
self.assertEqual(1, len(limits))
self.assertEqual(self.project_2_id, limits[0]['project_id'])
# if non system scoped request contain project_id filter, keystone
# will return an empty list.
# any project user can filter by their own project
r = self.get(
'/limits?project_id=%s' % self.project_id,
expected_status=http_client.OK)
limits = r.result['limits']
self.assertEqual(0, len(limits))
self.assertEqual(1, len(limits))
self.assertEqual(self.project_id, limits[0]['project_id'])
# a system scoped request can specify the project_id filter
r = self.get(
'/limits?project_id=%s' % self.project_id,
expected_status=http_client.OK,
auth=self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'],
system=True)
token=self.system_admin_token
)
limits = r.result['limits']
self.assertEqual(1, len(limits))
@ -1046,6 +1062,7 @@ class LimitsTestCase(test_v3.RestfulTestCase):
else:
id1 = r.result['limits'][1]['id']
self.get('/limits/fake_id',
token=self.system_admin_token,
expected_status=http_client.NOT_FOUND)
r = self.get('/limits/%s' % id1,
expected_status=http_client.OK)