diff --git a/keystone/common/rbac_enforcer/enforcer.py b/keystone/common/rbac_enforcer/enforcer.py index ec85496926..de7b694257 100644 --- a/keystone/common/rbac_enforcer/enforcer.py +++ b/keystone/common/rbac_enforcer/enforcer.py @@ -327,6 +327,13 @@ class RBACEnforcer(object): # The lowest priority values are set first and the highest priority # values are set last. + # Populate the input attributes (view args) directly to the policy + # dict. This is to allow the policy engine to have access to the + # view args for substitution. This is to mirror the old @protected + # mechanism and ensure current policy files continue to work as + # expected. + policy_dict.update(flask.request.view_args) + # Get the Target Data Set. if target_attr is None: try: diff --git a/keystone/tests/unit/common/test_rbac_enforcer.py b/keystone/tests/unit/common/test_rbac_enforcer.py index 44c16db36d..cb393f11c0 100644 --- a/keystone/tests/unit/common/test_rbac_enforcer.py +++ b/keystone/tests/unit/common/test_rbac_enforcer.py @@ -110,12 +110,20 @@ class _TestRBACEnforcerBase(rest.RestfulTestCase): # Very Basic Restful Resource class RestfulResource(flask_restful.Resource): - def get(self, argument_id): + def get(self, argument_id=None): + if argument_id is not None: + return self._get_argument(argument_id) + return self._list_arguments() + + def _get_argument(self, argument_id): return {'argument': driver_simulation_method(argument_id)} + def _list_arguments(self): + return {'arguments': []} + self.restful_api_resource = RestfulResource self.restful_api.add_resource( - RestfulResource, '/argument/') + RestfulResource, '/argument/', '/argument') self.cleanup_instance('restful_api', 'restful_resource', 'restful_api_url_prefix') @@ -355,6 +363,48 @@ class TestRBACEnforcerRest(_TestRBACEnforcerBase): self.assertDictEqual(extracted['target'], self.restful_api_resource().get(argument_id)) + def test_view_args_populated_in_policy_dict(self): + # Setup the "resource" object and make a call that has view arguments + # (substituted values in the URL). Make sure to use an policy enforcer + # that properly checks (substitutes in) a value that is not in "target" + # path but in the main policy dict path. + + def _enforce_mock_func(credentials, action, target, + do_raise=True): + if 'argument_id' not in target: + raise exception.ForbiddenAction(action=action) + + self.useFixture(fixtures.MockPatchObject( + self.enforcer, '_enforce', _enforce_mock_func)) + + argument_id = uuid.uuid4().hex + + # Check with a call that will populate view_args. + + with self.test_client() as c: + path = '/v3/auth/tokens' + body = self._auth_json() + + r = c.post( + path, + json=body, + follow_redirects=True, + expected_status_code=201) + + token_id = r.headers['X-Subject-Token'] + + c.get('%s/argument/%s' % (self.restful_api_url_prefix, + argument_id), + headers={'X-Auth-Token': token_id}) + + # Use any valid policy as _enforce is mockpatched out + self.enforcer.enforce_call(action='example:allowed') + c.get('%s/argument' % self.restful_api_url_prefix, + headers={'X-Auth-Token': token_id}) + self.assertRaises(exception.ForbiddenAction, + self.enforcer.enforce_call, + action='example:allowed') + def test_extract_member_target_data_supplied_target(self): # Test extract member target data with member_target and # member_target_type supplied.