# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import uuid import fixtures import flask from flask import blueprints import flask_restful import mock from oslo_policy import policy from keystone.common import authorization from keystone.common import context from keystone.common import provider_api from keystone.common import rbac_enforcer from keystone import exception from keystone.tests import unit from keystone.tests.unit import rest PROVIDER_APIS = provider_api.ProviderAPIs class TestRBACEnforcer(unit.TestCase): def test_enforcer_shared_state(self): enforcer = rbac_enforcer.enforcer.RBACEnforcer() enforcer2 = rbac_enforcer.enforcer.RBACEnforcer() self.assertIsNotNone(enforcer._enforcer) self.assertEqual(enforcer._enforcer, enforcer2._enforcer) setattr(enforcer, '_test_attr', uuid.uuid4().hex) self.assertEqual(enforcer._test_attr, enforcer2._test_attr) def test_enforcer_auto_instantiated(self): enforcer = rbac_enforcer.enforcer.RBACEnforcer() # Check that the enforcer instantiates the oslo_policy enforcer object # on demand. self.assertIsNotNone(enforcer._enforcer) enforcer._reset() self.assertIsNotNone(enforcer._enforcer) class _TestRBACEnforcerBase(rest.RestfulTestCase): def setUp(self): super(_TestRBACEnforcerBase, self).setUp() self._setup_enforcer_object() self._setup_dynamic_flask_blueprint_api() self._setup_flask_restful_api() def _setup_enforcer_object(self): self.enforcer = rbac_enforcer.enforcer.RBACEnforcer() self.cleanup_instance('enforcer') def register_new_rules(enforcer): rules = self._testing_policy_rules() enforcer.register_defaults(rules) self.useFixture(fixtures.MockPatchObject( self.enforcer, 'register_rules', register_new_rules)) # Set the possible actions to our limited list original_actions = rbac_enforcer.enforcer._POSSIBLE_TARGET_ACTIONS rbac_enforcer.enforcer._POSSIBLE_TARGET_ACTIONS = frozenset([ rule.name for rule in self._testing_policy_rules()]) # RESET the FrozenSet of possible target actions to the original # value self.addCleanup(setattr, rbac_enforcer.enforcer, '_POSSIBLE_TARGET_ACTIONS', original_actions) # Force a reset on the enforcer to load up new policy rules. self.enforcer._reset() def _setup_dynamic_flask_blueprint_api(self): # Create a dynamic flask blueprint with a known prefix api = uuid.uuid4().hex url_prefix = '/_%s_TEST' % api blueprint = blueprints.Blueprint(api, __name__, url_prefix=url_prefix) self.url_prefix = url_prefix self.flask_blueprint = blueprint self.cleanup_instance('flask_blueprint', 'url_prefix') def _driver_simulation_get_method(self, argument_id): user = self.user_req_admin return {'id': argument_id, 'value': 'TEST', 'owner_id': user['id']} def _setup_flask_restful_api(self): self.restful_api_url_prefix = '/_%s_TEST' % uuid.uuid4().hex self.restful_api = flask_restful.Api(self.public_app.app, self.restful_api_url_prefix) driver_simulation_method = self._driver_simulation_get_method # Very Basic Restful Resource class RestfulResource(flask_restful.Resource): def get(self, argument_id=None): if argument_id is not None: return self._get_argument(argument_id) return self._list_arguments() def _get_argument(self, argument_id): return {'argument': driver_simulation_method(argument_id)} def _list_arguments(self): return {'arguments': []} self.restful_api_resource = RestfulResource self.restful_api.add_resource( RestfulResource, '/argument/', '/argument') self.cleanup_instance('restful_api', 'restful_resource', 'restful_api_url_prefix') def _register_blueprint_to_app(self): # TODO(morgan): remove the need for webtest, but for now just unwrap # by one layer. Once everything is converted to flask, we can fix # the tests to eliminate "webtest". self.public_app.app.register_blueprint( self.flask_blueprint, url_prefix=self.url_prefix) def _auth_json(self): return { 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'name': self.user_req_admin['name'], 'password': self.user_req_admin['password'], 'domain': { 'id': self.user_req_admin['domain_id'] } } } }, 'scope': { 'project': { 'id': self.tenant_service['id'] } } } } def _testing_policy_rules(self): test_policy_rules = [ policy.RuleDefault( name='example:subject_token', check_str='user_id:%(target.token.user_id)s', scope_types=['project'], ), policy.RuleDefault( name='example:target', check_str='user_id:%(target.myuser.id)s', scope_types=['project'], ), policy.RuleDefault( name='example:inferred_member_data', check_str='user_id:%(target.argument.owner_id)s', scope_types=['project'], ), policy.RuleDefault( name='example:with_filter', check_str='user_id:%(user)s', scope_types=['project'], ), policy.RuleDefault( name='example:allowed', check_str='', scope_types=['project'], ), policy.RuleDefault( name='example:denied', check_str='false:false', scope_types=['project'], ), ] return test_policy_rules class TestRBACEnforcerRestAdminAuthToken(_TestRBACEnforcerBase): def config_overrides(self): super(TestRBACEnforcerRestAdminAuthToken, self).config_overrides() self.config_fixture.config(admin_token='ADMIN') def test_enforcer_is_admin_check_with_token(self): # Admin-shared token passed and valid, "is_admin" should be true. with self.test_client() as c: c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={authorization.AUTH_TOKEN_HEADER: 'ADMIN'}) self.assertTrue(self.enforcer._shared_admin_auth_token_set()) def test_enforcer_is_admin_check_without_token(self): with self.test_client() as c: # Admin-shared token passed and invalid, "is_admin" should be false c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={authorization.AUTH_TOKEN_HEADER: 'BOGUS'}) self.assertFalse(self.enforcer._shared_admin_auth_token_set()) # Admin-shared token not passed, "is_admin" should be false c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex)) self.assertFalse(self.enforcer._shared_admin_auth_token_set()) def test_enforce_call_is_admin(self): with self.test_client() as c: c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={authorization.AUTH_TOKEN_HEADER: 'ADMIN'}) with mock.patch.object(self.enforcer, '_enforce') as mock_method: self.enforcer.enforce_call(action='example:allowed') mock_method.assert_not_called() class TestRBACEnforcerRest(_TestRBACEnforcerBase): def test_extract_subject_token_target_data(self): path = '/v3/auth/tokens' body = self._auth_json() with self.test_client() as c: r = c.post( path, json=body, follow_redirects=True, expected_status_code=201) token_id = r.headers['X-Subject-Token'] c.get('/v3', headers={'X-Auth-Token': token_id, 'X-Subject-Token': token_id}) token = PROVIDER_APIS.token_provider_api.validate_token(token_id) subj_token_data = ( self.enforcer._extract_subject_token_target_data()) subj_token_data = subj_token_data['token'] self.assertEqual(token.user_id, subj_token_data['user_id']) self.assertIn('user', subj_token_data) self.assertIn('domain', subj_token_data['user']) self.assertEqual(token.user_domain['id'], subj_token_data['user']['domain']['id']) def test_extract_filter_data(self): # Test that we are extracting useful filter data from the # request context. The tested function validates tha # extract_filter_attr only adds the passed filter values to the # policy dict, all other query-params are ignored. path = uuid.uuid4().hex @self.flask_blueprint.route('/%s' % path) def return_nothing_interesting(): return 'OK', 200 self._register_blueprint_to_app() with self.test_client() as c: expected_param = uuid.uuid4().hex unexpected_param = uuid.uuid4().hex get_path = '/'.join([self.url_prefix, path]) # Populate the query-string with two params, one that should # exist and one that should not in the resulting policy # dict. qs = '%(expected)s=EXPECTED&%(unexpected)s=UNEXPECTED' % { 'expected': expected_param, 'unexpected': unexpected_param } # Perform the get with the query-string c.get('%(path)s?%(qs)s' % {'path': get_path, 'qs': qs}) # Extract the filter values. extracted_filter = self.enforcer._extract_filter_values( [expected_param]) # Unexpected param is not in the extracted values # Expected param is in the extracted values # Expected param has the expected value self.assertNotIn(extracted_filter, unexpected_param) self.assertIn(expected_param, expected_param) self.assertEqual(extracted_filter[expected_param], 'EXPECTED') def test_retrive_oslo_req_context(self): # Test to ensure 'get_oslo_req_context' is pulling the request context # from the environ as expected. The only way to really test is an # instance check. with self.test_client() as c: c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex)) oslo_req_context = self.enforcer._get_oslo_req_context() self.assertIsInstance(oslo_req_context, context.RequestContext) def test_is_authenticated_check(self): # Check that the auth_context is in-fact decoded as expected. token_path = '/v3/auth/tokens' auth_json = self._auth_json() with self.test_client() as c: r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id}) self.enforcer._assert_is_authenticated() c.get('/', expected_status_code=300) self.assertRaises(exception.Unauthorized, self.enforcer._assert_is_authenticated) oslo_ctx = self.enforcer._get_oslo_req_context() # Set authenticated to a false value that is not None oslo_ctx.authenticated = False self.assertRaises(exception.Unauthorized, self.enforcer._assert_is_authenticated) def test_extract_policy_check_credentials(self): # Make sure extracting the creds is the same as what is in the request # environment. token_path = '/v3/auth/tokens' auth_json = self._auth_json() with self.test_client() as c: r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id}) extracted_creds = self.enforcer._extract_policy_check_credentials() self.assertEqual( flask.request.environ.get(authorization.AUTH_CONTEXT_ENV), extracted_creds) def test_extract_member_target_data_inferred(self): # NOTE(morgan): Setup the "resource" object with a 'member_name' attr # and the 'get_member_from_driver' binding to the 'get' method. The # enforcer here will look for 'get_member_from_driver' (callable) and # the 'member_name' (e.g. 'user') so it can automatically populate # the target dict with the member information. This is mostly compat # with current @protected (ease of use). For most cases the target # should be explicitly passed to .enforce_call, but for ease of # converting / use, the automatic population of data has been added. self.restful_api_resource.member_key = 'argument' member_from_driver = self._driver_simulation_get_method self.restful_api_resource.get_member_from_driver = member_from_driver argument_id = uuid.uuid4().hex with self.test_client() as c: c.get('%s/argument/%s' % (self.restful_api_url_prefix, argument_id)) extracted = self.enforcer._extract_member_target_data( member_target_type=None, member_target=None) self.assertDictEqual(extracted['target'], self.restful_api_resource().get(argument_id)) def test_view_args_populated_in_policy_dict(self): # Setup the "resource" object and make a call that has view arguments # (substituted values in the URL). Make sure to use an policy enforcer # that properly checks (substitutes in) a value that is not in "target" # path but in the main policy dict path. def _enforce_mock_func(credentials, action, target, do_raise=True): if 'argument_id' not in target: raise exception.ForbiddenAction(action=action) self.useFixture(fixtures.MockPatchObject( self.enforcer, '_enforce', _enforce_mock_func)) argument_id = uuid.uuid4().hex # Check with a call that will populate view_args. with self.test_client() as c: path = '/v3/auth/tokens' body = self._auth_json() r = c.post( path, json=body, follow_redirects=True, expected_status_code=201) token_id = r.headers['X-Subject-Token'] c.get('%s/argument/%s' % (self.restful_api_url_prefix, argument_id), headers={'X-Auth-Token': token_id}) # Use any valid policy as _enforce is mockpatched out self.enforcer.enforce_call(action='example:allowed') c.get('%s/argument' % self.restful_api_url_prefix, headers={'X-Auth-Token': token_id}) self.assertRaises(exception.ForbiddenAction, self.enforcer.enforce_call, action='example:allowed') def test_extract_member_target_data_supplied_target(self): # Test extract member target data with member_target and # member_target_type supplied. member_type = uuid.uuid4().hex member_target = {uuid.uuid4().hex: {uuid.uuid4().hex}} extracted = self.enforcer._extract_member_target_data( member_target_type=member_type, member_target=member_target) self.assertDictEqual({'target': {member_type: member_target}}, extracted) def test_extract_member_target_data_bad_input(self): # Test Extract Member Target Data with only "member_target" and only # "member_target_type" and ensure empty dict is returned. self.assertEqual({}, self.enforcer._extract_member_target_data( member_target=None, member_target_type=uuid.uuid4().hex)) self.assertEqual({}, self.enforcer._extract_member_target_data( member_target={}, member_target_type=None)) def test_call_build_enforcement_target(self): assertIn = self.assertIn assertEq = self.assertEqual ref_uuid = uuid.uuid4().hex def _enforce_mock_func(credentials, action, target, do_raise=True): assertIn('target.domain.id', target) assertEq(target['target.domain.id'], ref_uuid) def _build_enforcement_target(): return {'domain': {'id': ref_uuid}} self.useFixture(fixtures.MockPatchObject( self.enforcer, '_enforce', _enforce_mock_func)) argument_id = uuid.uuid4().hex with self.test_client() as c: path = '/v3/auth/tokens' body = self._auth_json() r = c.post( path, json=body, follow_redirects=True, expected_status_code=201) token_id = r.headers['X-Subject-Token'] c.get('%s/argument/%s' % (self.restful_api_url_prefix, argument_id), headers={'X-Auth-Token': token_id}) self.enforcer.enforce_call( action='example:allowed', build_target=_build_enforcement_target) def test_policy_enforcer_action_decorator(self): # Create a method that has an action pre-registered action = 'example:allowed' @self.flask_blueprint.route('') @self.enforcer.policy_enforcer_action(action) def nothing_interesting(): return 'OK', 200 self._register_blueprint_to_app() with self.test_client() as c: c.get('%s' % self.url_prefix) self.assertEqual( action, getattr(flask.g, self.enforcer.ACTION_STORE_ATTR)) def test_policy_enforcer_action_invalid_action_decorator(self): # If the "action" is not a registered policy enforcement point, check # that a ValueError is raised. def _decorator_fails(): # Create a method that has an action pre-registered, but the # action is bogus action = uuid.uuid4().hex @self.flask_blueprint.route('') @self.enforcer.policy_enforcer_action(action) def nothing_interesting(): return 'OK', 200 self.assertRaises(ValueError, _decorator_fails) def test_enforce_call_invalid_action(self): self.assertRaises(exception.Forbidden, self.enforcer.enforce_call, action=uuid.uuid4().hex) def test_enforce_call_not_is_authenticated(self): with self.test_client() as c: c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex)) # Patch the enforcer to return an empty oslo context. with mock.patch.object(self.enforcer, '_get_oslo_req_context', return_value=None): self.assertRaises( exception.Unauthorized, self.enforcer.enforce_call, action='example:allowed') # Explicitly set "authenticated" on the context to false. ctx = self.enforcer._get_oslo_req_context() ctx.authenticated = False self.assertRaises( exception.Unauthorized, self.enforcer.enforce_call, action='example:allowed') def test_enforce_call_explicit_target_attr(self): token_path = '/v3/auth/tokens' auth_json = self._auth_json() with self.test_client() as c: r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') # check the enforcer properly handles explicitly passed in targets # no subject-token processing is done in this case. # # TODO(morgan): confirm if subject-token-processing can/should # occur in this form without causing issues. c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id, 'X-Subject-Token': token_id}) target = {'myuser': {'id': self.user_req_admin['id']}} self.enforcer.enforce_call(action='example:target', target_attr=target) # Ensure extracting the subject-token data is not happening. self.assertRaises( exception.ForbiddenAction, self.enforcer.enforce_call, action='example:subject_token', target_attr=target) def test_enforce_call_with_subject_token_data(self): token_path = '/v3/auth/tokens' auth_json = self._auth_json() with self.test_client() as c: r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') # Check that the enforcer passes if user_id and subject token # user_id are the same. example:deprecated should also pass # since it is open enforcement. c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id, 'X-Subject-Token': token_id}) self.enforcer.enforce_call(action='example:subject_token') def test_enforce_call_with_member_target_type_and_member_target(self): token_path = '/v3/auth/tokens' auth_json = self._auth_json() with self.test_client() as c: r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') # check the enforcer properly handles passed in member_target_type # and member_target. This form still extracts data from the subject # token. c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id, 'X-Subject-Token': token_id}) target_type = 'myuser' target = {'id': self.user_req_admin['id']} self.enforcer.enforce_call(action='example:target', member_target_type=target_type, member_target=target) # Ensure we're still extracting the subject-token data self.enforcer.enforce_call(action='example:subject_token') def test_enforce_call_inferred_member_target_data(self): # Check that inferred "get" works as expected for the member target # setup the restful resource for an inferred "get" self.restful_api_resource.member_key = 'argument' member_from_driver = self._driver_simulation_get_method self.restful_api_resource.get_member_from_driver = member_from_driver token_path = '/v3/auth/tokens' auth_json = self._auth_json() with self.test_client() as c: r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') # check the enforcer properly handles inferred member data get # This form still extracts data from the subject token. c.get('%s/argument/%s' % (self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id, 'X-Subject-Token': token_id}) self.enforcer.enforce_call(action='example:inferred_member_data') # Ensure we're still extracting the subject-token data self.enforcer.enforce_call(action='example:subject_token') def test_enforce_call_with_filter_values(self): token_path = '/v3/auth/tokens' auth_json = self._auth_json() with self.test_client() as c: r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') # Check that the enforcer passes if a filter is supplied *and* # the filter name is passed to enforce_call c.get('%s/argument/%s?user=%s' % ( self.restful_api_url_prefix, uuid.uuid4().hex, self.user_req_admin['id']), headers={'X-Auth-Token': token_id}) self.enforcer.enforce_call(action='example:with_filter', filters=['user']) # With No Filters passed into enforce_call self.assertRaises( exception.ForbiddenAction, self.enforcer.enforce_call, action='example:with_filter') # With No Filters in the PATH c.get('%s/argument/%s' % ( self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id}) self.assertRaises( exception.ForbiddenAction, self.enforcer.enforce_call, action='example:with_filter', filters=['user']) # With no filters in the path and no filters passed to enforce_call c.get('%s/argument/%s' % ( self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id}) self.assertRaises( exception.ForbiddenAction, self.enforcer.enforce_call, action='example:with_filter') def test_enforce_call_with_pre_instantiated_enforcer(self): token_path = '/v3/auth/tokens' auth_json = self._auth_json() enforcer = rbac_enforcer.enforcer.RBACEnforcer() with self.test_client() as c: r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') # Check the enforcer behaves as expected with a pre-instantiated # enforcer passed into .enforce_call() c.get('%s/argument/%s' % ( self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id}) self.enforcer.enforce_call(action='example:allowed', enforcer=enforcer) self.assertRaises(exception.ForbiddenAction, self.enforcer.enforce_call, action='example:denied', enforcer=enforcer) def test_enforce_call_sets_enforcement_attr(self): # Ensure calls to enforce_call set the value on flask.g that indicates # enforce_call has actually been called token_path = '/v3/auth/tokens' auth_json = self._auth_json() with self.test_client() as c: # setup/initial call. Note that the request must hit the flask # app to have access to g (without an explicit app-context push) r = c.post(token_path, json=auth_json, expected_status_code=201) token_id = r.headers.get('X-Subject-Token') c.get('%s/argument/%s' % ( self.restful_api_url_prefix, uuid.uuid4().hex), headers={'X-Auth-Token': token_id}) # Ensure the attribute is not set self.assertFalse( hasattr( flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR) ) # Set the value to false, like the resource have done automatically setattr( flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR, False) # Enforce self.enforcer.enforce_call(action='example:allowed') # Verify the attribute has been set to true. self.assertEqual( getattr(flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR), True) # Reset Attribute and check that attribute is still set even if # enforcement results in forbidden. setattr( flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR, False) self.assertRaises(exception.ForbiddenAction, self.enforcer.enforce_call, action='example:denied') self.assertEqual( getattr(flask.g, rbac_enforcer.enforcer._ENFORCEMENT_CHECK_ATTR), True)