517 lines
21 KiB
Python
517 lines
21 KiB
Python
# Copyright 2017 AT&T Corporation.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import functools
|
|
import logging
|
|
import sys
|
|
|
|
from oslo_log import versionutils
|
|
from oslo_utils import excutils
|
|
import six
|
|
|
|
from tempest import config
|
|
from tempest.lib import exceptions as lib_exc
|
|
from tempest import test
|
|
|
|
from patrole_tempest_plugin import policy_authority
|
|
from patrole_tempest_plugin import rbac_exceptions
|
|
from patrole_tempest_plugin import requirements_authority
|
|
import testtools
|
|
|
|
|
|
CONF = config.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
_SUPPORTED_ERROR_CODES = [403, 404]
|
|
_DEFAULT_ERROR_CODE = 403
|
|
|
|
RBACLOG = logging.getLogger('rbac_reporting')
|
|
|
|
|
|
def action(service,
|
|
rules=None,
|
|
expected_error_codes=None,
|
|
extra_target_data=None):
|
|
"""A decorator for verifying OpenStack policy enforcement.
|
|
|
|
A decorator which allows for positive and negative RBAC testing. Given:
|
|
|
|
* an OpenStack service,
|
|
* a policy action (``rule``) enforced by that service, and
|
|
* the test roles defined by ``[patrole] rbac_test_roles``
|
|
|
|
determines whether the test role has sufficient permissions to perform an
|
|
API call that enforces the ``rule``.
|
|
|
|
This decorator should only be applied to an instance or subclass of
|
|
``tempest.test.BaseTestCase``.
|
|
|
|
The result from ``_is_authorized`` is used to determine the *expected*
|
|
test result. The *actual* test result is determined by running the
|
|
Tempest test this decorator applies to.
|
|
|
|
Below are the following possibilities from comparing the *expected* and
|
|
*actual* results:
|
|
|
|
1) If *expected* is True and the test passes (*actual*), this is a success.
|
|
2) If *expected* is True and the test fails (*actual*), this results in a
|
|
``RbacUnderPermissionException`` exception failure.
|
|
3) If *expected* is False and the test passes (*actual*), this results in
|
|
an ``RbacOverPermissionException`` exception failure.
|
|
4) If *expected* is False and the test fails (*actual*), this is a success.
|
|
|
|
As such, negative and positive testing can be applied using this decorator.
|
|
|
|
:param str service: An OpenStack service. Examples: "nova" or "neutron".
|
|
:param list rules: A list of policy actions defined in a policy file or in
|
|
code. The rules are logical-ANDed together to derive the expected
|
|
result. Also accepts list of callables that return a policy action.
|
|
|
|
.. note::
|
|
|
|
Patrole currently only supports custom JSON policy files.
|
|
|
|
:type rules: list[str] or list[callable]
|
|
:param list expected_error_codes: When the ``rules`` list parameter is
|
|
used, then this list indicates the expected error code to use if one
|
|
of the rules does not allow the role being tested. This list must
|
|
coincide with and its elements remain in the same order as the rules
|
|
in the rules list.
|
|
|
|
Example::
|
|
|
|
rules=["api_action1", "api_action2"]
|
|
expected_error_codes=[404, 403]
|
|
|
|
a) If api_action1 fails and api_action2 passes, then the expected
|
|
error code is 404.
|
|
b) if api_action2 fails and api_action1 passes, then the expected
|
|
error code is 403.
|
|
c) if both api_action1 and api_action2 fail, then the expected error
|
|
code is the first error seen (404).
|
|
|
|
If it is not passed, then it is defaulted to 403.
|
|
|
|
.. warning::
|
|
|
|
A 404 should not be provided *unless* the endpoint masks a
|
|
``Forbidden`` exception as a ``NotFound`` exception.
|
|
|
|
:type expected_error_codes: list[int]
|
|
:param dict extra_target_data: Dictionary, keyed with ``oslo.policy``
|
|
generic check names, whose values are string literals that reference
|
|
nested ``tempest.test.BaseTestCase`` attributes. Used by
|
|
``oslo.policy`` for performing matching against attributes that are
|
|
sent along with the API calls. Example::
|
|
|
|
extra_target_data={
|
|
"target.token.user_id":
|
|
"os_alt.auth_provider.credentials.user_id"
|
|
})
|
|
|
|
:raises RbacInvalidServiceException: If ``service`` is invalid.
|
|
:raises RbacUnderPermissionException: For item (2) above.
|
|
:raises RbacOverPermissionException: For item (3) above.
|
|
:raises RbacExpectedWrongException: When a 403 is expected but a 404
|
|
is raised instead or vice versa.
|
|
|
|
Examples::
|
|
|
|
@rbac_rule_validation.action(
|
|
service="nova",
|
|
rules=["os_compute_api:os-agents"])
|
|
def test_list_agents_rbac(self):
|
|
# The call to `override_role` is mandatory.
|
|
with self.override_role():
|
|
self.agents_client.list_agents()
|
|
"""
|
|
|
|
if extra_target_data is None:
|
|
extra_target_data = {}
|
|
|
|
rules, expected_error_codes = _prepare_multi_policy(rules,
|
|
expected_error_codes)
|
|
|
|
def decorator(test_func):
|
|
roles = CONF.patrole.rbac_test_roles
|
|
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
|
|
if CONF.patrole.rbac_test_role:
|
|
msg = ('CONF.patrole.rbac_test_role is deprecated in favor of '
|
|
'CONF.patrole.rbac_test_roles and will be removed in '
|
|
'future.')
|
|
versionutils.report_deprecated_feature(LOG, msg)
|
|
if not roles:
|
|
roles.append(CONF.patrole.rbac_test_role)
|
|
|
|
@functools.wraps(test_func)
|
|
def wrapper(*args, **kwargs):
|
|
if args and isinstance(args[0], test.BaseTestCase):
|
|
test_obj = args[0]
|
|
else:
|
|
raise rbac_exceptions.RbacResourceSetupFailed(
|
|
'`rbac_rule_validation` decorator can only be applied to '
|
|
'an instance of `tempest.test.BaseTestCase`.')
|
|
|
|
allowed = True
|
|
disallowed_rules = []
|
|
for rule in rules:
|
|
_allowed = _is_authorized(
|
|
test_obj, service, rule, extra_target_data)
|
|
if not _allowed:
|
|
disallowed_rules.append(rule)
|
|
allowed = allowed and _allowed
|
|
|
|
if disallowed_rules:
|
|
# Choose the first disallowed rule and expect the error
|
|
# code corresponding to it.
|
|
first_error_index = rules.index(disallowed_rules[0])
|
|
exp_error_code = expected_error_codes[first_error_index]
|
|
LOG.debug("%s: Expecting %d to be raised for policy name: %s",
|
|
test_func.__name__, exp_error_code,
|
|
disallowed_rules[0])
|
|
else:
|
|
exp_error_code = expected_error_codes[0]
|
|
|
|
expected_exception, irregular_msg = _get_exception_type(
|
|
exp_error_code)
|
|
|
|
caught_exception = None
|
|
test_status = 'Allowed'
|
|
|
|
try:
|
|
test_func(*args, **kwargs)
|
|
except rbac_exceptions.RbacInvalidServiceException:
|
|
with excutils.save_and_reraise_exception():
|
|
msg = ("%s is not a valid service." % service)
|
|
# FIXME(felipemonteiro): This test_status is logged too
|
|
# late. Need a function to log it before re-raising.
|
|
test_status = ('Error, %s' % (msg))
|
|
LOG.error(msg)
|
|
except (expected_exception,
|
|
rbac_exceptions.BasePatroleResponseBodyException) \
|
|
as actual_exception:
|
|
caught_exception = actual_exception
|
|
test_status = 'Denied'
|
|
|
|
if irregular_msg:
|
|
LOG.warning(irregular_msg,
|
|
test_func.__name__,
|
|
', '.join(rules),
|
|
service)
|
|
|
|
if allowed:
|
|
msg = ("User with roles %s was not allowed to perform the "
|
|
"following actions: %s. Expected allowed actions: "
|
|
"%s. Expected disallowed actions: %s." % (
|
|
roles, sorted(rules),
|
|
sorted(set(rules) - set(disallowed_rules)),
|
|
sorted(disallowed_rules)))
|
|
LOG.error(msg)
|
|
raise rbac_exceptions.RbacUnderPermissionException(
|
|
"%s Exception was: %s" % (msg, actual_exception))
|
|
except Exception as actual_exception:
|
|
caught_exception = actual_exception
|
|
|
|
if _check_for_expected_mismatch_exception(expected_exception,
|
|
actual_exception):
|
|
LOG.error('Expected and actual exceptions do not match. '
|
|
'Expected: %s. Actual: %s.',
|
|
expected_exception,
|
|
actual_exception.__class__)
|
|
raise rbac_exceptions.RbacExpectedWrongException(
|
|
expected=expected_exception,
|
|
actual=actual_exception.__class__,
|
|
exception=actual_exception)
|
|
else:
|
|
with excutils.save_and_reraise_exception():
|
|
exc_info = sys.exc_info()
|
|
error_details = six.text_type(exc_info[1])
|
|
msg = ("An unexpected exception has occurred during "
|
|
"test: %s. Exception was: %s" % (
|
|
test_func.__name__, error_details))
|
|
test_status = 'Error, %s' % (error_details)
|
|
LOG.error(msg)
|
|
else:
|
|
if not allowed:
|
|
msg = (
|
|
"OverPermission: Role %s was allowed to perform the "
|
|
"following disallowed actions: %s" % (
|
|
roles, sorted(disallowed_rules)
|
|
)
|
|
)
|
|
LOG.error(msg)
|
|
raise rbac_exceptions.RbacOverPermissionException(msg)
|
|
finally:
|
|
if CONF.patrole_log.enable_reporting:
|
|
RBACLOG.info(
|
|
"[Service]: %s, [Test]: %s, [Rules]: %s, "
|
|
"[Expected]: %s, [Actual]: %s",
|
|
service, test_func.__name__, ', '.join(rules),
|
|
"Allowed" if allowed else "Denied",
|
|
test_status)
|
|
|
|
# Sanity-check that ``override_role`` was called to eliminate
|
|
# false-positives and bad test flows resulting from exceptions
|
|
# getting raised too early, too late or not at all, within
|
|
# the scope of an RBAC test.
|
|
_validate_override_role_called(
|
|
test_obj,
|
|
actual_exception=caught_exception)
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def _prepare_multi_policy(rules, exp_error_codes):
|
|
if exp_error_codes:
|
|
if not rules:
|
|
msg = ("The `rules` list must be provided if using the "
|
|
"`expected_error_codes` list.")
|
|
raise ValueError(msg)
|
|
if len(rules) != len(exp_error_codes):
|
|
msg = ("The `expected_error_codes` list is not the same length "
|
|
"as the `rules` list.")
|
|
raise ValueError(msg)
|
|
if not isinstance(exp_error_codes, (tuple, list)):
|
|
exp_error_codes = [exp_error_codes]
|
|
else:
|
|
exp_error_codes = []
|
|
|
|
if rules is None:
|
|
rules = []
|
|
elif not isinstance(rules, (tuple, list)):
|
|
rules = [rules]
|
|
|
|
# Fill in the exp_error_codes if needed. This is needed for the scenarios
|
|
# where no exp_error_codes array is provided, so the error codes must be
|
|
# set to the default error code value and there must be the same number
|
|
# of error codes as rules.
|
|
num_ecs = len(exp_error_codes)
|
|
num_rules = len(rules)
|
|
if (num_ecs < num_rules):
|
|
for i in range(num_rules - num_ecs):
|
|
exp_error_codes.append(_DEFAULT_ERROR_CODE)
|
|
|
|
evaluated_rules = [
|
|
r() if callable(r) else r for r in rules
|
|
]
|
|
|
|
return evaluated_rules, exp_error_codes
|
|
|
|
|
|
def _is_authorized(test_obj, service, rule, extra_target_data):
|
|
"""Validates whether current RBAC role has permission to do policy action.
|
|
|
|
:param test_obj: An instance or subclass of ``tempest.test.BaseTestCase``.
|
|
:param service: The OpenStack service that enforces ``rule``.
|
|
:param rule: The name of the policy action. Examples include
|
|
"identity:create_user" or "os_compute_api:os-agents".
|
|
:param extra_target_data: Dictionary, keyed with ``oslo.policy`` generic
|
|
check names, whose values are string literals that reference nested
|
|
``tempest.test.BaseTestCase`` attributes. Used by ``oslo.policy`` for
|
|
performing matching against attributes that are sent along with the API
|
|
calls.
|
|
|
|
:returns: True if the current RBAC role can perform the policy action,
|
|
else False.
|
|
|
|
:raises RbacResourceSetupFailed: If `project_id` or `user_id` are missing
|
|
from the `auth_provider` attribute in `test_obj`.
|
|
"""
|
|
|
|
try:
|
|
project_id = test_obj.os_primary.credentials.project_id
|
|
user_id = test_obj.os_primary.credentials.user_id
|
|
except AttributeError as e:
|
|
msg = ("{0}: project_id or user_id not found in os_primary.credentials"
|
|
.format(e))
|
|
LOG.error(msg)
|
|
raise rbac_exceptions.RbacResourceSetupFailed(msg)
|
|
|
|
roles = CONF.patrole.rbac_test_roles
|
|
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
|
|
if CONF.patrole.rbac_test_role:
|
|
if not roles:
|
|
roles.append(CONF.patrole.rbac_test_role)
|
|
|
|
# Adding implied roles
|
|
roles = test_obj.get_all_needed_roles(roles)
|
|
|
|
# Test RBAC against custom requirements. Otherwise use oslo.policy.
|
|
if CONF.patrole.test_custom_requirements:
|
|
authority = requirements_authority.RequirementsAuthority(
|
|
CONF.patrole.custom_requirements_file, service)
|
|
else:
|
|
formatted_target_data = _format_extra_target_data(
|
|
test_obj, extra_target_data)
|
|
policy_authority.PolicyAuthority.os_admin = test_obj.os_admin
|
|
authority = policy_authority.PolicyAuthority(
|
|
project_id, user_id, service,
|
|
extra_target_data=formatted_target_data)
|
|
|
|
is_allowed = authority.allowed(rule, roles)
|
|
|
|
if is_allowed:
|
|
LOG.debug("[Policy action]: %s, [Role]: %s is allowed!", rule,
|
|
roles)
|
|
else:
|
|
LOG.debug("[Policy action]: %s, [Role]: %s is NOT allowed!",
|
|
rule, roles)
|
|
|
|
return is_allowed
|
|
|
|
|
|
def _get_exception_type(expected_error_code=_DEFAULT_ERROR_CODE):
|
|
"""Dynamically calculate the expected exception to be caught.
|
|
|
|
Dynamically calculate the expected exception to be caught by the test case.
|
|
Only ``Forbidden`` and ``NotFound`` exceptions are permitted. ``NotFound``
|
|
is supported because Neutron, for security reasons, masks ``Forbidden``
|
|
exceptions as ``NotFound`` exceptions.
|
|
|
|
:param expected_error_code: the integer representation of the expected
|
|
exception to be caught. Must be contained in
|
|
``_SUPPORTED_ERROR_CODES``.
|
|
:returns: tuple of the exception type corresponding to
|
|
``expected_error_code`` and a message explaining that a non-Forbidden
|
|
exception was expected, if applicable.
|
|
"""
|
|
expected_exception = None
|
|
irregular_msg = None
|
|
|
|
if not isinstance(expected_error_code, six.integer_types) \
|
|
or expected_error_code not in _SUPPORTED_ERROR_CODES:
|
|
msg = ("Please pass an expected error code. Currently "
|
|
"supported codes: {0}".format(_SUPPORTED_ERROR_CODES))
|
|
LOG.error(msg)
|
|
raise rbac_exceptions.RbacInvalidErrorCode(msg)
|
|
|
|
if expected_error_code == 403:
|
|
expected_exception = lib_exc.Forbidden
|
|
elif expected_error_code == 404:
|
|
expected_exception = lib_exc.NotFound
|
|
irregular_msg = ("NotFound exception was caught for test %s. Expected "
|
|
"policies which may have caused the error: %s. The "
|
|
"service %s throws a 404 instead of a 403, which is "
|
|
"irregular")
|
|
return expected_exception, irregular_msg
|
|
|
|
|
|
def _format_extra_target_data(test_obj, extra_target_data):
|
|
"""Formats the "extra_target_data" dictionary with correct test data.
|
|
|
|
Before being formatted, "extra_target_data" is a dictionary that maps a
|
|
policy string like "trust.trustor_user_id" to a nested list of
|
|
``tempest.test.BaseTestCase`` attributes. For example, the attribute list
|
|
in::
|
|
|
|
"trust.trustor_user_id": "os.auth_provider.credentials.user_id"
|
|
|
|
is parsed by iteratively calling ``getattr`` until the value of "user_id"
|
|
is resolved. The resulting dictionary returns::
|
|
|
|
"trust.trustor_user_id": "the user_id of the `os_primary` credential"
|
|
|
|
:param test_obj: An instance or subclass of ``tempest.test.BaseTestCase``.
|
|
:param extra_target_data: Dictionary, keyed with ``oslo.policy`` generic
|
|
check names, whose values are string literals that reference nested
|
|
``tempest.test.BaseTestCase`` attributes. Used by ``oslo.policy`` for
|
|
performing matching against attributes that are sent along with the API
|
|
calls.
|
|
:returns: Dictionary containing additional object data needed by
|
|
``oslo.policy`` to validate generic checks.
|
|
"""
|
|
attr_value = test_obj
|
|
formatted_target_data = {}
|
|
|
|
for user_attribute, attr_string in extra_target_data.items():
|
|
attrs = attr_string.split('.')
|
|
for attr in attrs:
|
|
attr_value = getattr(attr_value, attr)
|
|
formatted_target_data[user_attribute] = attr_value
|
|
|
|
return formatted_target_data
|
|
|
|
|
|
def _check_for_expected_mismatch_exception(expected_exception,
|
|
actual_exception):
|
|
"""Checks that ``expected_exception`` matches ``actual_exception``.
|
|
|
|
Since Patrole must handle 403/404 it is important that the expected and
|
|
actual error codes match.
|
|
|
|
:param excepted_exception: Expected exception for test.
|
|
:param actual_exception: Actual exception raised by test.
|
|
:returns: True if match, else False.
|
|
:rtype: boolean
|
|
"""
|
|
permission_exceptions = (lib_exc.Forbidden, lib_exc.NotFound)
|
|
if isinstance(actual_exception, permission_exceptions):
|
|
if not isinstance(actual_exception, expected_exception.__class__):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _validate_override_role_called(test_obj, actual_exception):
|
|
"""Validates that :func:`rbac_utils.RbacUtilsMixin.override_role` is called
|
|
during each Patrole test.
|
|
|
|
Useful for validating that the expected exception isn't raised too early
|
|
(before ``override_role`` call) or too late (after ``override_call``) or
|
|
at all (which is a bad test).
|
|
|
|
:param test_obj: An instance or subclass of ``tempest.test.BaseTestCase``.
|
|
:param actual_exception: Actual exception raised by test.
|
|
:raises RbacOverrideRoleException: If ``override_role`` isn't called, is
|
|
called too early, or is called too late.
|
|
"""
|
|
called = test_obj._validate_override_role_called()
|
|
base_msg = ('This error is unrelated to RBAC and is due to either '
|
|
'an API or override role failure. Exception: %s' %
|
|
actual_exception)
|
|
|
|
if not called:
|
|
if actual_exception is not None:
|
|
# Use testtools skipException in base TestCase
|
|
# to support different skip exceptions used.
|
|
# Just return so the skip exception will go up
|
|
# the stack and be handled by the unit testing framework
|
|
if isinstance(actual_exception,
|
|
testtools.testcase.TestCase.skipException):
|
|
return
|
|
msg = ('Caught exception (%s) but it was raised before the '
|
|
'`override_role` context. ' % actual_exception.__class__)
|
|
else:
|
|
msg = 'Test missing required `override_role` call. '
|
|
msg += base_msg
|
|
LOG.error(msg)
|
|
raise rbac_exceptions.RbacOverrideRoleException(msg)
|
|
else:
|
|
exc_caught_in_ctx = test_obj._validate_override_role_caught_exc()
|
|
# This block is only executed if ``override_role`` is called. If
|
|
# an exception is raised and the exception wasn't raised in the
|
|
# ``override_role`` context and if the exception isn't a valid
|
|
# exception type (instance of ``BasePatroleException``), then this is
|
|
# a legitimate error.
|
|
if (not exc_caught_in_ctx and
|
|
actual_exception is not None and
|
|
not isinstance(actual_exception,
|
|
rbac_exceptions.BasePatroleException)):
|
|
msg = ('Caught exception (%s) but it was raised after the '
|
|
'`override_role` context. ' % actual_exception.__class__)
|
|
msg += base_msg
|
|
LOG.error(msg)
|
|
raise rbac_exceptions.RbacOverrideRoleException(msg)
|