Allow domain users to access the limit API
This commit adds domain-scope to the scope_types for limit policies, allowing domain users to access those APIs when enforce_scope is enabled. This commit also introduces some tests that explicitly show how domain users are expected to behave with the limits API. A subsequent patch will do the same for project users. This commit also modifies the GET /v3/limit policy to allow project users to filter responses by project_id, which isn't entirely useful outside of just calling the API with a project-scoped token. Change-Id: I9b38f3fd2f83efd508b2d9a6c323bbaa7169d4cd Related-Bug: 1805880 Partial-Bug: 1818736
This commit is contained in:
parent
e992b79fa3
commit
f249c9e2b0
|
@ -20,6 +20,7 @@ from keystone.common import json_home
|
|||
from keystone.common import provider_api
|
||||
from keystone.common import rbac_enforcer
|
||||
from keystone.common import validation
|
||||
from keystone import exception
|
||||
from keystone.limit import schema
|
||||
from keystone.server import flask as ks_flask
|
||||
|
||||
|
@ -28,6 +29,27 @@ PROVIDERS = provider_api.ProviderAPIs
|
|||
ENFORCER = rbac_enforcer.RBACEnforcer
|
||||
|
||||
|
||||
def _build_limit_enforcement_target():
|
||||
target = {}
|
||||
try:
|
||||
limit = PROVIDERS.unified_limit_api.get_limit(
|
||||
flask.request.view_args.get('limit_id')
|
||||
)
|
||||
target['limit'] = limit
|
||||
if limit.get('project_id'):
|
||||
project = PROVIDERS.resource_api.get_project(limit['project_id'])
|
||||
target['limit']['project'] = project
|
||||
elif limit.get('domain_id'):
|
||||
domain = PROVIDERS.resource_api.get_domain(limit['domain_id'])
|
||||
target['limit']['domain'] = domain
|
||||
except exception.NotFound: # nosec
|
||||
# Defer the existence check in the event the limit doesn't exist, this
|
||||
# is checked later anyway.
|
||||
pass
|
||||
|
||||
return target
|
||||
|
||||
|
||||
class LimitsResource(ks_flask.ResourceBase):
|
||||
collection_key = 'limits'
|
||||
member_key = 'limit'
|
||||
|
@ -38,27 +60,38 @@ class LimitsResource(ks_flask.ResourceBase):
|
|||
def _list_limits(self):
|
||||
filters = ['service_id', 'region_id', 'resource_name', 'project_id',
|
||||
'domain_id']
|
||||
|
||||
ENFORCER.enforce_call(action='identity:list_limits', filters=filters)
|
||||
|
||||
hints = self.build_driver_hints(filters)
|
||||
project_id_filter = hints.get_exact_filter_by_name('project_id')
|
||||
domain_id_filter = hints.get_exact_filter_by_name('domain_id')
|
||||
if project_id_filter or domain_id_filter:
|
||||
if self.oslo_context.system_scope:
|
||||
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
||||
else:
|
||||
refs = []
|
||||
else:
|
||||
project_id = self.oslo_context.project_id
|
||||
domain_id = self.oslo_context.domain_id
|
||||
if project_id:
|
||||
hints.add_filter('project_id', project_id)
|
||||
elif domain_id:
|
||||
hints.add_filter('domain_id', domain_id)
|
||||
|
||||
filtered_refs = []
|
||||
if self.oslo_context.system_scope:
|
||||
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
||||
return self.wrap_collection(refs, hints=hints)
|
||||
filtered_refs = refs
|
||||
elif self.oslo_context.domain_id:
|
||||
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
||||
projects = PROVIDERS.resource_api.list_projects_in_domain(
|
||||
self.oslo_context.domain_id
|
||||
)
|
||||
project_ids = [project['id'] for project in projects]
|
||||
for limit in refs:
|
||||
if limit.get('project_id'):
|
||||
if limit['project_id'] in project_ids:
|
||||
filtered_refs.append(limit)
|
||||
elif limit.get('domain_id'):
|
||||
if limit['domain_id'] == self.oslo_context.domain_id:
|
||||
filtered_refs.append(limit)
|
||||
elif self.oslo_context.project_id:
|
||||
hints.add_filter('project_id', self.oslo_context.project_id)
|
||||
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
||||
filtered_refs = refs
|
||||
|
||||
return self.wrap_collection(filtered_refs, hints=hints)
|
||||
|
||||
def _get_limit(self, limit_id):
|
||||
ENFORCER.enforce_call(action='identity:get_limit')
|
||||
ENFORCER.enforce_call(action='identity:get_limit',
|
||||
build_target=_build_limit_enforcement_target)
|
||||
ref = PROVIDERS.unified_limit_api.get_limit(limit_id)
|
||||
return self.wrap_member(ref)
|
||||
|
||||
|
|
|
@ -14,11 +14,23 @@ from oslo_policy import policy
|
|||
|
||||
from keystone.common.policies import base
|
||||
|
||||
SYSTEM_OR_DOMAIN_OR_PROJECT_USER = (
|
||||
'(' + base.SYSTEM_READER + ') or '
|
||||
'('
|
||||
'domain_id:%(target.limit.domain.id)s or '
|
||||
'domain_id:%(target.limit.project.domain_id)s'
|
||||
') or '
|
||||
'('
|
||||
'project_id:%(target.limit.project_id)s and not '
|
||||
'None:%(target.limit.project_id)s'
|
||||
')'
|
||||
)
|
||||
|
||||
limit_policies = [
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'get_limit_model',
|
||||
check_str='',
|
||||
scope_types=['system', 'project'],
|
||||
scope_types=['system', 'domain', 'project'],
|
||||
description='Get limit enforcement model.',
|
||||
operations=[{'path': '/v3/limits/model',
|
||||
'method': 'GET'},
|
||||
|
@ -26,10 +38,8 @@ limit_policies = [
|
|||
'method': 'HEAD'}]),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'get_limit',
|
||||
check_str='(role:reader and system_scope:all) or '
|
||||
'project_id:%(target.limit.project_id)s or '
|
||||
'domain_id:%(target.limit.domain_id)s',
|
||||
scope_types=['system', 'project', 'domain'],
|
||||
check_str=SYSTEM_OR_DOMAIN_OR_PROJECT_USER,
|
||||
scope_types=['system', 'domain', 'project'],
|
||||
description='Show limit details.',
|
||||
operations=[{'path': '/v3/limits/{limit_id}',
|
||||
'method': 'GET'},
|
||||
|
@ -38,7 +48,7 @@ limit_policies = [
|
|||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'list_limits',
|
||||
check_str='',
|
||||
scope_types=['system', 'project'],
|
||||
scope_types=['system', 'domain', 'project'],
|
||||
description='List limits.',
|
||||
operations=[{'path': '/v3/limits',
|
||||
'method': 'GET'},
|
||||
|
|
|
@ -25,8 +25,11 @@ CONF = keystone.conf.CONF
|
|||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
def _create_limit_and_dependencies():
|
||||
"""Create a limit and its dependencies to test with."""
|
||||
def _create_limits_and_dependencies(domain_id=None):
|
||||
"""Create limits and its dependencies for testing."""
|
||||
if not domain_id:
|
||||
domain_id = CONF.identity.default_domain_id
|
||||
|
||||
service = PROVIDERS.catalog_api.create_service(
|
||||
uuid.uuid4().hex, unit.new_service_ref()
|
||||
)
|
||||
|
@ -41,18 +44,34 @@ def _create_limit_and_dependencies():
|
|||
)
|
||||
registered_limit = registered_limits[0]
|
||||
|
||||
project = PROVIDERS.resource_api.create_project(
|
||||
uuid.uuid4().hex,
|
||||
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
|
||||
domain_limit = unit.new_limit_ref(
|
||||
domain_id=domain_id, service_id=service['id'],
|
||||
resource_name=registered_limit['resource_name'],
|
||||
resource_limit=10, id=uuid.uuid4().hex
|
||||
)
|
||||
|
||||
limit = unit.new_limit_ref(
|
||||
project = PROVIDERS.resource_api.create_project(
|
||||
uuid.uuid4().hex, unit.new_project_ref(domain_id=domain_id)
|
||||
)
|
||||
|
||||
project_limit = unit.new_limit_ref(
|
||||
project_id=project['id'], service_id=service['id'],
|
||||
resource_name=registered_limit['resource_name'],
|
||||
resource_limit=5, id=uuid.uuid4().hex
|
||||
)
|
||||
limits = PROVIDERS.unified_limit_api.create_limits([limit])
|
||||
return limits
|
||||
limits = PROVIDERS.unified_limit_api.create_limits(
|
||||
[domain_limit, project_limit]
|
||||
)
|
||||
|
||||
project_limit_id = None
|
||||
domain_limit_id = None
|
||||
for limit in limits:
|
||||
if limit.get('domain_id'):
|
||||
domain_limit_id = limit['id']
|
||||
else:
|
||||
project_limit_id = limit['id']
|
||||
|
||||
return (project_limit_id, domain_limit_id)
|
||||
|
||||
|
||||
class _UserLimitTests(object):
|
||||
|
@ -63,21 +82,24 @@ class _UserLimitTests(object):
|
|||
c.get('/v3/limits/model', headers=self.headers)
|
||||
|
||||
def test_user_can_get_a_limit(self):
|
||||
limits = _create_limit_and_dependencies()
|
||||
limit = limits[0]
|
||||
limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/limits/%s' % limit['id'], headers=self.headers)
|
||||
self.assertEqual(limit['id'], r.json['limit']['id'])
|
||||
r = c.get('/v3/limits/%s' % limit_id, headers=self.headers)
|
||||
self.assertEqual(limit_id, r.json['limit']['id'])
|
||||
|
||||
def test_user_can_list_limits(self):
|
||||
limits = _create_limit_and_dependencies()
|
||||
limit = limits[0]
|
||||
project_limit_id, domain_limit_id = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/limits', headers=self.headers)
|
||||
self.assertTrue(len(r.json['limits']) == 1)
|
||||
self.assertEqual(limit['id'], r.json['limits'][0]['id'])
|
||||
self.assertTrue(len(r.json['limits']) == 2)
|
||||
result = []
|
||||
for limit in r.json['limits']:
|
||||
result.append(limit['id'])
|
||||
|
||||
self.assertIn(project_limit_id, result)
|
||||
self.assertIn(domain_limit_id, result)
|
||||
|
||||
def test_user_cannot_create_limits(self):
|
||||
service = PROVIDERS.catalog_api.create_service(
|
||||
|
@ -116,25 +138,23 @@ class _UserLimitTests(object):
|
|||
)
|
||||
|
||||
def test_user_cannot_update_limits(self):
|
||||
limits = _create_limit_and_dependencies()
|
||||
limit = limits[0]
|
||||
limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
update = {'limits': {'description': uuid.uuid4().hex}}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.patch(
|
||||
'/v3/limits/%s' % limit['id'], json=update,
|
||||
'/v3/limits/%s' % limit_id, json=update,
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_limits(self):
|
||||
limits = _create_limit_and_dependencies()
|
||||
limit = limits[0]
|
||||
limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/limits/%s' % limit['id'],
|
||||
'/v3/limits/%s' % limit_id,
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
@ -231,21 +251,24 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
|
|||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
|
||||
def test_user_can_get_a_limit(self):
|
||||
limits = _create_limit_and_dependencies()
|
||||
limit = limits[0]
|
||||
limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/limits/%s' % limit['id'], headers=self.headers)
|
||||
self.assertEqual(limit['id'], r.json['limit']['id'])
|
||||
r = c.get('/v3/limits/%s' % limit_id, headers=self.headers)
|
||||
self.assertEqual(limit_id, r.json['limit']['id'])
|
||||
|
||||
def test_user_can_list_limits(self):
|
||||
limits = _create_limit_and_dependencies()
|
||||
limit = limits[0]
|
||||
project_limit_id, domain_limit_id = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/limits', headers=self.headers)
|
||||
self.assertTrue(len(r.json['limits']) == 1)
|
||||
self.assertEqual(limit['id'], r.json['limits'][0]['id'])
|
||||
self.assertTrue(len(r.json['limits']) == 2)
|
||||
result = []
|
||||
for limit in r.json['limits']:
|
||||
result.append(limit['id'])
|
||||
|
||||
self.assertIn(project_limit_id, result)
|
||||
self.assertIn(domain_limit_id, result)
|
||||
|
||||
def test_user_can_create_limits(self):
|
||||
service = PROVIDERS.catalog_api.create_service(
|
||||
|
@ -281,20 +304,335 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
|
|||
c.post('/v3/limits', json=create, headers=self.headers)
|
||||
|
||||
def test_user_can_update_limits(self):
|
||||
limits = _create_limit_and_dependencies()
|
||||
limit = limits[0]
|
||||
limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
update = {'limits': {'description': uuid.uuid4().hex}}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.patch(
|
||||
'/v3/limits/%s' % limit['id'], json=update,
|
||||
'/v3/limits/%s' % limit_id, json=update,
|
||||
headers=self.headers
|
||||
)
|
||||
|
||||
def test_user_can_delete_limits(self):
|
||||
limits = _create_limit_and_dependencies()
|
||||
limit = limits[0]
|
||||
limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
c.delete('/v3/limits/%s' % limit['id'], headers=self.headers)
|
||||
c.delete('/v3/limits/%s' % limit_id, headers=self.headers)
|
||||
|
||||
|
||||
class DomainUserTests(base_classes.TestCaseWithBootstrap,
|
||||
common_auth.AuthTestMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(DomainUserTests, self).setUp()
|
||||
self.loadapp()
|
||||
self.useFixture(ksfixtures.Policy(self.config_fixture))
|
||||
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||
|
||||
domain = PROVIDERS.resource_api.create_domain(
|
||||
uuid.uuid4().hex, unit.new_domain_ref()
|
||||
)
|
||||
self.domain_id = domain['id']
|
||||
domain_admin = unit.new_user_ref(domain_id=self.domain_id)
|
||||
self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.admin_role_id, user_id=self.user_id,
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
auth = self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=domain_admin['password'],
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
# Grab a token using the persona we're testing and prepare headers
|
||||
# for requests we'll be making in the tests.
|
||||
with self.test_client() as c:
|
||||
r = c.post('/v3/auth/tokens', json=auth)
|
||||
self.token_id = r.headers['X-Subject-Token']
|
||||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
|
||||
def test_user_can_get_project_limits_within_domain(self):
|
||||
project_limit_id, _ = _create_limits_and_dependencies(
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
c.get('/v3/limits/%s' % project_limit_id, headers=self.headers)
|
||||
|
||||
def test_user_can_get_domain_limits(self):
|
||||
_, domain_limit_id = _create_limits_and_dependencies(
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/limits/%s' % domain_limit_id, headers=self.headers)
|
||||
self.assertEqual(self.domain_id, r.json['limit']['domain_id'])
|
||||
|
||||
def test_user_cannot_get_project_limit_outside_domain(self):
|
||||
project_limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
c.get(
|
||||
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_get_domain_limits_for_other_domain(self):
|
||||
_, domain_limit_id = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
c.get(
|
||||
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_can_list_limits_within_domain(self):
|
||||
project_limit_id, domain_limit_id = _create_limits_and_dependencies(
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/limits', headers=self.headers)
|
||||
result = []
|
||||
for limit in r.json['limits']:
|
||||
result.append(limit['id'])
|
||||
|
||||
self.assertEqual(2, len(r.json['limits']))
|
||||
self.assertIn(project_limit_id, result)
|
||||
self.assertIn(domain_limit_id, result)
|
||||
|
||||
def test_user_cannot_list_limits_outside_domain(self):
|
||||
_create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/limits', headers=self.headers)
|
||||
self.assertEqual(0, len(r.json['limits']))
|
||||
|
||||
def test_user_cannot_create_limits_for_domain(self):
|
||||
service = PROVIDERS.catalog_api.create_service(
|
||||
uuid.uuid4().hex, unit.new_service_ref()
|
||||
)
|
||||
|
||||
registered_limit = unit.new_registered_limit_ref(
|
||||
service_id=service['id'], id=uuid.uuid4().hex
|
||||
)
|
||||
registered_limits = (
|
||||
PROVIDERS.unified_limit_api.create_registered_limits(
|
||||
[registered_limit]
|
||||
)
|
||||
)
|
||||
registered_limit = registered_limits[0]
|
||||
|
||||
create = {
|
||||
'limits': [
|
||||
unit.new_limit_ref(
|
||||
domain_id=self.domain_id, service_id=service['id'],
|
||||
resource_name=registered_limit['resource_name'],
|
||||
resource_limit=5
|
||||
)
|
||||
]
|
||||
}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.post(
|
||||
'/v3/limits', json=create, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_create_limits_for_other_domain(self):
|
||||
service = PROVIDERS.catalog_api.create_service(
|
||||
uuid.uuid4().hex, unit.new_service_ref()
|
||||
)
|
||||
|
||||
registered_limit = unit.new_registered_limit_ref(
|
||||
service_id=service['id'], id=uuid.uuid4().hex
|
||||
)
|
||||
registered_limits = (
|
||||
PROVIDERS.unified_limit_api.create_registered_limits(
|
||||
[registered_limit]
|
||||
)
|
||||
)
|
||||
registered_limit = registered_limits[0]
|
||||
|
||||
create = {
|
||||
'limits': [
|
||||
unit.new_limit_ref(
|
||||
domain_id=CONF.identity.default_domain_id,
|
||||
service_id=service['id'],
|
||||
resource_name=registered_limit['resource_name'],
|
||||
resource_limit=5
|
||||
)
|
||||
]
|
||||
}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.post(
|
||||
'/v3/limits', json=create, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_create_limits_for_projects_in_domain(self):
|
||||
service = PROVIDERS.catalog_api.create_service(
|
||||
uuid.uuid4().hex, unit.new_service_ref()
|
||||
)
|
||||
|
||||
registered_limit = unit.new_registered_limit_ref(
|
||||
service_id=service['id'], id=uuid.uuid4().hex
|
||||
)
|
||||
registered_limits = (
|
||||
PROVIDERS.unified_limit_api.create_registered_limits(
|
||||
[registered_limit]
|
||||
)
|
||||
)
|
||||
registered_limit = registered_limits[0]
|
||||
|
||||
project = PROVIDERS.resource_api.create_project(
|
||||
uuid.uuid4().hex, unit.new_project_ref(domain_id=self.domain_id)
|
||||
)
|
||||
|
||||
create = {
|
||||
'limits': [
|
||||
unit.new_limit_ref(
|
||||
project_id=project['id'],
|
||||
service_id=service['id'],
|
||||
resource_name=registered_limit['resource_name'],
|
||||
resource_limit=5
|
||||
)
|
||||
]
|
||||
}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.post(
|
||||
'/v3/limits', json=create, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_create_limits_for_projects_outside_domain(self):
|
||||
service = PROVIDERS.catalog_api.create_service(
|
||||
uuid.uuid4().hex, unit.new_service_ref()
|
||||
)
|
||||
|
||||
registered_limit = unit.new_registered_limit_ref(
|
||||
service_id=service['id'], id=uuid.uuid4().hex
|
||||
)
|
||||
registered_limits = (
|
||||
PROVIDERS.unified_limit_api.create_registered_limits(
|
||||
[registered_limit]
|
||||
)
|
||||
)
|
||||
registered_limit = registered_limits[0]
|
||||
|
||||
project = PROVIDERS.resource_api.create_project(
|
||||
uuid.uuid4().hex,
|
||||
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
|
||||
)
|
||||
|
||||
create = {
|
||||
'limits': [
|
||||
unit.new_limit_ref(
|
||||
project_id=project['id'],
|
||||
service_id=service['id'],
|
||||
resource_name=registered_limit['resource_name'],
|
||||
resource_limit=5
|
||||
)
|
||||
]
|
||||
}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.post(
|
||||
'/v3/limits', json=create, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_update_limits_for_domain(self):
|
||||
_, domain_limit_id = _create_limits_and_dependencies(
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
update = {'limit': {'resource_limit': 1}}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.patch(
|
||||
'/v3/limits/%s' % domain_limit_id, json=update,
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_update_limits_for_other_domain(self):
|
||||
_, domain_limit_id = _create_limits_and_dependencies()
|
||||
|
||||
update = {'limit': {'resource_limit': 1}}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.patch(
|
||||
'/v3/limits/%s' % domain_limit_id, json=update,
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_update_limits_for_projects_in_domain(self):
|
||||
project_limit_id, _ = _create_limits_and_dependencies(
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
update = {'limit': {'resource_limit': 1}}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.patch(
|
||||
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||
json=update, expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_update_limits_for_projects_outside_domain(self):
|
||||
project_limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
update = {'limit': {'resource_limit': 1}}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.patch(
|
||||
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||
json=update, expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_limits_for_domain(self):
|
||||
_, domain_limit_id = _create_limits_and_dependencies(
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_limits_for_other_domain(self):
|
||||
_, domain_limit_id = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_limits_for_projects_in_domain(self):
|
||||
project_limit_id, _ = _create_limits_and_dependencies(
|
||||
domain_id=self.domain_id
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_limits_for_projects_outside_domain(self):
|
||||
project_limit_id, _ = _create_limits_and_dependencies()
|
||||
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
|
|
@ -560,6 +560,22 @@ class LimitsTestCase(test_v3.RestfulTestCase):
|
|||
|
||||
def setUp(self):
|
||||
super(LimitsTestCase, self).setUp()
|
||||
# FIXME(lbragstad): Remove all this duplicated logic once we get all
|
||||
# keystone tests using bootstrap consistently. This is something the
|
||||
# bootstrap utility already does for us.
|
||||
reader_role = {'id': uuid.uuid4().hex, 'name': 'reader'}
|
||||
reader_role = PROVIDERS.role_api.create_role(
|
||||
reader_role['id'], reader_role
|
||||
)
|
||||
|
||||
member_role = {'id': uuid.uuid4().hex, 'name': 'member'}
|
||||
member_role = PROVIDERS.role_api.create_role(
|
||||
member_role['id'], member_role
|
||||
)
|
||||
PROVIDERS.role_api.create_implied_role(self.role_id, member_role['id'])
|
||||
PROVIDERS.role_api.create_implied_role(
|
||||
member_role['id'], reader_role['id']
|
||||
)
|
||||
|
||||
# Most of these tests require system-scoped tokens. Let's have one on
|
||||
# hand so that we can use it in tests when we need it.
|
||||
|
@ -927,6 +943,8 @@ class LimitsTestCase(test_v3.RestfulTestCase):
|
|||
|
||||
def test_list_limit_with_project_id_filter(self):
|
||||
# create two limit in different projects for test.
|
||||
self.config_fixture.config(group='oslo_policy',
|
||||
enforce_scope=True)
|
||||
ref1 = unit.new_limit_ref(project_id=self.project_id,
|
||||
service_id=self.service_id,
|
||||
region_id=self.region_id,
|
||||
|
@ -955,21 +973,19 @@ class LimitsTestCase(test_v3.RestfulTestCase):
|
|||
self.assertEqual(1, len(limits))
|
||||
self.assertEqual(self.project_2_id, limits[0]['project_id'])
|
||||
|
||||
# if non system scoped request contain project_id filter, keystone
|
||||
# will return an empty list.
|
||||
# any project user can filter by their own project
|
||||
r = self.get(
|
||||
'/limits?project_id=%s' % self.project_id,
|
||||
expected_status=http_client.OK)
|
||||
limits = r.result['limits']
|
||||
self.assertEqual(0, len(limits))
|
||||
self.assertEqual(1, len(limits))
|
||||
self.assertEqual(self.project_id, limits[0]['project_id'])
|
||||
|
||||
# a system scoped request can specify the project_id filter
|
||||
r = self.get(
|
||||
'/limits?project_id=%s' % self.project_id,
|
||||
expected_status=http_client.OK,
|
||||
auth=self.build_authentication_request(
|
||||
user_id=self.user['id'], password=self.user['password'],
|
||||
system=True)
|
||||
token=self.system_admin_token
|
||||
)
|
||||
limits = r.result['limits']
|
||||
self.assertEqual(1, len(limits))
|
||||
|
@ -1046,6 +1062,7 @@ class LimitsTestCase(test_v3.RestfulTestCase):
|
|||
else:
|
||||
id1 = r.result['limits'][1]['id']
|
||||
self.get('/limits/fake_id',
|
||||
token=self.system_admin_token,
|
||||
expected_status=http_client.NOT_FOUND)
|
||||
r = self.get('/limits/%s' % id1,
|
||||
expected_status=http_client.OK)
|
||||
|
|
Loading…
Reference in New Issue