Merge "Multi role RBAC validation"

This commit is contained in:
Zuul 2018-11-07 00:33:28 +00:00 committed by Gerrit Code Review
commit 415261e3d5
18 changed files with 470 additions and 163 deletions

View File

@ -75,7 +75,7 @@
description: Patrole job for admin role.
vars:
devstack_localrc:
RBAC_TEST_ROLE: admin
RBAC_TEST_ROLES: admin
- job:
name: patrole-member
@ -88,7 +88,7 @@
- stable/pike
vars:
devstack_localrc:
RBAC_TEST_ROLE: member
RBAC_TEST_ROLES: member
- job:
name: patrole-member-rocky
@ -111,7 +111,7 @@
voting: false
vars:
devstack_localrc:
RBAC_TEST_ROLE: admin
RBAC_TEST_ROLES: admin
- job:
name: patrole-multinode-member
@ -119,7 +119,7 @@
voting: false
vars:
devstack_localrc:
RBAC_TEST_ROLE: member
RBAC_TEST_ROLES: member
- job:
name: patrole-py35-member
@ -129,7 +129,7 @@
devstack_localrc:
# Use member for py35 because arguably negative testing is more
# important than admin, which is already covered by patrole-admin job.
RBAC_TEST_ROLE: member
RBAC_TEST_ROLES: member
USE_PYTHON3: true
devstack_services:
s-account: false
@ -168,7 +168,7 @@
voting: false
vars:
devstack_localrc:
RBAC_TEST_ROLE: member
RBAC_TEST_ROLES: member
tempest_test_regex: (?=.*PluginRbacTest)(^patrole_tempest_plugin\.tests\.api)
- job:
@ -177,7 +177,7 @@
voting: false
vars:
devstack_localrc:
RBAC_TEST_ROLE: admin
RBAC_TEST_ROLES: admin
tempest_test_regex: (?=.*PluginRbacTest)(^patrole_tempest_plugin\.tests\.api)
- project:

View File

@ -186,20 +186,20 @@ the steps outlined therein. Afterward, proceed with the steps below.
RBAC Tests
----------
To change the role that the patrole tests are being run as, edit
``rbac_test_role`` in the ``patrole`` section of tempest.conf: ::
To change the roles that the patrole tests are being run as, edit
``rbac_test_roles`` in the ``patrole`` section of tempest.conf: ::
[patrole]
rbac_test_role = member
rbac_test_role = member,reader
...
.. note::
The ``rbac_test_role`` is service-specific. member, for example,
The ``rbac_test_roles`` is service-specific. member, for example,
is an arbitrary role, but by convention is used to designate the default
non-admin role in the system. Most Patrole tests should be run with
**admin** and **member** roles. However, other services may use entirely
different roles.
different roles or role combinations.
For more information about the member role and its nomenclature,
please see: `<https://ask.openstack.org/en/question/4759/member-vs-_member_/>`__.

View File

@ -14,9 +14,14 @@ function install_patrole_tempest_plugin {
setup_package $PATROLE_DIR -e
if [[ ${DEVSTACK_SERIES} == 'pike' ]]; then
if [[ "$RBAC_TEST_ROLE" == "member" ]]; then
RBAC_TEST_ROLE="Member"
fi
IFS=',' read -ra roles_array <<< "$RBAC_TEST_ROLES"
RBAC_TEST_ROLES=""
for i in "${roles_array[@]}"; do
if [[ $i == "member" ]]; then
i="Member"
fi
RBAC_TEST_ROLES="$i,$RBAC_TEST_ROLES"
done
# Policies used by Patrole testing that were changed in a backwards-incompatible way.
# TODO(felipemonteiro): Remove these once stable/pike becomes EOL.
@ -35,9 +40,14 @@ function install_patrole_tempest_plugin {
fi
if [[ ${DEVSTACK_SERIES} == 'queens' ]]; then
if [[ "$RBAC_TEST_ROLE" == "member" ]]; then
RBAC_TEST_ROLE="Member"
fi
IFS=',' read -ra roles_array <<< "$RBAC_TEST_ROLES"
RBAC_TEST_ROLES=""
for i in "${roles_array[@]}"; do
if [[ $i == "member" ]]; then
i="Member"
fi
RBAC_TEST_ROLES="$i,$RBAC_TEST_ROLES"
done
# TODO(cl566n): Remove these once stable/queens becomes EOL.
# These policies were removed in Stein but are available in Queens.
@ -52,7 +62,7 @@ function install_patrole_tempest_plugin {
iniset $TEMPEST_CONFIG policy-feature-enabled removed_keystone_policies_stein False
fi
iniset $TEMPEST_CONFIG patrole rbac_test_role $RBAC_TEST_ROLE
iniset $TEMPEST_CONFIG patrole rbac_test_roles $RBAC_TEST_ROLES
}
if is_service_enabled tempest; then

View File

@ -7,34 +7,28 @@ Patrole can be customized by updating Tempest's ``tempest.conf`` configuration
file. All Patrole-specific configuration options should be included under
the ``patrole`` group.
RBAC Test Role
--------------
RBAC Test Roles
---------------
The RBAC test role governs which role is used when running Patrole tests. For
example, setting ``rbac_test_role`` to "admin" will execute all RBAC tests
using admin credentials. Changing the ``rbac_test_role`` value will `override`
Tempest's primary credentials to use that role.
The RBAC test roles govern the list of roles to be used when running Patrole
tests. For example, setting ``rbac_test_roles`` to "admin" will execute all
RBAC tests using admin credentials. Changing the ``rbac_test_roles`` value
will `override` Tempest's primary credentials to use that role.
This implies that, if ``rbac_test_role`` is "admin", regardless of the Tempest
This implies that, if ``rbac_test_roles`` is "admin", regardless of the Tempest
credentials used by a client, the client will be calling APIs using the admin
role. That is, ``self.os_primary.servers_client`` will run as though it were
``self.os_admin.servers_client``.
Similarly, setting ``rbac_test_role`` to a non-admin role results in Tempest's
primary credentials being overridden by the role specified by
``rbac_test_role``.
Similarly, setting ``rbac_test_roles`` with various roles, results in
Tempest's primary credentials being overridden by the roles specified by
``rbac_test_roles``.
.. note::
Only the role of the primary Tempest credentials ("os_primary") is
Only the roles of the primary Tempest credentials ("os_primary") are
modified. The ``user_id`` and ``project_id`` remain unchanged.
Enable RBAC
-----------
Given the value of ``enable_rbac``, enables or disables Patrole tests. If
``enable_rbac`` is ``False``, then Patrole tests are skipped.
Custom Policy Files
-------------------

View File

@ -10,10 +10,10 @@ Validation Workflow Overview
RBAC testing validation is broken up into 3 stages:
#. "Expected" stage. Determine whether the test should be able to succeed
or fail based on the test role defined by ``[patrole] rbac_test_role``)
or fail based on the test roles defined by ``[patrole] rbac_test_roles``)
and the policy action that the test enforces.
#. "Actual" stage. Run the test by calling the API endpoint that enforces
the expected policy action using the test role.
the expected policy action using the test roles.
#. Comparing the outputs from both stages for consistency. A "consistent"
result is treated as a pass and an "inconsistent" result is treated
as a failure. "Consistent" (or successful) cases include:
@ -63,7 +63,7 @@ The Policy Authority Module
---------------------------
Module called by :ref:`rbac-validation` to verify whether the test
role is allowed to execute a policy action by querying ``oslo.policy`` with
roles are allowed to execute a policy action by querying ``oslo.policy`` with
required test data. The result is used by :ref:`rbac-validation` as the
"Expected" result.

