Limit exception handling to calls within override_role

Motivation: prevents false positives caused by test
exceptions matching the expected exception before or
after the ``override_role`` context is called.

This patchset changes expected_error_codes behavior [0] by checking
errors explicitly outside the override_role context. This is done
by introducing a new function to rbac_rule_validation that is
used for validating that the expected exception isn't raised too
early (before ``override_role`` call) or too late (after
``override_call``) or at all (which is a bad test).

This means that exceptions raised prior to override_role
call result in failure. The same goes for exceptions raised
after override_role -- except for those that are an instance
of BasePatroleException (which is valid for things like
RbacMalformedResponse getting raised intentionally).

The new exception that is introduced is called
RbacOverrideRoleException.

Unit tests are added for all validation scenarios described
above.

[0] https://storyboard.openstack.org/#!/story/2003297
Story: 2003297
Task: 24246

Co-Authored-By: Felipe Monteiro <felipe.monteiro@att.com>
Change-Id: Iae9a58640463093f6dda20d40261b20051be2820
This commit is contained in:
Mykola Yakovliev 2018-08-06 15:34:22 -05:00 committed by Felipe Monteiro
parent 842845eaef
commit 11376ab7ff
6 changed files with 394 additions and 20 deletions

View File

@ -16,12 +16,16 @@
from tempest.lib import exceptions
class RbacConflictingPolicies(exceptions.TempestException):
class BasePatroleException(exceptions.TempestException):
message = "An unknown RBAC exception occurred"
class RbacConflictingPolicies(BasePatroleException):
message = ("Conflicting policies preventing this action from being "
"performed.")
class RbacMalformedResponse(exceptions.TempestException):
class RbacMalformedResponse(BasePatroleException):
message = ("The response body is missing the expected %(attribute)s due "
"to policy enforcement failure.")
@ -37,25 +41,25 @@ class RbacMalformedResponse(exceptions.TempestException):
super(RbacMalformedResponse, self).__init__(**kwargs)
class RbacResourceSetupFailed(exceptions.TempestException):
class RbacResourceSetupFailed(BasePatroleException):
message = "RBAC resource setup failed"
class RbacOverPermissionException(exceptions.TempestException):
class RbacOverPermissionException(BasePatroleException):
"""Raised when the expected result is failure but the actual result is
pass.
"""
message = "Unauthorized action was allowed to be performed"
class RbacUnderPermissionException(exceptions.TempestException):
class RbacUnderPermissionException(BasePatroleException):
"""Raised when the expected result is pass but the actual result is
failure.
"""
message = "Authorized action was not allowed to be performed"
class RbacExpectedWrongException(exceptions.TempestException):
class RbacExpectedWrongException(BasePatroleException):
"""Raised when the expected exception does not match the actual exception
raised, when both are instances of Forbidden or NotFound, indicating
the test provides a wrong argument to `expected_error_codes`.
@ -64,16 +68,30 @@ class RbacExpectedWrongException(exceptions.TempestException):
"instead. Actual exception: %(exception)s")
class RbacInvalidServiceException(exceptions.TempestException):
class RbacInvalidServiceException(BasePatroleException):
"""Raised when an invalid service is passed to ``rbac_rule_validation``
decorator.
"""
message = "Attempted to test an invalid service"
class RbacParsingException(exceptions.TempestException):
class RbacParsingException(BasePatroleException):
message = "Attempted to test an invalid policy file or action"
class RbacInvalidErrorCode(exceptions.TempestException):
class RbacInvalidErrorCode(BasePatroleException):
message = "Unsupported error code passed in test"
class RbacOverrideRoleException(BasePatroleException):
"""Raised when override_role is used incorrectly or fails somehow.
Used for safeguarding against false positives that might occur when the
expected exception isn't raised inside the ``override_role`` context.
Specifically, when:
* ``override_role`` isn't called
* an exception is raised before ``override_role`` context
* an exception is raised after ``override_role`` context
"""
message = "Override role failure or incorrect usage"

View File

@ -180,6 +180,7 @@ def action(service, rule='', rules=None,
expected_exception, irregular_msg = _get_exception_type(
exp_error_code)
caught_exception = None
test_status = 'Allowed'
try:
@ -193,13 +194,16 @@ def action(service, rule='', rules=None,
LOG.error(msg)
except (expected_exception,
rbac_exceptions.RbacConflictingPolicies,
rbac_exceptions.RbacMalformedResponse) as e:
rbac_exceptions.RbacMalformedResponse) as actual_exception:
caught_exception = actual_exception
test_status = 'Denied'
if irregular_msg:
LOG.warning(irregular_msg,
test_func.__name__,
', '.join(rules),
service)
if allowed:
msg = ("Role %s was not allowed to perform the following "
"actions: %s. Expected allowed actions: %s. "
@ -209,8 +213,10 @@ def action(service, rule='', rules=None,
sorted(disallowed_rules)))
LOG.error(msg)
raise rbac_exceptions.RbacUnderPermissionException(
"%s Exception was: %s" % (msg, e))
"%s Exception was: %s" % (msg, actual_exception))
except Exception as actual_exception:
caught_exception = actual_exception
if _check_for_expected_mismatch_exception(expected_exception,
actual_exception):
LOG.error('Expected and actual exceptions do not match. '
@ -249,6 +255,14 @@ def action(service, rule='', rules=None,
"Allowed" if allowed else "Denied",
test_status)
# Sanity-check that ``override_role`` was called to eliminate
# false-positives and bad test flows resulting from exceptions
# getting raised too early, too late or not at all, within
# the scope of an RBAC test.
_validate_override_role_called(
test_obj,
actual_exception=caught_exception)
return wrapper
return decorator
@ -389,7 +403,7 @@ def _get_exception_type(expected_error_code=_DEFAULT_ERROR_CODE):
irregular_msg = ("NotFound exception was caught for test %s. Expected "
"policies which may have caused the error: %s. The "
"service %s throws a 404 instead of a 403, which is "
"irregular.")
"irregular")
return expected_exception, irregular_msg
@ -431,8 +445,63 @@ def _format_extra_target_data(test_obj, extra_target_data):
def _check_for_expected_mismatch_exception(expected_exception,
actual_exception):
"""Checks that ``expected_exception`` matches ``actual_exception``.
Since Patrole must handle 403/404 it is important that the expected and
actual error codes match.
:param excepted_exception: Expected exception for test.
:param actual_exception: Actual exception raised by test.
:returns: True if match, else False.
:rtype: boolean
"""
permission_exceptions = (lib_exc.Forbidden, lib_exc.NotFound)
if isinstance(actual_exception, permission_exceptions):
if not isinstance(actual_exception, expected_exception.__class__):
return True
return False
def _validate_override_role_called(test_obj, actual_exception):
"""Validates that :func:`rbac_utils.RbacUtils.override_role` is called
during each Patrole test.
Useful for validating that the expected exception isn't raised too early
(before ``override_role`` call) or too late (after ``override_call``) or
at all (which is a bad test).
:param test_obj: An instance or subclass of ``tempest.test.BaseTestCase``.
:param actual_exception: Actual exception raised by test.
:raises RbacOverrideRoleException: If ``override_role`` isn't called, is
called too early, or is called too late.
"""
called = test_obj._validate_override_role_called()
base_msg = ('This error is unrelated to RBAC and is due to either '
'an API or override role failure. Exception: %s' %
actual_exception)
if not called:
if actual_exception is not None:
msg = ('Caught exception (%s) but it was raised before the '
'`override_role` context. ' % actual_exception.__class__)
else:
msg = 'Test missing required `override_role` call. '
msg += base_msg
LOG.error(msg)
raise rbac_exceptions.RbacOverrideRoleException(msg)
else:
exc_caught_in_ctx = test_obj._validate_override_role_caught_exc()
# This block is only executed if ``override_role`` is called. If
# an exception is raised and the exception wasn't raised in the
# ``override_role`` context and if the exception isn't a valid
# exception type (instance of ``BasePatroleException``), then this is
# a legitimate error.
if (not exc_caught_in_ctx and
actual_exception is not None and
not isinstance(actual_exception,
rbac_exceptions.BasePatroleException)):
msg = ('Caught exception (%s) but it was raised after the '
'`override_role` context. ' % actual_exception.__class__)
msg += base_msg
LOG.error(msg)
raise rbac_exceptions.RbacOverrideRoleException(msg)

View File

@ -14,6 +14,7 @@
# under the License.
from contextlib import contextmanager
import sys
import time
from oslo_log import log as logging
@ -95,11 +96,17 @@ class RbacUtils(object):
# if the API call above threw an exception, any code below this
# point in the test is not executed.
"""
test_obj._set_override_role_called()
self._override_role(test_obj, True)
try:
# Execute the test.
yield
finally:
# Check whether an exception was raised. If so, remember that
# for future validation.
exc = sys.exc_info()[0]
if exc is not None:
test_obj._set_override_role_caught_exc()
# This code block is always executed, no matter the result of the
# test. Automatically switch back to the admin role for test clean
# up.
@ -222,6 +229,11 @@ class RbacUtilsMixin(object):
cls.setup_rbac_utils()
"""
# Shows if override_role was called.
__override_role_called = False
# Shows if exception raised during override_role.
__override_role_caught_exc = False
@classmethod
def get_auth_providers(cls):
"""Returns list of auth_providers used within test.
@ -238,7 +250,7 @@ class RbacUtilsMixin(object):
"deprecated and will be removed in the S "
"release. Patrole tests will always be enabled "
"following installation of the Patrole Tempest "
"plugin. Use a regex to skip tests.")
"plugin. Use a regex to skip tests")
versionutils.report_deprecated_feature(LOG, deprecation_msg)
raise cls.skipException(
'Patrole testing not enabled so skipping %s.' % cls.__name__)
@ -247,6 +259,33 @@ class RbacUtilsMixin(object):
def setup_rbac_utils(cls):
cls.rbac_utils = RbacUtils(cls)
def _set_override_role_called(self):
"""Helper for tracking whether ``override_role`` was called."""
self.__override_role_called = True
def _set_override_role_caught_exc(self):
"""Helper for tracking whether exception was thrown inside
``override_role``.
"""
self.__override_role_caught_exc = True
def _validate_override_role_called(self):
"""Idempotently validate that ``override_role`` is called and reset
its value to False for sequential tests.
"""
was_called = self.__override_role_called
self.__override_role_called = False
return was_called
def _validate_override_role_caught_exc(self):
"""Idempotently validate that exception was caught inside
``override_role``, so that, by process of elimination, it can be
determined whether one was thrown outside (which is invalid).
"""
caught_exception = self.__override_role_caught_exc
self.__override_role_caught_exc = False
return caught_exception
def is_admin():
"""Verifies whether the current test role equals the admin role.

View File

@ -16,6 +16,7 @@
"""Fixtures for Patrole tests."""
from __future__ import absolute_import
from contextlib import contextmanager
import fixtures
import mock
import time
@ -117,6 +118,17 @@ class RbacUtilsFixture(fixtures.Fixture):
new_role = 'member' if role_toggle else 'admin'
self.set_roles(['admin', 'member'], [new_role])
@contextmanager
def real_override_role(self, test_obj):
"""Actual call to ``override_role``.
Useful for ensuring all the necessary mocks are performed before
the method in question is called.
"""
_rbac_utils = rbac_utils.RbacUtils(test_obj)
with _rbac_utils.override_role(test_obj):
yield
def set_roles(self, roles, roles_on_project=None):
"""Set the list of available roles in the system.

