Add support for enforce_call to set value on flask.g

When enforce_call is called, flask.g has the appropriate value set
in the known attribute location so that it can be determined if
enforce_call was used within a REST API request.

Change-Id: I08ecd2be0a80248df7041596437adb6238835153
Partial-Bug: #1776504
This commit is contained in:
Morgan Fainberg 2018-06-26 10:26:20 -07:00
parent bb3b15bbf0
commit fb0299f661
3 changed files with 56 additions and 6 deletions

View File

@ -37,6 +37,7 @@ _POSSIBLE_TARGET_ACTIONS = frozenset([
rule.name for
rule in policies.list_rules() if not rule.deprecated_for_removal
])
_ENFORCEMENT_CHECK_ATTR = 'keystone:RBAC:enforcement_called'
class RBACEnforcer(object):
@ -288,6 +289,12 @@ class RBACEnforcer(object):
'Internal RBAC enforcement error, no rule/action name to '
'lookup'))
# Mark flask.g as "enforce_call" has been called. This should occur
# before anything except the "is this a valid action" check, ensuring
# all proper "after request" checks pass, showing that the API has
# enforcement.
setattr(flask.g, _ENFORCEMENT_CHECK_ATTR, True)
# Assert we are actually authenticated
cls._assert_is_authenticated()

View File

@ -20,22 +20,24 @@ import flask_restful
from oslo_log import log
import six
from keystone.common.rbac_enforcer import enforcer
LOG = log.getLogger(__name__)
ResourceMap = collections.namedtuple('resource_map', 'resource, urls, kwargs')
_ENFORCEMENT_CHECK_ATTR = 'keystone:RBAC:enforcement_called'
def _initialize_rbac_enforcement_check():
setattr(g, _ENFORCEMENT_CHECK_ATTR, False)
setattr(g, enforcer._ENFORCEMENT_CHECK_ATTR, False)
def _assert_rbac_enforcement_called():
# assert is intended to be used to ensure code during development works
# as expected, it is fine to be optimized out with `python -O`
assert getattr(g, _ENFORCEMENT_CHECK_ATTR, False) # nosec
msg = ('PROGRAMMING ERROR: enforcement (`keystone.common.rbac_enforcer.'
'enforcer.RBACKEnforcer.enforce_call()`) has not been called; API '
'is unenforced.')
assert getattr(g, enforcer._ENFORCEMENT_CHECK_ATTR, False), msg # nosec
@six.add_metaclass(abc.ABCMeta)
@ -212,7 +214,7 @@ class APIBase(object):
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
setattr(g, _ENFORCEMENT_CHECK_ATTR, True)
setattr(g, enforcer._ENFORCEMENT_CHECK_ATTR, True)
return f(*args, **kwargs)
return wrapper

View File

@ -562,3 +562,44 @@ class TestRBACEnforcerRest(_TestRBACEnforcerBase):
self.enforcer.enforce_call,
action='example:denied',
enforcer=enforcer)
def test_enforce_call_sets_enforcement_attr(self):
# Ensure calls to enforce_call set the value on flask.g that indicates
# enforce_call has actually been called
token_path = '/v3/auth/tokens'
auth_json = self._auth_json()
with self.test_client() as c:
# setup/initial call. Note that the request must hit the flask
# app to have access to g (without an explicit app-context push)
r = c.post(token_path, json=auth_json, expected_status_code=201)
token_id = r.headers.get('X-Subject-Token')
c.get('%s/argument/%s' % (
self.restful_api_url_prefix, uuid.uuid4().hex),
headers={'X-Auth-Token': token_id})
# Ensure the attribute is not set
self.assertFalse(
hasattr(
flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR)
)
# Set the value to false, like the resource have done automatically
setattr(
flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR, False)
# Enforce
self.enforcer.enforce_call(action='example:allowed')
# Verify the attribute has been set to true.
self.assertEqual(
getattr(flask.g,
rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR),
True)
# Reset Attribute and check that attribute is still set even if
# enforcement results in forbidden.
setattr(
flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR, False)
self.assertRaises(exception.ForbiddenAction,
self.enforcer.enforce_call,
action='example:denied')
self.assertEqual(
getattr(flask.g,
rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR),
True)