View File

@ -23,8 +23,8 @@ Role Overriding
Role overriding is the way Patrole is able to create resources and delete
resources -- including those that require admin credentials -- while still
being able to exercise the same set of Tempest credentials to perform the API
action that authorizes the policy under test, by manipulating the role of
the Tempest credentials.
action that authorizes the policy under test, by manipulating roles of the
Tempest credentials.
Patrole implicitly splits up each test into 3 stages: set up, test execution,
and teardown.
@ -33,10 +33,10 @@ The role workflow is as follows:
#. Setup: Admin role is used automatically. The primary credentials are
overridden with the admin role.
#. Test execution: ``[patrole] rbac_test_role`` is used manually via the
#. Test execution: ``[patrole] rbac_test_roles`` is used manually via the
call to ``with rbac_utils.override_role(self)``. Everything that
is executed within this contextmanager uses the primary
credentials overridden with the ``[patrole] rbac_test_role``.
credentials overridden with the ``[patrole] rbac_test_roles``.
#. Teardown: Admin role is used automatically. The primary credentials have
been overridden with the admin role.

View File

@ -7,12 +7,17 @@
# From patrole.config
#
# The current RBAC role against which to run Patrole
# tests. (string value)
#rbac_test_role = admin
# DEPRECATED: The current RBAC role against which to run
# Patrole tests. (string value)
# This option is deprecated for removal.
# Its value may be silently ignored in the future.
# Reason: This option is deprecated and being
# replaced with ``rbac_test_roles``.
#rbac_test_role =
# Enables RBAC tests. (boolean value)
#enable_rbac = true
# The current RBAC roles to be assigned to Keystone
# Group against which to run Patrole tests. (list value)
#rbac_test_roles = admin
# List of the paths to search for policy files. Each
# policy path assumes that the service name is included in the path
@ -20,9 +25,11 @@
# assumes Patrole is on the same host as the policy files. The paths
# should be
# ordered by precedence, with high-priority paths before low-priority
# paths. The
# first path that is found to contain the service's policy file will
# be used.
# paths. All
# the paths that are found to contain the service's policy file will
# be used and
# all policy files will be merged. Allowed ``json`` or ``yaml``
# formats.
# (list value)
#custom_policy_files = /etc/%s/policy.json
@ -150,3 +157,16 @@
# This policy
# was changed in a backwards-incompatible way. (boolean value)
#volume_extension_volume_actions_unreserve_policy = true
# Are the Nova API extension policies available in the
# cloud (e.g. os_compute_api:os-extended-availability-zone)? These
# policies were
# removed in Stein because Nova API extension concept was removed in
# Pike. (boolean value)
#removed_nova_policies_stein = true
# Are the Cinder API extension policies available in the
# cloud (e.g. [create|update|get|delete]_encryption_policy)? These
# policies are
# added in Stein. (boolean value)
#added_cinder_policies_stein = true

View File

