Merge "Implement base for new RBAC Enforcer"

This commit is contained in:
Zuul 2018-07-13 08:49:04 +00:00 committed by Gerrit Code Review
commit a5b55d17ce
11 changed files with 1053 additions and 97 deletions

View File

@ -1,81 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy as common_policy
from keystone.common import policies
import keystone.conf
from keystone import exception
CONF = keystone.conf.CONF
_ENFORCER = None
def reset():
global _ENFORCER
_ENFORCER = None
def init():
global _ENFORCER
if not _ENFORCER:
_ENFORCER = common_policy.Enforcer(CONF)
register_rules(_ENFORCER)
def get_enforcer():
# Here we pass an empty list of arguments because there aren't any
# arguments that oslo.config or oslo.policy shouldn't already understand
# from the CONF object. This makes things easier here because we don't have
# to parse arguments passed in from the command line and remove unexpected
# arguments before building a Config object.
CONF([], project='keystone')
init()
return _ENFORCER
def enforce(credentials, action, target, do_raise=True):
"""Verify that the action is valid on the target in this context.
:param credentials: user credentials
:param action: string representing the action to be checked, which should
be colon separated for clarity.
:param target: dictionary representing the object of the action for object
creation this should be a dictionary representing the
location of the object e.g. {'project_id':
object.project_id}
:raises keystone.exception.Forbidden: If verification fails.
Actions should be colon separated for clarity. For example:
* identity:list_users
"""
init()
# Add the exception arguments if asked to do a raise
extra = {}
if do_raise:
extra.update(exc=exception.ForbiddenAction, action=action,
do_raise=do_raise)
try:
return _ENFORCER.enforce(action, target, credentials, **extra)
except common_policy.InvalidScope:
raise exception.ForbiddenAction(action=action)
def register_rules(enforcer):
enforcer.register_defaults(policies.list_rules())

View File

@ -0,0 +1,16 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.common.rbac_enforcer import enforcer
__all__ = ('enforcer',)

View File

