Add support for handling multiple error codes

Patrole currently cannot handle the scenario where two possible
error codes can returned by Neutron policy enforcement for a
failed policy check (403 Forbidden and 404 NotFound), depending
on what role is being tested. Patrole framework can only handle
one expected_exception.

This change builds upon the recent multi-policy support to allow
the tester to specify multiple policy actions for one API test.
For each policy action, the tester would need to specify an
error code that is expected if the action should fail. If multiple
policy actions fail, the error code for the first policy action
that fails will be expected to be returned from the service.

This handles the cases in Neutron where Neutron may use a second
policy rule to determine whether or not to return a 403 error
code or a 404 error code. The tester is expected to list out
which policy rules are being tested by the API endpoint test.

Change-Id: I5cd861e184da90bb27f8ba454c94fa4d4f99c269
Closes-Bug: #1772710
This commit is contained in:
Cliff Parsons 2018-05-07 14:03:40 -05:00
parent 2fc2929882
commit 35a77113fc
4 changed files with 247 additions and 14 deletions

View File

@ -33,11 +33,13 @@ CONF = config.CONF
LOG = logging.getLogger(__name__)
_SUPPORTED_ERROR_CODES = [403, 404]
_DEFAULT_ERROR_CODE = 403
RBACLOG = logging.getLogger('rbac_reporting')
def action(service, rule='', rules=None, expected_error_code=403,
def action(service, rule='', rules=None,
expected_error_code=_DEFAULT_ERROR_CODE, expected_error_codes=None,
extra_target_data=None):
"""A decorator for verifying OpenStack policy enforcement.
@ -90,6 +92,25 @@ def action(service, rule='', rules=None, expected_error_code=403,
A 404 should not be provided *unless* the endpoint masks a
``Forbidden`` exception as a ``NotFound`` exception.
:param list expected_error_codes: When the ``rules`` list parameter is
used, then this list indicates the expected error code to use if one
of the rules does not allow the role being tested. This list must
coincide with and its elements remain in the same order as the rules
in the rules list.
Example::
rules=["api_action1", "api_action2"]
expected_error_codes=[404, 403]
a) If api_action1 fails and api_action2 passes, then the expected
error code is 404.
b) if api_action2 fails and api_action1 passes, then the expected
error code is 403.
c) if both api_action1 and api_action2 fail, then the expected error
code is the first error seen (404).
If an error code is missing from the list, it is defaulted to 403.
:param dict extra_target_data: Dictionary, keyed with ``oslo.policy``
generic check names, whose values are string literals that reference
nested ``tempest.test.BaseTestCase`` attributes. Used by
@ -118,7 +139,9 @@ def action(service, rule='', rules=None, expected_error_code=403,
if extra_target_data is None:
extra_target_data = {}
rules = _prepare_rules(rule, rules)
rules, expected_error_codes = _prepare_multi_policy(rule, rules,
expected_error_code,
expected_error_codes)
def decorator(test_func):
role = CONF.patrole.rbac_test_role
@ -141,8 +164,18 @@ def action(service, rule='', rules=None, expected_error_code=403,
disallowed_rules.append(rule)
allowed = allowed and _allowed
exp_error_code = expected_error_code
if disallowed_rules:
# Choose the first disallowed rule and expect the error
# code corresponding to it.
first_error_index = rules.index(disallowed_rules[0])
exp_error_code = expected_error_codes[first_error_index]
LOG.debug("%s: Expecting %d to be raised for policy name: %s",
test_func.__name__, exp_error_code,
disallowed_rules[0])
expected_exception, irregular_msg = _get_exception_type(
expected_error_code)
exp_error_code)
test_status = 'Allowed'
@ -202,7 +235,32 @@ def action(service, rule='', rules=None, expected_error_code=403,
return decorator
def _prepare_rules(rule, rules):
def _prepare_multi_policy(rule, rules, exp_error_code, exp_error_codes):
if exp_error_codes:
if not rules:
msg = ("The `rules` list must be provided if using the "
"`expected_error_codes` list.")
raise ValueError(msg)
if len(rules) != len(exp_error_codes):
msg = ("The `expected_error_codes` list is not the same length "
"as the `rules` list.")
raise ValueError(msg)
if exp_error_code:
deprecation_msg = (
"The `exp_error_code` argument has been deprecated in favor "
"of `exp_error_codes` and will be removed in a future "
"version.")
versionutils.report_deprecated_feature(LOG, deprecation_msg)
LOG.debug("The `exp_error_codes` argument will be used instead of "
"`exp_error_code`.")
if not isinstance(exp_error_codes, (tuple, list)):
exp_error_codes = [exp_error_codes]
else:
exp_error_codes = []
if exp_error_code:
exp_error_codes.append(exp_error_code)
if rules is None:
rules = []
elif not isinstance(rules, (tuple, list)):
@ -216,7 +274,18 @@ def _prepare_rules(rule, rules):
LOG.debug("The `rules` argument will be used instead of `rule`.")
else:
rules.append(rule)
return rules
# Fill in the exp_error_codes if needed. This is needed for the scenarios
# where no exp_error_codes array is provided, so the error codes must be
# set to the default error code value and there must be the same number
# of error codes as rules.
num_ecs = len(exp_error_codes)
num_rules = len(rules)
if (num_ecs < num_rules):
for i in range(num_rules - num_ecs):
exp_error_codes.append(_DEFAULT_ERROR_CODE)
return rules, exp_error_codes
def _is_authorized(test_obj, service, rule, extra_target_data):

View File

@ -330,7 +330,8 @@ class RouterRbacTest(base.BaseNetworkRbacTest):
self.routers_client.delete_router(router['id'])
@rbac_rule_validation.action(service="neutron",
rule="add_router_interface")
rules=["get_router", "add_router_interface"],
expected_error_codes=[404, 403])
@decorators.idempotent_id('a0627778-d68d-4913-881b-e345360cca19')
def test_add_router_interface(self):
"""Add Router Interface

View File

@ -436,7 +436,8 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
rules = [mock.sentinel.action1, mock.sentinel.action2]
@rbac_rv.action(mock.sentinel.service, rules=rules)
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=[403, 403])
def test_policy(*args):
pass
@ -454,8 +455,10 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
rules = [
mock.sentinel.action1, mock.sentinel.action2, mock.sentinel.action3
]
exp_ecodes = [403, 403, 403]
@rbac_rv.action(mock.sentinel.service, rules=rules)
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=exp_ecodes)
def test_policy(*args):
pass
@ -466,6 +469,9 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
error_re = ".*OverPermission: .* \[%s\]$" % fail_on_action
self.assertRaisesRegex(rbac_exceptions.RbacOverPermission,
error_re, test_policy, self.mock_test_args)
mock_log.debug.assert_any_call(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
mock_log.error.reset_mock()
self._assert_policy_authority_called_with(rules, mock_authority)
@ -485,21 +491,26 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
rules = [
mock.sentinel.action1, mock.sentinel.action2, mock.sentinel.action3
]
exp_ecodes = [403, 403, 403]
@rbac_rv.action(mock.sentinel.service, rules=rules)
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=exp_ecodes)
def test_policy(*args):
raise exceptions.Forbidden()
def _do_test(allowed_list):
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.mock_test_args)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
mock_log.error.assert_not_called()
self._assert_policy_authority_called_with(rules, mock_authority)
_do_test([True, True, False])
_do_test([False, True, True])
_do_test([True, False, True])
_do_test([True, True, False], mock.sentinel.action3)
_do_test([False, True, True], mock.sentinel.action1)
_do_test([True, False, True], mock.sentinel.action2)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
@ -513,7 +524,8 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
# NOTE: Avoid mock.sentinel here due to weird sorting with them.
rules = ['action1', 'action2', 'action3']
@rbac_rv.action(mock.sentinel.service, rules=rules)
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=[403, 403, 403])
def test_policy(*args):
raise exceptions.Forbidden()
@ -528,3 +540,136 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
self.mock_test_args)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
self._assert_policy_authority_called_with(rules, mock_authority)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_actions_forbidden(
self, mock_authority, mock_log):
"""Test that when the expected result is forbidden because
two of the actions fail and the first action specifies 403,
verify that the overall evaluation results in success.
"""
rules = [
mock.sentinel.action1, mock.sentinel.action2, mock.sentinel.action3
]
exp_ecodes = [403, 403, 404]
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=exp_ecodes)
def test_policy(*args):
raise exceptions.Forbidden()
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.mock_test_args)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
mock_log.error.assert_not_called()
self._assert_policy_authority_called_with(rules, mock_authority)
_do_test([False, True, False], mock.sentinel.action1)
_do_test([False, False, True], mock.sentinel.action1)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_actions_notfound(
self, mock_authority, mock_log):
"""Test that when the expected result is not found because
two of the actions fail and the first action specifies 404,
verify that the overall evaluation results in success.
"""
rules = [
mock.sentinel.action1, mock.sentinel.action2,
mock.sentinel.action3, mock.sentinel.action4
]
exp_ecodes = [403, 404, 403, 403]
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=exp_ecodes)
def test_policy(*args):
raise exceptions.NotFound()
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.mock_test_args)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 404, fail_on_action)
mock_log.error.assert_not_called()
self._assert_policy_authority_called_with(rules, mock_authority)
_do_test([True, False, False, True], mock.sentinel.action2)
_do_test([True, False, True, False], mock.sentinel.action2)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
def test_prepare_multi_policy_allowed_usages(self, mock_log):
def _do_test(rule, rules, ecode, ecodes, exp_rules, exp_ecodes):
rule_list, ec_list = rbac_rv._prepare_multi_policy(rule, rules,
ecode, ecodes)
self.assertEqual(rule_list, exp_rules)
self.assertEqual(ec_list, exp_ecodes)
# Validate that using deprecated values: rule and expected_error_code
# are converted into rules = [rule] and expected_error_codes =
# [expected_error_code]
_do_test("rule1", None, 403, None, ["rule1"], [403])
# Validate that rules = [rule] and expected_error_codes defaults to
# 403 when no values are provided.
_do_test("rule1", None, None, None, ["rule1"], [403])
# Validate that `len(rules) == len(expected_error_codes)` works when
# both == 1.
_do_test(None, ["rule1"], None, [403], ["rule1"], [403])
# Validate that `len(rules) == len(expected_error_codes)` works when
# both are > 1.
_do_test(None, ["rule1", "rule2"], None, [403, 404],
["rule1", "rule2"], [403, 404])
# Validate that when only a default expected_error_code argument is
# provided, that default value and other default values (403) are
# filled into the expected_error_codes list.
# Example:
# @rbac_rv.action(service, rules=[<rule>, <rule>])
# def test_policy(*args):
# ...
_do_test(None, ["rule1", "rule2"], 403, None,
["rule1", "rule2"], [403, 403])
# Validate that the deprecated values are ignored when new values are
# provided.
_do_test("rule3", ["rule1", "rule2"], 404, [403, 403],
["rule1", "rule2"], [403, 403])
mock_log.debug.assert_any_call(
"The `rules` argument will be used instead of `rule`.")
mock_log.debug.assert_any_call(
"The `exp_error_codes` argument will be used instead of "
"`exp_error_code`.")
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
def test_prepare_multi_policy_disallowed_usages(self, mock_log):
def _do_test(rule, rules, ecode, ecodes):
rule_list, ec_list = rbac_rv._prepare_multi_policy(rule, rules,
ecode, ecodes)
error_re = ("The `expected_error_codes` list is not the same length"
" as the `rules` list.")
# When len(rules) > 1 then len(expected_error_codes) must be same len.
self.assertRaisesRegex(ValueError, error_re, _do_test,
None, ["rule1", "rule2"], None, [403])
# When len(expected_error_codes) > 1 len(rules) must be same len.
self.assertRaisesRegex(ValueError, error_re, _do_test,
None, ["rule1"], None, [403, 404])
error_re = ("The `rules` list must be provided if using the "
"`expected_error_codes` list.")
# When expected_error_codes is provided rules must be as well.
self.assertRaisesRegex(ValueError, error_re, _do_test,
None, None, None, [404])

View File

@ -7,7 +7,25 @@ features:
expected test result. This allows Patrole to more accurately determine
whether RBAC is configured correctly, since some API endpoints enforce
multiple policies.
Multiple policy support includes the capability to specify multiple
expected error codes, as some components may return different error codes
for different roles due to checking multiple policy rules. The
``expected_error_codes`` argument has been added to the
``rbac_rule_validation.action`` decorator, which is a list of error codes
expected when the corresponding rule in the ``rules`` list is disallowed
to perform the API action. For this reason, the error codes in the
``expected_error_codes`` list must appear in the same order as their
corresponding rules in the ``rules`` list. For example:
expected_error_codes[0] is the error code for the rules[0] rule.
expected_error_codes[1] is the error code for the rules[1] rule.
...
deprecations:
- |
The ``rule`` argument in the ``rbac_rule_validation.action`` decorator has
been deprecated in favor of ``rules``.
The ``expected_error_code`` argument in the ``rbac_rule_validation.action``
decorator has been deprecated in favor of ``expected_error_codes``.