@ -22,8 +22,16 @@ patrole_group = cfg.OptGroup(name='patrole', title='Patrole Testing Options')
PatroleGroup = [
cfg.StrOpt('rbac_test_role',
default='admin',
deprecated_for_removal=True,
deprecated_reason="""This option is deprecated and being
replaced with ``rbac_test_roles``.
""",
help="""The current RBAC role against which to run
Patrole tests."""),
cfg.ListOpt('rbac_test_roles',
help="""List of the RBAC roles against which to run
Patrole tests.""",
default=['admin']),
cfg.ListOpt('custom_policy_files',
default=['/etc/%s/policy.json'],
help="""List of the paths to search for policy files. Each

View File

@ -154,17 +154,17 @@ class PolicyAuthority(RbacAuthority):
if os.path.isfile(filename):
cls.policy_files[service].append(filename)
def allowed(self, rule_name, role):
def allowed(self, rule_name, roles):
"""Checks if a given rule in a policy is allowed with given role.
:param string rule_name: Policy name to pass to``oslo.policy``.
:param string role: Role to validate for authorization.
:param List[string] roles: List of roles to validate for authorization.
:raises RbacParsingException: If ``rule_name`` does not exist in the
cloud (in policy file or among registered in-code policy defaults).
"""
is_admin_context = self._is_admin_context(role)
is_admin_context = self._is_admin_context(roles)
is_allowed = self._allowed(
access=self._get_access_token(role),
access=self._get_access_token(roles),
apply_rule=rule_name,
is_admin=is_admin_context)
return is_allowed
@ -224,7 +224,7 @@ class PolicyAuthority(RbacAuthority):
return rules
def _is_admin_context(self, role):
def _is_admin_context(self, roles):
"""Checks whether a role has admin context.
If context_is_admin is contained in the policy file, then checks
@ -233,17 +233,17 @@ class PolicyAuthority(RbacAuthority):
"""
if 'context_is_admin' in self.rules.keys():
return self._allowed(
access=self._get_access_token(role),
access=self._get_access_token(roles),
apply_rule='context_is_admin')
return role == CONF.identity.admin_role
return CONF.identity.admin_role in roles
def _get_access_token(self, role):
def _get_access_token(self, roles):
access_token = {
"token": {
"roles": [
{
"name": role
}
} for role in roles
],
"project_id": self.project_id,
"tenant_id": self.project_id,

View File

@ -17,6 +17,7 @@ import functools
import logging
import sys
from oslo_log import versionutils
from oslo_utils import excutils
import six
@ -47,7 +48,7 @@ def action(service,
* an OpenStack service,
* a policy action (``rule``) enforced by that service, and
* the test role defined by ``[patrole] rbac_test_role``
* the test roles defined by ``[patrole] rbac_test_roles``
determines whether the test role has sufficient permissions to perform an
API call that enforces the ``rule``.
@ -142,7 +143,15 @@ def action(service,
expected_error_codes)
def decorator(test_func):
role = CONF.patrole.rbac_test_role
roles = CONF.patrole.rbac_test_roles
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
if CONF.patrole.rbac_test_role:
msg = ('CONF.patrole.rbac_test_role is deprecated in favor of '
'CONF.patrole.rbac_test_roles and will be removed in '
'future.')
versionutils.report_deprecated_feature(LOG, msg)
if not roles:
roles.append(CONF.patrole.rbac_test_role)
@functools.wraps(test_func)
def wrapper(*args, **kwargs):
@ -200,10 +209,10 @@ def action(service,
service)
if allowed:
msg = ("Role %s was not allowed to perform the following "
"actions: %s. Expected allowed actions: %s. "
"Expected disallowed actions: %s." % (
role, sorted(rules),
msg = ("User with roles %s was not allowed to perform the "
"following actions: %s. Expected allowed actions: "
"%s. Expected disallowed actions: %s." % (
roles, sorted(rules),
sorted(set(rules) - set(disallowed_rules)),
sorted(disallowed_rules)))
LOG.error(msg)
@ -236,7 +245,7 @@ def action(service,
msg = (
"OverPermission: Role %s was allowed to perform the "
"following disallowed actions: %s" % (
role, sorted(disallowed_rules)
roles, sorted(disallowed_rules)
)
)
LOG.error(msg)
@ -328,7 +337,12 @@ def _is_authorized(test_obj, service, rule, extra_target_data):
LOG.error(msg)
raise rbac_exceptions.RbacResourceSetupFailed(msg)
role = CONF.patrole.rbac_test_role
roles = CONF.patrole.rbac_test_roles
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
if CONF.patrole.rbac_test_role:
if not roles:
roles.append(CONF.patrole.rbac_test_role)
# Test RBAC against custom requirements. Otherwise use oslo.policy.
if CONF.patrole.test_custom_requirements:
authority = requirements_authority.RequirementsAuthority(
@ -339,14 +353,14 @@ def _is_authorized(test_obj, service, rule, extra_target_data):
authority = policy_authority.PolicyAuthority(
project_id, user_id, service,
extra_target_data=formatted_target_data)
is_allowed = authority.allowed(rule, role)
is_allowed = authority.allowed(rule, roles)
if is_allowed:
LOG.debug("[Policy action]: %s, [Role]: %s is allowed!", rule,
role)
roles)
else:
LOG.debug("[Policy action]: %s, [Role]: %s is NOT allowed!",
rule, role)
rule, roles)
return is_allowed

View File

@ -40,7 +40,7 @@ class RbacUtils(object):
up, and primary credentials, needed to perform the API call which does
policy enforcement. The primary credentials always cycle between roles
defined by ``CONF.identity.admin_role`` and
``CONF.patrole.rbac_test_role``.
``CONF.patrole.rbac_test_roles``.
"""
def __init__(self, test_obj):
@ -58,10 +58,15 @@ class RbacUtils(object):
"Patrole role overriding only supports v3 identity API.")
self.admin_roles_client = admin_roles_client
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
# Change default role to admin
self._override_role(test_obj, False)
admin_role_id = None
rbac_role_id = None
rbac_role_ids = None
@contextmanager
def override_role(self, test_obj):
@ -69,7 +74,7 @@ class RbacUtils(object):
Temporarily change the role used by ``os_primary`` credentials to:
* ``[patrole] rbac_test_role`` before test execution
* ``[patrole] rbac_test_roles`` before test execution
* ``[identity] admin_role`` after test execution
Automatically switches to admin role after test execution.
@ -122,25 +127,21 @@ class RbacUtils(object):
* If True: role is set to ``[patrole] rbac_test_role``
* If False: role is set to ``[identity] admin_role``
"""
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
self.token = test_obj.os_primary.auth_provider.get_token()
LOG.debug('Overriding role to: %s.', toggle_rbac_role)
role_already_present = False
roles_already_present = False
try:
if not all([self.admin_role_id, self.rbac_role_id]):
if not all([self.admin_role_id, self.rbac_role_ids]):
self._get_roles_by_name()
target_role = (
self.rbac_role_id if toggle_rbac_role else self.admin_role_id)
role_already_present = self._list_and_clear_user_roles_on_project(
target_role)
target_roles = (self.rbac_role_ids
if toggle_rbac_role else [self.admin_role_id])
roles_already_present = self._list_and_clear_user_roles_on_project(
target_roles)
# Do not override roles if `target_role` already exists.
if not role_already_present:
self._create_user_role_on_project(target_role)
if not roles_already_present:
self._create_user_role_on_project(target_roles)
except Exception as exp:
with excutils.save_and_reraise_exception():
LOG.exception(exp)
@ -152,8 +153,8 @@ class RbacUtils(object):
# passing the second boundary before attempting to authenticate.
# Only sleep if a token revocation occurred as a result of role
# overriding. This will optimize test runtime in the case where
# ``[identity] admin_role`` == ``[patrole] rbac_test_role``.
if not role_already_present:
# ``[identity] admin_role`` == ``[patrole] rbac_test_roles``.
if not roles_already_present:
time.sleep(1)
for provider in auth_providers:
@ -164,41 +165,53 @@ class RbacUtils(object):
role_map = {r['name']: r['id'] for r in available_roles}
LOG.debug('Available roles: %s', list(role_map.keys()))
admin_role_id = role_map.get(CONF.identity.admin_role)
rbac_role_id = role_map.get(CONF.patrole.rbac_test_role)
rbac_role_ids = []
roles = CONF.patrole.rbac_test_roles
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
if CONF.patrole.rbac_test_role:
if not roles:
roles.append(CONF.patrole.rbac_test_role)
if not all([admin_role_id, rbac_role_id]):
for role_name in roles:
rbac_role_ids.append(role_map.get(role_name))
admin_role_id = role_map.get(CONF.identity.admin_role)
if not all([admin_role_id, all(rbac_role_ids)]):
missing_roles = []
msg = ("Could not find `[patrole] rbac_test_role` or "
msg = ("Could not find `[patrole] rbac_test_roles` or "
"`[identity] admin_role`, both of which are required for "
"RBAC testing.")
if not admin_role_id:
missing_roles.append(CONF.identity.admin_role)
if not rbac_role_id:
missing_roles.append(CONF.patrole.rbac_test_role)
if not all(rbac_role_ids):
missing_roles += [role_name for role_name in roles
if not role_map.get(role_name)]
msg += " Following roles were not found: %s." % (
", ".join(missing_roles))
msg += " Available roles: %s." % ", ".join(list(role_map.keys()))
raise rbac_exceptions.RbacResourceSetupFailed(msg)
self.admin_role_id = admin_role_id
self.rbac_role_id = rbac_role_id
self.rbac_role_ids = rbac_role_ids
def _create_user_role_on_project(self, role_id):
self.admin_roles_client.create_user_role_on_project(
self.project_id, self.user_id, role_id)
def _create_user_role_on_project(self, role_ids):
for role_id in role_ids:
self.admin_roles_client.create_user_role_on_project(
self.project_id, self.user_id, role_id)
def _list_and_clear_user_roles_on_project(self, role_id):
def _list_and_clear_user_roles_on_project(self, role_ids):
roles = self.admin_roles_client.list_user_roles_on_project(
self.project_id, self.user_id)['roles']
role_ids = [role['id'] for role in roles]
all_role_ids = [role['id'] for role in roles]
# NOTE(felipemonteiro): We do not use ``role_id in role_ids`` here to
# avoid over-permission errors: if the current list of roles on the
# NOTE(felipemonteiro): We do not use ``role_id in all_role_ids`` here
# to avoid over-permission errors: if the current list of roles on the
# project includes "admin" and "Member", and we are switching to the
# "Member" role, then we must delete the "admin" role. Thus, we only
# return early if the user's roles on the project are an exact match.
if [role_id] == role_ids:
if set(role_ids) == set(all_role_ids):
return True
for role in roles:
@ -279,8 +292,14 @@ class RbacUtilsMixin(object):
def is_admin():
"""Verifies whether the current test role equals the admin role.
:returns: True if ``rbac_test_role`` is the admin role.
:returns: True if ``rbac_test_roles`` contain the admin role.
"""
roles = CONF.patrole.rbac_test_roles
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
if CONF.patrole.rbac_test_role:
roles.append(CONF.patrole.rbac_test_role)
roles = list(set(roles))
# TODO(felipemonteiro): Make this more robust via a context is admin
# lookup.
return CONF.patrole.rbac_test_role == CONF.identity.admin_role
return CONF.identity.admin_role in roles

View File

@ -95,13 +95,14 @@ class RequirementsAuthority(RbacAuthority):
else:
self.roles_dict = None
def allowed(self, rule_name, role):
def allowed(self, rule_name, roles):
"""Checks if a given rule in a policy is allowed with given role.
:param string rule_name: Rule to be checked using provided requirements
file specified by ``[patrole].custom_requirements_file``. Must be
a key present in this file, under the appropriate component.
:param string role: Role to validate against custom requirements file.
:param List[string] roles: Roles to validate against custom
requirements file.
:returns: True if ``role`` is allowed to perform ``rule_name``, else
False.
:rtype: bool
@ -115,8 +116,7 @@ class RequirementsAuthority(RbacAuthority):
"formatted.")
try:
_api = self.roles_dict[rule_name]
return role in _api
return all(role in _api for role in roles)
except KeyError:
raise KeyError("'%s' API is not defined in the requirements YAML "
"file" % rule_name)
return False

