Implement base for new RBAC Enforcer
Create the base implementation of the RBAC enforcer with compat code for the legacy mechanism via @protected decorators. Change-Id: I80662d9b23e706b720d56670cb849318e951a3b4 Parital-Bug: #1776504
This commit is contained in:
parent
1bcc8a11c8
commit
bb3b15bbf0
|
@ -1,81 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_policy import policy as common_policy
|
||||
|
||||
from keystone.common import policies
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
|
||||
|
||||
_ENFORCER = None
|
||||
|
||||
|
||||
def reset():
|
||||
global _ENFORCER
|
||||
_ENFORCER = None
|
||||
|
||||
|
||||
def init():
|
||||
global _ENFORCER
|
||||
if not _ENFORCER:
|
||||
_ENFORCER = common_policy.Enforcer(CONF)
|
||||
register_rules(_ENFORCER)
|
||||
|
||||
|
||||
def get_enforcer():
|
||||
# Here we pass an empty list of arguments because there aren't any
|
||||
# arguments that oslo.config or oslo.policy shouldn't already understand
|
||||
# from the CONF object. This makes things easier here because we don't have
|
||||
# to parse arguments passed in from the command line and remove unexpected
|
||||
# arguments before building a Config object.
|
||||
CONF([], project='keystone')
|
||||
init()
|
||||
return _ENFORCER
|
||||
|
||||
|
||||
def enforce(credentials, action, target, do_raise=True):
|
||||
"""Verify that the action is valid on the target in this context.
|
||||
|
||||
:param credentials: user credentials
|
||||
:param action: string representing the action to be checked, which should
|
||||
be colon separated for clarity.
|
||||
:param target: dictionary representing the object of the action for object
|
||||
creation this should be a dictionary representing the
|
||||
location of the object e.g. {'project_id':
|
||||
object.project_id}
|
||||
:raises keystone.exception.Forbidden: If verification fails.
|
||||
|
||||
Actions should be colon separated for clarity. For example:
|
||||
|
||||
* identity:list_users
|
||||
|
||||
"""
|
||||
init()
|
||||
|
||||
# Add the exception arguments if asked to do a raise
|
||||
extra = {}
|
||||
if do_raise:
|
||||
extra.update(exc=exception.ForbiddenAction, action=action,
|
||||
do_raise=do_raise)
|
||||
|
||||
try:
|
||||
return _ENFORCER.enforce(action, target, credentials, **extra)
|
||||
except common_policy.InvalidScope:
|
||||
raise exception.ForbiddenAction(action=action)
|
||||
|
||||
|
||||
def register_rules(enforcer):
|
||||
enforcer.register_defaults(policies.list_rules())
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from keystone.common.rbac_enforcer import enforcer
|
||||
|
||||
|
||||
__all__ = ('enforcer',)
|
|
@ -0,0 +1,378 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
|
||||
import flask
|
||||
from oslo_log import log
|
||||
from oslo_policy import policy as common_policy
|
||||
from oslo_utils import strutils
|
||||
|
||||
from keystone.common import authorization
|
||||
from keystone.common import context
|
||||
from keystone.common import policies
|
||||
from keystone.common import provider_api
|
||||
from keystone.common import utils
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone.i18n import _
|
||||
from keystone.models import token_model
|
||||
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
PROVIDER_APIS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
_POSSIBLE_TARGET_ACTIONS = frozenset([
|
||||
rule.name for
|
||||
rule in policies.list_rules() if not rule.deprecated_for_removal
|
||||
])
|
||||
|
||||
|
||||
class RBACEnforcer(object):
|
||||
"""Enforce RBAC on API calls."""
|
||||
|
||||
__shared_state__ = {}
|
||||
__ENFORCER = None
|
||||
ACTION_STORE_ATTR = 'keystone:RBAC:action_name'
|
||||
|
||||
def __init__(self):
|
||||
# NOTE(morgan): All Enforcer Instances use the same shared state;
|
||||
# BORG pattern.
|
||||
self.__dict__ = self.__shared_state__
|
||||
|
||||
def _enforce(self, credentials, action, target, do_raise=True):
|
||||
"""Verify that the action is valid on the target in this context.
|
||||
|
||||
This method is for cases that exceed the base enforcer
|
||||
functionality (notably for compatibility with `@protected` style
|
||||
decorators.
|
||||
|
||||
:param credentials: user credentials
|
||||
:param action: string representing the action to be checked, which
|
||||
should be colon separated for clarity.
|
||||
:param target: dictionary representing the object of the action for
|
||||
object creation this should be a dictionary
|
||||
representing the location of the object e.g.
|
||||
{'project_id': object.project_id}
|
||||
:raises keystone.exception.Forbidden: If verification fails.
|
||||
|
||||
Actions should be colon separated for clarity. For example:
|
||||
|
||||
* identity:list_users
|
||||
"""
|
||||
# Add the exception arguments if asked to do a raise
|
||||
extra = {}
|
||||
if do_raise:
|
||||
extra.update(exc=exception.ForbiddenAction, action=action,
|
||||
do_raise=do_raise)
|
||||
|
||||
try:
|
||||
return self._enforcer.enforce(
|
||||
rule=action, target=target, creds=credentials, **extra)
|
||||
except common_policy.InvalidScope:
|
||||
raise exception.ForbiddenAction(action=action)
|
||||
|
||||
def _reset(self):
|
||||
# NOTE(morgan): Used for TEST purposes only.
|
||||
self.__ENFORCER = None
|
||||
|
||||
@property
|
||||
def _enforcer(self):
|
||||
# The raw oslo-policy enforcer object
|
||||
if self.__ENFORCER is None:
|
||||
self.__ENFORCER = common_policy.Enforcer(CONF)
|
||||
self.register_rules(self.__ENFORCER)
|
||||
return self.__ENFORCER
|
||||
|
||||
@staticmethod
|
||||
def _extract_filter_values(filters):
|
||||
"""Extract filter data from query params for RBAC enforcement."""
|
||||
filters = filters or []
|
||||
target = {i: flask.request.args[i] for
|
||||
i in filters if i in flask.request.args}
|
||||
if target:
|
||||
if LOG.logger.getEffectiveLevel() <= log.DEBUG:
|
||||
LOG.debug(
|
||||
'RBAC: Adding query filter params (%s)',
|
||||
', '.join(['%s=%s' % (k, v) for k, v in target.items()]))
|
||||
return target
|
||||
|
||||
@staticmethod
|
||||
def _extract_member_target_data(member_target_type, member_target):
|
||||
"""Build some useful target data.
|
||||
|
||||
:param member_target_type: what type of target, e.g. 'user'
|
||||
:type member_target_type: str or None
|
||||
:param member_target: reference of the target data
|
||||
:type member_target: dict or None
|
||||
:returns: constructed target dict or empty dict
|
||||
:rtype: dict
|
||||
"""
|
||||
ret_dict = {}
|
||||
if ((member_target is not None and member_target_type is None) or
|
||||
(member_target is None and member_target_type is not None)):
|
||||
LOG.warning('RBAC: Unknown target type or target reference. '
|
||||
'Rejecting as unauthorized. '
|
||||
'(member_target_type=%(target_type)r, '
|
||||
'member_target=%(target_ref)r)',
|
||||
{'target_type': member_target_type,
|
||||
'target_ref': member_target})
|
||||
# Fast exit.
|
||||
return ret_dict
|
||||
|
||||
if member_target is not None and member_target_type is not None:
|
||||
ret_dict['target'] = {member_target_type: member_target}
|
||||
else:
|
||||
# Try and do some magic loading based upon the resource we've
|
||||
# matched in our route. This is mostly so we can have a level of
|
||||
# automatic pulling in the resource; strictly for some added
|
||||
# DRY capabilities. In an ideal world the target is always passed
|
||||
# in explicitly.
|
||||
if flask.request.endpoint:
|
||||
# This only works for cases of Flask-RESTful, or carefully
|
||||
# crafted endpoints that live on a class. Ultimately, there
|
||||
# should be more protection against something wonky
|
||||
# here.
|
||||
resource = flask.current_app.view_functions[
|
||||
flask.request.endpoint].view_class
|
||||
member_name = getattr(resource, 'member_name', None)
|
||||
func = getattr(
|
||||
resource, 'get_member_from_driver', None)
|
||||
if member_name is not None and callable(func):
|
||||
key = '%s_id' % member_name
|
||||
if key in (flask.request.view_args or {}):
|
||||
# NOTE(morgan): For most correct setup, instantiate the
|
||||
# view_class. There is no current support for passing
|
||||
# extra args to the constructor of the view_class like
|
||||
# .as_view() method would actually do. In this case
|
||||
# perform a simple instantiation to represent the
|
||||
# `self` pass to the unbound method.
|
||||
#
|
||||
# TODO(morgan): add (future) support for passing class
|
||||
# instantiation args.
|
||||
ret_dict['target'] = {
|
||||
member_name: func(
|
||||
resource(),
|
||||
flask.request.view_args[key])[member_name]}
|
||||
return ret_dict
|
||||
|
||||
@staticmethod
|
||||
def _extract_policy_check_credentials():
|
||||
# Pull out the auth context
|
||||
return flask.request.environ.get(authorization.AUTH_CONTEXT_ENV, {})
|
||||
|
||||
@classmethod
|
||||
def _extract_subject_token_target_data(cls):
|
||||
ret_dict = {}
|
||||
window_seconds = 0
|
||||
# NOTE(morgan): Populate the subject token data into
|
||||
# the policy dict at "target.token". In all liklyhood
|
||||
# it is un-interesting to populate this data outside
|
||||
# of the auth paths.
|
||||
target = 'token'
|
||||
subject_token = flask.request.headers.get('X-Subject-Token')
|
||||
if subject_token is not None:
|
||||
allow_expired = (strutils.bool_from_string(
|
||||
flask.request.args.get('allow_expired', False),
|
||||
default=False))
|
||||
if allow_expired:
|
||||
window_seconds = CONF.token.allow_expired_window
|
||||
token_ref = token_model.KeystoneToken(
|
||||
token_id=subject_token,
|
||||
token_data=PROVIDER_APIS.token_provider_api.validate_token(
|
||||
subject_token,
|
||||
window_seconds=window_seconds))
|
||||
# TODO(morgan): Expand extracted data from the subject token.
|
||||
ret_dict[target] = {}
|
||||
ret_dict[target]['user_id'] = token_ref.user_id
|
||||
try:
|
||||
user_domain_id = token_ref.user_domain_id
|
||||
except exception.UnexpectedError:
|
||||
user_domain_id = None
|
||||
if user_domain_id:
|
||||
ret_dict[target].setdefault('user', {})
|
||||
ret_dict[target]['user'].setdefault('domain', {})
|
||||
ret_dict[target]['user']['domain']['id'] = user_domain_id
|
||||
return ret_dict
|
||||
|
||||
@staticmethod
|
||||
def _get_oslo_req_context():
|
||||
return flask.request.environ.get(context.REQUEST_CONTEXT_ENV, None)
|
||||
|
||||
@classmethod
|
||||
def _assert_is_authenticated(cls):
|
||||
ctx = cls._get_oslo_req_context()
|
||||
if ctx is None:
|
||||
LOG.warning('RBAC: Error reading the request context generated by '
|
||||
'the Auth Middleware (there is no context). Rejecting '
|
||||
'request as unauthorized.')
|
||||
raise exception.Unauthorized(
|
||||
_('Internal error processing authentication and '
|
||||
'authorization.'))
|
||||
if not ctx.authenticated:
|
||||
raise exception.Unauthorized(
|
||||
_('auth_context did not decode anything useful'))
|
||||
|
||||
@classmethod
|
||||
def _shared_admin_auth_token_set(cls):
|
||||
ctx = cls._get_oslo_req_context()
|
||||
return getattr(ctx, 'is_admin', False)
|
||||
|
||||
@classmethod
|
||||
def enforce_call(cls, enforcer=None, action=None, target_attr=None,
|
||||
member_target_type=None, member_target=None,
|
||||
filters=None):
|
||||
"""Enforce RBAC on the current request.
|
||||
|
||||
This will do some legwork and then instantiate the Enforcer if an
|
||||
enforcer is not passed in.
|
||||
|
||||
:param enforcer: A pre-instantiated Enforcer object (optional)
|
||||
:type enforcer: :class:`RBACEnforcer`
|
||||
:param action: the name of the rule/policy enforcement to be checked
|
||||
against, e.g. `identity:get_user` (optional may be
|
||||
replaced by decorating the method/function with
|
||||
`policy_enforcer_action`.
|
||||
:type action: str
|
||||
:param target_attr: complete override of the target data. This will
|
||||
replace all other generated target data meaning
|
||||
`member_target_type` and `member_target` are
|
||||
ignored. This will also prevent extraction of
|
||||
data from the X-Subject-Token. The `target` dict
|
||||
should contain a series of key-value pairs such
|
||||
as `{'user': user_ref_dict}`.
|
||||
:type target_attr: dict
|
||||
:param member_target_type: the type of the target, e.g. 'user'. Both
|
||||
this and `member_target` must be passed if
|
||||
either is passed.
|
||||
:type member_target_type: str
|
||||
:param member_target: the (dict form) reference of the member object.
|
||||
Both this and `member_target_type` must be passed
|
||||
if either is passed.
|
||||
:type member_target: dict
|
||||
:param filters: A variable number of optional string filters, these are
|
||||
used to extract values from the query params. The
|
||||
filters are added to the reques data that is passed to
|
||||
the enforcer and may be used to determine policy
|
||||
action. In practice these are mainly supplied in the
|
||||
various "list" APIs and are un-used in the default
|
||||
supplied policies.
|
||||
:type filters: iterable
|
||||
"""
|
||||
# NOTE(morgan) everything in the policy_dict may be used by the policy
|
||||
# DSL to action on RBAC and request information/response data.
|
||||
policy_dict = {}
|
||||
|
||||
# If "action" has not explicitly been overridden, see if it is set in
|
||||
# Flask.g app-context (per-request thread local) meaning the
|
||||
# @policy_enforcer_action decorator was used.
|
||||
action = action or getattr(flask.g, cls.ACTION_STORE_ATTR, None)
|
||||
if action not in _POSSIBLE_TARGET_ACTIONS:
|
||||
LOG.warning('RBAC: Unknown/No enforcement action name. Rejecting '
|
||||
'as unauthorized, this is a programming error and a '
|
||||
'bug should be filed with as much information about '
|
||||
'the request that caused this as possible.')
|
||||
raise exception.Unauthorized(
|
||||
message=_(
|
||||
'Internal RBAC enforcement error, no rule/action name to '
|
||||
'lookup'))
|
||||
|
||||
# Assert we are actually authenticated
|
||||
cls._assert_is_authenticated()
|
||||
|
||||
# Check if "is_admin", this is in support of the old "admin auth token"
|
||||
# middleware with a shared "admin" token for auth
|
||||
if cls._shared_admin_auth_token_set():
|
||||
LOG.warning('RBAC: Bypassing authorization')
|
||||
return
|
||||
|
||||
# NOTE(morgan): !!! ORDER OF THESE OPERATIONS IS IMPORTANT !!!
|
||||
# The lowest priority values are set first and the highest priority
|
||||
# values are set last.
|
||||
|
||||
# Get the Target Data Set.
|
||||
if target_attr is None:
|
||||
policy_dict.update(cls._extract_member_target_data(
|
||||
member_target_type, member_target))
|
||||
|
||||
# Special Case, extract and add subject_token data.
|
||||
subj_token_target_data = cls._extract_subject_token_target_data()
|
||||
if subj_token_target_data:
|
||||
policy_dict.setdefault('target', {}).update(
|
||||
subj_token_target_data)
|
||||
else:
|
||||
policy_dict['target'] = target_attr
|
||||
|
||||
# Pull the data from the view args (path based params) to generate
|
||||
# appropriate input/target attributes, we take an explicit copy here
|
||||
# to ensure we're not somehow corrupting
|
||||
policy_dict.update(flask.request.view_args or {})
|
||||
|
||||
# Generate the filter_attr dataset.
|
||||
policy_dict.update(cls._extract_filter_values(filters))
|
||||
|
||||
# Extract the cred data
|
||||
creds = cls._extract_policy_check_credentials()
|
||||
flattened = utils.flatten_dict(policy_dict)
|
||||
if LOG.logger.getEffectiveLevel() <= log.DEBUG:
|
||||
# LOG the Args
|
||||
args_str = ', '.join(
|
||||
['%s=%s' % (k, v) for
|
||||
k, v in (flask.request.view_args or {}).items()])
|
||||
args_str = strutils.mask_password(args_str)
|
||||
LOG.debug('RBAC: Authorizing `%(action)s(%(args)s)`',
|
||||
{'action': action, 'args': args_str})
|
||||
|
||||
# LOG the Cred Data
|
||||
cred_str = ', '.join(
|
||||
['%s=%s' % (k, v) for k, v in creds.items()])
|
||||
cred_str = strutils.mask_password(cred_str)
|
||||
LOG.debug('RBAC: Policy Enforcement Cred Data '
|
||||
'`%(action)s creds(%(cred_str)s)`',
|
||||
{'action': action, 'cred_str': cred_str})
|
||||
|
||||
# Log the Target Data
|
||||
target_str = ', '.join(
|
||||
['%s=%s' % (k, v) for k, v in flattened.items()])
|
||||
target_str = strutils.mask_password(target_str)
|
||||
LOG.debug('RBAC: Policy Enforcement Target Data '
|
||||
'`%(action)s => target(%(target_str)s)`',
|
||||
{'action': action, 'target_str': target_str})
|
||||
|
||||
# Instantiate the enforcer object if needed.
|
||||
enforcer_obj = enforcer or cls()
|
||||
enforcer_obj._enforce(
|
||||
credentials=creds, action=action, target=flattened)
|
||||
LOG.debug('RBAC: Authorization granted')
|
||||
|
||||
@classmethod
|
||||
def policy_enforcer_action(cls, action):
|
||||
"""Decorator to set policy enforcement action name."""
|
||||
if action not in _POSSIBLE_TARGET_ACTIONS:
|
||||
raise ValueError('PROGRAMMING ERROR: Action must reference a '
|
||||
'valid Keystone policy enforcement name.')
|
||||
|
||||
def wrapper(f):
|
||||
@functools.wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
# Set the action in g on a known attr so we can reference it
|
||||
# later.
|
||||
setattr(flask.g, cls.ACTION_STORE_ATTR, action)
|
||||
return f(*args, **kwargs)
|
||||
return inner
|
||||
return wrapper
|
||||
|
||||
@staticmethod
|
||||
def register_rules(enforcer):
|
||||
enforcer.register_defaults(policies.list_rules())
|
|
@ -0,0 +1,47 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# NOTE(morgan): This entire module is to provide compatibility for the old
|
||||
# @protected style decorator enforcement. All new enforcement should directly
|
||||
# reference the Enforcer object itself.
|
||||
from keystone.common.rbac_enforcer import enforcer
|
||||
from keystone import conf
|
||||
|
||||
|
||||
CONF = conf.CONF
|
||||
|
||||
|
||||
# NOTE(morgan): Shared-state enforcer object
|
||||
_ENFORCER = enforcer.RBACEnforcer()
|
||||
|
||||
|
||||
def reset():
|
||||
_ENFORCER._reset()
|
||||
|
||||
|
||||
def get_enforcer():
|
||||
"""Entrypoint that must return the raw oslo.policy enforcer obj.
|
||||
|
||||
This is utilized by the command-line policy tools.
|
||||
|
||||
:returns: :class:`oslo_policy.policy.Enforcer`
|
||||
"""
|
||||
# Here we pass an empty list of arguments because there aren't any
|
||||
# arguments that oslo.config or oslo.policy shouldn't already understand
|
||||
# from the CONF object. This makes things easier here because we don't have
|
||||
# to parse arguments passed in from the command line and remove unexpected
|
||||
# arguments before building a Config object.
|
||||
CONF([], project='keystone')
|
||||
return _ENFORCER._enforcer
|
||||
|
||||
|
||||
enforce = _ENFORCER._enforce
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
from oslo_log import log
|
||||
|
||||
from keystone.common import policy
|
||||
from keystone.common.rbac_enforcer import policy
|
||||
from keystone import exception
|
||||
from keystone.policy.backends import base
|
||||
|
||||
|
|
|
@ -12,8 +12,10 @@
|
|||
|
||||
import abc
|
||||
import collections
|
||||
import functools
|
||||
|
||||
from flask import blueprints
|
||||
from flask import g
|
||||
import flask_restful
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
@ -23,6 +25,19 @@ LOG = log.getLogger(__name__)
|
|||
ResourceMap = collections.namedtuple('resource_map', 'resource, urls, kwargs')
|
||||
|
||||
|
||||
_ENFORCEMENT_CHECK_ATTR = 'keystone:RBAC:enforcement_called'
|
||||
|
||||
|
||||
def _initialize_rbac_enforcement_check():
|
||||
setattr(g, _ENFORCEMENT_CHECK_ATTR, False)
|
||||
|
||||
|
||||
def _assert_rbac_enforcement_called():
|
||||
# assert is intended to be used to ensure code during development works
|
||||
# as expected, it is fine to be optimized out with `python -O`
|
||||
assert getattr(g, _ENFORCEMENT_CHECK_ATTR, False) # nosec
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class APIBase(object):
|
||||
|
||||
|
@ -119,7 +134,7 @@ class APIBase(object):
|
|||
'Adding resource routes to API %(name)s: '
|
||||
'[%(urls)r %(kwargs)r]',
|
||||
{'name': self._name, 'urls': r.urls, 'kwargs': r.kwargs})
|
||||
self._blueprint.add_resource(r.resource, *r.urls, **r.kwargs)
|
||||
self.blueprint.add_resource(r.resource, *r.urls, **r.kwargs)
|
||||
|
||||
def _register_before_request_functions(self, functions=None):
|
||||
"""Register functions to be executed in the `before request` phase.
|
||||
|
@ -145,6 +160,7 @@ class APIBase(object):
|
|||
assert not self.__before_request_functions_added, msg # nosec
|
||||
# register global before request functions
|
||||
# e.g. self.__blueprint.before_request(function)
|
||||
self.__blueprint.before_request(_initialize_rbac_enforcement_check)
|
||||
|
||||
# Add passed-in functions
|
||||
for f in functions:
|
||||
|
@ -175,12 +191,31 @@ class APIBase(object):
|
|||
assert not self.__after_request_functions_added, msg # nosec
|
||||
# register global after request functions
|
||||
# e.g. self.__blueprint.after_request(function)
|
||||
self.__blueprint.after_request(_assert_rbac_enforcement_called)
|
||||
|
||||
# Add Passed-In Functions
|
||||
for f in functions:
|
||||
self.__blueprint.after_request(f)
|
||||
self.__after_request_functions_added = True
|
||||
|
||||
@staticmethod
|
||||
def unenforced_api(f):
|
||||
"""Decorate a resource method to mark is as an unenforced API.
|
||||
|
||||
Explicitly exempts an API from receiving the enforced API check,
|
||||
specifically for cases such as user self-service password changes (or
|
||||
other APIs that must work without already having a token).
|
||||
|
||||
This decorator may also be used if the API has extended enforcement
|
||||
logic/varying enforcement logic (such as some of the AUTH paths) where
|
||||
the full enforcement will be implemented directly within the methods.
|
||||
"""
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
setattr(g, _ENFORCEMENT_CHECK_ATTR, True)
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
@classmethod
|
||||
def instantiate_and_register_to_app(cls, flask_app):
|
||||
"""Build the API object and register to the passed in flask_app.
|
||||
|
|
|
@ -0,0 +1,564 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import flask
|
||||
from flask import blueprints
|
||||
from flask import g
|
||||
import flask_restful
|
||||
import mock
|
||||
from oslo_policy import policy
|
||||
|
||||
from keystone.common import authorization
|
||||
from keystone.common import context
|
||||
from keystone.common import provider_api
|
||||
from keystone.common import rbac_enforcer
|
||||
from keystone import exception
|
||||
from keystone.models import token_model
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import rest
|
||||
|
||||
|
||||
PROVIDER_APIS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class TestRBACEnforcer(unit.TestCase):
|
||||
|
||||
def test_enforcer_shared_state(self):
|
||||
enforcer = rbac_enforcer.enforcer.RBACEnforcer()
|
||||
enforcer2 = rbac_enforcer.enforcer.RBACEnforcer()
|
||||
|
||||
self.assertIsNotNone(enforcer._enforcer)
|
||||
self.assertEqual(enforcer._enforcer, enforcer2._enforcer)
|
||||
setattr(enforcer, '_test_attr', uuid.uuid4().hex)
|
||||
self.assertEqual(enforcer._test_attr, enforcer2._test_attr)
|
||||
|
||||
def test_enforcer_auto_instantiated(self):
|
||||
enforcer = rbac_enforcer.enforcer.RBACEnforcer()
|
||||
# Check that the enforcer instantiates the oslo_policy enforcer object
|
||||
# on demand.
|
||||
self.assertIsNotNone(enforcer._enforcer)
|
||||
enforcer._reset()
|
||||
self.assertIsNotNone(enforcer._enforcer)
|
||||
|
||||
|
||||
class _TestRBACEnforcerBase(rest.RestfulTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(_TestRBACEnforcerBase, self).setUp()
|
||||
self._setup_enforcer_object()
|
||||
self._setup_dynamic_flask_blueprint_api()
|
||||
self._setup_flask_restful_api()
|
||||
|
||||
def _setup_enforcer_object(self):
|
||||
self.enforcer = rbac_enforcer.enforcer.RBACEnforcer()
|
||||
self.cleanup_instance('enforcer')
|
||||
|
||||
def register_new_rules(enforcer):
|
||||
rules = self._testing_policy_rules()
|
||||
enforcer.register_defaults(rules)
|
||||
|
||||
self.useFixture(fixtures.MockPatchObject(
|
||||
self.enforcer, 'register_rules', register_new_rules))
|
||||
|
||||
# Set the possible actions to our limited list
|
||||
original_actions = rbac_enforcer.enforcer._POSSIBLE_TARGET_ACTIONS
|
||||
rbac_enforcer.enforcer._POSSIBLE_TARGET_ACTIONS = frozenset([
|
||||
rule.name for rule in self._testing_policy_rules()])
|
||||
|
||||
# RESET the FrozenSet of possible target actions to the original
|
||||
# value
|
||||
self.addCleanup(setattr,
|
||||
rbac_enforcer.enforcer,
|
||||
'_POSSIBLE_TARGET_ACTIONS',
|
||||
original_actions)
|
||||
|
||||
# Force a reset on the enforcer to load up new policy rules.
|
||||
self.enforcer._reset()
|
||||
|
||||
def _setup_dynamic_flask_blueprint_api(self):
|
||||
# Create a dynamic flask blueprint with a known prefix
|
||||
api = uuid.uuid4().hex
|
||||
url_prefix = '/_%s_TEST' % api
|
||||
blueprint = blueprints.Blueprint(api, __name__, url_prefix=url_prefix)
|
||||
self.url_prefix = url_prefix
|
||||
self.flask_blueprint = blueprint
|
||||
self.cleanup_instance('flask_blueprint', 'url_prefix')
|
||||
|
||||
def _setup_flask_restful_api(self):
|
||||
self.restful_api_url_prefix = '/_%s_TEST' % uuid.uuid4().hex
|
||||
self.restful_api = flask_restful.Api(self.public_app.app,
|
||||
self.restful_api_url_prefix)
|
||||
user = self.user_req_admin
|
||||
|
||||
# Very Basic Restful Resource
|
||||
class RestfulResource(flask_restful.Resource):
|
||||
def get(self, argument_id):
|
||||
return {'argument': {
|
||||
'id': argument_id,
|
||||
'value': 'TEST',
|
||||
'owner_id': user['id']}}
|
||||
|
||||
self.restful_api_resource = RestfulResource
|
||||
self.restful_api.add_resource(
|
||||
RestfulResource, '/argument/<string:argument_id>')
|
||||
self.cleanup_instance('restful_api', 'restful_resource',
|
||||
'restful_api_url_prefix')
|
||||
|
||||
def _register_blueprint_to_app(self):
|
||||
# TODO(morgan): remove the need for webtest, but for now just unwrap
|
||||
# by one layer. Once everything is converted to flask, we can fix
|
||||
# the tests to eliminate "webtest".
|
||||
self.public_app.app.register_blueprint(
|
||||
self.flask_blueprint, url_prefix=self.url_prefix)
|
||||
|
||||
def _auth_json(self):
|
||||
return {
|
||||
'auth': {
|
||||
'identity': {
|
||||
'methods': ['password'],
|
||||
'password': {
|
||||
'user': {
|
||||
'name': self.user_req_admin['name'],
|
||||
'password': self.user_req_admin['password'],
|
||||
'domain': {
|
||||
'id': self.user_req_admin['domain_id']
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
'scope': {
|
||||
'project': {
|
||||
'id': self.tenant_service['id']
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def _testing_policy_rules(self):
|
||||
test_policy_rules = [
|
||||
policy.RuleDefault(
|
||||
name='example:subject_token',
|
||||
check_str='user_id:%(target.token.user_id)s',
|
||||
scope_types=['project'],
|
||||
),
|
||||
policy.RuleDefault(
|
||||
name='example:target',
|
||||
check_str='user_id:%(target.myuser.id)s',
|
||||
scope_types=['project'],
|
||||
),
|
||||
policy.RuleDefault(
|
||||
name='example:inferred_member_data',
|
||||
check_str='user_id:%(target.argument.owner_id)s',
|
||||
scope_types=['project'],
|
||||
),
|
||||
policy.RuleDefault(
|
||||
name='example:with_filter',
|
||||
check_str='user_id:%(user)s',
|
||||
scope_types=['project'],
|
||||
),
|
||||
policy.RuleDefault(
|
||||
name='example:allowed',
|
||||
check_str='',
|
||||
scope_types=['project'],
|
||||
),
|
||||
policy.RuleDefault(
|
||||
name='example:denied',
|
||||
check_str='false:false',
|
||||
scope_types=['project'],
|
||||
),
|
||||
]
|
||||
return test_policy_rules
|
||||
|
||||
|
||||
class TestRBACEnforcerRestAdminAuthToken(_TestRBACEnforcerBase):
|
||||
|
||||
def config_overrides(self):
|
||||
super(TestRBACEnforcerRestAdminAuthToken, self).config_overrides()
|
||||
self.config_fixture.config(admin_token='ADMIN')
|
||||
|
||||
def test_enforcer_is_admin_check_with_token(self):
|
||||
# Admin-shared token passed and valid, "is_admin" should be true.
|
||||
with self.test_client() as c:
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={authorization.AUTH_TOKEN_HEADER: 'ADMIN'})
|
||||
self.assertTrue(self.enforcer._shared_admin_auth_token_set())
|
||||
|
||||
def test_enforcer_is_admin_check_without_token(self):
|
||||
with self.test_client() as c:
|
||||
# Admin-shared token passed and invalid, "is_admin" should be false
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={authorization.AUTH_TOKEN_HEADER: 'BOGUS'})
|
||||
self.assertFalse(self.enforcer._shared_admin_auth_token_set())
|
||||
|
||||
# Admin-shared token not passed, "is_admin" should be false
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex))
|
||||
self.assertFalse(self.enforcer._shared_admin_auth_token_set())
|
||||
|
||||
def test_enforce_call_is_admin(self):
|
||||
with self.test_client() as c:
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={authorization.AUTH_TOKEN_HEADER: 'ADMIN'})
|
||||
with mock.patch.object(self.enforcer, '_enforce') as mock_method:
|
||||
self.enforcer.enforce_call(action='example:allowed')
|
||||
mock_method.assert_not_called()
|
||||
|
||||
|
||||
class TestRBACEnforcerRest(_TestRBACEnforcerBase):
|
||||
|
||||
def test_extract_subject_token_target_data(self):
|
||||
path = '/v3/auth/tokens'
|
||||
body = self._auth_json()
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.post(
|
||||
path,
|
||||
json=body,
|
||||
follow_redirects=True,
|
||||
expected_status_code=201)
|
||||
|
||||
token_id = r.headers['X-Subject-Token']
|
||||
|
||||
c.get('/v3', headers={'X-Auth-Token': token_id,
|
||||
'X-Subject-Token': token_id})
|
||||
token_ref = token_model.KeystoneToken(
|
||||
token_id=token_id,
|
||||
token_data=PROVIDER_APIS.token_provider_api.validate_token(
|
||||
token_id
|
||||
)
|
||||
)
|
||||
subj_token_data = (
|
||||
self.enforcer._extract_subject_token_target_data())
|
||||
subj_token_data = subj_token_data['token']
|
||||
self.assertEqual(token_ref.user_id, subj_token_data['user_id'])
|
||||
self.assertIn('user', subj_token_data)
|
||||
self.assertIn('domain', subj_token_data['user'])
|
||||
self.assertEqual(token_ref.user_domain_id,
|
||||
subj_token_data['user']['domain']['id'])
|
||||
|
||||
def test_extract_filter_data(self):
|
||||
# Test that we are extracting useful filter data from the
|
||||
# request context. The tested function validates tha
|
||||
# extract_filter_attr only adds the passed filter values to the
|
||||
# policy dict, all other query-params are ignored.
|
||||
|
||||
path = uuid.uuid4().hex
|
||||
|
||||
@self.flask_blueprint.route('/%s' % path)
|
||||
def return_nothing_interesting():
|
||||
return 'OK', 200
|
||||
|
||||
self._register_blueprint_to_app()
|
||||
|
||||
with self.test_client() as c:
|
||||
expected_param = uuid.uuid4().hex
|
||||
unexpected_param = uuid.uuid4().hex
|
||||
get_path = '/'.join([self.url_prefix, path])
|
||||
# Populate the query-string with two params, one that should
|
||||
# exist and one that should not in the resulting policy
|
||||
# dict.
|
||||
qs = '%(expected)s=EXPECTED&%(unexpected)s=UNEXPECTED' % {
|
||||
'expected': expected_param,
|
||||
'unexpected': unexpected_param
|
||||
}
|
||||
# Perform the get with the query-string
|
||||
c.get('%(path)s?%(qs)s' % {'path': get_path, 'qs': qs})
|
||||
# Extract the filter values.
|
||||
extracted_filter = self.enforcer._extract_filter_values(
|
||||
[expected_param])
|
||||
# Unexpected param is not in the extracted values
|
||||
# Expected param is in the extracted values
|
||||
# Expected param has the expected value
|
||||
self.assertNotIn(extracted_filter, unexpected_param)
|
||||
self.assertIn(expected_param, expected_param)
|
||||
self.assertEqual(extracted_filter[expected_param], 'EXPECTED')
|
||||
|
||||
def test_retrive_oslo_req_context(self):
|
||||
# Test to ensure 'get_oslo_req_context' is pulling the request context
|
||||
# from the environ as expected. The only way to really test is an
|
||||
# instance check.
|
||||
with self.test_client() as c:
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex))
|
||||
oslo_req_context = self.enforcer._get_oslo_req_context()
|
||||
self.assertIsInstance(oslo_req_context, context.RequestContext)
|
||||
|
||||
def test_is_authenticated_check(self):
|
||||
# Check that the auth_context is in-fact decoded as expected.
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
with self.test_client() as c:
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id})
|
||||
self.enforcer._assert_is_authenticated()
|
||||
c.get('/', expected_status_code=300)
|
||||
self.assertRaises(exception.Unauthorized,
|
||||
self.enforcer._assert_is_authenticated)
|
||||
oslo_ctx = self.enforcer._get_oslo_req_context()
|
||||
# Set authenticated to a false value that is not None
|
||||
oslo_ctx.authenticated = False
|
||||
self.assertRaises(exception.Unauthorized,
|
||||
self.enforcer._assert_is_authenticated)
|
||||
|
||||
def test_extract_policy_check_credentials(self):
|
||||
# Make sure extracting the creds is the same as what is in the request
|
||||
# environment.
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
with self.test_client() as c:
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id})
|
||||
extracted_creds = self.enforcer._extract_policy_check_credentials()
|
||||
self.assertEqual(
|
||||
flask.request.environ.get(authorization.AUTH_CONTEXT_ENV),
|
||||
extracted_creds)
|
||||
|
||||
def test_extract_member_target_data_inferred(self):
|
||||
self.restful_api_resource.member_name = 'argument'
|
||||
member_from_driver = self.restful_api_resource.get
|
||||
self.restful_api_resource.get_member_from_driver = member_from_driver
|
||||
|
||||
argument_id = uuid.uuid4().hex
|
||||
|
||||
with self.test_client() as c:
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
argument_id))
|
||||
extracted = self.enforcer._extract_member_target_data(
|
||||
member_target_type=None, member_target=None)
|
||||
self.assertDictEqual(extracted['target'],
|
||||
self.restful_api_resource().get(argument_id))
|
||||
|
||||
def test_extract_member_target_data_supplied_target(self):
|
||||
# Test extract member target data with member_target and
|
||||
# member_target_type supplied.
|
||||
member_type = uuid.uuid4().hex
|
||||
member_target = {uuid.uuid4().hex: {uuid.uuid4().hex}}
|
||||
extracted = self.enforcer._extract_member_target_data(
|
||||
member_target_type=member_type, member_target=member_target)
|
||||
self.assertDictEqual({'target': {member_type: member_target}},
|
||||
extracted)
|
||||
|
||||
def test_extract_member_target_data_bad_input(self):
|
||||
# Test Extract Member Target Data with only "member_target" and only
|
||||
# "member_target_type" and ensure empty dict is returned.
|
||||
self.assertEqual({}, self.enforcer._extract_member_target_data(
|
||||
member_target=None, member_target_type=uuid.uuid4().hex))
|
||||
self.assertEqual({}, self.enforcer._extract_member_target_data(
|
||||
member_target={}, member_target_type=None))
|
||||
|
||||
def test_policy_enforcer_action_decorator(self):
|
||||
# Create a method that has an action pre-registered
|
||||
action = 'example:allowed'
|
||||
|
||||
@self.flask_blueprint.route('')
|
||||
@self.enforcer.policy_enforcer_action(action)
|
||||
def nothing_interesting():
|
||||
return 'OK', 200
|
||||
|
||||
self._register_blueprint_to_app()
|
||||
|
||||
with self.test_client() as c:
|
||||
c.get('%s' % self.url_prefix)
|
||||
self.assertEqual(
|
||||
action, getattr(g, self.enforcer.ACTION_STORE_ATTR))
|
||||
|
||||
def test_policy_enforcer_action_invalid_action_decorator(self):
|
||||
# If the "action" is not a registered policy enforcement point, check
|
||||
# that a ValueError is raised.
|
||||
def _decorator_fails():
|
||||
# Create a method that has an action pre-registered, but the
|
||||
# action is bogus
|
||||
action = uuid.uuid4().hex
|
||||
|
||||
@self.flask_blueprint.route('')
|
||||
@self.enforcer.policy_enforcer_action(action)
|
||||
def nothing_interesting():
|
||||
return 'OK', 200
|
||||
|
||||
self.assertRaises(ValueError, _decorator_fails)
|
||||
|
||||
def test_enforce_call_invalid_action(self):
|
||||
self.assertRaises(exception.Unauthorized, self.enforcer.enforce_call,
|
||||
action=uuid.uuid4().hex)
|
||||
|
||||
def test_enforce_call_not_is_authenticated(self):
|
||||
with self.test_client() as c:
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex))
|
||||
# Patch the enforcer to return an empty oslo context.
|
||||
with mock.patch.object(self.enforcer, '_get_oslo_req_context',
|
||||
return_value=None):
|
||||
self.assertRaises(
|
||||
exception.Unauthorized,
|
||||
self.enforcer.enforce_call, action='example:allowed')
|
||||
|
||||
# Explicitly set "authenticated" on the context to false.
|
||||
ctx = self.enforcer._get_oslo_req_context()
|
||||
ctx.authenticated = False
|
||||
self.assertRaises(
|
||||
exception.Unauthorized,
|
||||
self.enforcer.enforce_call, action='example:allowed')
|
||||
|
||||
def test_enforce_call_explicit_target_attr(self):
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
with self.test_client() as c:
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
# check the enforcer properly handles explicitly passed in targets
|
||||
# no subject-token processing is done in this case.
|
||||
#
|
||||
# TODO(morgan): confirm if subject-token-processing can/should
|
||||
# occur in this form without causing issues.
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id,
|
||||
'X-Subject-Token': token_id})
|
||||
target = {'myuser': {'id': self.user_req_admin['id']}}
|
||||
self.enforcer.enforce_call(action='example:target',
|
||||
target_attr=target)
|
||||
# Ensure extracting the subject-token data is not happening.
|
||||
self.assertRaises(
|
||||
exception.ForbiddenAction,
|
||||
self.enforcer.enforce_call,
|
||||
action='example:subject_token',
|
||||
target_attr=target)
|
||||
|
||||
def test_enforce_call_with_subject_token_data(self):
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
with self.test_client() as c:
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
# Check that the enforcer passes if user_id and subject token
|
||||
# user_id are the same. example:deprecated should also pass
|
||||
# since it is open enforcement.
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id,
|
||||
'X-Subject-Token': token_id})
|
||||
self.enforcer.enforce_call(action='example:subject_token')
|
||||
|
||||
def test_enforce_call_with_member_target_type_and_member_target(self):
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
with self.test_client() as c:
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
# check the enforcer properly handles passed in member_target_type
|
||||
# and member_target. This form still extracts data from the subject
|
||||
# token.
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id,
|
||||
'X-Subject-Token': token_id})
|
||||
target_type = 'myuser'
|
||||
target = {'id': self.user_req_admin['id']}
|
||||
self.enforcer.enforce_call(action='example:target',
|
||||
member_target_type=target_type,
|
||||
member_target=target)
|
||||
# Ensure we're still extracting the subject-token data
|
||||
self.enforcer.enforce_call(action='example:subject_token')
|
||||
|
||||
def test_enforce_call_inferred_member_target_data(self):
|
||||
# Check that inferred "get" works as expected for the member target
|
||||
|
||||
# setup the restful resource for an inferred "get"
|
||||
self.restful_api_resource.member_name = 'argument'
|
||||
member_from_driver = self.restful_api_resource.get
|
||||
self.restful_api_resource.get_member_from_driver = member_from_driver
|
||||
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
with self.test_client() as c:
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
# check the enforcer properly handles inferred member data get
|
||||
# This form still extracts data from the subject token.
|
||||
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
|
||||
uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id,
|
||||
'X-Subject-Token': token_id})
|
||||
self.enforcer.enforce_call(action='example:inferred_member_data')
|
||||
# Ensure we're still extracting the subject-token data
|
||||
self.enforcer.enforce_call(action='example:subject_token')
|
||||
|
||||
def test_enforce_call_with_filter_values(self):
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
with self.test_client() as c:
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
# Check that the enforcer passes if a filter is supplied *and*
|
||||
# the filter name is passed to enforce_call
|
||||
c.get('%s/argument/%s?user=%s' % (
|
||||
self.restful_api_url_prefix, uuid.uuid4().hex,
|
||||
self.user_req_admin['id']),
|
||||
headers={'X-Auth-Token': token_id})
|
||||
self.enforcer.enforce_call(action='example:with_filter',
|
||||
filters=['user'])
|
||||
|
||||
# With No Filters passed into enforce_call
|
||||
self.assertRaises(
|
||||
exception.ForbiddenAction,
|
||||
self.enforcer.enforce_call,
|
||||
action='example:with_filter')
|
||||
|
||||
# With No Filters in the PATH
|
||||
c.get('%s/argument/%s' % (
|
||||
self.restful_api_url_prefix, uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id})
|
||||
self.assertRaises(
|
||||
exception.ForbiddenAction,
|
||||
self.enforcer.enforce_call,
|
||||
action='example:with_filter',
|
||||
filters=['user'])
|
||||
|
||||
# With no filters in the path and no filters passed to enforce_call
|
||||
c.get('%s/argument/%s' % (
|
||||
self.restful_api_url_prefix, uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id})
|
||||
self.assertRaises(
|
||||
exception.ForbiddenAction,
|
||||
self.enforcer.enforce_call,
|
||||
action='example:with_filter')
|
||||
|
||||
def test_enforce_call_with_pre_instantiated_enforcer(self):
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
enforcer = rbac_enforcer.enforcer.RBACEnforcer()
|
||||
with self.test_client() as c:
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
# Check the enforcer behaves as expected with a pre-instantiated
|
||||
# enforcer passed into .enforce_call()
|
||||
c.get('%s/argument/%s' % (
|
||||
self.restful_api_url_prefix, uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id})
|
||||
self.enforcer.enforce_call(action='example:allowed',
|
||||
enforcer=enforcer)
|
||||
self.assertRaises(exception.ForbiddenAction,
|
||||
self.enforcer.enforce_call,
|
||||
action='example:denied',
|
||||
enforcer=enforcer)
|
|
@ -14,7 +14,7 @@
|
|||
import fixtures
|
||||
from oslo_policy import opts
|
||||
|
||||
from keystone.common import policy
|
||||
from keystone.common.rbac_enforcer import policy
|
||||
|
||||
|
||||
class Policy(fixtures.Fixture):
|
||||
|
@ -29,5 +29,4 @@ class Policy(fixtures.Fixture):
|
|||
opts.set_defaults(self._config_fixture.conf)
|
||||
self._config_fixture.config(group='oslo_policy',
|
||||
policy_file=self._policy_file)
|
||||
policy.init()
|
||||
self.addCleanup(policy.reset)
|
||||
|
|
|
@ -24,7 +24,7 @@ import six
|
|||
from testtools import matchers
|
||||
|
||||
from keystone.common import policies
|
||||
from keystone.common import policy
|
||||
from keystone.common.rbac_enforcer import policy
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone.tests import unit
|
||||
|
@ -56,7 +56,7 @@ class PolicyFileTestCase(unit.TestCase):
|
|||
policy.enforce(empty_credentials, action, self.target)
|
||||
with open(self.tmpfilename, "w") as policyfile:
|
||||
policyfile.write("""{"example:test": ["false:false"]}""")
|
||||
policy._ENFORCER.clear()
|
||||
policy._ENFORCER._enforcer.clear()
|
||||
self.assertRaises(exception.ForbiddenAction, policy.enforce,
|
||||
empty_credentials, action, self.target)
|
||||
|
||||
|
@ -84,7 +84,7 @@ class PolicyTestCase(unit.TestCase):
|
|||
|
||||
def _set_rules(self):
|
||||
these_rules = common_policy.Rules.from_dict(self.rules)
|
||||
policy._ENFORCER.set_rules(these_rules)
|
||||
policy._ENFORCER._enforcer.set_rules(these_rules)
|
||||
|
||||
def test_enforce_nonexistent_action_throws(self):
|
||||
action = "example:noexist"
|
||||
|
@ -132,13 +132,12 @@ class PolicyScopeTypesEnforcementTestCase(unit.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
super(PolicyScopeTypesEnforcementTestCase, self).setUp()
|
||||
policy.init()
|
||||
rule = common_policy.RuleDefault(
|
||||
name='foo',
|
||||
check_str='',
|
||||
scope_types=['system']
|
||||
)
|
||||
policy._ENFORCER.register_default(rule)
|
||||
policy._ENFORCER._enforcer.register_default(rule)
|
||||
self.credentials = {}
|
||||
self.action = 'foo'
|
||||
self.target = {}
|
||||
|
@ -203,11 +202,10 @@ class PolicyJsonTestCase(unit.TestCase):
|
|||
'is_admin_project': True, 'project_id': None,
|
||||
'domain_id': uuid.uuid4().hex}
|
||||
|
||||
# Since we are moving policy.json defaults to code, we instead call
|
||||
# `policy.init()` which does the enforce setup for us with the added
|
||||
# bonus of registering the in code default policies.
|
||||
policy.init()
|
||||
result = policy._ENFORCER.enforce(action, target, credentials)
|
||||
# The enforcer is setup behind the scenes and registers the in code
|
||||
# default policies.
|
||||
result = policy._ENFORCER._enforcer.enforce(action, target,
|
||||
credentials)
|
||||
self.assertTrue(result)
|
||||
|
||||
domain_policy = unit.dirs.etc('policy.v3cloudsample.json')
|
||||
|
|
|
@ -33,8 +33,8 @@ from testtools import testcase
|
|||
|
||||
from keystone import auth
|
||||
from keystone.auth.plugins import totp
|
||||
from keystone.common import policy
|
||||
from keystone.common import provider_api
|
||||
from keystone.common.rbac_enforcer import policy
|
||||
from keystone.common import utils
|
||||
import keystone.conf
|
||||
from keystone.credential.providers import fernet as credential_fernet
|
||||
|
|
Loading…
Reference in New Issue