From 3e948cfab4398c9092d0f738e15fa639ba248135 Mon Sep 17 00:00:00 2001 From: Morgan Fainberg Date: Tue, 17 Jul 2018 13:38:21 -0700 Subject: [PATCH] Move trusts to flask native dispatching Migrate trust APIs from legacy routes/webob to flask native dispatching. Change-Id: I6cba774c7dcf2fc6fdcbdc8f6e80111ccd8036a1 Partial-Bug: #1776504 --- keystone/api/__init__.py | 5 +- keystone/api/trusts.py | 333 ++++++++++++++++++ keystone/server/flask/application.py | 4 +- keystone/server/flask/common.py | 29 +- .../tests/unit/server/test_keystone_flask.py | 5 + keystone/trust/__init__.py | 1 - keystone/trust/controllers.py | 258 -------------- keystone/trust/routers.py | 69 ---- 8 files changed, 367 insertions(+), 337 deletions(-) create mode 100644 keystone/api/trusts.py delete mode 100644 keystone/trust/controllers.py delete mode 100644 keystone/trust/routers.py diff --git a/keystone/api/__init__.py b/keystone/api/__init__.py index d78b91fb70..92a01c2b5a 100644 --- a/keystone/api/__init__.py +++ b/keystone/api/__init__.py @@ -12,6 +12,7 @@ from keystone.api import credentials from keystone.api import discovery +from keystone.api import trusts -__all__ = ('discovery', 'credentials') -__apis__ = (discovery, credentials) +__all__ = ('discovery', 'credentials', 'trusts') +__apis__ = (discovery, credentials, trusts) diff --git a/keystone/api/trusts.py b/keystone/api/trusts.py new file mode 100644 index 0000000000..027efd0d57 --- /dev/null +++ b/keystone/api/trusts.py @@ -0,0 +1,333 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# This file handles all flask-restful resources for /v3/OS-TRUST + +# TODO(morgan): Deprecate /v3/OS-TRUST/trusts path in favour of /v3/trusts. +# /v3/OS-TRUST should remain indefinitely. + +import functools + +import flask +import flask_restful +from six.moves import http_client + +from keystone import assignment +from keystone.common import context +from keystone.common import json_home +from keystone.common import provider_api +from keystone.common import rbac_enforcer +from keystone.common import utils +from keystone.common import validation +from keystone import exception +from keystone.i18n import _ +from keystone.server import flask as ks_flask +from keystone.trust import schema + + +ENFORCER = rbac_enforcer.RBACEnforcer +PROVIDERS = provider_api.ProviderAPIs + +_build_resource_relation = functools.partial( + json_home.build_v3_extension_resource_relation, extension_name='OS-TRUST', + extension_version='1.0') +_build_parameter_relation = functools.partial( + json_home.build_v3_extension_parameter_relation, extension_name='OS-TRUST', + extension_version='1.0') + +TRUST_ID_PARAMETER_RELATION = _build_parameter_relation( + parameter_name='trust_id') + + +def _trustor_trustee_only(trust): + user_id = flask.request.environ.get(context.REQUEST_CONTEXT_ENV).user_id + if user_id not in [trust.get('trustee_user_id'), + trust.get('trustor_user_id')]: + raise exception.ForbiddenAction( + action=_('Requested user has no relation to this trust')) + + +def _normalize_trust_expires_at(trust): + # correct isotime + if trust.get('expires_at') is not None: + trust['expires_at'] = utils.isotime(trust['expires_at'], + subsecond=True) + + +def _normalize_trust_roles(trust): + # fill in role data + trust_full_roles = [] + for trust_role in trust.get('roles', []): + trust_role = trust_role['id'] + try: + matching_role = PROVIDERS.role_api.get_role(trust_role) + # TODO(morgan): Correct the cross-subsystem call here to allow + # for local handling of the role wrapping + full_role = assignment.controllers.RoleV3.wrap_member( + {'environment': flask.request.environ}, + matching_role)['role'] + trust_full_roles.append(full_role) + except exception.RoleNotFound: + pass + + trust['roles'] = trust_full_roles + trust['roles_links'] = { + 'self': ks_flask.base_url() + '/%s/roles' % trust['id'], + 'next': None, + 'previous': None} + + +class TrustResource(ks_flask.ResourceBase): + collection_key = 'trusts' + member_key = 'trust' + api_prefix = '/OS-TRUST' + json_home_resource_rel_func = _build_resource_relation + json_home_parameter_rel_func = _build_parameter_relation + + def _check_unrestricted(self): + token = self.auth_context['token'] + auth_methods = token['methods'] + if 'application_credential' in auth_methods: + td = token.token_data['token'] + if td['application_credential']['restricted']: + action = _("Using method 'application_credential' is not " + "allowed for managing trusts.") + raise exception.ForbiddenAction(action=action) + + def _find_redelegated_trust(self): + # Check if delegated via trust + redelegated_trust = None + if self.oslo_context.is_delegated_auth: + src_trust_id = self.oslo_context.trust_id + if not src_trust_id: + action = _('Redelegation allowed for delegated by trust only') + raise exception.ForbiddenAction(action=action) + redelegated_trust = PROVIDERS.trust_api.get_trust(src_trust_id) + return redelegated_trust + + @staticmethod + def _parse_expiration_date(expiration_date): + if expiration_date is not None: + return utils.parse_expiration_date(expiration_date) + return None + + def _require_trustor_has_role_in_project(self, trust): + trustor_roles = self._get_trustor_roles(trust) + for trust_role in trust['roles']: + matching_roles = [x for x in trustor_roles + if x == trust_role['id']] + if not matching_roles: + raise exception.RoleNotFound(role_id=trust_role['id']) + + def _get_trustor_roles(self, trust): + original_trust = trust.copy() + while original_trust.get('redelegated_trust_id'): + original_trust = PROVIDERS.trust_api.get_trust( + original_trust['redelegated_trust_id']) + + if not ((trust.get('project_id')) in [None, '']): + # Check project exists. + PROVIDERS.resource_api.get_project(trust['project_id']) + # Get a list of roles including any domain specific roles + assignment_list = PROVIDERS.assignment_api.list_role_assignments( + user_id=original_trust['trustor_user_id'], + project_id=original_trust['project_id'], + effective=True, strip_domain_roles=False) + return list({x['role_id'] for x in assignment_list}) + else: + return [] + + def _normalize_role_list(self, trust_roles): + roles = [] + for role in trust_roles: + if role.get('id'): + roles.append({'id': role['id']}) + else: + roles.append( + PROVIDERS.role_api.get_unique_role_by_name(role['name'])) + return roles + + def _get_trust(self, trust_id): + ENFORCER.enforce_call(action='identity:get_trust') + trust = PROVIDERS.trust_api.get_trust(trust_id) + _trustor_trustee_only(trust) + _normalize_trust_expires_at(trust) + _normalize_trust_roles(trust) + return self.wrap_member(trust) + + def _list_trusts(self): + ENFORCER.enforce_call(action='identity:list_trusts') + trusts = [] + trustor_user_id = flask.request.args.get('trustor_user_id') + trustee_user_id = flask.request.args.get('trustee_user_id') + + if not flask.request.args: + # NOTE(morgan): Admin can list all trusts. + ENFORCER.enforce_call(action='admin_required') + trusts += PROVIDERS.trust_api.list_trusts() + + # TODO(morgan): Convert the trustor/trustee checks into policy + # checkstr we can enforce on. This is duplication of code + # behavior/design as the OS-TRUST controller for ease of review/ + # comparison of previous code. Minor optimizations [checks before db + # hits] have been done. + action = _('Cannot list trusts for another user') + if trustor_user_id: + if trustor_user_id != self.oslo_context.user_id: + raise exception.Forbidden(action=action) + + if trustee_user_id: + if trustee_user_id != self.oslo_context.user_id: + raise exception.Forbidden(action=action) + + trusts += PROVIDERS.trust_api.list_trusts_for_trustor(trustor_user_id) + trusts += PROVIDERS.trust_api.list_trusts_for_trustee(trustee_user_id) + + for trust in trusts: + # get_trust returns roles, list_trusts does not + # It seems in some circumstances, roles does not + # exist in the query response, so check first + if 'roles' in trust: + del trust['roles'] + + if trust.get('expires_at') is not None: + trust['expires_at'] = utils.isotime(trust['expires_at'], + subsecond=True) + + return self.wrap_collection(trusts) + + def get(self, trust_id=None): + """Dispatch for GET/HEAD or LIST trusts.""" + if trust_id is not None: + return self._get_trust(trust_id=trust_id) + else: + return self._list_trusts() + + def post(self): + """Create a new trust. + + The User creating the trust must be the trustor. + """ + ENFORCER.enforce_call(action='identity:create_trust') + trust = flask.request.json.get('trust', {}) + validation.lazy_validate(schema.trust_create, trust) + self._check_unrestricted() + + if trust.get('project_id') and not trust.get('roles'): + action = _('At least one role should be specified') + raise exception.ForbiddenAction(action=action) + + if self.oslo_context.user_id != trust.get('trustor_user_id'): + action = _("The authenticated user should match the trustor") + raise exception.ForbiddenAction(action=action) + + # Ensure the trustee exists + PROVIDERS.identity_api.get_user(trust['trustee_user_id']) + + # Normalize roles + trust['roles'] = self._normalize_role_list(trust.get('roles', [])) + self._require_trustor_has_role_in_project(trust) + trust['expires_at'] = self._parse_expiration_date( + trust.get('expires_at')) + trust = self._assign_unique_id(trust) + redelegated_trust = self._find_redelegated_trust() + return_trust = PROVIDERS.trust_api.create_trust( + trust_id=trust['id'], + trust=trust, + roles=trust['roles'], + redelegated_trust=redelegated_trust, + initiator=self.audit_initiator) + _normalize_trust_expires_at(return_trust) + _normalize_trust_roles(return_trust) + return self.wrap_member(return_trust), http_client.CREATED + + def delete(self, trust_id): + ENFORCER.enforce_call(action='identity:delete_trust') + self._check_unrestricted() + + trust = PROVIDERS.trust_api.get_trust(trust_id) + + # TODO(morgan): convert this check to an oslo_policy checkstr that + # can be referenced/enforced on. + if (self.oslo_context.user_id != trust.get('trustor_user_id') and + not self.oslo_context.is_admin): + action = _('Only admin or trustor can delete a trust') + raise exception.ForbiddenAction(action=action) + + PROVIDERS.trust_api.delete_trust(trust_id, + initiator=self.audit_initiator) + return '', http_client.NO_CONTENT + + +# NOTE(morgan): Since this Resource is not being used with the automatic +# URL additions and does not have a collection key/member_key, we use +# the flask-restful Resource, not the keystone ResourceBase +class RolesForTrustListResource(flask_restful.Resource): + def get(self, trust_id): + ENFORCER.enforce_call(action='identity:list_roles_for_trust') + # NOTE(morgan): This duplicates a little of the .get_trust from the + # main resource, as it needs some of the same logic. However, due to + # how flask-restful works, this should be fully encapsulated + trust = PROVIDERS.trust_api.get_trust(trust_id) + _trustor_trustee_only(trust) + _normalize_trust_expires_at(trust) + _normalize_trust_roles(trust) + return {'roles': trust['roles'], + 'links': trust['roles_links']} + + +# NOTE(morgan): Since this Resource is not being used with the automatic +# URL additions and does not have a collection key/member_key, we use +# the flask-restful Resource, not the keystone ResourceBase +class RoleForTrustResource(flask_restful.Resource): + def get(self, trust_id, role_id): + """Get a role that has been assigned to a trust.""" + ENFORCER.enforce_call(action='identity:get_role_for_trust') + trust = PROVIDERS.trust_api.get_trust(trust_id) + _trustor_trustee_only(trust) + if not any(role['id'] == role_id for role in trust['roles']): + raise exception.RoleNotFound(role_id=role_id) + + role = PROVIDERS.role_api.get_role(role_id) + # TODO(morgan): Correct this to allow for local member wrapping of + # RoleV3. + return assignment.controllers.RoleV3.wrap_member( + {'environment': flask.request.environ}, role) + + +class TrustAPI(ks_flask.APIBase): + _name = 'trusts' + _import_name = __name__ + resources = [TrustResource] + resource_mapping = [ + ks_flask.construct_resource_map( + resource=RolesForTrustListResource, + url='/trusts//roles', + resource_kwargs={}, + rel='trust_roles', + path_vars={ + 'trust_id': TRUST_ID_PARAMETER_RELATION}, + resource_relation_func=_build_resource_relation), + ks_flask.construct_resource_map( + resource=RoleForTrustResource, + url='/trusts//roles/', + resource_kwargs={}, + rel='trust_role', + path_vars={ + 'trust_id': TRUST_ID_PARAMETER_RELATION, + 'role_id': json_home.Parameters.ROLE_ID}, + resource_relation_func=_build_resource_relation), + ] + _api_url_prefix = '/OS-TRUST' + + +APIs = (TrustAPI,) diff --git a/keystone/server/flask/application.py b/keystone/server/flask/application.py index 7216c33daf..c76a6c59df 100644 --- a/keystone/server/flask/application.py +++ b/keystone/server/flask/application.py @@ -40,11 +40,10 @@ from keystone.policy import routers as policy_routers from keystone.resource import routers as resource_routers from keystone.revoke import routers as revoke_routers from keystone.token import _simple_cert as simple_cert_ext -from keystone.trust import routers as trust_routers # TODO(morgan): _MOVED_API_PREFIXES to be removed when the legacy dispatch # support is removed. -_MOVED_API_PREFIXES = frozenset(['credentials']) +_MOVED_API_PREFIXES = frozenset(['credentials', 'OS-TRUST']) LOG = log.getLogger(__name__) @@ -56,7 +55,6 @@ ALL_API_ROUTERS = [auth_routers, limit_routers, policy_routers, resource_routers, - trust_routers, revoke_routers, federation_routers, oauth1_routers, diff --git a/keystone/server/flask/common.py b/keystone/server/flask/common.py index 304935f111..09bbaa2278 100644 --- a/keystone/server/flask/common.py +++ b/keystone/server/flask/common.py @@ -30,6 +30,7 @@ from pycadf import resource import six from six.moves import http_client +from keystone.common import authorization from keystone.common import context from keystone.common import driver_hints from keystone.common import json_home @@ -295,15 +296,22 @@ class APIBase(object): for r in self.resources: c_key = getattr(r, 'collection_key', None) m_key = getattr(r, 'member_key', None) + r_pfx = getattr(r, 'api_prefix', None) if not c_key or not m_key: LOG.debug('Unable to add resource %(resource)s to API ' '%(name)s, both `member_key` and `collection_key` ' 'must be implemented. [collection_key(%(col_key)s) ' 'member_key(%(m_key)s)]', - {'resource': r.__class__.view_class.__name__, + {'resource': r.__name__, 'name': self._name, 'col_key': c_key, 'm_key': m_key}) continue + if r_pfx != self._api_url_prefix: + LOG.debug('Unable to add resource %(resource)s to API as the ' + 'API Prefixes do not match: %(apfx)r != %(rpfx)r', + {'resource': r.__name__, + 'rpfx': r_pfx, 'apfx': self._api_url_prefix}) + continue # NOTE(morgan): The Prefix is automatically added by the API, so # we do not add it to the paths here. @@ -311,8 +319,10 @@ class APIBase(object): entity_path = '/%(collection)s/' % { 'collection': c_key, 'member': m_key} # NOTE(morgan): The json-home form of the entity path is different - # from the flask-url routing form. - jh_e_path = _URL_SUBST.sub('{\\1}', entity_path) + # from the flask-url routing form. Must also include the prefix + jh_e_path = '%(pfx)s/%(e_path)s' % { + 'pfx': self._api_url_prefix, + 'e_path': _URL_SUBST.sub('{\\1}', entity_path).lstrip('/')} LOG.debug( 'Adding standard routes to API %(name)s for `%(resource)s` ' @@ -515,6 +525,9 @@ class ResourceBase(flask_restful.Resource): collection_key = None member_key = None + # NOTE(morgan): This must match the string on the API the resource is + # registered to. + api_prefix = '' method_decorators = [] @@ -588,7 +601,11 @@ class ResourceBase(flask_restful.Resource): @classmethod def _add_self_referential_link(cls, ref): - self_link = '/'.join([base_url(), 'v3', cls.collection_key, ref['id']]) + collection_element = cls.collection_key + if cls.api_prefix: + api_prefix = cls.api_prefix.lstrip('/').rstrip('/') + collection_element = '/'.join([api_prefix, cls.collection_key]) + self_link = '/'.join([base_url(), 'v3', collection_element, ref['id']]) ref.setdefault('links', {})['self'] = self_link @classmethod @@ -652,6 +669,10 @@ class ResourceBase(flask_restful.Resource): return refs + @property + def auth_context(self): + return flask.request.environ.get(authorization.AUTH_CONTEXT_ENV, None) + @property def oslo_context(self): return flask.request.environ.get(context.REQUEST_CONTEXT_ENV, None) diff --git a/keystone/tests/unit/server/test_keystone_flask.py b/keystone/tests/unit/server/test_keystone_flask.py index 6260e53a9b..524ce6170c 100644 --- a/keystone/tests/unit/server/test_keystone_flask.py +++ b/keystone/tests/unit/server/test_keystone_flask.py @@ -178,6 +178,11 @@ class TestKeystoneFlaskCommon(rest.RestfulTestCase): def _setup_flask_restful_api(self, **options): self.restful_api_opts = options.copy() + orig_value = _TestResourceWithCollectionInfo.api_prefix + setattr(_TestResourceWithCollectionInfo, + 'api_prefix', options.get('api_url_prefix', '')) + self.addCleanup(setattr, _TestResourceWithCollectionInfo, 'api_prefix', + orig_value) self.restful_api = _TestRestfulAPI(**options) self.public_app.app.register_blueprint(self.restful_api.blueprint) self.cleanup_instance('restful_api') diff --git a/keystone/trust/__init__.py b/keystone/trust/__init__.py index bd7297ea3b..e9af3a5e01 100644 --- a/keystone/trust/__init__.py +++ b/keystone/trust/__init__.py @@ -12,5 +12,4 @@ # License for the specific language governing permissions and limitations # under the License. -from keystone.trust import controllers # noqa from keystone.trust.core import * # noqa diff --git a/keystone/trust/controllers.py b/keystone/trust/controllers.py deleted file mode 100644 index b33cb28e17..0000000000 --- a/keystone/trust/controllers.py +++ /dev/null @@ -1,258 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from keystone import assignment -from keystone.common import controller -from keystone.common import provider_api -from keystone.common import utils -from keystone.common import validation -from keystone import exception -from keystone.i18n import _ -from keystone.trust import schema - - -PROVIDERS = provider_api.ProviderAPIs - - -def _trustor_trustee_only(trust, user_id): - if user_id not in [trust.get('trustee_user_id'), - trust.get('trustor_user_id')]: - raise exception.ForbiddenAction( - action=_('Requested user has no relation to this trust')) - - -class TrustV3(controller.V3Controller): - collection_name = "trusts" - member_name = "trust" - - @classmethod - def base_url(cls, context, path=None): - """Construct a path and pass it to V3Controller.base_url method.""" - # NOTE(stevemar): Overriding path to /OS-TRUST/trusts so that - # V3Controller.base_url handles setting the self link correctly. - path = '/OS-TRUST/' + cls.collection_name - return super(TrustV3, cls).base_url(context, path=path) - - def get_trust(self, request, trust_id): - trust = PROVIDERS.trust_api.get_trust(trust_id) - _trustor_trustee_only(trust, request.context.user_id) - self._fill_in_roles(request.context_dict, trust) - return TrustV3.wrap_member(request.context_dict, trust) - - def _fill_in_roles(self, context, trust): - if trust.get('expires_at') is not None: - trust['expires_at'] = (utils.isotime - (trust['expires_at'], - subsecond=True)) - - trust_full_roles = [] - for trust_role in trust.get('roles', []): - if isinstance(trust_role, dict): - trust_role = trust_role['id'] - try: - matching_role = PROVIDERS.role_api.get_role(trust_role) - full_role = assignment.controllers.RoleV3.wrap_member( - context, matching_role)['role'] - trust_full_roles.append(full_role) - except exception.RoleNotFound: - pass - - trust['roles'] = trust_full_roles - trust['roles_links'] = { - 'self': (self.base_url(context) + "/%s/roles" % trust['id']), - 'next': None, - 'previous': None} - - def _normalize_role_list(self, trust_roles): - roles = [] - for role in trust_roles: - if role.get('id'): - roles.append({'id': role['id']}) - else: - role_api = PROVIDERS.role_api - roles.append(role_api.get_unique_role_by_name(role['name'])) - return roles - - def _find_redelegated_trust(self, request): - # Check if delegated via trust - if request.context.is_delegated_auth: - # Redelegation case - src_trust_id = request.context.trust_id - if not src_trust_id: - action = _('Redelegation allowed for delegated by trust only') - raise exception.ForbiddenAction(action=action) - - redelegated_trust = PROVIDERS.trust_api.get_trust(src_trust_id) - else: - redelegated_trust = None - return redelegated_trust - - def _check_unrestricted(self, token): - auth_methods = token['methods'] - if 'application_credential' in auth_methods: - td = token.token_data['token'] - if td['application_credential']['restricted']: - action = _("Using method 'application_credential' is not " - "allowed for managing trusts.") - raise exception.ForbiddenAction(action=action) - - @controller.protected() - def create_trust(self, request, trust): - """Create a new trust. - - The user creating the trust must be the trustor. - - """ - validation.lazy_validate(schema.trust_create, trust) - - token = request.auth_context['token'] - self._check_unrestricted(token) - - redelegated_trust = self._find_redelegated_trust(request) - - if trust.get('project_id') and not trust.get('roles'): - action = _('At least one role should be specified') - raise exception.ForbiddenAction(action=action) - - # the creating user must be the trustor - if request.context.user_id != trust.get('trustor_user_id'): - action = _("The authenticated user should match the trustor") - raise exception.ForbiddenAction(action=action) - - # ensure trustee exists - PROVIDERS.identity_api.get_user(trust['trustee_user_id']) - - # Normalize roles - normalized_roles = self._normalize_role_list(trust.get('roles', [])) - trust['roles'] = normalized_roles - self._require_trustor_has_role_in_project(trust) - trust['expires_at'] = self._parse_expiration_date( - trust.get('expires_at')) - trust_id = uuid.uuid4().hex - new_trust = PROVIDERS.trust_api.create_trust( - trust_id, - trust, - normalized_roles, - redelegated_trust, - initiator=request.audit_initiator - ) - self._fill_in_roles(request.context_dict, new_trust) - return TrustV3.wrap_member(request.context_dict, new_trust) - - def _get_trustor_roles(self, trust): - original_trust = trust.copy() - while original_trust.get('redelegated_trust_id'): - original_trust = PROVIDERS.trust_api.get_trust( - original_trust['redelegated_trust_id']) - - if not self._attribute_is_empty(trust, 'project_id'): - PROVIDERS.resource_api.get_project(original_trust['project_id']) - # Get a list of roles including any domain specific roles - assignment_list = PROVIDERS.assignment_api.list_role_assignments( - user_id=original_trust['trustor_user_id'], - project_id=original_trust['project_id'], - effective=True, strip_domain_roles=False) - return list(set([x['role_id'] for x in assignment_list])) - else: - return [] - - def _require_trustor_has_role_in_project(self, trust): - trustor_roles = self._get_trustor_roles(trust) - for trust_role in trust['roles']: - matching_roles = [x for x in trustor_roles - if x == trust_role['id']] - if not matching_roles: - raise exception.RoleNotFound(role_id=trust_role['id']) - - def _parse_expiration_date(self, expiration_date): - if expiration_date is None: - return None - return utils.parse_expiration_date(expiration_date) - - @controller.protected() - def list_trusts(self, request): - trusts = [] - trustor_user_id = request.params.get('trustor_user_id') - trustee_user_id = request.params.get('trustee_user_id') - - if not request.params: - self.assert_admin(request) - trusts += PROVIDERS.trust_api.list_trusts() - - action = _('Cannot list trusts for another user') - if trustor_user_id: - if trustor_user_id != request.context.user_id: - raise exception.Forbidden(action=action) - - trusts += PROVIDERS.trust_api.list_trusts_for_trustor( - trustor_user_id - ) - - if trustee_user_id: - if trustee_user_id != request.context.user_id: - raise exception.ForbiddenAction(action=action) - - trusts += PROVIDERS.trust_api.list_trusts_for_trustee( - trustee_user_id - ) - - for trust in trusts: - # get_trust returns roles, list_trusts does not - # It seems in some circumstances, roles does not - # exist in the query response, so check first - if 'roles' in trust: - del trust['roles'] - - if trust.get('expires_at') is not None: - trust['expires_at'] = utils.isotime(trust['expires_at'], - subsecond=True) - - return TrustV3.wrap_collection(request.context_dict, trusts) - - @controller.protected() - def delete_trust(self, request, trust_id): - token = request.auth_context['token'] - self._check_unrestricted(token) - - trust = PROVIDERS.trust_api.get_trust(trust_id) - - if (request.context.user_id != trust.get('trustor_user_id') and - not request.context.is_admin): - action = _('Only admin or trustor can delete a trust') - raise exception.ForbiddenAction(action=action) - - PROVIDERS.trust_api.delete_trust( - trust_id, initiator=request.audit_initiator - ) - - @controller.protected() - def list_roles_for_trust(self, request, trust_id): - trust = self.get_trust(request, trust_id)['trust'] - return {'roles': trust['roles'], - 'links': trust['roles_links']} - - @controller.protected() - def get_role_for_trust(self, request, trust_id, role_id): - """Get a role that has been assigned to a trust.""" - trust = PROVIDERS.trust_api.get_trust(trust_id) - _trustor_trustee_only(trust, request.context.user_id) - - if not any(role['id'] == role_id for role in trust['roles']): - raise exception.RoleNotFound(role_id=role_id) - - role = PROVIDERS.role_api.get_role(role_id) - return assignment.controllers.RoleV3.wrap_member(request.context_dict, - role) diff --git a/keystone/trust/routers.py b/keystone/trust/routers.py deleted file mode 100644 index 5c50ece1a9..0000000000 --- a/keystone/trust/routers.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""WSGI Routers for the Trust service.""" - -import functools - -from keystone.common import json_home -from keystone.common import wsgi -from keystone.trust import controllers - - -_build_resource_relation = functools.partial( - json_home.build_v3_extension_resource_relation, extension_name='OS-TRUST', - extension_version='1.0') - -TRUST_ID_PARAMETER_RELATION = json_home.build_v3_extension_parameter_relation( - 'OS-TRUST', '1.0', 'trust_id') - - -class Routers(wsgi.RoutersBase): - - _path_prefixes = ('OS-TRUST',) - - def append_v3_routers(self, mapper, routers): - trust_controller = controllers.TrustV3() - - self._add_resource( - mapper, trust_controller, - path='/OS-TRUST/trusts', - get_head_action='list_trusts', - post_action='create_trust', - rel=_build_resource_relation(resource_name='trusts')) - self._add_resource( - mapper, trust_controller, - path='/OS-TRUST/trusts/{trust_id}', - get_head_action='get_trust', - delete_action='delete_trust', - rel=_build_resource_relation(resource_name='trust'), - path_vars={ - 'trust_id': TRUST_ID_PARAMETER_RELATION, - }) - self._add_resource( - mapper, trust_controller, - path='/OS-TRUST/trusts/{trust_id}/roles', - get_head_action='list_roles_for_trust', - rel=_build_resource_relation(resource_name='trust_roles'), - path_vars={ - 'trust_id': TRUST_ID_PARAMETER_RELATION, - }) - self._add_resource( - mapper, trust_controller, - path='/OS-TRUST/trusts/{trust_id}/roles/{role_id}', - get_head_action='get_role_for_trust', - rel=_build_resource_relation(resource_name='trust_role'), - path_vars={ - 'trust_id': TRUST_ID_PARAMETER_RELATION, - 'role_id': json_home.Parameters.ROLE_ID, - })