View File

@ -66,7 +66,8 @@ class RbacUtilsFixture(fixtures.Fixture):
def setUp(self):
super(RbacUtilsFixture, self).setUp()
self.useFixture(ConfPatcher(rbac_test_role='member', group='patrole'))
self.useFixture(ConfPatcher(rbac_test_roles=['member'],
group='patrole'))
self.useFixture(ConfPatcher(
admin_role='admin', auth_version='v3', group='identity'))
self.useFixture(ConfPatcher(
@ -92,7 +93,7 @@ class RbacUtilsFixture(fixtures.Fixture):
mock_admin_mgr = mock.patch.object(
clients, 'Manager', spec=clients.Manager,
roles_v3_client=mock.Mock(), roles_client=mock.Mock()).start()
self.roles_v3_client = mock_admin_mgr.return_value.roles_v3_client
self.admin_roles_client = mock_admin_mgr.return_value.roles_v3_client
self.set_roles(['admin', 'member'], [])
@ -153,6 +154,6 @@ class RbacUtilsFixture(fixtures.Fixture):
for role in roles_on_project]
}
self.roles_v3_client.list_roles.return_value = available_roles
self.roles_v3_client.list_user_roles_on_project.return_value = (
self.admin_roles_client.list_roles.return_value = available_roles
self.admin_roles_client.list_user_roles_on_project.return_value = (
available_project_roles)

View File

@ -108,9 +108,60 @@ class PolicyAuthorityTest(base.TestCase):
for rule, role_list in expected.items():
for role in role_list:
self.assertTrue(authority.allowed(rule, role))
self.assertTrue(authority.allowed(rule, [role]))
for role in set(default_roles) - set(role_list):
self.assertFalse(authority.allowed(rule, role))
self.assertFalse(authority.allowed(rule, [role]))
@mock.patch.object(policy_authority, 'LOG', autospec=True)
def _test_custom_multi_roles_policy(self, *args):
default_roles = ['zero', 'one', 'two', 'three', 'four',
'five', 'six', 'seven', 'eight', 'nine']
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "custom_rbac_policy")
expected = {
'policy_action_1': ['two', 'four', 'six', 'eight'],
'policy_action_2': ['one', 'three', 'five', 'seven', 'nine'],
'policy_action_4': ['one', 'two', 'three', 'five', 'seven'],
'policy_action_5': ['zero', 'one', 'two', 'three', 'four', 'five',
'six', 'seven', 'eight', 'nine'],
}
for rule, role_list in expected.items():
allowed_roles_lists = [roles for roles in [
role_list[len(role_list) // 2:],
role_list[:len(role_list) // 2]] if roles]
for test_roles in allowed_roles_lists:
self.assertTrue(authority.allowed(rule, test_roles))
disallowed_roles = list(set(default_roles) - set(role_list))
disallowed_roles_lists = [roles for roles in [
disallowed_roles[len(disallowed_roles) // 2:],
disallowed_roles[:len(disallowed_roles) // 2]] if roles]
for test_roles in disallowed_roles_lists:
self.assertFalse(authority.allowed(rule, test_roles))
def test_empty_rbac_test_roles(self):
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "custom_rbac_policy")
disallowed_for_empty_roles = ['policy_action_1', 'policy_action_2',
'policy_action_3', 'policy_action_4',
'policy_action_6']
# Due to "policy_action_5": "rule:all_rule" / "all_rule": ""
allowed_for_empty_roles = ['policy_action_5']
for rule in disallowed_for_empty_roles:
self.assertFalse(authority.allowed(rule, []))
for rule in allowed_for_empty_roles:
self.assertTrue(authority.allowed(rule, []))
def test_custom_policy_json(self):
# The CONF.patrole.custom_policy_files has a path to JSON file by
@ -122,24 +173,34 @@ class PolicyAuthorityTest(base.TestCase):
custom_policy_files=[self.conf_policy_path_yaml], group='patrole'))
self._test_custom_policy()
def test_custom_multi_roles_policy_json(self):
# The CONF.patrole.custom_policy_files has a path to JSON file by
# default, so we don't need to use ConfPatcher here.
self._test_custom_multi_roles_policy()
def test_custom_multi_roles_policy_yaml(self):
self.useFixture(fixtures.ConfPatcher(
custom_policy_files=[self.conf_policy_path_yaml], group='patrole'))
self._test_custom_multi_roles_policy()
def test_admin_policy_file_with_admin_role(self):
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "admin_rbac_policy")
role = 'admin'
roles = ['admin']
allowed_rules = [
'admin_rule', 'is_admin_rule', 'alt_admin_rule'
]
disallowed_rules = ['non_admin_rule']
for rule in allowed_rules:
allowed = authority.allowed(rule, role)
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, role)
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
def test_admin_policy_file_with_member_role(self):
@ -148,7 +209,7 @@ class PolicyAuthorityTest(base.TestCase):
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "admin_rbac_policy")
role = 'Member'
roles = ['Member']
allowed_rules = [
'non_admin_rule'
]
@ -156,11 +217,11 @@ class PolicyAuthorityTest(base.TestCase):
'admin_rule', 'is_admin_rule', 'alt_admin_rule']
for rule in allowed_rules:
allowed = authority.allowed(rule, role)
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, role)
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
def test_alt_admin_policy_file_with_context_is_admin(self):
@ -169,28 +230,28 @@ class PolicyAuthorityTest(base.TestCase):
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "alt_admin_rbac_policy")
role = 'fake_admin'
roles = ['fake_admin']
allowed_rules = ['non_admin_rule']
disallowed_rules = ['admin_rule']
for rule in allowed_rules:
allowed = authority.allowed(rule, role)
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, role)
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
role = 'super_admin'
roles = ['super_admin']
allowed_rules = ['admin_rule']
disallowed_rules = ['non_admin_rule']
for rule in allowed_rules:
allowed = authority.allowed(rule, role)
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, role)
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
def test_tenant_user_policy(self):
@ -208,17 +269,17 @@ class PolicyAuthorityTest(base.TestCase):
# Check whether Member role can perform expected actions.
allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
for rule in allowed_rules:
allowed = authority.allowed(rule, 'Member')
allowed = authority.allowed(rule, ['Member'])
self.assertTrue(allowed)
disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
for disallowed_rule in disallowed_rules:
self.assertFalse(authority.allowed(disallowed_rule, 'Member'))
self.assertFalse(authority.allowed(disallowed_rule, ['Member']))
# Check whether admin role can perform expected actions.
allowed_rules.extend(disallowed_rules)
for rule in allowed_rules:
allowed = authority.allowed(rule, 'admin')
allowed = authority.allowed(rule, ['admin'])
self.assertTrue(allowed)
# Check whether _try_rule is called with the correct target dictionary.
@ -243,7 +304,7 @@ class PolicyAuthorityTest(base.TestCase):
}
for rule in allowed_rules:
allowed = authority.allowed(rule, 'Member')
allowed = authority.allowed(rule, ['Member'])
self.assertTrue(allowed)
mock_try_rule.assert_called_once_with(
rule, expected_target, expected_access_data, mock.ANY)
@ -292,7 +353,7 @@ class PolicyAuthorityTest(base.TestCase):
fake_rule, [self.custom_policy_file], "custom_rbac_policy")
e = self.assertRaises(rbac_exceptions.RbacParsingException,
authority.allowed, fake_rule, None)
authority.allowed, fake_rule, [None])
self.assertIn(expected_message, str(e))
m_log.debug.assert_called_once_with(expected_message)
@ -309,13 +370,13 @@ class PolicyAuthorityTest(base.TestCase):
mock.sentinel.error)})
expected_message = (
'Policy action "{0}" not found in policy files: {1} or among '
'Policy action "[{0}]" not found in policy files: {1} or among '
'registered policy in code defaults for {2} service.').format(
mock.sentinel.rule, [self.custom_policy_file],
"custom_rbac_policy")
e = self.assertRaises(rbac_exceptions.RbacParsingException,
authority.allowed, mock.sentinel.rule, None)
authority.allowed, [mock.sentinel.rule], [None])
self.assertIn(expected_message, str(e))
m_log.debug.assert_called_once_with(expected_message)

