Convert Roles API to flask native dispatching

Convert Roles and Implied Roles (all paths under /v3/roles) to
flask native dispatching. This change does not convert
/v3/role_inferences to flask native dispatching.

Change-Id: I114380e96c6a2b3c167676fa1525e4470560b541
Partial-Bug: #1776504
This commit is contained in:
Morgan Fainberg 2018-08-08 13:12:22 -07:00
parent 56d9c30f8f
commit cfc5a730b7
5 changed files with 337 additions and 279 deletions

View File

@ -20,6 +20,7 @@ from keystone.api import os_revoke
from keystone.api import os_simple_cert
from keystone.api import regions
from keystone.api import registered_limits
from keystone.api import roles
from keystone.api import services
from keystone.api import trusts
@ -34,6 +35,7 @@ __all__ = (
'os_simple_cert',
'regions',
'registered_limits',
'roles',
'services',
'trusts',
)
@ -49,6 +51,7 @@ __apis__ = (
os_simple_cert,
regions,
registered_limits,
roles,
services,
trusts,
)

332
keystone/api/roles.py Normal file
View File

@ -0,0 +1,332 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This file handles all flask-restful resources for /v3/roles
import flask
import flask_restful
from six.moves import http_client
from keystone.assignment import schema
from keystone.common import json_home
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone.common import validation
import keystone.conf
from keystone.server import flask as ks_flask
CONF = keystone.conf.CONF
ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs
class RoleResource(ks_flask.ResourceBase):
collection_key = 'roles'
member_key = 'role'
def __init__(self):
super(RoleResource, self).__init__()
self.get_member_from_driver = PROVIDERS.role_api.get_role
def _is_domain_role(self, role):
return bool(role.get('domain_id'))
def get(self, role_id=None):
"""Get role or list roles.
GET/HEAD /v3/roles
GET/HEAD /v3/roles/{role_id}
"""
if role_id is not None:
return self._get_role(role_id)
return self._list_roles()
def _get_role(self, role_id):
err = None
role = {}
try:
role = PROVIDERS.role_api.get_role(role_id)
except Exception as e: # nosec
# We don't raise out here, we raise out after enforcement, this
# ensures we do not leak role existence. Do nothing yet, process
# enforcement before raising out an error.
err = e
finally:
# NOTE(morgan): There are a couple of cases to be aware of here
# if there is an exception (e is not None), then we are enforcing
# on "get_role" to be safe. If the role is not a "domain_role",
# we are enforcing on "get_role". If the role is "domain_role" we
# are inforcing on "get_domain_role"
if err is not None or not self._is_domain_role(role):
ENFORCER.enforce_call(action='identity:get_role')
if err:
# reraise the error after enforcement if needed.
raise err
else:
ENFORCER.enforce_call(action='identity:get_domain_role',
member_target_type='role',
member_target=role)
return self.wrap_member(role)
def _list_roles(self):
filters = ['name', 'domain_id']
domain_filter = flask.request.args.get('domain_id')
if domain_filter:
ENFORCER.enforce_call(action='identity:list_domain_roles',
filters=filters)
else:
ENFORCER.enforce_call(action='identity:list_roles',
filters=filters)
hints = self.build_driver_hints(filters)
if not domain_filter:
# NOTE(jamielennox): To handle the default case of not domain_id
# defined the role_assignment backend does some hackery to
# distinguish between global and domain scoped roles. This backend
# behaviour relies upon a value of domain_id being set (not just
# defaulting to None). Manually set the filter if its not
# provided.
hints.add_filter('domain_id', None)
refs = PROVIDERS.role_api.list_roles(hints=hints)
return self.wrap_collection(refs)
def post(self):
"""Create role.
POST /v3/roles
"""
role = self.request_body_json.get('role', {})
if self._is_domain_role(role):
ENFORCER.enforce_call(action='identity:create_domain_role')
else:
ENFORCER.enforce_call(action='identity:create_role')
validation.lazy_validate(schema.role_create, role)
if role['name'] == CONF.member_role_name:
# Use the configured member role ID when creating the configured
# member role name. This avoids the potential of creating
# a "member" role with an unexpected ID.
role['id'] = CONF.member_role_id
else:
role = self._assign_unique_id(role)
role = self._normalize_dict(role)
ref = PROVIDERS.role_api.create_role(
role['id'], role, initiator=self.audit_initiator)
return self.wrap_member(ref), http_client.CREATED
def patch(self, role_id):
"""Update role.
PATCH /v3/roles/{role_id}
"""
err = None
role = {}
try:
role = PROVIDERS.role_api.get_role(role_id)
except Exception as e: # nosec
# We don't raise out here, we raise out after enforcement, this
# ensures we do not leak role existence. Do nothing yet, process
# enforcement before raising out an error.
err = e
finally:
if err is not None or not self._is_domain_role(role):
ENFORCER.enforce_call(action='identity:update_role')
if err:
raise err
else:
ENFORCER.enforce_call(action='identity:update_domain_role',
member_target_type='role',
member_target=role)
request_body_role = self.request_body_json.get('role', {})
validation.lazy_validate(schema.role_update, request_body_role)
self._require_matching_id(request_body_role)
ref = PROVIDERS.role_api.update_role(
role_id, request_body_role, initiator=self.audit_initiator)
return self.wrap_member(ref)
def delete(self, role_id):
"""Delete role.
DELETE /v3/roles/{role_id}
"""
err = None
role = {}
try:
role = PROVIDERS.role_api.get_role(role_id)
except Exception as e: # nosec
# We don't raise out here, we raise out after enforcement, this
# ensures we do not leak role existence. Do nothing yet, process
# enforcement before raising out an error.
err = e
finally:
if err is not None or not self._is_domain_role(role):
ENFORCER.enforce_call(action='identity:delete_role')
if err:
raise err
else:
ENFORCER.enforce_call(action='identity:delete_domain_role',
member_target_type='role',
member_target=role)
PROVIDERS.role_api.delete_role(role_id, initiator=self.audit_initiator)
return None, http_client.NO_CONTENT
def _build_enforcement_target_ref():
ref = {}
if flask.request.view_args:
ref['prior_role'] = PROVIDERS.role_api.get_role(
flask.request.view_args.get('prior_role_id'))
if flask.request.view_args.get('implied_role_id'):
ref['implied_role'] = PROVIDERS.role_api.get_role(
flask.request.view_args['implied_role_id'])
return ref
def _build_prior_role_response_data(prior_role_id, prior_role_name):
return {
'id': prior_role_id,
'links': {
'self': ks_flask.base_url(path='/roles/%s' % prior_role_id)
},
'name': prior_role_name}
def _build_implied_role_response_data(implied_role):
return {
'id': implied_role['id'],
'links': {
'self': ks_flask.base_url(
path='/roles/%s' % implied_role['id'])
},
'name': implied_role['name']}
def _role_inference_response(prior_role_id):
prior_role = PROVIDERS.role_api.get_role(prior_role_id)
response = {
'role_inference': {
'prior_role': _build_prior_role_response_data(
prior_role_id, prior_role['name'])}}
return response
class RoleImplicationListResource(flask_restful.Resource):
def get(self, prior_role_id):
"""List Implied Roles.
GET/HEAD /v3/roles/{prior_role_id}/implies
"""
ENFORCER.enforce_call(action='identity:list_implied_roles',
target_attr=_build_enforcement_target_ref())
ref = PROVIDERS.role_api.list_implied_roles(prior_role_id)
implied_ids = [r['implied_role_id'] for r in ref]
response_json = _role_inference_response(prior_role_id)
response_json['role_inference']['implies'] = []
for implied_id in implied_ids:
implied_role = PROVIDERS.role_api.get_role(implied_id)
response_json['role_inference']['implies'].append(
_build_implied_role_response_data(implied_role))
response_json['links'] = {
'self': ks_flask.base_url(
path='/roles/%s/implies' % prior_role_id)}
return response_json
class RoleImplicationResource(flask_restful.Resource):
def head(self, prior_role_id, implied_role_id=None):
# TODO(morgan): deprecate "check_implied_role" policy, as a user must
# have both check_implied_role and get_implied_role to use the head
# action. This enforcement of HEAD is historical for
# consistent policy enforcement behavior even if it is superfluous.
# Alternatively we can keep check_implied_role and reference
# ._get_implied_role instead.
ENFORCER.enforce_call(action='identity:check_implied_role',
target_attr=_build_enforcement_target_ref())
self.get(prior_role_id, implied_role_id)
# NOTE(morgan): Our API here breaks HTTP Spec. This should be evaluated
# for a future fix. This should just return the above "get" however,
# we document and implment this as a NO_CONTENT response. NO_CONTENT
# here is incorrect. It is maintained as is for API contract reasons.
return None, http_client.NO_CONTENT
def get(self, prior_role_id, implied_role_id):
"""Get implied role.
GET/HEAD /v3/roles/{prior_role_id}/implies/{implied_role_id}
"""
ENFORCER.enforce_call(
action='identity:get_implied_role',
target_attr=_build_enforcement_target_ref())
return self._get_implied_role(prior_role_id, implied_role_id)
def _get_implied_role(self, prior_role_id, implied_role_id):
# Isolate this logic so it can be re-used without added enforcement
PROVIDERS.role_api.get_implied_role(
prior_role_id, implied_role_id)
implied_role_ref = PROVIDERS.role_api.get_role(implied_role_id)
response_json = _role_inference_response(prior_role_id)
response_json['role_inference'][
'implies'] = _build_implied_role_response_data(
implied_role_ref)
response_json['links'] = {
'self': ks_flask.base_url(
path='/roles/%(prior)s/implies/%(implies)s' % {
'prior': prior_role_id, 'implies': implied_role_id})}
return response_json
def put(self, prior_role_id, implied_role_id):
"""Create implied role.
PUT /v3/roles/{prior_role_id}/implies/{implied_role_id}
"""
ENFORCER.enforce_call(action='identity:create_implied_role',
target_attr=_build_enforcement_target_ref())
PROVIDERS.role_api.create_implied_role(prior_role_id, implied_role_id)
response_json = self._get_implied_role(prior_role_id, implied_role_id)
return response_json, http_client.CREATED
def delete(self, prior_role_id, implied_role_id):
"""Delete implied role.
DELETE /v3/roles/{prior_role_id}/implies/{implied_role_id}
"""
ENFORCER.enforce_call(action='identity:delete_implied_role',
target_attr=_build_enforcement_target_ref())
PROVIDERS.role_api.delete_implied_role(prior_role_id, implied_role_id)
return None, http_client.NO_CONTENT
class RoleAPI(ks_flask.APIBase):
_name = 'roles'
_import_name = __name__
resources = [RoleResource]
resource_mapping = [
ks_flask.construct_resource_map(
resource=RoleImplicationListResource,
url='/roles/<string:prior_role_id>/implies',
resource_kwargs={},
rel='implied_roles',
path_vars={'prior_role_id': json_home.Parameters.ROLE_ID}),
ks_flask.construct_resource_map(
resource=RoleImplicationResource,
resource_kwargs={},
url=('/roles/<string:prior_role_id>/'
'implies/<string:implied_role_id>'),
rel='implied_role',
path_vars={
'prior_role_id': json_home.Parameters.ROLE_ID,
'implied_role_id': json_home.Parameters.ROLE_ID})
]
APIs = (RoleAPI,)