@ -0,0 +1,378 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import flask
from oslo_log import log
from oslo_policy import policy as common_policy
from oslo_utils import strutils
from keystone.common import authorization
from keystone.common import context
from keystone.common import policies
from keystone.common import provider_api
from keystone.common import utils
import keystone.conf
from keystone import exception
from keystone.i18n import _
from keystone.models import token_model
CONF = keystone.conf.CONF
LOG = log.getLogger(__name__)
PROVIDER_APIS = provider_api.ProviderAPIs
_POSSIBLE_TARGET_ACTIONS = frozenset([
rule.name for
rule in policies.list_rules() if not rule.deprecated_for_removal
])
class RBACEnforcer(object):
"""Enforce RBAC on API calls."""
__shared_state__ = {}
__ENFORCER = None
ACTION_STORE_ATTR = 'keystone:RBAC:action_name'
def __init__(self):
# NOTE(morgan): All Enforcer Instances use the same shared state;
# BORG pattern.
self.__dict__ = self.__shared_state__
def _enforce(self, credentials, action, target, do_raise=True):
"""Verify that the action is valid on the target in this context.
This method is for cases that exceed the base enforcer
functionality (notably for compatibility with `@protected` style
decorators.
:param credentials: user credentials
:param action: string representing the action to be checked, which
should be colon separated for clarity.
:param target: dictionary representing the object of the action for
object creation this should be a dictionary
representing the location of the object e.g.
{'project_id': object.project_id}
:raises keystone.exception.Forbidden: If verification fails.
Actions should be colon separated for clarity. For example:
* identity:list_users
"""
# Add the exception arguments if asked to do a raise
extra = {}
if do_raise:
extra.update(exc=exception.ForbiddenAction, action=action,
do_raise=do_raise)
try:
return self._enforcer.enforce(
rule=action, target=target, creds=credentials, **extra)
except common_policy.InvalidScope:
raise exception.ForbiddenAction(action=action)
def _reset(self):
# NOTE(morgan): Used for TEST purposes only.
self.__ENFORCER = None
@property
def _enforcer(self):
# The raw oslo-policy enforcer object
if self.__ENFORCER is None:
self.__ENFORCER = common_policy.Enforcer(CONF)
self.register_rules(self.__ENFORCER)
return self.__ENFORCER
@staticmethod
def _extract_filter_values(filters):
"""Extract filter data from query params for RBAC enforcement."""
filters = filters or []
target = {i: flask.request.args[i] for
i in filters if i in flask.request.args}
if target:
if LOG.logger.getEffectiveLevel() <= log.DEBUG:
LOG.debug(
'RBAC: Adding query filter params (%s)',
', '.join(['%s=%s' % (k, v) for k, v in target.items()]))
return target
@staticmethod
def _extract_member_target_data(member_target_type, member_target):
"""Build some useful target data.
:param member_target_type: what type of target, e.g. 'user'
:type member_target_type: str or None
:param member_target: reference of the target data
:type member_target: dict or None
:returns: constructed target dict or empty dict
:rtype: dict
"""
ret_dict = {}
if ((member_target is not None and member_target_type is None) or
(member_target is None and member_target_type is not None)):
LOG.warning('RBAC: Unknown target type or target reference. '
'Rejecting as unauthorized. '
'(member_target_type=%(target_type)r, '
'member_target=%(target_ref)r)',
{'target_type': member_target_type,
'target_ref': member_target})
# Fast exit.
return ret_dict
if member_target is not None and member_target_type is not None:
ret_dict['target'] = {member_target_type: member_target}
else:
# Try and do some magic loading based upon the resource we've
# matched in our route. This is mostly so we can have a level of
# automatic pulling in the resource; strictly for some added
# DRY capabilities. In an ideal world the target is always passed
# in explicitly.
if flask.request.endpoint:
# This only works for cases of Flask-RESTful, or carefully
# crafted endpoints that live on a class. Ultimately, there
# should be more protection against something wonky
# here.
resource = flask.current_app.view_functions[
flask.request.endpoint].view_class
member_name = getattr(resource, 'member_name', None)
func = getattr(
resource, 'get_member_from_driver', None)
if member_name is not None and callable(func):
key = '%s_id' % member_name
if key in (flask.request.view_args or {}):
# NOTE(morgan): For most correct setup, instantiate the
# view_class. There is no current support for passing
# extra args to the constructor of the view_class like
# .as_view() method would actually do. In this case
# perform a simple instantiation to represent the
# `self` pass to the unbound method.
#
# TODO(morgan): add (future) support for passing class
# instantiation args.
ret_dict['target'] = {
member_name: func(
resource(),
flask.request.view_args[key])[member_name]}
return ret_dict
@staticmethod
def _extract_policy_check_credentials():
# Pull out the auth context
return flask.request.environ.get(authorization.AUTH_CONTEXT_ENV, {})
@classmethod
def _extract_subject_token_target_data(cls):
ret_dict = {}
window_seconds = 0
# NOTE(morgan): Populate the subject token data into
# the policy dict at "target.token". In all liklyhood
# it is un-interesting to populate this data outside
# of the auth paths.
target = 'token'
subject_token = flask.request.headers.get('X-Subject-Token')
if subject_token is not None:
allow_expired = (strutils.bool_from_string(
flask.request.args.get('allow_expired', False),
default=False))
if allow_expired:
window_seconds = CONF.token.allow_expired_window
token_ref = token_model.KeystoneToken(
token_id=subject_token,
token_data=PROVIDER_APIS.token_provider_api.validate_token(
subject_token,
window_seconds=window_seconds))
# TODO(morgan): Expand extracted data from the subject token.
ret_dict[target] = {}
ret_dict[target]['user_id'] = token_ref.user_id
try:
user_domain_id = token_ref.user_domain_id
except exception.UnexpectedError:
user_domain_id = None
if user_domain_id:
ret_dict[target].setdefault('user', {})
ret_dict[target]['user'].setdefault('domain', {})
ret_dict[target]['user']['domain']['id'] = user_domain_id
return ret_dict
@staticmethod
def _get_oslo_req_context():
return flask.request.environ.get(context.REQUEST_CONTEXT_ENV, None)
@classmethod
def _assert_is_authenticated(cls):
ctx = cls._get_oslo_req_context()
if ctx is None:
LOG.warning('RBAC: Error reading the request context generated by '
'the Auth Middleware (there is no context). Rejecting '
'request as unauthorized.')
raise exception.Unauthorized(
_('Internal error processing authentication and '
'authorization.'))
if not ctx.authenticated:
raise exception.Unauthorized(
_('auth_context did not decode anything useful'))
@classmethod
def _shared_admin_auth_token_set(cls):
ctx = cls._get_oslo_req_context()
return getattr(ctx, 'is_admin', False)
@classmethod
def enforce_call(cls, enforcer=None, action=None, target_attr=None,
member_target_type=None, member_target=None,
filters=None):
"""Enforce RBAC on the current request.
This will do some legwork and then instantiate the Enforcer if an
enforcer is not passed in.
:param enforcer: A pre-instantiated Enforcer object (optional)
:type enforcer: :class:`RBACEnforcer`
:param action: the name of the rule/policy enforcement to be checked
against, e.g. `identity:get_user` (optional may be
replaced by decorating the method/function with
`policy_enforcer_action`.
:type action: str
:param target_attr: complete override of the target data. This will
replace all other generated target data meaning
`member_target_type` and `member_target` are
ignored. This will also prevent extraction of
data from the X-Subject-Token. The `target` dict
should contain a series of key-value pairs such
as `{'user': user_ref_dict}`.
:type target_attr: dict
:param member_target_type: the type of the target, e.g. 'user'. Both
this and `member_target` must be passed if
either is passed.
:type member_target_type: str
:param member_target: the (dict form) reference of the member object.
Both this and `member_target_type` must be passed
if either is passed.
:type member_target: dict
:param filters: A variable number of optional string filters, these are
used to extract values from the query params. The
filters are added to the reques data that is passed to
the enforcer and may be used to determine policy
action. In practice these are mainly supplied in the
various "list" APIs and are un-used in the default
supplied policies.
:type filters: iterable
"""
# NOTE(morgan) everything in the policy_dict may be used by the policy
# DSL to action on RBAC and request information/response data.
policy_dict = {}
# If "action" has not explicitly been overridden, see if it is set in
# Flask.g app-context (per-request thread local) meaning the
# @policy_enforcer_action decorator was used.
action = action or getattr(flask.g, cls.ACTION_STORE_ATTR, None)
if action not in _POSSIBLE_TARGET_ACTIONS:
LOG.warning('RBAC: Unknown/No enforcement action name. Rejecting '
'as unauthorized, this is a programming error and a '
'bug should be filed with as much information about '
'the request that caused this as possible.')
raise exception.Unauthorized(
message=_(
'Internal RBAC enforcement error, no rule/action name to '
'lookup'))
# Assert we are actually authenticated
cls._assert_is_authenticated()
# Check if "is_admin", this is in support of the old "admin auth token"
# middleware with a shared "admin" token for auth
if cls._shared_admin_auth_token_set():
LOG.warning('RBAC: Bypassing authorization')
return
# NOTE(morgan): !!! ORDER OF THESE OPERATIONS IS IMPORTANT !!!
# The lowest priority values are set first and the highest priority
# values are set last.
# Get the Target Data Set.
if target_attr is None:
policy_dict.update(cls._extract_member_target_data(
member_target_type, member_target))
# Special Case, extract and add subject_token data.
subj_token_target_data = cls._extract_subject_token_target_data()
if subj_token_target_data:
policy_dict.setdefault('target', {}).update(
subj_token_target_data)
else:
policy_dict['target'] = target_attr
# Pull the data from the view args (path based params) to generate
# appropriate input/target attributes, we take an explicit copy here
# to ensure we're not somehow corrupting
policy_dict.update(flask.request.view_args or {})
# Generate the filter_attr dataset.
policy_dict.update(cls._extract_filter_values(filters))
# Extract the cred data
creds = cls._extract_policy_check_credentials()
flattened = utils.flatten_dict(policy_dict)
if LOG.logger.getEffectiveLevel() <= log.DEBUG:
# LOG the Args
args_str = ', '.join(
['%s=%s' % (k, v) for
k, v in (flask.request.view_args or {}).items()])
args_str = strutils.mask_password(args_str)
LOG.debug('RBAC: Authorizing `%(action)s(%(args)s)`',
{'action': action, 'args': args_str})
# LOG the Cred Data
cred_str = ', '.join(
['%s=%s' % (k, v) for k, v in creds.items()])
cred_str = strutils.mask_password(cred_str)
LOG.debug('RBAC: Policy Enforcement Cred Data '
'`%(action)s creds(%(cred_str)s)`',
{'action': action, 'cred_str': cred_str})
# Log the Target Data
target_str = ', '.join(
['%s=%s' % (k, v) for k, v in flattened.items()])
target_str = strutils.mask_password(target_str)
LOG.debug('RBAC: Policy Enforcement Target Data '
'`%(action)s => target(%(target_str)s)`',
{'action': action, 'target_str': target_str})
# Instantiate the enforcer object if needed.
enforcer_obj = enforcer or cls()
enforcer_obj._enforce(
credentials=creds, action=action, target=flattened)
LOG.debug('RBAC: Authorization granted')
@classmethod
def policy_enforcer_action(cls, action):
"""Decorator to set policy enforcement action name."""
if action not in _POSSIBLE_TARGET_ACTIONS:
raise ValueError('PROGRAMMING ERROR: Action must reference a '
'valid Keystone policy enforcement name.')
def wrapper(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# Set the action in g on a known attr so we can reference it
# later.
setattr(flask.g, cls.ACTION_STORE_ATTR, action)
return f(*args, **kwargs)
return inner
return wrapper
@staticmethod
def register_rules(enforcer):
enforcer.register_defaults(policies.list_rules())

View File

@ -0,0 +1,47 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(morgan): This entire module is to provide compatibility for the old
# @protected style decorator enforcement. All new enforcement should directly
# reference the Enforcer object itself.
from keystone.common.rbac_enforcer import enforcer
from keystone import conf
CONF = conf.CONF
# NOTE(morgan): Shared-state enforcer object
_ENFORCER = enforcer.RBACEnforcer()
def reset():
_ENFORCER._reset()
def get_enforcer():
"""Entrypoint that must return the raw oslo.policy enforcer obj.
This is utilized by the command-line policy tools.
:returns: :class:`oslo_policy.policy.Enforcer`
"""
# Here we pass an empty list of arguments because there aren't any
# arguments that oslo.config or oslo.policy shouldn't already understand
# from the CONF object. This makes things easier here because we don't have
# to parse arguments passed in from the command line and remove unexpected
# arguments before building a Config object.
CONF([], project='keystone')
return _ENFORCER._enforcer
enforce = _ENFORCER._enforce

View File

@ -17,7 +17,7 @@
from oslo_log import log
from keystone.common import policy
from keystone.common.rbac_enforcer import policy
from keystone import exception
from keystone.policy.backends import base

View File

@ -12,8 +12,10 @@
import abc
import collections
import functools
from flask import blueprints
from flask import g
import flask_restful
from oslo_log import log
import six
@ -23,6 +25,19 @@ LOG = log.getLogger(__name__)
ResourceMap = collections.namedtuple('resource_map', 'resource, urls, kwargs')
_ENFORCEMENT_CHECK_ATTR = 'keystone:RBAC:enforcement_called'
def _initialize_rbac_enforcement_check():
setattr(g, _ENFORCEMENT_CHECK_ATTR, False)
def _assert_rbac_enforcement_called():
# assert is intended to be used to ensure code during development works
# as expected, it is fine to be optimized out with `python -O`
assert getattr(g, _ENFORCEMENT_CHECK_ATTR, False) # nosec
@six.add_metaclass(abc.ABCMeta)
class APIBase(object):
@ -119,7 +134,7 @@ class APIBase(object):
'Adding resource routes to API %(name)s: '
'[%(urls)r %(kwargs)r]',
{'name': self._name, 'urls': r.urls, 'kwargs': r.kwargs})
self._blueprint.add_resource(r.resource, *r.urls, **r.kwargs)
self.blueprint.add_resource(r.resource, *r.urls, **r.kwargs)
def _register_before_request_functions(self, functions=None):
"""Register functions to be executed in the `before request` phase.
@ -145,6 +160,7 @@ class APIBase(object):
assert not self.__before_request_functions_added, msg # nosec
# register global before request functions
# e.g. self.__blueprint.before_request(function)
self.__blueprint.before_request(_initialize_rbac_enforcement_check)
# Add passed-in functions
for f in functions:
@ -175,12 +191,31 @@ class APIBase(object):
assert not self.__after_request_functions_added, msg # nosec
# register global after request functions
# e.g. self.__blueprint.after_request(function)
self.__blueprint.after_request(_assert_rbac_enforcement_called)
# Add Passed-In Functions
for f in functions:
self.__blueprint.after_request(f)
self.__after_request_functions_added = True
@staticmethod
def unenforced_api(f):
"""Decorate a resource method to mark is as an unenforced API.
Explicitly exempts an API from receiving the enforced API check,
specifically for cases such as user self-service password changes (or
other APIs that must work without already having a token).
This decorator may also be used if the API has extended enforcement
logic/varying enforcement logic (such as some of the AUTH paths) where
the full enforcement will be implemented directly within the methods.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
setattr(g, _ENFORCEMENT_CHECK_ATTR, True)
return f(*args, **kwargs)
return wrapper
@classmethod
def instantiate_and_register_to_app(cls, flask_app):
"""Build the API object and register to the passed in flask_app.

View File

@ -0,0 +1,564 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import fixtures
import flask
from flask import blueprints
from flask import g
import flask_restful
import mock
from oslo_policy import policy
from keystone.common import authorization
from keystone.common import context
from keystone.common import provider_api
from keystone.common import rbac_enforcer
from keystone import exception
from keystone.models import token_model
from keystone.tests import unit
from keystone.tests.unit import rest
PROVIDER_APIS = provider_api.ProviderAPIs
class TestRBACEnforcer(unit.TestCase):
def test_enforcer_shared_state(self):
enforcer = rbac_enforcer.enforcer.RBACEnforcer()
enforcer2 = rbac_enforcer.enforcer.RBACEnforcer()
self.assertIsNotNone(enforcer._enforcer)
self.assertEqual(enforcer._enforcer, enforcer2._enforcer)
setattr(enforcer, '_test_attr', uuid.uuid4().hex)
self.assertEqual(enforcer._test_attr, enforcer2._test_attr)
def test_enforcer_auto_instantiated(self):
enforcer = rbac_enforcer.enforcer.RBACEnforcer()
# Check that the enforcer instantiates the oslo_policy enforcer object
# on demand.
self.assertIsNotNone(enforcer._enforcer)
enforcer._reset()
self.assertIsNotNone(enforcer._enforcer)
class _TestRBACEnforcerBase(rest.RestfulTestCase):
def setUp(self):
super(_TestRBACEnforcerBase, self).setUp()
self._setup_enforcer_object()
self._setup_dynamic_flask_blueprint_api()
self._setup_flask_restful_api()
def _setup_enforcer_object(self):
self.enforcer = rbac_enforcer.enforcer.RBACEnforcer()
self.cleanup_instance('enforcer')
def register_new_rules(enforcer):
rules = self._testing_policy_rules()
enforcer.register_defaults(rules)
self.useFixture(fixtures.MockPatchObject(
self.enforcer, 'register_rules', register_new_rules))
# Set the possible actions to our limited list
original_actions = rbac_enforcer.enforcer._POSSIBLE_TARGET_ACTIONS
rbac_enforcer.enforcer._POSSIBLE_TARGET_ACTIONS = frozenset([
rule.name for rule in self._testing_policy_rules()])
# RESET the FrozenSet of possible target actions to the original
# value
self.addCleanup(setattr,
rbac_enforcer.enforcer,
'_POSSIBLE_TARGET_ACTIONS',
original_actions)
# Force a reset on the enforcer to load up new policy rules.
self.enforcer._reset()
def _setup_dynamic_flask_blueprint_api(self):
# Create a dynamic flask blueprint with a known prefix
api = uuid.uuid4().hex
url_prefix = '/_%s_TEST' % api
blueprint = blueprints.Blueprint(api, __name__, url_prefix=url_prefix)
self.url_prefix = url_prefix
self.flask_blueprint = blueprint
self.cleanup_instance('flask_blueprint', 'url_prefix')
def _setup_flask_restful_api(self):
self.restful_api_url_prefix = '/_%s_TEST' % uuid.uuid4().hex
self.restful_api = flask_restful.Api(self.public_app.app,
self.restful_api_url_prefix)
user = self.user_req_admin
# Very Basic Restful Resource
class RestfulResource(flask_restful.Resource):
def get(self, argument_id):
return {'argument': {
'id': argument_id,
'value': 'TEST',
'owner_id': user['id']}}
self.restful_api_resource = RestfulResource
self.restful_api.add_resource(
RestfulResource, '/argument/<string:argument_id>')
self.cleanup_instance('restful_api', 'restful_resource',
'restful_api_url_prefix')
def _register_blueprint_to_app(self):
# TODO(morgan): remove the need for webtest, but for now just unwrap
# by one layer. Once everything is converted to flask, we can fix
# the tests to eliminate "webtest".
self.public_app.app.register_blueprint(
self.flask_blueprint, url_prefix=self.url_prefix)
def _auth_json(self):
return {
'auth': {
'identity': {
'methods': ['password'],
'password': {
'user': {
'name': self.user_req_admin['name'],
'password': self.user_req_admin['password'],
'domain': {
'id': self.user_req_admin['domain_id']
}
}
}
},
'scope': {
'project': {
'id': self.tenant_service['id']
}
}
}
}
def _testing_policy_rules(self):
test_policy_rules = [
policy.RuleDefault(
name='example:subject_token',
check_str='user_id:%(target.token.user_id)s',
scope_types=['project'],
),
policy.RuleDefault(
name='example:target',
check_str='user_id:%(target.myuser.id)s',
scope_types=['project'],
),
policy.RuleDefault(
name='example:inferred_member_data',
check_str='user_id:%(target.argument.owner_id)s',
scope_types=['project'],
),
policy.RuleDefault(
name='example:with_filter',
check_str='user_id:%(user)s',
scope_types=['project'],
),
policy.RuleDefault(
name='example:allowed',
check_str='',
scope_types=['project'],
),
policy.RuleDefault(
name='example:denied',
check_str='false:false',
scope_types=['project'],
),
]
return test_policy_rules
class TestRBACEnforcerRestAdminAuthToken(_TestRBACEnforcerBase):
def config_overrides(self):
super(TestRBACEnforcerRestAdminAuthToken, self).config_overrides()
self.config_fixture.config(admin_token='ADMIN')
def test_enforcer_is_admin_check_with_token(self):
# Admin-shared token passed and valid, "is_admin" should be true.
with self.test_client() as c:
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={authorization.AUTH_TOKEN_HEADER: 'ADMIN'})
self.assertTrue(self.enforcer._shared_admin_auth_token_set())
def test_enforcer_is_admin_check_without_token(self):
with self.test_client() as c:
# Admin-shared token passed and invalid, "is_admin" should be false
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={authorization.AUTH_TOKEN_HEADER: 'BOGUS'})
self.assertFalse(self.enforcer._shared_admin_auth_token_set())
# Admin-shared token not passed, "is_admin" should be false
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex))
self.assertFalse(self.enforcer._shared_admin_auth_token_set())
def test_enforce_call_is_admin(self):
with self.test_client() as c:
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={authorization.AUTH_TOKEN_HEADER: 'ADMIN'})
with mock.patch.object(self.enforcer, '_enforce') as mock_method:
self.enforcer.enforce_call(action='example:allowed')
mock_method.assert_not_called()
class TestRBACEnforcerRest(_TestRBACEnforcerBase):
def test_extract_subject_token_target_data(self):
path = '/v3/auth/tokens'
body = self._auth_json()
with self.test_client() as c:
r = c.post(
path,
json=body,
follow_redirects=True,
expected_status_code=201)
token_id = r.headers['X-Subject-Token']
c.get('/v3', headers={'X-Auth-Token': token_id,
'X-Subject-Token': token_id})
token_ref = token_model.KeystoneToken(
token_id=token_id,
token_data=PROVIDER_APIS.token_provider_api.validate_token(
token_id
)
)
subj_token_data = (
self.enforcer._extract_subject_token_target_data())
subj_token_data = subj_token_data['token']
self.assertEqual(token_ref.user_id, subj_token_data['user_id'])
self.assertIn('user', subj_token_data)
self.assertIn('domain', subj_token_data['user'])
self.assertEqual(token_ref.user_domain_id,
subj_token_data['user']['domain']['id'])
def test_extract_filter_data(self):
# Test that we are extracting useful filter data from the
# request context. The tested function validates tha
# extract_filter_attr only adds the passed filter values to the
# policy dict, all other query-params are ignored.
path = uuid.uuid4().hex
@self.flask_blueprint.route('/%s' % path)
def return_nothing_interesting():
return 'OK', 200
self._register_blueprint_to_app()
with self.test_client() as c:
expected_param = uuid.uuid4().hex
unexpected_param = uuid.uuid4().hex
get_path = '/'.join([self.url_prefix, path])
# Populate the query-string with two params, one that should
# exist and one that should not in the resulting policy
# dict.
qs = '%(expected)s=EXPECTED&%(unexpected)s=UNEXPECTED' % {
'expected': expected_param,
'unexpected': unexpected_param
}
# Perform the get with the query-string
c.get('%(path)s?%(qs)s' % {'path': get_path, 'qs': qs})
# Extract the filter values.
extracted_filter = self.enforcer._extract_filter_values(
[expected_param])
# Unexpected param is not in the extracted values
# Expected param is in the extracted values
# Expected param has the expected value
self.assertNotIn(extracted_filter, unexpected_param)
self.assertIn(expected_param, expected_param)
self.assertEqual(extracted_filter[expected_param], 'EXPECTED')
def test_retrive_oslo_req_context(self):
# Test to ensure 'get_oslo_req_context' is pulling the request context
# from the environ as expected. The only way to really test is an
# instance check.
with self.test_client() as c:
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex))
oslo_req_context = self.enforcer._get_oslo_req_context()
self.assertIsInstance(oslo_req_context, context.RequestContext)
def test_is_authenticated_check(self):
# Check that the auth_context is in-fact decoded as expected.
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
with self.test_client() as c:
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={'X-Auth-Token': token_id})
self.enforcer._assert_is_authenticated()
c.get('/', expected_status_code=300)
self.assertRaises(exception.Unauthorized,
self.enforcer._assert_is_authenticated)
oslo_ctx = self.enforcer._get_oslo_req_context()
# Set authenticated to a false value that is not None
oslo_ctx.authenticated = False
self.assertRaises(exception.Unauthorized,
self.enforcer._assert_is_authenticated)
def test_extract_policy_check_credentials(self):
# Make sure extracting the creds is the same as what is in the request
# environment.
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
with self.test_client() as c:
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={'X-Auth-Token': token_id})
extracted_creds = self.enforcer._extract_policy_check_credentials()
self.assertEqual(
flask.request.environ.get(authorization.AUTH_CONTEXT_ENV),
extracted_creds)
def test_extract_member_target_data_inferred(self):
self.restful_api_resource.member_name = 'argument'
member_from_driver = self.restful_api_resource.get
self.restful_api_resource.get_member_from_driver = member_from_driver
argument_id = uuid.uuid4().hex
with self.test_client() as c:
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
argument_id))
extracted = self.enforcer._extract_member_target_data(
member_target_type=None, member_target=None)
self.assertDictEqual(extracted['target'],
self.restful_api_resource().get(argument_id))
def test_extract_member_target_data_supplied_target(self):
# Test extract member target data with member_target and
# member_target_type supplied.
member_type = uuid.uuid4().hex
member_target = {uuid.uuid4().hex: {uuid.uuid4().hex}}
extracted = self.enforcer._extract_member_target_data(
member_target_type=member_type, member_target=member_target)
self.assertDictEqual({'target': {member_type: member_target}},
extracted)
def test_extract_member_target_data_bad_input(self):
# Test Extract Member Target Data with only "member_target" and only
# "member_target_type" and ensure empty dict is returned.
self.assertEqual({}, self.enforcer._extract_member_target_data(
member_target=None, member_target_type=uuid.uuid4().hex))
self.assertEqual({}, self.enforcer._extract_member_target_data(
member_target={}, member_target_type=None))
def test_policy_enforcer_action_decorator(self):
# Create a method that has an action pre-registered
action = 'example:allowed'
@self.flask_blueprint.route('')
@self.enforcer.policy_enforcer_action(action)
def nothing_interesting():
return 'OK', 200
self._register_blueprint_to_app()
with self.test_client() as c:
c.get('%s' % self.url_prefix)
self.assertEqual(
action, getattr(g, self.enforcer.ACTION_STORE_ATTR))
def test_policy_enforcer_action_invalid_action_decorator(self):
# If the "action" is not a registered policy enforcement point, check
# that a ValueError is raised.
def _decorator_fails():
# Create a method that has an action pre-registered, but the
# action is bogus
action = uuid.uuid4().hex
@self.flask_blueprint.route('')
@self.enforcer.policy_enforcer_action(action)
def nothing_interesting():
return 'OK', 200
self.assertRaises(ValueError, _decorator_fails)
def test_enforce_call_invalid_action(self):
self.assertRaises(exception.Unauthorized, self.enforcer.enforce_call,
action=uuid.uuid4().hex)
def test_enforce_call_not_is_authenticated(self):
with self.test_client() as c:
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex))
# Patch the enforcer to return an empty oslo context.
with mock.patch.object(self.enforcer, '_get_oslo_req_context',
return_value=None):
self.assertRaises(
exception.Unauthorized,
self.enforcer.enforce_call, action='example:allowed')
# Explicitly set "authenticated" on the context to false.
ctx = self.enforcer._get_oslo_req_context()
ctx.authenticated = False
self.assertRaises(
exception.Unauthorized,
self.enforcer.enforce_call, action='example:allowed')
def test_enforce_call_explicit_target_attr(self):
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
with self.test_client() as c:
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
# check the enforcer properly handles explicitly passed in targets
# no subject-token processing is done in this case.
#
# TODO(morgan): confirm if subject-token-processing can/should
# occur in this form without causing issues.
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={'X-Auth-Token': token_id,
'X-Subject-Token': token_id})
target = {'myuser': {'id': self.user_req_admin['id']}}
self.enforcer.enforce_call(action='example:target',
target_attr=target)
# Ensure extracting the subject-token data is not happening.
self.assertRaises(
exception.ForbiddenAction,
self.enforcer.enforce_call,
action='example:subject_token',
target_attr=target)
def test_enforce_call_with_subject_token_data(self):
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
with self.test_client() as c:
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
# Check that the enforcer passes if user_id and subject token
# user_id are the same. example:deprecated should also pass
# since it is open enforcement.
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={'X-Auth-Token': token_id,
'X-Subject-Token': token_id})
self.enforcer.enforce_call(action='example:subject_token')
def test_enforce_call_with_member_target_type_and_member_target(self):
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
with self.test_client() as c:
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
# check the enforcer properly handles passed in member_target_type
# and member_target. This form still extracts data from the subject
# token.
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={'X-Auth-Token': token_id,
'X-Subject-Token': token_id})
target_type = 'myuser'
target = {'id': self.user_req_admin['id']}
self.enforcer.enforce_call(action='example:target',
member_target_type=target_type,
member_target=target)
# Ensure we're still extracting the subject-token data
self.enforcer.enforce_call(action='example:subject_token')
def test_enforce_call_inferred_member_target_data(self):
# Check that inferred "get" works as expected for the member target
# setup the restful resource for an inferred "get"
self.restful_api_resource.member_name = 'argument'
member_from_driver = self.restful_api_resource.get
self.restful_api_resource.get_member_from_driver = member_from_driver
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
with self.test_client() as c:
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
# check the enforcer properly handles inferred member data get
# This form still extracts data from the subject token.
c.get('%s/argument/%s' % (self.restful_api_url_prefix,
uuid.uuid4().hex),
headers={'X-Auth-Token': token_id,
'X-Subject-Token': token_id})
self.enforcer.enforce_call(action='example:inferred_member_data')
# Ensure we're still extracting the subject-token data
self.enforcer.enforce_call(action='example:subject_token')
def test_enforce_call_with_filter_values(self):
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
with self.test_client() as c:
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
# Check that the enforcer passes if a filter is supplied *and*
# the filter name is passed to enforce_call
c.get('%s/argument/%s?user=%s' % (
self.restful_api_url_prefix, uuid.uuid4().hex,
self.user_req_admin['id']),
headers={'X-Auth-Token': token_id})
self.enforcer.enforce_call(action='example:with_filter',
filters=['user'])
# With No Filters passed into enforce_call
self.assertRaises(
exception.ForbiddenAction,
self.enforcer.enforce_call,
action='example:with_filter')
# With No Filters in the PATH
c.get('%s/argument/%s' % (
self.restful_api_url_prefix, uuid.uuid4().hex),
headers={'X-Auth-Token': token_id})
self.assertRaises(
exception.ForbiddenAction,
self.enforcer.enforce_call,
action='example:with_filter',
filters=['user'])
# With no filters in the path and no filters passed to enforce_call
c.get('%s/argument/%s' % (
self.restful_api_url_prefix, uuid.uuid4().hex),
headers={'X-Auth-Token': token_id})
self.assertRaises(
exception.ForbiddenAction,
self.enforcer.enforce_call,
action='example:with_filter')
def test_enforce_call_with_pre_instantiated_enforcer(self):
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
enforcer = rbac_enforcer.enforcer.RBACEnforcer()
with self.test_client() as c:
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
# Check the enforcer behaves as expected with a pre-instantiated
# enforcer passed into .enforce_call()
c.get('%s/argument/%s' % (
self.restful_api_url_prefix, uuid.uuid4().hex),
headers={'X-Auth-Token': token_id})
self.enforcer.enforce_call(action='example:allowed',
enforcer=enforcer)
self.assertRaises(exception.ForbiddenAction,
self.enforcer.enforce_call,
action='example:denied',
enforcer=enforcer)

View File

@ -14,7 +14,7 @@
import fixtures
from oslo_policy import opts
from keystone.common import policy
from keystone.common.rbac_enforcer import policy
class Policy(fixtures.Fixture):
@ -29,5 +29,4 @@ class Policy(fixtures.Fixture):
opts.set_defaults(self._config_fixture.conf)
self._config_fixture.config(group='oslo_policy',
policy_file=self._policy_file)
policy.init()
self.addCleanup(policy.reset)

View File

@ -24,7 +24,7 @@ import six
from testtools import matchers
from keystone.common import policies
from keystone.common import policy
from keystone.common.rbac_enforcer import policy
import keystone.conf
from keystone import exception
from keystone.tests import unit
@ -56,7 +56,7 @@ class PolicyFileTestCase(unit.TestCase):
policy.enforce(empty_credentials, action, self.target)
with open(self.tmpfilename, "w") as policyfile:
policyfile.write("""{"example:test": ["false:false"]}""")
policy._ENFORCER.clear()
policy._ENFORCER._enforcer.clear()
self.assertRaises(exception.ForbiddenAction, policy.enforce,
empty_credentials, action, self.target)
@ -84,7 +84,7 @@ class PolicyTestCase(unit.TestCase):
def _set_rules(self):
these_rules = common_policy.Rules.from_dict(self.rules)
policy._ENFORCER.set_rules(these_rules)
policy._ENFORCER._enforcer.set_rules(these_rules)
def test_enforce_nonexistent_action_throws(self):
action = "example:noexist"
@ -132,13 +132,12 @@ class PolicyScopeTypesEnforcementTestCase(unit.TestCase):
def setUp(self):
super(PolicyScopeTypesEnforcementTestCase, self).setUp()
policy.init()
rule = common_policy.RuleDefault(
name='foo',
check_str='',
scope_types=['system']
)
policy._ENFORCER.register_default(rule)
policy._ENFORCER._enforcer.register_default(rule)
self.credentials = {}
self.action = 'foo'
self.target = {}
@ -203,11 +202,10 @@ class PolicyJsonTestCase(unit.TestCase):
'is_admin_project': True, 'project_id': None,
'domain_id': uuid.uuid4().hex}
# Since we are moving policy.json defaults to code, we instead call
# `policy.init()` which does the enforce setup for us with the added
# bonus of registering the in code default policies.
policy.init()
result = policy._ENFORCER.enforce(action, target, credentials)
# The enforcer is setup behind the scenes and registers the in code
# default policies.
result = policy._ENFORCER._enforcer.enforce(action, target,
credentials)
self.assertTrue(result)
domain_policy = unit.dirs.etc('policy.v3cloudsample.json')

View File

@ -33,8 +33,8 @@ from testtools import testcase
from keystone import auth
from keystone.auth.plugins import totp
from keystone.common import policy
from keystone.common import provider_api
from keystone.common.rbac_enforcer import policy
from keystone.common import utils
import keystone.conf
from keystone.credential.providers import fernet as credential_fernet

View File

@ -181,7 +181,7 @@ oslo.policy.policies =
keystone = keystone.common.policies:list_rules
oslo.policy.enforcer =
keystone = keystone.common.policy:get_enforcer
keystone = keystone.common.rbac_enforcer.policy:get_enforcer
keystone.server_middleware =
cors = oslo_middleware:CORS