View File

@ -47,7 +47,7 @@ class BaseRBACRuleValidationTest(base.TestCase):
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
self.useFixture(
patrole_fixtures.ConfPatcher(rbac_test_role='Member',
patrole_fixtures.ConfPatcher(rbac_test_roles=['member'],
group='patrole'))
# Disable patrole log for unit tests.
self.useFixture(
@ -55,6 +55,29 @@ class BaseRBACRuleValidationTest(base.TestCase):
group='patrole_log'))
class BaseRBACMultiRoleRuleValidationTest(base.TestCase):
def setUp(self):
super(BaseRBACMultiRoleRuleValidationTest, self).setUp()
self.mock_test_args = mock.Mock(spec=test.BaseTestCase)
self.mock_test_args.os_primary = mock.Mock(spec=manager.Manager)
self.mock_test_args.rbac_utils = mock.Mock(
spec_set=rbac_utils.RbacUtils)
# Setup credentials for mock client manager.
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
project_id=mock.sentinel.project_id)
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
self.useFixture(
patrole_fixtures.ConfPatcher(
rbac_test_roles=['member', 'anotherrole'], group='patrole'))
# Disable patrole log for unit tests.
self.useFixture(
patrole_fixtures.ConfPatcher(enable_reporting=False,
group='patrole_log'))
class RBACRuleValidationTest(BaseRBACRuleValidationTest):
"""Test suite for validating fundamental functionality for the
``rbac_rule_validation`` decorator.
@ -118,8 +141,8 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
def test_policy(*args):
raise exceptions.Forbidden()
test_re = ("Role Member was not allowed to perform the following "
"actions: \[%s\].*" % (mock.sentinel.action))
test_re = ("User with roles \['member'\] was not allowed to perform "
"the following actions: \[%s\].*" % (mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
self.mock_test_args)
@ -159,11 +182,10 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
def test_policy(*args):
raise rbac_exceptions.RbacMalformedResponse()
test_re = ("Role Member was not allowed to perform the following "
"actions: \[%s\].*" % (mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
self.mock_test_args)
test_re = ("User with roles \['member'\] was not allowed to perform "
"the following actions: \[%s\]. " % (mock.sentinel.action))
self.assertRaisesRegex(rbac_exceptions.RbacUnderPermissionException,
test_re, test_policy, self.mock_test_args)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@ -214,8 +236,8 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
raise exceptions.NotFound()
expected_errors = [
("Role Member was not allowed to perform the following "
"actions: \['%s'\].*" % policy_names[0]),
("User with roles \['member'\] was not allowed to perform the "
"following actions: \['%s'\].*" % policy_names[0]),
None
]
@ -348,6 +370,108 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
mock_log.error.reset_mock()
class RBACMultiRoleRuleValidationTest(BaseRBACMultiRoleRuleValidationTest,
RBACRuleValidationTest):
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_forbidden_negative(self, mock_authority,
mock_log):
"""Test RbacUnderPermissionException error is thrown and have
permission fails.
Negative test case: if Forbidden is thrown and the user should be
allowed to perform the action, then the RbacUnderPermissionException
exception should be raised.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
def test_policy(*args):
raise exceptions.Forbidden()
test_re = ("User with roles \['member', 'anotherrole'\] was not "
"allowed to perform the following actions: \[%s\].*" %
(mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
self.mock_test_args)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_rbac_malformed_response_negative(
self, mock_authority, mock_log):
"""Test RbacMalformedResponse error is thrown with permission fails.
Negative test case: if RbacMalformedResponse is thrown and the user is
allowed to perform the action, then this is an expected failure.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
def test_policy(*args):
raise rbac_exceptions.RbacMalformedResponse()
test_re = ("User with roles \['member', 'anotherrole'\] was not "
"allowed to perform the following actions: \[%s\]. " %
(mock.sentinel.action))
self.assertRaisesRegex(rbac_exceptions.RbacUnderPermissionException,
test_re, test_policy, self.mock_test_args)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_expect_not_found_and_raise_not_found(self, mock_authority,
mock_log):
"""Test that expecting 404 and getting 404 works for all scenarios.
Tests the following scenarios:
1) Test no permission and 404 is expected and 404 is thrown succeeds.
2) Test have permission and 404 is expected and 404 is thrown fails.
In both cases, a LOG.warning is called with the "irregular message"
that signals to user that a 404 was expected and caught.
"""
policy_names = ['foo:bar']
@rbac_rv.action(mock.sentinel.service, rules=policy_names,
expected_error_codes=[404])
def test_policy(*args):
raise exceptions.NotFound()
expected_errors = [
("User with roles \['member', 'anotherrole'\] was not allowed to "
"perform the following actions: \['%s'\].*" % policy_names[0]),
None
]
for pos, allowed in enumerate([True, False]):
mock_authority.PolicyAuthority.return_value.allowed\
.return_value = allowed
error_re = expected_errors[pos]
if error_re:
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.mock_test_args)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
else:
test_policy(self.mock_test_args)
mock_log.error.assert_not_called()
mock_log.warning.assert_called_with(
"NotFound exception was caught for test %s. Expected policies "
"which may have caused the error: %s. The service %s throws a "
"404 instead of a 403, which is irregular",
test_policy.__name__,
', '.join(policy_names),
mock.sentinel.service)
mock_log.warning.reset_mock()
mock_log.error.reset_mock()
class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
"""Test class for validating the RBAC log, dedicated to just logging
Patrole RBAC validation work flows.
@ -422,7 +546,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
mock.sentinel.action,
CONF.patrole.rbac_test_role)
CONF.patrole.rbac_test_roles)
mock_log.error.assert_not_called()
@ -454,18 +578,23 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"foo",
CONF.patrole.rbac_test_role)
CONF.patrole.rbac_test_roles)
policy_authority.allowed.reset_mock()
test_bar_policy(self.mock_test_args)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"qux",
CONF.patrole.rbac_test_role)
CONF.patrole.rbac_test_roles)
mock_log.error.assert_not_called()
class RBACMultiRoleRuleValidationLoggingTest(
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationLoggingTest):
pass
class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
def setUp(self):
@ -488,6 +617,11 @@ class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
test_policy, self.mock_test_args)
class RBACMultiRoleRuleValidationNegativeTest(
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationNegativeTest):
pass
class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
"""Test suite for validating multi-policy support for the
``rbac_rule_validation`` decorator.
@ -502,7 +636,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
def _assert_policy_authority_called_with(self, rules, mock_authority):
m_authority = mock_authority.PolicyAuthority.return_value
m_authority.allowed.assert_has_calls([
mock.call(rule, CONF.patrole.rbac_test_role) for rule in rules
mock.call(rule, CONF.patrole.rbac_test_roles) for rule in rules
])
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
@ -614,10 +748,10 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
mock_authority.PolicyAuthority.return_value.allowed\
.return_value = True
error_re = ("Role Member was not allowed to perform the following "
"actions: %s. Expected allowed actions: %s. Expected "
"disallowed actions: []." % (rules, rules)).replace(
'[', '\[').replace(']', '\]')
error_re = ("User with roles ['member'] was not allowed to perform "
"the following actions: %s. Expected allowed actions: %s. "
"Expected disallowed actions: []." %
(rules, rules)).replace('[', '\[').replace(']', '\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.mock_test_args)
@ -739,6 +873,39 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
self.assertRaisesRegex(ValueError, error_re, _do_test, None, [404])
class RBACMultiRoleRuleValidationTestMultiPolicy(
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationTestMultiPolicy):
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_multi_policy_forbidden_failure(
self, mock_authority, mock_log):
"""Test that when the expected result is authorized and the test
fails (with a Forbidden error code) that the overall evaluation
results in a RbacUnderPermissionException getting raised.
"""
# NOTE: Avoid mock.sentinel here due to weird sorting with them.
rules = ['action1', 'action2', 'action3']
@rbac_rv.action(mock.sentinel.service, rules=rules,
expected_error_codes=[403, 403, 403])
def test_policy(*args):
raise exceptions.Forbidden()
mock_authority.PolicyAuthority.return_value.allowed\
.return_value = True
error_re = ("User with roles ['member', 'anotherrole'] was not "
"allowed to perform the following actions: %s. Expected "
"allowed actions: %s. Expected disallowed actions: []." %
(rules, rules)).replace('[', '\[').replace(']', '\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.mock_test_args)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
self._assert_policy_authority_called_with(rules, mock_authority)
class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
"""Class for validating that untimely exceptions (outside
``override_role`` is called) result in test failures.

