Allow domain users to access the limit API
This commit adds domain-scope to the scope_types for limit policies, allowing domain users to access those APIs when enforce_scope is enabled. This commit also introduces some tests that explicitly show how domain users are expected to behave with the limits API. A subsequent patch will do the same for project users. This commit also modifies the GET /v3/limit policy to allow project users to filter responses by project_id, which isn't entirely useful outside of just calling the API with a project-scoped token. Change-Id: I9b38f3fd2f83efd508b2d9a6c323bbaa7169d4cd Related-Bug: 1805880 Partial-Bug: 1818736
This commit is contained in:
parent
e992b79fa3
commit
f249c9e2b0
|
@ -20,6 +20,7 @@ from keystone.common import json_home
|
||||||
from keystone.common import provider_api
|
from keystone.common import provider_api
|
||||||
from keystone.common import rbac_enforcer
|
from keystone.common import rbac_enforcer
|
||||||
from keystone.common import validation
|
from keystone.common import validation
|
||||||
|
from keystone import exception
|
||||||
from keystone.limit import schema
|
from keystone.limit import schema
|
||||||
from keystone.server import flask as ks_flask
|
from keystone.server import flask as ks_flask
|
||||||
|
|
||||||
|
@ -28,6 +29,27 @@ PROVIDERS = provider_api.ProviderAPIs
|
||||||
ENFORCER = rbac_enforcer.RBACEnforcer
|
ENFORCER = rbac_enforcer.RBACEnforcer
|
||||||
|
|
||||||
|
|
||||||
|
def _build_limit_enforcement_target():
|
||||||
|
target = {}
|
||||||
|
try:
|
||||||
|
limit = PROVIDERS.unified_limit_api.get_limit(
|
||||||
|
flask.request.view_args.get('limit_id')
|
||||||
|
)
|
||||||
|
target['limit'] = limit
|
||||||
|
if limit.get('project_id'):
|
||||||
|
project = PROVIDERS.resource_api.get_project(limit['project_id'])
|
||||||
|
target['limit']['project'] = project
|
||||||
|
elif limit.get('domain_id'):
|
||||||
|
domain = PROVIDERS.resource_api.get_domain(limit['domain_id'])
|
||||||
|
target['limit']['domain'] = domain
|
||||||
|
except exception.NotFound: # nosec
|
||||||
|
# Defer the existence check in the event the limit doesn't exist, this
|
||||||
|
# is checked later anyway.
|
||||||
|
pass
|
||||||
|
|
||||||
|
return target
|
||||||
|
|
||||||
|
|
||||||
class LimitsResource(ks_flask.ResourceBase):
|
class LimitsResource(ks_flask.ResourceBase):
|
||||||
collection_key = 'limits'
|
collection_key = 'limits'
|
||||||
member_key = 'limit'
|
member_key = 'limit'
|
||||||
|
@ -38,27 +60,38 @@ class LimitsResource(ks_flask.ResourceBase):
|
||||||
def _list_limits(self):
|
def _list_limits(self):
|
||||||
filters = ['service_id', 'region_id', 'resource_name', 'project_id',
|
filters = ['service_id', 'region_id', 'resource_name', 'project_id',
|
||||||
'domain_id']
|
'domain_id']
|
||||||
|
|
||||||
ENFORCER.enforce_call(action='identity:list_limits', filters=filters)
|
ENFORCER.enforce_call(action='identity:list_limits', filters=filters)
|
||||||
|
|
||||||
hints = self.build_driver_hints(filters)
|
hints = self.build_driver_hints(filters)
|
||||||
project_id_filter = hints.get_exact_filter_by_name('project_id')
|
|
||||||
domain_id_filter = hints.get_exact_filter_by_name('domain_id')
|
filtered_refs = []
|
||||||
if project_id_filter or domain_id_filter:
|
if self.oslo_context.system_scope:
|
||||||
if self.oslo_context.system_scope:
|
|
||||||
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
|
||||||
else:
|
|
||||||
refs = []
|
|
||||||
else:
|
|
||||||
project_id = self.oslo_context.project_id
|
|
||||||
domain_id = self.oslo_context.domain_id
|
|
||||||
if project_id:
|
|
||||||
hints.add_filter('project_id', project_id)
|
|
||||||
elif domain_id:
|
|
||||||
hints.add_filter('domain_id', domain_id)
|
|
||||||
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
||||||
return self.wrap_collection(refs, hints=hints)
|
filtered_refs = refs
|
||||||
|
elif self.oslo_context.domain_id:
|
||||||
|
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
||||||
|
projects = PROVIDERS.resource_api.list_projects_in_domain(
|
||||||
|
self.oslo_context.domain_id
|
||||||
|
)
|
||||||
|
project_ids = [project['id'] for project in projects]
|
||||||
|
for limit in refs:
|
||||||
|
if limit.get('project_id'):
|
||||||
|
if limit['project_id'] in project_ids:
|
||||||
|
filtered_refs.append(limit)
|
||||||
|
elif limit.get('domain_id'):
|
||||||
|
if limit['domain_id'] == self.oslo_context.domain_id:
|
||||||
|
filtered_refs.append(limit)
|
||||||
|
elif self.oslo_context.project_id:
|
||||||
|
hints.add_filter('project_id', self.oslo_context.project_id)
|
||||||
|
refs = PROVIDERS.unified_limit_api.list_limits(hints)
|
||||||
|
filtered_refs = refs
|
||||||
|
|
||||||
|
return self.wrap_collection(filtered_refs, hints=hints)
|
||||||
|
|
||||||
def _get_limit(self, limit_id):
|
def _get_limit(self, limit_id):
|
||||||
ENFORCER.enforce_call(action='identity:get_limit')
|
ENFORCER.enforce_call(action='identity:get_limit',
|
||||||
|
build_target=_build_limit_enforcement_target)
|
||||||
ref = PROVIDERS.unified_limit_api.get_limit(limit_id)
|
ref = PROVIDERS.unified_limit_api.get_limit(limit_id)
|
||||||
return self.wrap_member(ref)
|
return self.wrap_member(ref)
|
||||||
|
|
||||||
|
|
|
@ -14,11 +14,23 @@ from oslo_policy import policy
|
||||||
|
|
||||||
from keystone.common.policies import base
|
from keystone.common.policies import base
|
||||||
|
|
||||||
|
SYSTEM_OR_DOMAIN_OR_PROJECT_USER = (
|
||||||
|
'(' + base.SYSTEM_READER + ') or '
|
||||||
|
'('
|
||||||
|
'domain_id:%(target.limit.domain.id)s or '
|
||||||
|
'domain_id:%(target.limit.project.domain_id)s'
|
||||||
|
') or '
|
||||||
|
'('
|
||||||
|
'project_id:%(target.limit.project_id)s and not '
|
||||||
|
'None:%(target.limit.project_id)s'
|
||||||
|
')'
|
||||||
|
)
|
||||||
|
|
||||||
limit_policies = [
|
limit_policies = [
|
||||||
policy.DocumentedRuleDefault(
|
policy.DocumentedRuleDefault(
|
||||||
name=base.IDENTITY % 'get_limit_model',
|
name=base.IDENTITY % 'get_limit_model',
|
||||||
check_str='',
|
check_str='',
|
||||||
scope_types=['system', 'project'],
|
scope_types=['system', 'domain', 'project'],
|
||||||
description='Get limit enforcement model.',
|
description='Get limit enforcement model.',
|
||||||
operations=[{'path': '/v3/limits/model',
|
operations=[{'path': '/v3/limits/model',
|
||||||
'method': 'GET'},
|
'method': 'GET'},
|
||||||
|
@ -26,10 +38,8 @@ limit_policies = [
|
||||||
'method': 'HEAD'}]),
|
'method': 'HEAD'}]),
|
||||||
policy.DocumentedRuleDefault(
|
policy.DocumentedRuleDefault(
|
||||||
name=base.IDENTITY % 'get_limit',
|
name=base.IDENTITY % 'get_limit',
|
||||||
check_str='(role:reader and system_scope:all) or '
|
check_str=SYSTEM_OR_DOMAIN_OR_PROJECT_USER,
|
||||||
'project_id:%(target.limit.project_id)s or '
|
scope_types=['system', 'domain', 'project'],
|
||||||
'domain_id:%(target.limit.domain_id)s',
|
|
||||||
scope_types=['system', 'project', 'domain'],
|
|
||||||
description='Show limit details.',
|
description='Show limit details.',
|
||||||
operations=[{'path': '/v3/limits/{limit_id}',
|
operations=[{'path': '/v3/limits/{limit_id}',
|
||||||
'method': 'GET'},
|
'method': 'GET'},
|
||||||
|
@ -38,7 +48,7 @@ limit_policies = [
|
||||||
policy.DocumentedRuleDefault(
|
policy.DocumentedRuleDefault(
|
||||||
name=base.IDENTITY % 'list_limits',
|
name=base.IDENTITY % 'list_limits',
|
||||||
check_str='',
|
check_str='',
|
||||||
scope_types=['system', 'project'],
|
scope_types=['system', 'domain', 'project'],
|
||||||
description='List limits.',
|
description='List limits.',
|
||||||
operations=[{'path': '/v3/limits',
|
operations=[{'path': '/v3/limits',
|
||||||
'method': 'GET'},
|
'method': 'GET'},
|
||||||
|
|
|
@ -25,8 +25,11 @@ CONF = keystone.conf.CONF
|
||||||
PROVIDERS = provider_api.ProviderAPIs
|
PROVIDERS = provider_api.ProviderAPIs
|
||||||
|
|
||||||
|
|
||||||
def _create_limit_and_dependencies():
|
def _create_limits_and_dependencies(domain_id=None):
|
||||||
"""Create a limit and its dependencies to test with."""
|
"""Create limits and its dependencies for testing."""
|
||||||
|
if not domain_id:
|
||||||
|
domain_id = CONF.identity.default_domain_id
|
||||||
|
|
||||||
service = PROVIDERS.catalog_api.create_service(
|
service = PROVIDERS.catalog_api.create_service(
|
||||||
uuid.uuid4().hex, unit.new_service_ref()
|
uuid.uuid4().hex, unit.new_service_ref()
|
||||||
)
|
)
|
||||||
|
@ -41,18 +44,34 @@ def _create_limit_and_dependencies():
|
||||||
)
|
)
|
||||||
registered_limit = registered_limits[0]
|
registered_limit = registered_limits[0]
|
||||||
|
|
||||||
project = PROVIDERS.resource_api.create_project(
|
domain_limit = unit.new_limit_ref(
|
||||||
uuid.uuid4().hex,
|
domain_id=domain_id, service_id=service['id'],
|
||||||
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
|
resource_name=registered_limit['resource_name'],
|
||||||
|
resource_limit=10, id=uuid.uuid4().hex
|
||||||
)
|
)
|
||||||
|
|
||||||
limit = unit.new_limit_ref(
|
project = PROVIDERS.resource_api.create_project(
|
||||||
|
uuid.uuid4().hex, unit.new_project_ref(domain_id=domain_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
project_limit = unit.new_limit_ref(
|
||||||
project_id=project['id'], service_id=service['id'],
|
project_id=project['id'], service_id=service['id'],
|
||||||
resource_name=registered_limit['resource_name'],
|
resource_name=registered_limit['resource_name'],
|
||||||
resource_limit=5, id=uuid.uuid4().hex
|
resource_limit=5, id=uuid.uuid4().hex
|
||||||
)
|
)
|
||||||
limits = PROVIDERS.unified_limit_api.create_limits([limit])
|
limits = PROVIDERS.unified_limit_api.create_limits(
|
||||||
return limits
|
[domain_limit, project_limit]
|
||||||
|
)
|
||||||
|
|
||||||
|
project_limit_id = None
|
||||||
|
domain_limit_id = None
|
||||||
|
for limit in limits:
|
||||||
|
if limit.get('domain_id'):
|
||||||
|
domain_limit_id = limit['id']
|
||||||
|
else:
|
||||||
|
project_limit_id = limit['id']
|
||||||
|
|
||||||
|
return (project_limit_id, domain_limit_id)
|
||||||
|
|
||||||
|
|
||||||
class _UserLimitTests(object):
|
class _UserLimitTests(object):
|
||||||
|
@ -63,21 +82,24 @@ class _UserLimitTests(object):
|
||||||
c.get('/v3/limits/model', headers=self.headers)
|
c.get('/v3/limits/model', headers=self.headers)
|
||||||
|
|
||||||
def test_user_can_get_a_limit(self):
|
def test_user_can_get_a_limit(self):
|
||||||
limits = _create_limit_and_dependencies()
|
limit_id, _ = _create_limits_and_dependencies()
|
||||||
limit = limits[0]
|
|
||||||
|
|
||||||
with self.test_client() as c:
|
with self.test_client() as c:
|
||||||
r = c.get('/v3/limits/%s' % limit['id'], headers=self.headers)
|
r = c.get('/v3/limits/%s' % limit_id, headers=self.headers)
|
||||||
self.assertEqual(limit['id'], r.json['limit']['id'])
|
self.assertEqual(limit_id, r.json['limit']['id'])
|
||||||
|
|
||||||
def test_user_can_list_limits(self):
|
def test_user_can_list_limits(self):
|
||||||
limits = _create_limit_and_dependencies()
|
project_limit_id, domain_limit_id = _create_limits_and_dependencies()
|
||||||
limit = limits[0]
|
|
||||||
|
|
||||||
with self.test_client() as c:
|
with self.test_client() as c:
|
||||||
r = c.get('/v3/limits', headers=self.headers)
|
r = c.get('/v3/limits', headers=self.headers)
|
||||||
self.assertTrue(len(r.json['limits']) == 1)
|
self.assertTrue(len(r.json['limits']) == 2)
|
||||||
self.assertEqual(limit['id'], r.json['limits'][0]['id'])
|
result = []
|
||||||
|
for limit in r.json['limits']:
|
||||||
|
result.append(limit['id'])
|
||||||
|
|
||||||
|
self.assertIn(project_limit_id, result)
|
||||||
|
self.assertIn(domain_limit_id, result)
|
||||||
|
|
||||||
def test_user_cannot_create_limits(self):
|
def test_user_cannot_create_limits(self):
|
||||||
service = PROVIDERS.catalog_api.create_service(
|
service = PROVIDERS.catalog_api.create_service(
|
||||||
|
@ -116,25 +138,23 @@ class _UserLimitTests(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_user_cannot_update_limits(self):
|
def test_user_cannot_update_limits(self):
|
||||||
limits = _create_limit_and_dependencies()
|
limit_id, _ = _create_limits_and_dependencies()
|
||||||
limit = limits[0]
|
|
||||||
|
|
||||||
update = {'limits': {'description': uuid.uuid4().hex}}
|
update = {'limits': {'description': uuid.uuid4().hex}}
|
||||||
|
|
||||||
with self.test_client() as c:
|
with self.test_client() as c:
|
||||||
c.patch(
|
c.patch(
|
||||||
'/v3/limits/%s' % limit['id'], json=update,
|
'/v3/limits/%s' % limit_id, json=update,
|
||||||
headers=self.headers,
|
headers=self.headers,
|
||||||
expected_status_code=http_client.FORBIDDEN
|
expected_status_code=http_client.FORBIDDEN
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_user_cannot_delete_limits(self):
|
def test_user_cannot_delete_limits(self):
|
||||||
limits = _create_limit_and_dependencies()
|
limit_id, _ = _create_limits_and_dependencies()
|
||||||
limit = limits[0]
|
|
||||||
|
|
||||||
with self.test_client() as c:
|
with self.test_client() as c:
|
||||||
c.delete(
|
c.delete(
|
||||||
'/v3/limits/%s' % limit['id'],
|
'/v3/limits/%s' % limit_id,
|
||||||
headers=self.headers,
|
headers=self.headers,
|
||||||
expected_status_code=http_client.FORBIDDEN
|
expected_status_code=http_client.FORBIDDEN
|
||||||
)
|
)
|
||||||
|
@ -231,21 +251,24 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
|
||||||
self.headers = {'X-Auth-Token': self.token_id}
|
self.headers = {'X-Auth-Token': self.token_id}
|
||||||
|
|
||||||
def test_user_can_get_a_limit(self):
|
def test_user_can_get_a_limit(self):
|
||||||
limits = _create_limit_and_dependencies()
|
limit_id, _ = _create_limits_and_dependencies()
|
||||||
limit = limits[0]
|
|
||||||
|
|
||||||
with self.test_client() as c:
|
with self.test_client() as c:
|
||||||
r = c.get('/v3/limits/%s' % limit['id'], headers=self.headers)
|
r = c.get('/v3/limits/%s' % limit_id, headers=self.headers)
|
||||||
self.assertEqual(limit['id'], r.json['limit']['id'])
|
self.assertEqual(limit_id, r.json['limit']['id'])
|
||||||
|
|
||||||
def test_user_can_list_limits(self):
|
def test_user_can_list_limits(self):
|
||||||
limits = _create_limit_and_dependencies()
|
project_limit_id, domain_limit_id = _create_limits_and_dependencies()
|
||||||
limit = limits[0]
|
|
||||||
|
|
||||||
with self.test_client() as c:
|
with self.test_client() as c:
|
||||||
r = c.get('/v3/limits', headers=self.headers)
|
r = c.get('/v3/limits', headers=self.headers)
|
||||||
self.assertTrue(len(r.json['limits']) == 1)
|
self.assertTrue(len(r.json['limits']) == 2)
|
||||||
self.assertEqual(limit['id'], r.json['limits'][0]['id'])
|
result = []
|
||||||
|
for limit in r.json['limits']:
|
||||||
|
result.append(limit['id'])
|
||||||
|
|
||||||
|
self.assertIn(project_limit_id, result)
|
||||||
|
self.assertIn(domain_limit_id, result)
|
||||||
|
|
||||||
def test_user_can_create_limits(self):
|
def test_user_can_create_limits(self):
|
||||||
service = PROVIDERS.catalog_api.create_service(
|
service = PROVIDERS.catalog_api.create_service(
|
||||||
|
@ -281,20 +304,335 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
|
||||||
c.post('/v3/limits', json=create, headers=self.headers)
|
c.post('/v3/limits', json=create, headers=self.headers)
|
||||||
|
|
||||||
def test_user_can_update_limits(self):
|
def test_user_can_update_limits(self):
|
||||||
limits = _create_limit_and_dependencies()
|
limit_id, _ = _create_limits_and_dependencies()
|
||||||
limit = limits[0]
|
|
||||||
|
|
||||||
update = {'limits': {'description': uuid.uuid4().hex}}
|
update = {'limits': {'description': uuid.uuid4().hex}}
|
||||||
|
|
||||||
with self.test_client() as c:
|
with self.test_client() as c:
|
||||||
c.patch(
|
c.patch(
|
||||||
'/v3/limits/%s' % limit['id'], json=update,
|
'/v3/limits/%s' % limit_id, json=update,
|
||||||
headers=self.headers
|
headers=self.headers
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_user_can_delete_limits(self):
|
def test_user_can_delete_limits(self):
|
||||||
limits = _create_limit_and_dependencies()
|
limit_id, _ = _create_limits_and_dependencies()
|
||||||
limit = limits[0]
|
|
||||||
|
|
||||||
with self.test_client() as c:
|
with self.test_client() as c:
|
||||||
c.delete('/v3/limits/%s' % limit['id'], headers=self.headers)
|
c.delete('/v3/limits/%s' % limit_id, headers=self.headers)
|
||||||
|
|
||||||
|
|
||||||
|
class DomainUserTests(base_classes.TestCaseWithBootstrap,
|
||||||
|
common_auth.AuthTestMixin):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(DomainUserTests, self).setUp()
|
||||||
|
self.loadapp()
|
||||||
|
self.useFixture(ksfixtures.Policy(self.config_fixture))
|
||||||
|
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||||
|
|
||||||
|
domain = PROVIDERS.resource_api.create_domain(
|
||||||
|
uuid.uuid4().hex, unit.new_domain_ref()
|
||||||
|
)
|
||||||
|
self.domain_id = domain['id']
|
||||||
|
domain_admin = unit.new_user_ref(domain_id=self.domain_id)
|
||||||
|
self.user_id = PROVIDERS.identity_api.create_user(domain_admin)['id']
|
||||||
|
PROVIDERS.assignment_api.create_grant(
|
||||||
|
self.bootstrapper.admin_role_id, user_id=self.user_id,
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
auth = self.build_authentication_request(
|
||||||
|
user_id=self.user_id,
|
||||||
|
password=domain_admin['password'],
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Grab a token using the persona we're testing and prepare headers
|
||||||
|
# for requests we'll be making in the tests.
|
||||||
|
with self.test_client() as c:
|
||||||
|
r = c.post('/v3/auth/tokens', json=auth)
|
||||||
|
self.token_id = r.headers['X-Subject-Token']
|
||||||
|
self.headers = {'X-Auth-Token': self.token_id}
|
||||||
|
|
||||||
|
def test_user_can_get_project_limits_within_domain(self):
|
||||||
|
project_limit_id, _ = _create_limits_and_dependencies(
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.get('/v3/limits/%s' % project_limit_id, headers=self.headers)
|
||||||
|
|
||||||
|
def test_user_can_get_domain_limits(self):
|
||||||
|
_, domain_limit_id = _create_limits_and_dependencies(
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
r = c.get('/v3/limits/%s' % domain_limit_id, headers=self.headers)
|
||||||
|
self.assertEqual(self.domain_id, r.json['limit']['domain_id'])
|
||||||
|
|
||||||
|
def test_user_cannot_get_project_limit_outside_domain(self):
|
||||||
|
project_limit_id, _ = _create_limits_and_dependencies()
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.get(
|
||||||
|
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_get_domain_limits_for_other_domain(self):
|
||||||
|
_, domain_limit_id = _create_limits_and_dependencies()
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.get(
|
||||||
|
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_can_list_limits_within_domain(self):
|
||||||
|
project_limit_id, domain_limit_id = _create_limits_and_dependencies(
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
r = c.get('/v3/limits', headers=self.headers)
|
||||||
|
result = []
|
||||||
|
for limit in r.json['limits']:
|
||||||
|
result.append(limit['id'])
|
||||||
|
|
||||||
|
self.assertEqual(2, len(r.json['limits']))
|
||||||
|
self.assertIn(project_limit_id, result)
|
||||||
|
self.assertIn(domain_limit_id, result)
|
||||||
|
|
||||||
|
def test_user_cannot_list_limits_outside_domain(self):
|
||||||
|
_create_limits_and_dependencies()
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
r = c.get('/v3/limits', headers=self.headers)
|
||||||
|
self.assertEqual(0, len(r.json['limits']))
|
||||||
|
|
||||||
|
def test_user_cannot_create_limits_for_domain(self):
|
||||||
|
service = PROVIDERS.catalog_api.create_service(
|
||||||
|
uuid.uuid4().hex, unit.new_service_ref()
|
||||||
|
)
|
||||||
|
|
||||||
|
registered_limit = unit.new_registered_limit_ref(
|
||||||
|
service_id=service['id'], id=uuid.uuid4().hex
|
||||||
|
)
|
||||||
|
registered_limits = (
|
||||||
|
PROVIDERS.unified_limit_api.create_registered_limits(
|
||||||
|
[registered_limit]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
registered_limit = registered_limits[0]
|
||||||
|
|
||||||
|
create = {
|
||||||
|
'limits': [
|
||||||
|
unit.new_limit_ref(
|
||||||
|
domain_id=self.domain_id, service_id=service['id'],
|
||||||
|
resource_name=registered_limit['resource_name'],
|
||||||
|
resource_limit=5
|
||||||
|
)
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.post(
|
||||||
|
'/v3/limits', json=create, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_create_limits_for_other_domain(self):
|
||||||
|
service = PROVIDERS.catalog_api.create_service(
|
||||||
|
uuid.uuid4().hex, unit.new_service_ref()
|
||||||
|
)
|
||||||
|
|
||||||
|
registered_limit = unit.new_registered_limit_ref(
|
||||||
|
service_id=service['id'], id=uuid.uuid4().hex
|
||||||
|
)
|
||||||
|
registered_limits = (
|
||||||
|
PROVIDERS.unified_limit_api.create_registered_limits(
|
||||||
|
[registered_limit]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
registered_limit = registered_limits[0]
|
||||||
|
|
||||||
|
create = {
|
||||||
|
'limits': [
|
||||||
|
unit.new_limit_ref(
|
||||||
|
domain_id=CONF.identity.default_domain_id,
|
||||||
|
service_id=service['id'],
|
||||||
|
resource_name=registered_limit['resource_name'],
|
||||||
|
resource_limit=5
|
||||||
|
)
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.post(
|
||||||
|
'/v3/limits', json=create, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_create_limits_for_projects_in_domain(self):
|
||||||
|
service = PROVIDERS.catalog_api.create_service(
|
||||||
|
uuid.uuid4().hex, unit.new_service_ref()
|
||||||
|
)
|
||||||
|
|
||||||
|
registered_limit = unit.new_registered_limit_ref(
|
||||||
|
service_id=service['id'], id=uuid.uuid4().hex
|
||||||
|
)
|
||||||
|
registered_limits = (
|
||||||
|
PROVIDERS.unified_limit_api.create_registered_limits(
|
||||||
|
[registered_limit]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
registered_limit = registered_limits[0]
|
||||||
|
|
||||||
|
project = PROVIDERS.resource_api.create_project(
|
||||||
|
uuid.uuid4().hex, unit.new_project_ref(domain_id=self.domain_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
create = {
|
||||||
|
'limits': [
|
||||||
|
unit.new_limit_ref(
|
||||||
|
project_id=project['id'],
|
||||||
|
service_id=service['id'],
|
||||||
|
resource_name=registered_limit['resource_name'],
|
||||||
|
resource_limit=5
|
||||||
|
)
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.post(
|
||||||
|
'/v3/limits', json=create, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_create_limits_for_projects_outside_domain(self):
|
||||||
|
service = PROVIDERS.catalog_api.create_service(
|
||||||
|
uuid.uuid4().hex, unit.new_service_ref()
|
||||||
|
)
|
||||||
|
|
||||||
|
registered_limit = unit.new_registered_limit_ref(
|
||||||
|
service_id=service['id'], id=uuid.uuid4().hex
|
||||||
|
)
|
||||||
|
registered_limits = (
|
||||||
|
PROVIDERS.unified_limit_api.create_registered_limits(
|
||||||
|
[registered_limit]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
registered_limit = registered_limits[0]
|
||||||
|
|
||||||
|
project = PROVIDERS.resource_api.create_project(
|
||||||
|
uuid.uuid4().hex,
|
||||||
|
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
create = {
|
||||||
|
'limits': [
|
||||||
|
unit.new_limit_ref(
|
||||||
|
project_id=project['id'],
|
||||||
|
service_id=service['id'],
|
||||||
|
resource_name=registered_limit['resource_name'],
|
||||||
|
resource_limit=5
|
||||||
|
)
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.post(
|
||||||
|
'/v3/limits', json=create, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_update_limits_for_domain(self):
|
||||||
|
_, domain_limit_id = _create_limits_and_dependencies(
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
update = {'limit': {'resource_limit': 1}}
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.patch(
|
||||||
|
'/v3/limits/%s' % domain_limit_id, json=update,
|
||||||
|
headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_update_limits_for_other_domain(self):
|
||||||
|
_, domain_limit_id = _create_limits_and_dependencies()
|
||||||
|
|
||||||
|
update = {'limit': {'resource_limit': 1}}
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.patch(
|
||||||
|
'/v3/limits/%s' % domain_limit_id, json=update,
|
||||||
|
headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_update_limits_for_projects_in_domain(self):
|
||||||
|
project_limit_id, _ = _create_limits_and_dependencies(
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
update = {'limit': {'resource_limit': 1}}
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.patch(
|
||||||
|
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||||
|
json=update, expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_update_limits_for_projects_outside_domain(self):
|
||||||
|
project_limit_id, _ = _create_limits_and_dependencies()
|
||||||
|
|
||||||
|
update = {'limit': {'resource_limit': 1}}
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.patch(
|
||||||
|
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||||
|
json=update, expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_delete_limits_for_domain(self):
|
||||||
|
_, domain_limit_id = _create_limits_and_dependencies(
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.delete(
|
||||||
|
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_delete_limits_for_other_domain(self):
|
||||||
|
_, domain_limit_id = _create_limits_and_dependencies()
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.delete(
|
||||||
|
'/v3/limits/%s' % domain_limit_id, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_delete_limits_for_projects_in_domain(self):
|
||||||
|
project_limit_id, _ = _create_limits_and_dependencies(
|
||||||
|
domain_id=self.domain_id
|
||||||
|
)
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.delete(
|
||||||
|
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_user_cannot_delete_limits_for_projects_outside_domain(self):
|
||||||
|
project_limit_id, _ = _create_limits_and_dependencies()
|
||||||
|
|
||||||
|
with self.test_client() as c:
|
||||||
|
c.delete(
|
||||||
|
'/v3/limits/%s' % project_limit_id, headers=self.headers,
|
||||||
|
expected_status_code=http_client.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
|
@ -560,6 +560,22 @@ class LimitsTestCase(test_v3.RestfulTestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(LimitsTestCase, self).setUp()
|
super(LimitsTestCase, self).setUp()
|
||||||
|
# FIXME(lbragstad): Remove all this duplicated logic once we get all
|
||||||
|
# keystone tests using bootstrap consistently. This is something the
|
||||||
|
# bootstrap utility already does for us.
|
||||||
|
reader_role = {'id': uuid.uuid4().hex, 'name': 'reader'}
|
||||||
|
reader_role = PROVIDERS.role_api.create_role(
|
||||||
|
reader_role['id'], reader_role
|
||||||
|
)
|
||||||
|
|
||||||
|
member_role = {'id': uuid.uuid4().hex, 'name': 'member'}
|
||||||
|
member_role = PROVIDERS.role_api.create_role(
|
||||||
|
member_role['id'], member_role
|
||||||
|
)
|
||||||
|
PROVIDERS.role_api.create_implied_role(self.role_id, member_role['id'])
|
||||||
|
PROVIDERS.role_api.create_implied_role(
|
||||||
|
member_role['id'], reader_role['id']
|
||||||
|
)
|
||||||
|
|
||||||
# Most of these tests require system-scoped tokens. Let's have one on
|
# Most of these tests require system-scoped tokens. Let's have one on
|
||||||
# hand so that we can use it in tests when we need it.
|
# hand so that we can use it in tests when we need it.
|
||||||
|
@ -927,6 +943,8 @@ class LimitsTestCase(test_v3.RestfulTestCase):
|
||||||
|
|
||||||
def test_list_limit_with_project_id_filter(self):
|
def test_list_limit_with_project_id_filter(self):
|
||||||
# create two limit in different projects for test.
|
# create two limit in different projects for test.
|
||||||
|
self.config_fixture.config(group='oslo_policy',
|
||||||
|
enforce_scope=True)
|
||||||
ref1 = unit.new_limit_ref(project_id=self.project_id,
|
ref1 = unit.new_limit_ref(project_id=self.project_id,
|
||||||
service_id=self.service_id,
|
service_id=self.service_id,
|
||||||
region_id=self.region_id,
|
region_id=self.region_id,
|
||||||
|
@ -955,21 +973,19 @@ class LimitsTestCase(test_v3.RestfulTestCase):
|
||||||
self.assertEqual(1, len(limits))
|
self.assertEqual(1, len(limits))
|
||||||
self.assertEqual(self.project_2_id, limits[0]['project_id'])
|
self.assertEqual(self.project_2_id, limits[0]['project_id'])
|
||||||
|
|
||||||
# if non system scoped request contain project_id filter, keystone
|
# any project user can filter by their own project
|
||||||
# will return an empty list.
|
|
||||||
r = self.get(
|
r = self.get(
|
||||||
'/limits?project_id=%s' % self.project_id,
|
'/limits?project_id=%s' % self.project_id,
|
||||||
expected_status=http_client.OK)
|
expected_status=http_client.OK)
|
||||||
limits = r.result['limits']
|
limits = r.result['limits']
|
||||||
self.assertEqual(0, len(limits))
|
self.assertEqual(1, len(limits))
|
||||||
|
self.assertEqual(self.project_id, limits[0]['project_id'])
|
||||||
|
|
||||||
# a system scoped request can specify the project_id filter
|
# a system scoped request can specify the project_id filter
|
||||||
r = self.get(
|
r = self.get(
|
||||||
'/limits?project_id=%s' % self.project_id,
|
'/limits?project_id=%s' % self.project_id,
|
||||||
expected_status=http_client.OK,
|
expected_status=http_client.OK,
|
||||||
auth=self.build_authentication_request(
|
token=self.system_admin_token
|
||||||
user_id=self.user['id'], password=self.user['password'],
|
|
||||||
system=True)
|
|
||||||
)
|
)
|
||||||
limits = r.result['limits']
|
limits = r.result['limits']
|
||||||
self.assertEqual(1, len(limits))
|
self.assertEqual(1, len(limits))
|
||||||
|
@ -1046,6 +1062,7 @@ class LimitsTestCase(test_v3.RestfulTestCase):
|
||||||
else:
|
else:
|
||||||
id1 = r.result['limits'][1]['id']
|
id1 = r.result['limits'][1]['id']
|
||||||
self.get('/limits/fake_id',
|
self.get('/limits/fake_id',
|
||||||
|
token=self.system_admin_token,
|
||||||
expected_status=http_client.NOT_FOUND)
|
expected_status=http_client.NOT_FOUND)
|
||||||
r = self.get('/limits/%s' % id1,
|
r = self.get('/limits/%s' % id1,
|
||||||
expected_status=http_client.OK)
|
expected_status=http_client.OK)
|
||||||
|
|
Loading…
Reference in New Issue