View File

@ -19,11 +19,8 @@ import functools
from oslo_log import log
from keystone.assignment import schema
from keystone.common import controller
from keystone.common import provider_api
from keystone.common import validation
from keystone.common import wsgi
import keystone.conf
from keystone import exception
from keystone.i18n import _
@ -53,175 +50,6 @@ class ProjectAssignmentV3(controller.V3Controller):
hints=hints)
class RoleV3(controller.V3Controller):
"""The V3 Role CRUD APIs.
To ease complexity (and hence risk) in writing the policy rules for the
role APIs, we create separate policy actions for roles that are domain
specific, as opposed to those that are global. In order to achieve this
each of the role API methods has a wrapper method that checks to see if the
role is global or domain specific.
NOTE (henry-nash): If this separate global vs scoped policy action pattern
becomes repeated for other entities, we should consider encapsulating this
into a specialized router class.
"""
collection_name = 'roles'
member_name = 'role'
def __init__(self):
super(RoleV3, self).__init__()
self.get_member_from_driver = PROVIDERS.role_api.get_role
def _is_domain_role(self, role):
return role.get('domain_id') is not None
def _is_domain_role_target(self, role_id):
try:
role = PROVIDERS.role_api.get_role(role_id)
except exception.RoleNotFound:
# We hide this error since we have not yet carried out a policy
# check - and it maybe that the caller isn't authorized to make
# this call. If so, we want that error to be raised instead.
return None, False
return role, self._is_domain_role(role)
def create_role_wrapper(self, request, role):
if self._is_domain_role(role):
return self.create_domain_role(request, role=role)
else:
return self.create_role(request, role=role)
@controller.protected()
def create_role(self, request, role):
validation.lazy_validate(schema.role_create, role)
return self._create_role(request, role)
@controller.protected()
def create_domain_role(self, request, role):
validation.lazy_validate(schema.role_create, role)
return self._create_role(request, role)
def list_roles_wrapper(self, request):
if request.params.get('domain_id'):
return self.list_domain_roles(request)
else:
return self.list_roles(request)
@controller.filterprotected('name', 'domain_id')
def list_roles(self, request, filters):
return self._list_roles(request, filters)
@controller.filterprotected('name', 'domain_id')
def list_domain_roles(self, request, filters):
return self._list_roles(request, filters)
def get_role_wrapper(self, request, role_id):
role, is_domain_role = self._is_domain_role_target(role_id)
if is_domain_role:
return self.get_domain_role(request, role_id=role_id, role=role)
else:
return self.get_role(request, role_id=role_id, role=role)
@controller.protected()
def get_role(self, request, role_id, role):
if not role:
raise exception.RoleNotFound(role_id=role_id)
return RoleV3.wrap_member(request.context_dict, role)
@controller.protected()
def get_domain_role(self, request, role_id, role):
if not role:
raise exception.RoleNotFound(role_id=role_id)
return RoleV3.wrap_member(request.context_dict, role)
def update_role_wrapper(self, request, role_id, role):
# Since we don't allow you change whether a role is global or domain
# specific, we can ignore the new update attributes and just look at
# the existing role.
_, is_domain_role = self._is_domain_role_target(role_id)
if is_domain_role:
return self.update_domain_role(
request, role_id=role_id, role=role)
else:
return self.update_role(request, role_id=role_id, role=role)
@controller.protected()
def update_role(self, request, role_id, role):
validation.lazy_validate(schema.role_update, role)
return self._update_role(request, role_id, role)
@controller.protected()
def update_domain_role(self, request, role_id, role):
validation.lazy_validate(schema.role_update, role)
return self._update_role(request, role_id, role)
def delete_role_wrapper(self, request, role_id):
_, is_domain_role = self._is_domain_role_target(role_id)
if is_domain_role:
return self.delete_domain_role(request, role_id=role_id)
else:
return self.delete_role(request, role_id=role_id)
@controller.protected()
def delete_role(self, request, role_id):
return self._delete_role(request, role_id)
@controller.protected()
def delete_domain_role(self, request, role_id):
return self._delete_role(request, role_id)
def _create_role(self, request, role):
if role['name'] == CONF.member_role_name:
# Use the configured member role ID when creating the configured
# member role name. This avoids the potential of creating a
# "member" role with an unexpected ID.
role['id'] = CONF.member_role_id
else:
role = self._assign_unique_id(role)
ref = self._normalize_dict(role)
ref = PROVIDERS.role_api.create_role(
ref['id'],
ref,
initiator=request.audit_initiator)
return RoleV3.wrap_member(request.context_dict, ref)
def _list_roles(self, request, filters):
hints = RoleV3.build_driver_hints(request, filters)
refs = PROVIDERS.role_api.list_roles(hints=hints)
return RoleV3.wrap_collection(request.context_dict, refs, hints=hints)
def _update_role(self, request, role_id, role):
self._require_matching_id(role_id, role)
ref = PROVIDERS.role_api.update_role(
role_id, role, initiator=request.audit_initiator
)
return RoleV3.wrap_member(request.context_dict, ref)
def _delete_role(self, request, role_id):
PROVIDERS.role_api.delete_role(role_id,
initiator=request.audit_initiator)
@classmethod
def build_driver_hints(cls, request, supported_filters):
# NOTE(jamielennox): To handle the default case of no domain_id defined
# the role_assignment backend does some hackery to distinguish between
# global and domain scoped roles. This backend behaviour relies upon a
# value of domain_id being set (not just defaulting to None). Manually
# set the empty filter if its not provided.
hints = super(RoleV3, cls).build_driver_hints(request,
supported_filters)
if not request.params.get('domain_id'):
hints.add_filter('domain_id', None)
return hints
class ImpliedRolesV3(controller.V3Controller):
"""The V3 ImpliedRoles CRD APIs. There is no Update."""
@ -254,83 +82,6 @@ class ImpliedRolesV3(controller.V3Controller):
}
return implied_response
def _populate_prior_role_response(self, endpoint, prior_id):
prior_role = PROVIDERS.role_api.get_role(prior_id)
response = {
"role_inference": {
"prior_role": self._prior_role_stanza(
endpoint, prior_id, prior_role['name'])
}
}
return response
def _populate_implied_roles_response(self, endpoint,
prior_id, implied_ids):
response = self._populate_prior_role_response(endpoint, prior_id)
response["role_inference"]['implies'] = []
for implied_id in implied_ids:
implied_role = PROVIDERS.role_api.get_role(implied_id)
implied_response = self._implied_role_stanza(
endpoint, implied_role)
response["role_inference"]['implies'].append(implied_response)
response["links"] = {
"self": endpoint + "/v3/roles/" + prior_id + "/implies"
}
return response
def _populate_implied_role_response(self, endpoint, prior_id, implied_id):
response = self._populate_prior_role_response(endpoint, prior_id)
implied_role = PROVIDERS.role_api.get_role(implied_id)
stanza = self._implied_role_stanza(endpoint, implied_role)
response["role_inference"]['implies'] = stanza
response["links"] = {
"self": (endpoint + "/v3/roles/" + prior_id
+ "/implies/" + implied_id)
}
return response
@controller.protected(callback=_check_implies_role)
def get_implied_role(self, request, prior_role_id, implied_role_id):
ref = PROVIDERS.role_api.get_implied_role(prior_role_id,
implied_role_id)
prior_id = ref['prior_role_id']
implied_id = ref['implied_role_id']
endpoint = super(controller.V3Controller, ImpliedRolesV3).base_url(
request.context_dict, 'public')
response = self._populate_implied_role_response(
endpoint, prior_id, implied_id)
return response
@controller.protected(callback=_check_implies_role)
def check_implied_role(self, request, prior_role_id, implied_role_id):
PROVIDERS.role_api.get_implied_role(prior_role_id, implied_role_id)
@controller.protected(callback=_check_implies_role)
def create_implied_role(self, request, prior_role_id, implied_role_id):
PROVIDERS.role_api.create_implied_role(prior_role_id, implied_role_id)
return wsgi.render_response(
self.get_implied_role(request,
prior_role_id,
implied_role_id),
status=(201, 'Created'))
@controller.protected(callback=_check_implies_role)
def delete_implied_role(self, request, prior_role_id, implied_role_id):
PROVIDERS.role_api.delete_implied_role(prior_role_id, implied_role_id)
@controller.protected(callback=_check_implies_role)
def list_implied_roles(self, request, prior_role_id):
ref = PROVIDERS.role_api.list_implied_roles(prior_role_id)
implied_ids = [r['implied_role_id'] for r in ref]
endpoint = super(controller.V3Controller, ImpliedRolesV3).base_url(
request.context_dict, 'public')
results = self._populate_implied_roles_response(
endpoint, prior_role_id, implied_ids)
return results
@controller.protected()
def list_role_inference_rules(self, request):
refs = PROVIDERS.role_api.list_role_inference_rules()