View File

@ -52,7 +52,7 @@ class RBACUtilsTest(base.TestCase):
self.rbac_utils.override_role()
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.roles_v3_client
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_called_once_with(
@ -67,7 +67,7 @@ class RBACUtilsTest(base.TestCase):
self.rbac_utils.set_roles(['admin', 'member'], 'admin')
self.rbac_utils.override_role()
roles_client = self.rbac_utils.roles_v3_client
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_not_called()
@ -77,7 +77,7 @@ class RBACUtilsTest(base.TestCase):
self.rbac_utils.override_role(True)
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.roles_v3_client
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
@ -96,7 +96,7 @@ class RBACUtilsTest(base.TestCase):
self.rbac_utils.set_roles(['admin', 'member'], 'member')
self.rbac_utils.override_role(True)
roles_client = self.rbac_utils.roles_v3_client
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
@ -109,7 +109,7 @@ class RBACUtilsTest(base.TestCase):
self.rbac_utils.override_role(True, False)
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.roles_v3_client
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
@ -133,7 +133,7 @@ class RBACUtilsTest(base.TestCase):
self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
self.rbac_utils.override_role()
roles_client = self.rbac_utils.roles_v3_client
roles_client = self.rbac_utils.admin_roles_client
roles_client.list_user_roles_on_project.assert_called_once_with(
self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID)
@ -169,7 +169,8 @@ class RBACUtilsTest(base.TestCase):
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
@mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
@mock.patch.object(rbac_utils.RbacUtils, '_override_role',
autospec=True)
def test_override_role_context_manager_simulate_fail(self,
mock_override_role):
"""Validate that expected override_role calls are made when switching

View File

@ -38,11 +38,11 @@ class RequirementsAuthorityTest(base.TestCase):
def test_auth_allowed_empty_roles(self):
self.rbac_auth.roles_dict = None
self.assertRaises(exceptions.InvalidConfiguration,
self.rbac_auth.allowed, "", "")
self.rbac_auth.allowed, "", [""])
def test_auth_allowed_role_in_api(self):
self.rbac_auth.roles_dict = {'api': ['_member_']}
self.assertTrue(self.rbac_auth.allowed("api", "_member_"))
self.assertTrue(self.rbac_auth.allowed("api", ["_member_"]))
def test_auth_allowed_role_not_in_api(self):
self.rbac_auth.roles_dict = {'api': ['_member_']}
@ -64,7 +64,8 @@ class RequirementsAuthorityTest(base.TestCase):
self.rbac_auth.roles_dict = req_auth.RequirementsParser.parse("Test")
self.assertEqual(self.expected_result, self.rbac_auth.roles_dict)
self.assertTrue(self.rbac_auth.allowed("test:create2", "test_member"))
self.assertTrue(
self.rbac_auth.allowed("test:create2", ["test_member"]))
def test_parser_role_not_in_api(self):
req_auth.RequirementsParser.Inner._rbac_map = \
@ -82,4 +83,4 @@ class RequirementsAuthorityTest(base.TestCase):
self.assertIsNone(self.rbac_auth.roles_dict)
self.assertRaises(exceptions.InvalidConfiguration,
self.rbac_auth.allowed, "", "")
self.rbac_auth.allowed, "", [""])

View File

@ -0,0 +1,11 @@
---
features:
- |
We have replaced CONF.patrole.rbac_test_role with
CONF.patrole.rbac_test_roles, where instead of single role we can specify
list of roles to be assigned to test user. This way we may run rbac tests
for scenarios that requires user to have more that a single role.
deprecations:
- |
Config parameter CONF.rbac_test_role is deprecated in favor of
CONF.rbac_test_roles that implements a list of roles instead of single role.