patrole/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py

1090 lines
46 KiB
Python

# Copyright 2017 AT&T Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import fixtures
import functools
from oslo_config import cfg
from tempest.lib import exceptions
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_rule_validation as rbac_rv
from patrole_tempest_plugin.tests.unit import base
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
CONF = cfg.CONF
class BaseRBACRuleValidationTest(base.TestCase):
test_roles = ['member']
def setUp(self):
super(BaseRBACRuleValidationTest, self).setUp()
self.rbac_utils_fixture = self.useFixture(
patrole_fixtures.RbacUtilsMixinFixture(
rbac_test_roles=self.test_roles))
self.test_obj = self.rbac_utils_fixture.test_obj
class BaseRBACMultiRoleRuleValidationTest(BaseRBACRuleValidationTest):
test_roles = ['member', 'anotherrole']
class RBACRuleValidationTest(BaseRBACRuleValidationTest):
"""Test suite for validating fundamental functionality for the
``rbac_rule_validation`` decorator.
"""
def setUp(self):
super(RBACRuleValidationTest, self).setUp()
# This behavior is tested in separate test class below.
self.useFixture(fixtures.MockPatchObject(
rbac_rv, '_validate_override_role_called'))
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_have_permission_no_exc(self, mock_authority,
mock_log):
"""Test that having permission and no exception thrown is success.
Positive test case success scenario.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
def test_policy(*args):
pass
test_policy(self.test_obj)
mock_log.error.assert_not_called()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_lack_permission_throw_exc(self, mock_authority,
mock_log):
"""Test that having no permission and exception thrown is success.
Negative test case success scenario.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
def test_policy(*args):
raise exceptions.Forbidden()
test_policy(self.test_obj)
mock_log.error.assert_not_called()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_forbidden_negative(self, mock_authority,
mock_log):
"""Test RbacUnderPermissionException error is thrown and have
permission fails.
Negative test case: if Forbidden is thrown and the user should be
allowed to perform the action, then the RbacUnderPermissionException
exception should be raised.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
def test_policy(*args):
raise exceptions.Forbidden()
test_re = (r"User with roles \['member'\] was not allowed to perform "
r"the following actions: \[%s\].*" % (mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_rbac_failed_response_body_positive(
self, mock_authority, mock_log):
"""Test BasePatroleResponseBodyException error is thrown without
permission passes.
Positive test case: if subclass of BasePatroleResponseBodyException is
thrown and the user is not allowed to perform the action, then this is
a success.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
def _do_test(exception_cls, **kwargs):
@rbac_rv.action(mock.sentinel.service,
rules=[mock.sentinel.action])
def test_policy(*args):
raise exception_cls(**kwargs)
mock_log.error.assert_not_called()
mock_log.warning.assert_not_called()
_do_test(rbac_exceptions.RbacMissingAttributeResponseBody,
attribute=mock.sentinel.attr)
_do_test(rbac_exceptions.RbacPartialResponseBody,
body=mock.sentinel.body)
_do_test(rbac_exceptions.RbacEmptyResponseBody)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_soft_authorization_exceptions(
self, mock_authority, mock_log):
"""Test RbacUnderPermissionException error is thrown when any of the
soft authorization-related exceptions are raised by a test.
Negative test case: if subclass of BasePatroleResponseBodyException is
thrown and the user is allowed to perform the action, then this is an
expected failure.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
def _do_test(exception_cls, **kwargs):
@rbac_rv.action(mock.sentinel.service,
rules=[mock.sentinel.action])
def test_policy(*args):
raise exception_cls(**kwargs)
test_re = (r".*User with roles \[%s\] was not allowed to "
r"perform the following actions: \[%s\].*" % (
', '.join("'%s'" % r for r in self.test_roles),
mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re,
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
_do_test(rbac_exceptions.RbacMissingAttributeResponseBody,
attribute=mock.sentinel.attr)
_do_test(rbac_exceptions.RbacPartialResponseBody,
body=mock.sentinel.body)
_do_test(rbac_exceptions.RbacEmptyResponseBody)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_expect_not_found_but_raises_forbidden(self, mock_authority,
mock_log):
"""Test that expecting 404 but getting 403 works for all scenarios.
Tests the following scenarios:
1) Test no permission and 404 is expected but 403 is thrown throws
exception.
2) Test have permission and 404 is expected but 403 is thrown throws
exception.
"""
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action],
expected_error_codes=[404])
def test_policy(*args):
raise exceptions.Forbidden('Test message')
error_re = r'Expected .* to be raised but .* was raised instead'
for allowed in [True, False]:
mock_authority.PolicyAuthority.return_value.allowed.\
return_value = allowed
self.assertRaisesRegex(
rbac_exceptions.RbacExpectedWrongException, error_re,
test_policy, self.test_obj)
self.assertTrue(mock_log.error.called)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_expect_not_found_and_raise_not_found(self, mock_authority,
mock_log):
"""Test that expecting 404 and getting 404 works for all scenarios.
Tests the following scenarios:
1) Test no permission and 404 is expected and 404 is thrown succeeds.
2) Test have permission and 404 is expected and 404 is thrown fails.
In both cases, a LOG.warning is called with the "irregular message"
that signals to user that a 404 was expected and caught.
"""
policy_names = ['foo:bar']
@rbac_rv.action(mock.sentinel.service, rules=policy_names,
expected_error_codes=[404])
def test_policy(*args):
raise exceptions.NotFound()
expected_errors = [
(r"User with roles \['member'\] was not allowed to perform the "
r"following actions: \['%s'\].*" % policy_names[0]),
None
]
for pos, allowed in enumerate([True, False]):
mock_authority.PolicyAuthority.return_value.allowed\
.return_value = allowed
error_re = expected_errors[pos]
if error_re:
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
else:
test_policy(self.test_obj)
mock_log.error.assert_not_called()
mock_log.warning.assert_called_with(
"NotFound exception was caught for test %s. Expected policies "
"which may have caused the error: %s. The service %s throws a "
"404 instead of a 403, which is irregular",
test_policy.__name__,
', '.join(policy_names),
mock.sentinel.service)
mock_log.warning.reset_mock()
mock_log.error.reset_mock()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_overpermission_negative(self, mock_authority,
mock_log):
"""Test that RbacOverPermissionException is correctly handled.
Tests that case where no exception is thrown but the Patrole framework
says that the role should not be allowed to perform the policy action.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
def test_policy_expect_forbidden(*args):
pass
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action],
expected_error_codes=[404])
def test_policy_expect_not_found(*args):
pass
for test_policy in (
test_policy_expect_forbidden, test_policy_expect_not_found):
error_re = r".*OverPermission: .* \[%s\]$" % mock.sentinel.action
self.assertRaisesRegex(rbac_exceptions.RbacOverPermissionException,
error_re, test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
mock_log.error.reset_mock()
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_invalid_policy_rule_raises_parsing_exception(
self, mock_authority):
"""Test that invalid policy action causes test to raise an exception.
"""
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = rbac_exceptions.RbacParsingException
@rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
def test_policy(*args):
pass
error_re = 'Attempted to test an invalid policy file or action'
self.assertRaisesRegex(rbac_exceptions.RbacParsingException, error_re,
test_policy, self.test_obj)
mock_authority.PolicyAuthority.assert_called_once_with(
mock.sentinel.project_id, mock.sentinel.user_id,
mock.sentinel.service, extra_target_data={})
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_get_exception_type_404(self, _):
"""Test that getting a 404 exception type returns NotFound."""
expected_exception = exceptions.NotFound
expected_irregular_msg = (
"NotFound exception was caught for test %s. Expected policies "
"which may have caused the error: %s. The service %s throws a "
"404 instead of a 403, which is irregular")
actual_exception, actual_irregular_msg = \
rbac_rv._get_exception_type(404)
self.assertEqual(expected_exception, actual_exception)
self.assertEqual(expected_irregular_msg, actual_irregular_msg)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_get_exception_type_403(self, _):
"""Test that getting a 403 exception type returns Forbidden."""
expected_exception = exceptions.Forbidden
expected_irregular_msg = None
actual_exception, actual_irregular_msg = \
rbac_rv._get_exception_type(403)
self.assertEqual(expected_exception, actual_exception)
self.assertEqual(expected_irregular_msg, actual_irregular_msg)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
def test_exception_thrown_when_type_is_not_int(self, mock_log, _):
"""Test that non-integer exception type raises error."""
self.assertRaises(rbac_exceptions.RbacInvalidErrorCode,
rbac_rv._get_exception_type, "403")
mock_log.error.assert_called_once_with("Please pass an expected error "
"code. Currently supported "
"codes: [403, 404]")
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
def test_exception_thrown_when_type_is_403_or_404(self, mock_log, _):
"""Test that unsupported exceptions throw error."""
invalid_exceptions = [200, 400, 500]
for exc in invalid_exceptions:
self.assertRaises(rbac_exceptions.RbacInvalidErrorCode,
rbac_rv._get_exception_type, exc)
mock_log.error.assert_called_once_with(
"Please pass an expected error code. Currently supported "
"codes: [403, 404]")
mock_log.error.reset_mock()
class RBACMultiRoleRuleValidationTest(BaseRBACMultiRoleRuleValidationTest,
RBACRuleValidationTest):
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_forbidden_negative(self, mock_authority,
mock_log):
"""Test RbacUnderPermissionException error is thrown and have
permission fails.
Negative test case: if Forbidden is thrown and the user should be
allowed to perform the action, then the RbacUnderPermissionException
exception should be raised.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
def test_policy(*args):
raise exceptions.Forbidden()
test_re = (r"User with roles \['member', 'anotherrole'\] was not "
r"allowed to perform the following actions: \[%s\].*" %
(mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_expect_not_found_and_raise_not_found(self, mock_authority,
mock_log):
"""Test that expecting 404 and getting 404 works for all scenarios.
Tests the following scenarios:
1) Test no permission and 404 is expected and 404 is thrown succeeds.
2) Test have permission and 404 is expected and 404 is thrown fails.
In both cases, a LOG.warning is called with the "irregular message"
that signals to user that a 404 was expected and caught.
"""
policy_names = ['foo:bar']
@rbac_rv.action(mock.sentinel.service, rules=policy_names,
expected_error_codes=[404])
def test_policy(*args):
raise exceptions.NotFound()
expected_errors = [
(r"User with roles \['member', 'anotherrole'\] was not allowed to "
r"perform the following actions: \['%s'\].*" % policy_names[0]),
None
]
for pos, allowed in enumerate([True, False]):
mock_authority.PolicyAuthority.return_value.allowed\
.return_value = allowed
error_re = expected_errors[pos]
if error_re:
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
else:
test_policy(self.test_obj)
mock_log.error.assert_not_called()
mock_log.warning.assert_called_with(
"NotFound exception was caught for test %s. Expected policies "
"which may have caused the error: %s. The service %s throws a "
"404 instead of a 403, which is irregular",
test_policy.__name__,
', '.join(policy_names),
mock.sentinel.service)
mock_log.warning.reset_mock()
mock_log.error.reset_mock()
class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
"""Test class for validating the RBAC log, dedicated to just logging
Patrole RBAC validation work flows.
"""
def setUp(self):
super(RBACRuleValidationLoggingTest, self).setUp()
# This behavior is tested in separate test class below.
self.useFixture(fixtures.MockPatchObject(
rbac_rv, '_validate_override_role_called'))
@mock.patch.object(rbac_rv, 'RBACLOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rbac_report_logging_disabled(self, mock_authority, mock_rbaclog):
"""Test case to ensure that we DON'T write logs when enable_reporting
is False
"""
self.useFixture(
patrole_fixtures.ConfPatcher(enable_reporting=False,
group='patrole_log'))
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
def test_policy(*args):
pass
test_policy(self.test_obj)
self.assertFalse(mock_rbaclog.info.called)
@mock.patch.object(rbac_rv, 'RBACLOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rbac_report_logging_enabled(self, mock_authority, mock_rbaclog):
"""Test case to ensure that we DO write logs when enable_reporting is
True
"""
self.useFixture(
patrole_fixtures.ConfPatcher(enable_reporting=True,
group='patrole_log'))
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
policy_names = ['foo:bar', 'baz:qux']
@rbac_rv.action(mock.sentinel.service, rules=policy_names)
def test_policy(*args):
pass
test_policy(self.test_obj)
mock_rbaclog.info.assert_called_once_with(
"[Service]: %s, [Test]: %s, [Rules]: %s, "
"[Expected]: %s, [Actual]: %s",
mock.sentinel.service,
'test_policy',
', '.join(policy_names),
"Allowed",
"Allowed")
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_with_callable_rule(self, mock_authority,
mock_log):
"""Test that a callable as the rule is evaluated correctly."""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
@rbac_rv.action(mock.sentinel.service,
rules=[lambda: mock.sentinel.action])
def test_policy(*args):
pass
test_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
mock.sentinel.action,
self.test_obj.get_all_needed_roles(CONF.patrole.rbac_test_roles))
mock_log.error.assert_not_called()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_with_conditional_callable_rule(
self, mock_authority, mock_log):
"""Test that a complex callable with conditional logic as the rule is
evaluated correctly.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
expected_roles = self.test_obj.get_all_needed_roles(
CONF.patrole.rbac_test_roles)
def partial_func(x):
return "foo" if x == "bar" else "qux"
foo_callable = functools.partial(partial_func, "bar")
bar_callable = functools.partial(partial_func, "baz")
@rbac_rv.action(mock.sentinel.service,
rules=[foo_callable])
def test_foo_policy(*args):
pass
@rbac_rv.action(mock.sentinel.service,
rules=[bar_callable])
def test_bar_policy(*args):
pass
test_foo_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"foo",
expected_roles)
policy_authority.allowed.reset_mock()
test_bar_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"qux",
expected_roles)
mock_log.error.assert_not_called()
class RBACMultiRoleRuleValidationLoggingTest(
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationLoggingTest):
pass
class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
def setUp(self):
super(RBACRuleValidationNegativeTest, self).setUp()
# This behavior is tested in separate test class below.
self.useFixture(fixtures.MockPatchObject(
rbac_rv, '_validate_override_role_called'))
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_invalid_service_raises_exc(self, mock_authority):
"""Test that invalid service raises the appropriate exception."""
mock_authority.PolicyAuthority.return_value.allowed.side_effect = (
rbac_exceptions.RbacInvalidServiceException)
@rbac_rv.action(mock.sentinel.service, mock.sentinel.action)
def test_policy(*args):
pass
self.assertRaises(rbac_exceptions.RbacInvalidServiceException,
test_policy, self.test_obj)
class RBACMultiRoleRuleValidationNegativeTest(
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationNegativeTest):
pass
class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
"""Test suite for validating multi-policy support for the
``rbac_rule_validation`` decorator.
"""
def setUp(self):
super(RBACRuleValidationTestMultiPolicy, self).setUp()
# This behavior is tested in separate test class below.
self.useFixture(fixtures.MockPatchObject(
rbac_rv, '_validate_override_role_called'))
def _assert_policy_authority_called_with(self, rules, mock_authority):
m_authority = mock_authority.PolicyAuthority.return_value
m_authority.allowed.assert_has_calls([
mock.call(
rule,
self.test_obj.get_all_needed_roles(
CONF.patrole.rbac_test_roles)
) for rule in rules
])
m_authority.allowed.reset_mock()
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_policy_have_permission_success(
self, mock_authority):
"""Test that when expected result is authorized and test passes that
the overall evaluation succeeds.
"""
mock_authority.PolicyAuthority.return_value.allowed.\
return_value = True
rules = [mock.sentinel.action1, mock.sentinel.action2]
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=[403, 403])
def test_policy(*args):
pass
test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_policy_overpermission_failure(
self, mock_authority, mock_log):
"""Test that when expected result is unauthorized and test passes that
the overall evaluation results in an RbacOverPermissionException
getting raised.
"""
rules = [
mock.sentinel.action1, mock.sentinel.action2, mock.sentinel.action3
]
exp_ecodes = [403, 403, 403]
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=exp_ecodes)
def test_policy(*args):
pass
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.side_effect = (
allowed_list)
error_re = r".*OverPermission: .* \[%s\]$" % fail_on_action
self.assertRaisesRegex(
rbac_exceptions.RbacOverPermissionException, error_re,
test_policy, self.test_obj)
mock_log.debug.assert_any_call(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
mock_log.error.reset_mock()
self._assert_policy_authority_called_with(rules, mock_authority)
_do_test([True, True, False], mock.sentinel.action3)
_do_test([False, True, True], mock.sentinel.action1)
_do_test([True, False, True], mock.sentinel.action2)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_policy_forbidden_success(
self, mock_authority, mock_log):
"""Test that when the expected result is unauthorized and the test
fails that the overall evaluation results in success.
"""
rules = [
mock.sentinel.action1, mock.sentinel.action2, mock.sentinel.action3
]
exp_ecodes = [403, 403, 403]
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=exp_ecodes)
def test_policy(*args):
raise exceptions.Forbidden()
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
mock_log.error.assert_not_called()
self._assert_policy_authority_called_with(rules, mock_authority)
_do_test([True, True, False], mock.sentinel.action3)
_do_test([False, True, True], mock.sentinel.action1)
_do_test([True, False, True], mock.sentinel.action2)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_policy_forbidden_failure(
self, mock_authority, mock_log):
"""Test that when the expected result is authorized and the test
fails (with a Forbidden error code) that the overall evaluation
results in a RbacUnderPermissionException getting raised.
"""
# NOTE: Avoid mock.sentinel here due to weird sorting with them.
rules = ['action1', 'action2', 'action3']
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=[403, 403, 403])
def test_policy(*args):
raise exceptions.Forbidden()
mock_authority.PolicyAuthority.return_value.allowed\
.return_value = True
error_re = ("User with roles ['member'] was not allowed to perform "
"the following actions: %s. Expected allowed actions: %s. "
"Expected disallowed actions: []." %
(rules, rules)).replace('[', r'\[').replace(']', r'\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
self._assert_policy_authority_called_with(rules, mock_authority)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_actions_forbidden(
self, mock_authority, mock_log):
"""Test that when the expected result is Forbidden because
two of the actions fail and the first action specifies 403,
verify that the overall evaluation results in success.
"""
rules = [
mock.sentinel.action1, mock.sentinel.action2, mock.sentinel.action3
]
exp_ecodes = [403, 403, 404]
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=exp_ecodes)
def test_policy(*args):
raise exceptions.Forbidden()
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
mock_log.error.assert_not_called()
self._assert_policy_authority_called_with(rules, mock_authority)
_do_test([False, True, False], mock.sentinel.action1)
_do_test([False, False, True], mock.sentinel.action1)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_actions_notfound(
self, mock_authority, mock_log):
"""Test that when the expected result is not found because
two of the actions fail and the first action specifies 404,
verify that the overall evaluation results in success.
"""
rules = [
'mock.sentinel.action1', 'mock.sentinel.action2',
'mock.sentinel.action3', 'mock.sentinel.action4'
]
exp_ecodes = [403, 404, 403, 403]
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=exp_ecodes)
def test_policy(*args):
raise exceptions.NotFound()
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 404, fail_on_action)
mock_log.error.assert_not_called()
self._assert_policy_authority_called_with(rules, mock_authority)
_do_test([True, False, False, True], 'mock.sentinel.action2')
_do_test([True, False, True, False], 'mock.sentinel.action2')
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_policy_defaults_to_correct_error_codes(
self, mock_authority, mock_log):
"""Test omission of expected_error_codes defaults to [403] * len(rules)
"""
mock_authority.PolicyAuthority.return_value.allowed.\
return_value = False
expected_log = "%s: Expecting %d to be raised for policy name: %s"
# Validate with single rule => expected_error_codes == [403].
rules = [mock.sentinel.action1]
@rbac_rv.action(mock.sentinel.service, rules=rules)
def test_policy(*args):
raise exceptions.Forbidden()
test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
# Assert that 403 is expected.
mock_calls = [x[1] for x in mock_log.debug.mock_calls]
self.assertTrue(
any([(expected_log, 'test_policy', 403, rules[0]) in mock_calls]))
# Validate with multiple rules => expected_error_codes == [403, 403].
rules = [mock.sentinel.action1, mock.sentinel.action2]
@rbac_rv.action(mock.sentinel.service, rules=rules)
def test_policy(*args):
raise exceptions.Forbidden()
test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
# Assert that 403 is expected.
mock_calls = [x[1] for x in mock_log.debug.mock_calls]
self.assertTrue(
any([(expected_log, 'test_policy', 403, rules[0]) in mock_calls]))
def test_prepare_multi_policy_allowed_usages(self):
def _do_test(rules, ecodes, exp_rules, exp_ecodes):
rule_list, ec_list = rbac_rv._prepare_multi_policy(rules, ecodes)
self.assertEqual(rule_list, exp_rules)
self.assertEqual(ec_list, exp_ecodes)
# Validate that expected_error_codes defaults to 403 when no values
# are provided.
_do_test(["rule1"], None, ["rule1"], [403])
# Validate that `len(rules) == len(expected_error_codes)` works when
# both == 1.
_do_test(["rule1"], [403], ["rule1"], [403])
# Validate that `len(rules) == len(expected_error_codes)` works when
# both are > 1.
_do_test(["rule1", "rule2"], [403, 404],
["rule1", "rule2"], [403, 404])
# Validate that when only a default expected_error_code argument is
# provided, that default value and other default values (403) are
# filled into the expected_error_codes list.
# Example:
# @rbac_rv.action(service, rules=[<rule>, <rule>])
# def test_policy(*args):
# ...
_do_test(["rule1", "rule2"], None,
["rule1", "rule2"], [403, 403])
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
def test_prepare_multi_policy_disallowed_usages(self, mock_log):
def _do_test(rules, ecodes):
rule_list, ec_list = rbac_rv._prepare_multi_policy(rules, ecodes)
error_re = ("The `expected_error_codes` list is not the same length"
" as the `rules` list.")
# When len(rules) > 1 then len(expected_error_codes) must be same len.
self.assertRaisesRegex(ValueError, error_re, _do_test,
["rule1", "rule2"], [403])
# When len(expected_error_codes) > 1 len(rules) must be same len.
self.assertRaisesRegex(ValueError, error_re, _do_test, ["rule1"],
[403, 404])
error_re = ("The `rules` list must be provided if using the "
"`expected_error_codes` list.")
# When expected_error_codes is provided rules must be as well.
self.assertRaisesRegex(ValueError, error_re, _do_test, None, [404])
class RBACMultiRoleRuleValidationTestMultiPolicy(
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationTestMultiPolicy):
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_policy_forbidden_failure(
self, mock_authority, mock_log):
"""Test that when the expected result is authorized and the test
fails (with a Forbidden error code) that the overall evaluation
results in a RbacUnderPermissionException getting raised.
"""
# NOTE: Avoid mock.sentinel here due to weird sorting with them.
rules = ['action1', 'action2', 'action3']
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=[403, 403, 403])
def test_policy(*args):
raise exceptions.Forbidden()
mock_authority.PolicyAuthority.return_value.allowed\
.return_value = True
error_re = ("User with roles ['member', 'anotherrole'] was not "
"allowed to perform the following actions: %s. Expected "
"allowed actions: %s. Expected disallowed actions: []." %
(rules, rules)).replace('[', r'\[').replace(']', r'\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
self._assert_policy_authority_called_with(rules, mock_authority)
class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
"""Class for validating that untimely exceptions (outside
``override_role`` is called) result in test failures.
This regression tests false positives caused by test exceptions matching
the expected exception before or after the ``override_role`` context is
called. Also tests case where ``override_role`` is never called which is
an invalid Patrole test.
"""
def setUp(self):
super(RBACOverrideRoleValidationTest, self).setUp()
self.parent_class = self.test_obj.__class__
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_called_inside_ctx(self,
mock_authority):
"""Test success case when the expected exception is raised within the
override_role context.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called(self_):
with self_.override_role():
raise exceptions.NotFound()
child_test = ChildRbacTest()
child_test.test_called()
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_patrole_exception_ignored(
self, mock_authority):
"""Test success case where Patrole exception is raised (which is
valid in case of e.g. BasePatroleResponseBodyException) after
override_role passes.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
True
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called(self_):
with self_.override_role():
pass
# Instances of BasePatroleResponseBodyException don't count as
# they are part of the validation work flow.
raise rbac_exceptions.BasePatroleResponseBodyException()
child_test = ChildRbacTest()
self.assertRaises(rbac_exceptions.RbacUnderPermissionException,
child_test.test_called)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_called_before_ctx(self,
mock_authority):
"""Test failure case when an exception that happens before
``override_role`` context, even if it is the expected exception,
raises ``RbacOverrideRoleException``.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
# This behavior should work for supported (NotFound/Forbidden) and
# miscellaneous exceptions alike.
for exception_type in (exceptions.NotFound,
Exception):
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called_before(self_):
raise exception_type()
child_test = ChildRbacTest()
test_re = ".*before.*"
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test.test_called_before)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_called_after_ctx(self,
mock_authority):
"""Test failure case when an exception that happens before
``override_role`` context, even if it is the expected exception,
raises ``RbacOverrideRoleException``.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
# This behavior should work for supported (NotFound/Forbidden) and
# miscellaneous exceptions alike.
for exception_type in (exceptions.NotFound,
Exception):
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called_after(self_):
with self_.override_role():
pass
# Simulates a test tearDown failure or some such.
raise exception_type()
child_test = ChildRbacTest()
test_re = ".*after.*"
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test.test_called_after)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_never_called(self, mock_authority):
"""Test failure case where override_role is **never** called."""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_never_called(self_):
pass
child_test = ChildRbacTest()
test_re = ".*missing required `override_role` call.*"
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test.test_never_called)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_sequential_test_calls(
self, mock_authority):
"""Test success/failure scenarios above across sequential test calls.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule1"],
expected_error_codes=[404])
def test_called(self_):
with self_.override_role():
raise exceptions.NotFound()
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule2"],
expected_error_codes=[404])
def test_called_before(self_):
raise exceptions.NotFound()
test_re = ".*before.*"
# Test case where override role is called in first test but *not* in
# second test.
child_test1 = ChildRbacTest()
child_test1.test_called()
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test1.test_called_before)
# Test case where override role is *not* called in first test but is
# in second test.
child_test2 = ChildRbacTest()
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test2.test_called_before)
child_test2.test_called()