View File

@ -12,9 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import mock
from oslo_config import cfg
import fixtures
from tempest.lib import exceptions
from tempest import manager
from tempest import test
@ -23,7 +26,7 @@ from tempest.tests import base
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_rule_validation as rbac_rv
from patrole_tempest_plugin import rbac_utils
from patrole_tempest_plugin.tests.unit import fixtures
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
CONF = cfg.CONF
@ -43,10 +46,12 @@ class BaseRBACRuleValidationTest(base.TestCase):
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
self.useFixture(
fixtures.ConfPatcher(rbac_test_role='Member', group='patrole'))
patrole_fixtures.ConfPatcher(rbac_test_role='Member',
group='patrole'))
# Disable patrole log for unit tests.
self.useFixture(
fixtures.ConfPatcher(enable_reporting=False, group='patrole_log'))
patrole_fixtures.ConfPatcher(enable_reporting=False,
group='patrole_log'))
class RBACRuleValidationTest(BaseRBACRuleValidationTest):
@ -54,6 +59,12 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
``rbac_rule_validation`` decorator.
"""
def setUp(self):
super(RBACRuleValidationTest, self).setUp()
# This behavior is tested in separate test class below.
self.useFixture(fixtures.MockPatchObject(
rbac_rv, '_validate_override_role_called'))
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_have_permission_no_exc(self, mock_authority,
@ -269,7 +280,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
mock_log.warning.assert_called_with(
"NotFound exception was caught for test %s. Expected policies "
"which may have caused the error: %s. The service %s throws a "
"404 instead of a 403, which is irregular.",
"404 instead of a 403, which is irregular",
test_policy.__name__,
', '.join(policy_names),
mock.sentinel.service)
@ -334,7 +345,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
expected_irregular_msg = (
"NotFound exception was caught for test %s. Expected policies "
"which may have caused the error: %s. The service %s throws a "
"404 instead of a 403, which is irregular.")
"404 instead of a 403, which is irregular")
actual_exception, actual_irregular_msg = \
rbac_rv._get_exception_type(404)
@ -385,6 +396,12 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
Patrole RBAC validation work flows.
"""
def setUp(self):
super(RBACRuleValidationLoggingTest, self).setUp()
# This behavior is tested in separate test class below.
self.useFixture(fixtures.MockPatchObject(
rbac_rv, '_validate_override_role_called'))
@mock.patch.object(rbac_rv, 'RBACLOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rbac_report_logging_disabled(self, mock_authority, mock_rbaclog):
@ -392,7 +409,8 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
is False
"""
self.useFixture(
fixtures.ConfPatcher(enable_reporting=False, group='patrole_log'))
patrole_fixtures.ConfPatcher(enable_reporting=False,
group='patrole_log'))
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
@ -410,7 +428,8 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
True
"""
self.useFixture(
fixtures.ConfPatcher(enable_reporting=True, group='patrole_log'))
patrole_fixtures.ConfPatcher(enable_reporting=True,
group='patrole_log'))
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
policy_names = ['foo:bar', 'baz:qux']
@ -432,6 +451,12 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
def setUp(self):
super(RBACRuleValidationNegativeTest, self).setUp()
# This behavior is tested in separate test class below.
self.useFixture(fixtures.MockPatchObject(
rbac_rv, '_validate_override_role_called'))
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_invalid_service_raises_exc(self, mock_authority):
"""Test that invalid service raises the appropriate exception."""
@ -451,6 +476,12 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
``rbac_rule_validation`` decorator.
"""
def setUp(self):
super(RBACRuleValidationTestMultiPolicy, self).setUp()
# This behavior is tested in separate test class below.
self.useFixture(fixtures.MockPatchObject(
rbac_rv, '_validate_override_role_called'))
def _assert_policy_authority_called_with(self, rules, mock_authority):
m_authority = mock_authority.PolicyAuthority.return_value
m_authority.allowed.assert_has_calls([
@ -708,3 +739,189 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
# When expected_error_codes is provided rules must be as well.
self.assertRaisesRegex(ValueError, error_re, _do_test,
None, None, None, [404])
class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
"""Class for validating that untimely exceptions (outside
``override_role`` is called) result in test failures.
This regression tests false positives caused by test exceptions matching
the expected exception before or after the ``override_role`` context is
called. Also tests case where ``override_role`` is never called which is
an invalid Patrole test.
"""
def setUp(self):
super(RBACOverrideRoleValidationTest, self).setUp()
# Mixin automatically initializes __override_role_called to False.
class FakeRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
def runTest(self):
pass
# Stub out problematic function calls.
FakeRbacTest.os_primary = mock.Mock(spec=manager.Manager)
FakeRbacTest.rbac_utils = self.useFixture(
patrole_fixtures.RbacUtilsFixture())
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
project_id=mock.sentinel.project_id)
setattr(FakeRbacTest.os_primary, 'credentials', mock_creds)
setattr(FakeRbacTest.os_primary, 'auth_provider', mock.Mock())
self.parent_class = FakeRbacTest
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_called_inside_ctx(self,
mock_authority):
"""Test success case when the expected exception is raised within the
override_role context.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called(self_):
with self_.rbac_utils.real_override_role(self_):
raise exceptions.NotFound()
child_test = ChildRbacTest()
child_test.test_called()
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_patrole_exception_ignored(
self, mock_authority):
"""Test success case where Patrole exception is raised (which is
valid in case of e.g. RbacMalformedException) after override_role
passes.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
True
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called(self_):
with self_.rbac_utils.real_override_role(self_):
pass
# Instances of BasePatroleException don't count as they are
# part of the validation work flow.
raise rbac_exceptions.BasePatroleException()
child_test = ChildRbacTest()
self.assertRaises(rbac_exceptions.BasePatroleException,
child_test.test_called)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_called_before_ctx(self,
mock_authority):
"""Test failure case when an exception that happens before
``override_role`` context, even if it is the expected exception,
raises ``RbacOverrideRoleException``.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
# This behavior should work for supported (NotFound/Forbidden) and
# miscellaneous exceptions alike.
for exception_type in (exceptions.NotFound,
Exception):
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called_before(self_):
raise exception_type()
child_test = ChildRbacTest()
test_re = ".*before.*"
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test.test_called_before)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_called_after_ctx(self,
mock_authority):
"""Test failure case when an exception that happens before
``override_role`` context, even if it is the expected exception,
raises ``RbacOverrideRoleException``.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
# This behavior should work for supported (NotFound/Forbidden) and
# miscellaneous exceptions alike.
for exception_type in (exceptions.NotFound,
Exception):
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called_after(self_):
with self_.rbac_utils.real_override_role(self_):
pass
# Simulates a test tearDown failure or some such.
raise exception_type()
child_test = ChildRbacTest()
test_re = ".*after.*"
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test.test_called_after)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_never_called(self, mock_authority):
"""Test failure case where override_role is **never** called."""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_never_called(self_):
pass
child_test = ChildRbacTest()
test_re = ".*missing required `override_role` call.*"
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test.test_never_called)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_sequential_test_calls(
self, mock_authority):
"""Test success/failure scenarios above across sequential test calls.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
False
class ChildRbacTest(self.parent_class):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule1"],
expected_error_codes=[404])
def test_called(self_):
with self_.rbac_utils.real_override_role(self_):
raise exceptions.NotFound()
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule2"],
expected_error_codes=[404])
def test_called_before(self_):
raise exceptions.NotFound()
test_re = ".*before.*"
# Test case where override role is called in first test but *not* in
# second test.
child_test1 = ChildRbacTest()
child_test1.test_called()
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test1.test_called_before)
# Test case where override role is *not* called in first test but is
# in second test.
child_test2 = ChildRbacTest()
self.assertRaisesRegex(rbac_exceptions.RbacOverrideRoleException,
test_re, child_test2.test_called_before)
child_test2.test_called()

View File

@ -0,0 +1,19 @@
---
features:
- |
Add new exception called ``RbacOverrideRoleException``. Used for
safeguarding against false positives that might occur when the expected
exception isn't raised inside the ``override_role`` context. Specifically,
when:
* ``override_role`` isn't called
* an exception is raised before ``override_role`` context
* an exception is raised after ``override_role`` context
fixes:
- |
Previously, the ``rbac_rule_validation.action`` decorator could catch
expected exceptions with no regard to where the error happened. Such
behavior could cause false-positive results. To prevent this from
happening from now on, if an exception happens outside of the
``override_role`` context, it will cause
``rbac_exceptions.RbacOverrideRoleException`` to be raised.