Add support for enforce_call to set value on flask.g
When enforce_call is called, flask.g has the appropriate value set in the known attribute location so that it can be determined if enforce_call was used within a REST API request. Change-Id: I08ecd2be0a80248df7041596437adb6238835153 Partial-Bug: #1776504
This commit is contained in:
parent
bb3b15bbf0
commit
fb0299f661
|
@ -37,6 +37,7 @@ _POSSIBLE_TARGET_ACTIONS = frozenset([
|
|||
rule.name for
|
||||
rule in policies.list_rules() if not rule.deprecated_for_removal
|
||||
])
|
||||
_ENFORCEMENT_CHECK_ATTR = 'keystone:RBAC:enforcement_called'
|
||||
|
||||
|
||||
class RBACEnforcer(object):
|
||||
|
@ -288,6 +289,12 @@ class RBACEnforcer(object):
|
|||
'Internal RBAC enforcement error, no rule/action name to '
|
||||
'lookup'))
|
||||
|
||||
# Mark flask.g as "enforce_call" has been called. This should occur
|
||||
# before anything except the "is this a valid action" check, ensuring
|
||||
# all proper "after request" checks pass, showing that the API has
|
||||
# enforcement.
|
||||
setattr(flask.g, _ENFORCEMENT_CHECK_ATTR, True)
|
||||
|
||||
# Assert we are actually authenticated
|
||||
cls._assert_is_authenticated()
|
||||
|
||||
|
|
|
@ -20,22 +20,24 @@ import flask_restful
|
|||
from oslo_log import log
|
||||
import six
|
||||
|
||||
from keystone.common.rbac_enforcer import enforcer
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
ResourceMap = collections.namedtuple('resource_map', 'resource, urls, kwargs')
|
||||
|
||||
|
||||
_ENFORCEMENT_CHECK_ATTR = 'keystone:RBAC:enforcement_called'
|
||||
|
||||
|
||||
def _initialize_rbac_enforcement_check():
|
||||
setattr(g, _ENFORCEMENT_CHECK_ATTR, False)
|
||||
setattr(g, enforcer._ENFORCEMENT_CHECK_ATTR, False)
|
||||
|
||||
|
||||
def _assert_rbac_enforcement_called():
|
||||
# assert is intended to be used to ensure code during development works
|
||||
# as expected, it is fine to be optimized out with `python -O`
|
||||
assert getattr(g, _ENFORCEMENT_CHECK_ATTR, False) # nosec
|
||||
msg = ('PROGRAMMING ERROR: enforcement (`keystone.common.rbac_enforcer.'
|
||||
'enforcer.RBACKEnforcer.enforce_call()`) has not been called; API '
|
||||
'is unenforced.')
|
||||
assert getattr(g, enforcer._ENFORCEMENT_CHECK_ATTR, False), msg # nosec
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
|
@ -212,7 +214,7 @@ class APIBase(object):
|
|||
"""
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
setattr(g, _ENFORCEMENT_CHECK_ATTR, True)
|
||||
setattr(g, enforcer._ENFORCEMENT_CHECK_ATTR, True)
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
|
|
@ -562,3 +562,44 @@ class TestRBACEnforcerRest(_TestRBACEnforcerBase):
|
|||
self.enforcer.enforce_call,
|
||||
action='example:denied',
|
||||
enforcer=enforcer)
|
||||
|
||||
def test_enforce_call_sets_enforcement_attr(self):
|
||||
# Ensure calls to enforce_call set the value on flask.g that indicates
|
||||
# enforce_call has actually been called
|
||||
token_path = '/v3/auth/tokens'
|
||||
auth_json = self._auth_json()
|
||||
with self.test_client() as c:
|
||||
# setup/initial call. Note that the request must hit the flask
|
||||
# app to have access to g (without an explicit app-context push)
|
||||
r = c.post(token_path, json=auth_json, expected_status_code=201)
|
||||
token_id = r.headers.get('X-Subject-Token')
|
||||
c.get('%s/argument/%s' % (
|
||||
self.restful_api_url_prefix, uuid.uuid4().hex),
|
||||
headers={'X-Auth-Token': token_id})
|
||||
|
||||
# Ensure the attribute is not set
|
||||
self.assertFalse(
|
||||
hasattr(
|
||||
flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR)
|
||||
)
|
||||
# Set the value to false, like the resource have done automatically
|
||||
setattr(
|
||||
flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR, False)
|
||||
# Enforce
|
||||
self.enforcer.enforce_call(action='example:allowed')
|
||||
# Verify the attribute has been set to true.
|
||||
self.assertEqual(
|
||||
getattr(flask.g,
|
||||
rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR),
|
||||
True)
|
||||
# Reset Attribute and check that attribute is still set even if
|
||||
# enforcement results in forbidden.
|
||||
setattr(
|
||||
flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR, False)
|
||||
self.assertRaises(exception.ForbiddenAction,
|
||||
self.enforcer.enforce_call,
|
||||
action='example:denied')
|
||||
self.assertEqual(
|
||||
getattr(flask.g,
|
||||
rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR),
|
||||
True)
|
||||
|
|
Loading…
Reference in New Issue