View File

@ -19,7 +19,6 @@ import functools
from keystone.assignment import controllers
from keystone.common import json_home
from keystone.common import router
from keystone.common import wsgi
@ -39,7 +38,7 @@ class Public(wsgi.ComposableRouter):
class Routers(wsgi.RoutersBase):
_path_prefixes = ('users', 'roles', 'role_inferences', 'projects',
_path_prefixes = ('users', 'role_inferences', 'projects',
'domains', 'system', 'role_assignments', 'OS-INHERIT')
def append_v3_routers(self, mapper, routers):
@ -54,35 +53,7 @@ class Routers(wsgi.RoutersBase):
'user_id': json_home.Parameters.USER_ID,
})
routers.append(
router.Router(controllers.RoleV3(), 'roles', 'role',
resource_descriptions=self.v3_resources,
method_template='%s_wrapper'))
implied_roles_controller = controllers.ImpliedRolesV3()
self._add_resource(
mapper, implied_roles_controller,
path='/roles/{prior_role_id}/implies',
rel=json_home.build_v3_resource_relation('implied_roles'),
get_head_action='list_implied_roles',
path_vars={
'prior_role_id': json_home.Parameters.ROLE_ID,
}
)
self._add_resource(
mapper, implied_roles_controller,
path='/roles/{prior_role_id}/implies/{implied_role_id}',
put_action='create_implied_role',
delete_action='delete_implied_role',
head_action='check_implied_role',
get_action='get_implied_role',
rel=json_home.build_v3_resource_relation('implied_role'),
path_vars={
'prior_role_id': json_home.Parameters.ROLE_ID,
'implied_role_id': json_home.Parameters.ROLE_ID
}
)
self._add_resource(
mapper, implied_roles_controller,
path='/role_inferences',

View File

@ -50,6 +50,7 @@ _MOVED_API_PREFIXES = frozenset(
'limits',
'regions',
'registered_limits',
'roles',
'services',
]
)