Multi role RBAC validation
This patchset replaces ``CONF.patrole.rbac_test_role`` with ``CONF.patrole.rbac_test_roles``, where instead of single role we can specify list of roles to be assigned to test user. Change-Id: Ia68bcbdbb523dfe7c4abd6107fb4c426a566ae9d
This commit is contained in:
parent
c38aca7587
commit
e0f35503c9
14
.zuul.yaml
14
.zuul.yaml
|
@ -71,7 +71,7 @@
|
|||
description: Patrole job for admin role.
|
||||
vars:
|
||||
devstack_localrc:
|
||||
RBAC_TEST_ROLE: admin
|
||||
RBAC_TEST_ROLES: admin
|
||||
|
||||
- job:
|
||||
name: patrole-member
|
||||
|
@ -84,7 +84,7 @@
|
|||
- stable/pike
|
||||
vars:
|
||||
devstack_localrc:
|
||||
RBAC_TEST_ROLE: member
|
||||
RBAC_TEST_ROLES: member
|
||||
|
||||
- job:
|
||||
name: patrole-member-rocky
|
||||
|
@ -107,7 +107,7 @@
|
|||
voting: false
|
||||
vars:
|
||||
devstack_localrc:
|
||||
RBAC_TEST_ROLE: admin
|
||||
RBAC_TEST_ROLES: admin
|
||||
|
||||
- job:
|
||||
name: patrole-multinode-member
|
||||
|
@ -115,7 +115,7 @@
|
|||
voting: false
|
||||
vars:
|
||||
devstack_localrc:
|
||||
RBAC_TEST_ROLE: member
|
||||
RBAC_TEST_ROLES: member
|
||||
|
||||
- job:
|
||||
name: patrole-py35-member
|
||||
|
@ -125,7 +125,7 @@
|
|||
devstack_localrc:
|
||||
# Use member for py35 because arguably negative testing is more
|
||||
# important than admin, which is already covered by patrole-admin job.
|
||||
RBAC_TEST_ROLE: member
|
||||
RBAC_TEST_ROLES: member
|
||||
USE_PYTHON3: true
|
||||
devstack_services:
|
||||
s-account: false
|
||||
|
@ -164,7 +164,7 @@
|
|||
voting: false
|
||||
vars:
|
||||
devstack_localrc:
|
||||
RBAC_TEST_ROLE: member
|
||||
RBAC_TEST_ROLES: member
|
||||
tempest_test_regex: (?=.*PluginRbacTest)(^patrole_tempest_plugin\.tests\.api)
|
||||
|
||||
- job:
|
||||
|
@ -173,7 +173,7 @@
|
|||
voting: false
|
||||
vars:
|
||||
devstack_localrc:
|
||||
RBAC_TEST_ROLE: admin
|
||||
RBAC_TEST_ROLES: admin
|
||||
tempest_test_regex: (?=.*PluginRbacTest)(^patrole_tempest_plugin\.tests\.api)
|
||||
|
||||
- project:
|
||||
|
|
10
README.rst
10
README.rst
|
@ -186,20 +186,20 @@ the steps outlined therein. Afterward, proceed with the steps below.
|
|||
RBAC Tests
|
||||
----------
|
||||
|
||||
To change the role that the patrole tests are being run as, edit
|
||||
``rbac_test_role`` in the ``patrole`` section of tempest.conf: ::
|
||||
To change the roles that the patrole tests are being run as, edit
|
||||
``rbac_test_roles`` in the ``patrole`` section of tempest.conf: ::
|
||||
|
||||
[patrole]
|
||||
rbac_test_role = member
|
||||
rbac_test_role = member,reader
|
||||
...
|
||||
|
||||
.. note::
|
||||
|
||||
The ``rbac_test_role`` is service-specific. member, for example,
|
||||
The ``rbac_test_roles`` is service-specific. member, for example,
|
||||
is an arbitrary role, but by convention is used to designate the default
|
||||
non-admin role in the system. Most Patrole tests should be run with
|
||||
**admin** and **member** roles. However, other services may use entirely
|
||||
different roles.
|
||||
different roles or role combinations.
|
||||
|
||||
For more information about the member role and its nomenclature,
|
||||
please see: `<https://ask.openstack.org/en/question/4759/member-vs-_member_/>`__.
|
||||
|
|
|
@ -14,9 +14,14 @@ function install_patrole_tempest_plugin {
|
|||
setup_package $PATROLE_DIR -e
|
||||
|
||||
if [[ ${DEVSTACK_SERIES} == 'pike' ]]; then
|
||||
if [[ "$RBAC_TEST_ROLE" == "member" ]]; then
|
||||
RBAC_TEST_ROLE="Member"
|
||||
fi
|
||||
IFS=',' read -ra roles_array <<< "$RBAC_TEST_ROLES"
|
||||
RBAC_TEST_ROLES=""
|
||||
for i in "${roles_array[@]}"; do
|
||||
if [[ $i == "member" ]]; then
|
||||
i="Member"
|
||||
fi
|
||||
RBAC_TEST_ROLES="$i,$RBAC_TEST_ROLES"
|
||||
done
|
||||
|
||||
# Policies used by Patrole testing that were changed in a backwards-incompatible way.
|
||||
# TODO(felipemonteiro): Remove these once stable/pike becomes EOL.
|
||||
|
@ -35,9 +40,14 @@ function install_patrole_tempest_plugin {
|
|||
fi
|
||||
|
||||
if [[ ${DEVSTACK_SERIES} == 'queens' ]]; then
|
||||
if [[ "$RBAC_TEST_ROLE" == "member" ]]; then
|
||||
RBAC_TEST_ROLE="Member"
|
||||
fi
|
||||
IFS=',' read -ra roles_array <<< "$RBAC_TEST_ROLES"
|
||||
RBAC_TEST_ROLES=""
|
||||
for i in "${roles_array[@]}"; do
|
||||
if [[ $i == "member" ]]; then
|
||||
i="Member"
|
||||
fi
|
||||
RBAC_TEST_ROLES="$i,$RBAC_TEST_ROLES"
|
||||
done
|
||||
|
||||
# TODO(cl566n): Remove these once stable/queens becomes EOL.
|
||||
# These policies were removed in Stein but are available in Queens.
|
||||
|
@ -52,7 +62,7 @@ function install_patrole_tempest_plugin {
|
|||
iniset $TEMPEST_CONFIG policy-feature-enabled removed_keystone_policies_stein False
|
||||
fi
|
||||
|
||||
iniset $TEMPEST_CONFIG patrole rbac_test_role $RBAC_TEST_ROLE
|
||||
iniset $TEMPEST_CONFIG patrole rbac_test_roles $RBAC_TEST_ROLES
|
||||
}
|
||||
|
||||
if is_service_enabled tempest; then
|
||||
|
|
|
@ -7,34 +7,28 @@ Patrole can be customized by updating Tempest's ``tempest.conf`` configuration
|
|||
file. All Patrole-specific configuration options should be included under
|
||||
the ``patrole`` group.
|
||||
|
||||
RBAC Test Role
|
||||
--------------
|
||||
RBAC Test Roles
|
||||
---------------
|
||||
|
||||
The RBAC test role governs which role is used when running Patrole tests. For
|
||||
example, setting ``rbac_test_role`` to "admin" will execute all RBAC tests
|
||||
using admin credentials. Changing the ``rbac_test_role`` value will `override`
|
||||
Tempest's primary credentials to use that role.
|
||||
The RBAC test roles govern the list of roles to be used when running Patrole
|
||||
tests. For example, setting ``rbac_test_roles`` to "admin" will execute all
|
||||
RBAC tests using admin credentials. Changing the ``rbac_test_roles`` value
|
||||
will `override` Tempest's primary credentials to use that role.
|
||||
|
||||
This implies that, if ``rbac_test_role`` is "admin", regardless of the Tempest
|
||||
This implies that, if ``rbac_test_roles`` is "admin", regardless of the Tempest
|
||||
credentials used by a client, the client will be calling APIs using the admin
|
||||
role. That is, ``self.os_primary.servers_client`` will run as though it were
|
||||
``self.os_admin.servers_client``.
|
||||
|
||||
Similarly, setting ``rbac_test_role`` to a non-admin role results in Tempest's
|
||||
primary credentials being overridden by the role specified by
|
||||
``rbac_test_role``.
|
||||
Similarly, setting ``rbac_test_roles`` with various roles, results in
|
||||
Tempest's primary credentials being overridden by the roles specified by
|
||||
``rbac_test_roles``.
|
||||
|
||||
.. note::
|
||||
|
||||
Only the role of the primary Tempest credentials ("os_primary") is
|
||||
Only the roles of the primary Tempest credentials ("os_primary") are
|
||||
modified. The ``user_id`` and ``project_id`` remain unchanged.
|
||||
|
||||
Enable RBAC
|
||||
-----------
|
||||
|
||||
Given the value of ``enable_rbac``, enables or disables Patrole tests. If
|
||||
``enable_rbac`` is ``False``, then Patrole tests are skipped.
|
||||
|
||||
Custom Policy Files
|
||||
-------------------
|
||||
|
||||
|
|
|
@ -10,10 +10,10 @@ Validation Workflow Overview
|
|||
RBAC testing validation is broken up into 3 stages:
|
||||
|
||||
#. "Expected" stage. Determine whether the test should be able to succeed
|
||||
or fail based on the test role defined by ``[patrole] rbac_test_role``)
|
||||
or fail based on the test roles defined by ``[patrole] rbac_test_roles``)
|
||||
and the policy action that the test enforces.
|
||||
#. "Actual" stage. Run the test by calling the API endpoint that enforces
|
||||
the expected policy action using the test role.
|
||||
the expected policy action using the test roles.
|
||||
#. Comparing the outputs from both stages for consistency. A "consistent"
|
||||
result is treated as a pass and an "inconsistent" result is treated
|
||||
as a failure. "Consistent" (or successful) cases include:
|
||||
|
@ -63,7 +63,7 @@ The Policy Authority Module
|
|||
---------------------------
|
||||
|
||||
Module called by :ref:`rbac-validation` to verify whether the test
|
||||
role is allowed to execute a policy action by querying ``oslo.policy`` with
|
||||
roles are allowed to execute a policy action by querying ``oslo.policy`` with
|
||||
required test data. The result is used by :ref:`rbac-validation` as the
|
||||
"Expected" result.
|
||||
|
||||
|
|
|
@ -23,8 +23,8 @@ Role Overriding
|
|||
Role overriding is the way Patrole is able to create resources and delete
|
||||
resources -- including those that require admin credentials -- while still
|
||||
being able to exercise the same set of Tempest credentials to perform the API
|
||||
action that authorizes the policy under test, by manipulating the role of
|
||||
the Tempest credentials.
|
||||
action that authorizes the policy under test, by manipulating roles of the
|
||||
Tempest credentials.
|
||||
|
||||
Patrole implicitly splits up each test into 3 stages: set up, test execution,
|
||||
and teardown.
|
||||
|
@ -33,10 +33,10 @@ The role workflow is as follows:
|
|||
|
||||
#. Setup: Admin role is used automatically. The primary credentials are
|
||||
overridden with the admin role.
|
||||
#. Test execution: ``[patrole] rbac_test_role`` is used manually via the
|
||||
#. Test execution: ``[patrole] rbac_test_roles`` is used manually via the
|
||||
call to ``with rbac_utils.override_role(self)``. Everything that
|
||||
is executed within this contextmanager uses the primary
|
||||
credentials overridden with the ``[patrole] rbac_test_role``.
|
||||
credentials overridden with the ``[patrole] rbac_test_roles``.
|
||||
#. Teardown: Admin role is used automatically. The primary credentials have
|
||||
been overridden with the admin role.
|
||||
|
||||
|
|
|
@ -7,12 +7,17 @@
|
|||
# From patrole.config
|
||||
#
|
||||
|
||||
# The current RBAC role against which to run Patrole
|
||||
# tests. (string value)
|
||||
#rbac_test_role = admin
|
||||
# DEPRECATED: The current RBAC role against which to run
|
||||
# Patrole tests. (string value)
|
||||
# This option is deprecated for removal.
|
||||
# Its value may be silently ignored in the future.
|
||||
# Reason: This option is deprecated and being
|
||||
# replaced with ``rbac_test_roles``.
|
||||
#rbac_test_role =
|
||||
|
||||
# Enables RBAC tests. (boolean value)
|
||||
#enable_rbac = true
|
||||
# The current RBAC roles to be assigned to Keystone
|
||||
# Group against which to run Patrole tests. (list value)
|
||||
#rbac_test_roles = admin
|
||||
|
||||
# List of the paths to search for policy files. Each
|
||||
# policy path assumes that the service name is included in the path
|
||||
|
@ -20,9 +25,11 @@
|
|||
# assumes Patrole is on the same host as the policy files. The paths
|
||||
# should be
|
||||
# ordered by precedence, with high-priority paths before low-priority
|
||||
# paths. The
|
||||
# first path that is found to contain the service's policy file will
|
||||
# be used.
|
||||
# paths. All
|
||||
# the paths that are found to contain the service's policy file will
|
||||
# be used and
|
||||
# all policy files will be merged. Allowed ``json`` or ``yaml``
|
||||
# formats.
|
||||
# (list value)
|
||||
#custom_policy_files = /etc/%s/policy.json
|
||||
|
||||
|
@ -150,3 +157,16 @@
|
|||
# This policy
|
||||
# was changed in a backwards-incompatible way. (boolean value)
|
||||
#volume_extension_volume_actions_unreserve_policy = true
|
||||
|
||||
# Are the Nova API extension policies available in the
|
||||
# cloud (e.g. os_compute_api:os-extended-availability-zone)? These
|
||||
# policies were
|
||||
# removed in Stein because Nova API extension concept was removed in
|
||||
# Pike. (boolean value)
|
||||
#removed_nova_policies_stein = true
|
||||
|
||||
# Are the Cinder API extension policies available in the
|
||||
# cloud (e.g. [create|update|get|delete]_encryption_policy)? These
|
||||
# policies are
|
||||
# added in Stein. (boolean value)
|
||||
#added_cinder_policies_stein = true
|
||||
|
|
|
@ -22,8 +22,16 @@ patrole_group = cfg.OptGroup(name='patrole', title='Patrole Testing Options')
|
|||
PatroleGroup = [
|
||||
cfg.StrOpt('rbac_test_role',
|
||||
default='admin',
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason="""This option is deprecated and being
|
||||
replaced with ``rbac_test_roles``.
|
||||
""",
|
||||
help="""The current RBAC role against which to run
|
||||
Patrole tests."""),
|
||||
cfg.ListOpt('rbac_test_roles',
|
||||
help="""List of the RBAC roles against which to run
|
||||
Patrole tests.""",
|
||||
default=['admin']),
|
||||
cfg.ListOpt('custom_policy_files',
|
||||
default=['/etc/%s/policy.json'],
|
||||
help="""List of the paths to search for policy files. Each
|
||||
|
|
|
@ -154,17 +154,17 @@ class PolicyAuthority(RbacAuthority):
|
|||
if os.path.isfile(filename):
|
||||
cls.policy_files[service].append(filename)
|
||||
|
||||
def allowed(self, rule_name, role):
|
||||
def allowed(self, rule_name, roles):
|
||||
"""Checks if a given rule in a policy is allowed with given role.
|
||||
|
||||
:param string rule_name: Policy name to pass to``oslo.policy``.
|
||||
:param string role: Role to validate for authorization.
|
||||
:param List[string] roles: List of roles to validate for authorization.
|
||||
:raises RbacParsingException: If ``rule_name`` does not exist in the
|
||||
cloud (in policy file or among registered in-code policy defaults).
|
||||
"""
|
||||
is_admin_context = self._is_admin_context(role)
|
||||
is_admin_context = self._is_admin_context(roles)
|
||||
is_allowed = self._allowed(
|
||||
access=self._get_access_token(role),
|
||||
access=self._get_access_token(roles),
|
||||
apply_rule=rule_name,
|
||||
is_admin=is_admin_context)
|
||||
return is_allowed
|
||||
|
@ -224,7 +224,7 @@ class PolicyAuthority(RbacAuthority):
|
|||
|
||||
return rules
|
||||
|
||||
def _is_admin_context(self, role):
|
||||
def _is_admin_context(self, roles):
|
||||
"""Checks whether a role has admin context.
|
||||
|
||||
If context_is_admin is contained in the policy file, then checks
|
||||
|
@ -233,17 +233,17 @@ class PolicyAuthority(RbacAuthority):
|
|||
"""
|
||||
if 'context_is_admin' in self.rules.keys():
|
||||
return self._allowed(
|
||||
access=self._get_access_token(role),
|
||||
access=self._get_access_token(roles),
|
||||
apply_rule='context_is_admin')
|
||||
return role == CONF.identity.admin_role
|
||||
return CONF.identity.admin_role in roles
|
||||
|
||||
def _get_access_token(self, role):
|
||||
def _get_access_token(self, roles):
|
||||
access_token = {
|
||||
"token": {
|
||||
"roles": [
|
||||
{
|
||||
"name": role
|
||||
}
|
||||
} for role in roles
|
||||
],
|
||||
"project_id": self.project_id,
|
||||
"tenant_id": self.project_id,
|
||||
|
|
|
@ -17,6 +17,7 @@ import functools
|
|||
import logging
|
||||
import sys
|
||||
|
||||
from oslo_log import versionutils
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
|
||||
|
@ -47,7 +48,7 @@ def action(service,
|
|||
|
||||
* an OpenStack service,
|
||||
* a policy action (``rule``) enforced by that service, and
|
||||
* the test role defined by ``[patrole] rbac_test_role``
|
||||
* the test roles defined by ``[patrole] rbac_test_roles``
|
||||
|
||||
determines whether the test role has sufficient permissions to perform an
|
||||
API call that enforces the ``rule``.
|
||||
|
@ -142,7 +143,15 @@ def action(service,
|
|||
expected_error_codes)
|
||||
|
||||
def decorator(test_func):
|
||||
role = CONF.patrole.rbac_test_role
|
||||
roles = CONF.patrole.rbac_test_roles
|
||||
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
|
||||
if CONF.patrole.rbac_test_role:
|
||||
msg = ('CONF.patrole.rbac_test_role is deprecated in favor of '
|
||||
'CONF.patrole.rbac_test_roles and will be removed in '
|
||||
'future.')
|
||||
versionutils.report_deprecated_feature(LOG, msg)
|
||||
if not roles:
|
||||
roles.append(CONF.patrole.rbac_test_role)
|
||||
|
||||
@functools.wraps(test_func)
|
||||
def wrapper(*args, **kwargs):
|
||||
|
@ -200,10 +209,10 @@ def action(service,
|
|||
service)
|
||||
|
||||
if allowed:
|
||||
msg = ("Role %s was not allowed to perform the following "
|
||||
"actions: %s. Expected allowed actions: %s. "
|
||||
"Expected disallowed actions: %s." % (
|
||||
role, sorted(rules),
|
||||
msg = ("User with roles %s was not allowed to perform the "
|
||||
"following actions: %s. Expected allowed actions: "
|
||||
"%s. Expected disallowed actions: %s." % (
|
||||
roles, sorted(rules),
|
||||
sorted(set(rules) - set(disallowed_rules)),
|
||||
sorted(disallowed_rules)))
|
||||
LOG.error(msg)
|
||||
|
@ -236,7 +245,7 @@ def action(service,
|
|||
msg = (
|
||||
"OverPermission: Role %s was allowed to perform the "
|
||||
"following disallowed actions: %s" % (
|
||||
role, sorted(disallowed_rules)
|
||||
roles, sorted(disallowed_rules)
|
||||
)
|
||||
)
|
||||
LOG.error(msg)
|
||||
|
@ -328,7 +337,12 @@ def _is_authorized(test_obj, service, rule, extra_target_data):
|
|||
LOG.error(msg)
|
||||
raise rbac_exceptions.RbacResourceSetupFailed(msg)
|
||||
|
||||
role = CONF.patrole.rbac_test_role
|
||||
roles = CONF.patrole.rbac_test_roles
|
||||
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
|
||||
if CONF.patrole.rbac_test_role:
|
||||
if not roles:
|
||||
roles.append(CONF.patrole.rbac_test_role)
|
||||
|
||||
# Test RBAC against custom requirements. Otherwise use oslo.policy.
|
||||
if CONF.patrole.test_custom_requirements:
|
||||
authority = requirements_authority.RequirementsAuthority(
|
||||
|
@ -339,14 +353,14 @@ def _is_authorized(test_obj, service, rule, extra_target_data):
|
|||
authority = policy_authority.PolicyAuthority(
|
||||
project_id, user_id, service,
|
||||
extra_target_data=formatted_target_data)
|
||||
is_allowed = authority.allowed(rule, role)
|
||||
is_allowed = authority.allowed(rule, roles)
|
||||
|
||||
if is_allowed:
|
||||
LOG.debug("[Policy action]: %s, [Role]: %s is allowed!", rule,
|
||||
role)
|
||||
roles)
|
||||
else:
|
||||
LOG.debug("[Policy action]: %s, [Role]: %s is NOT allowed!",
|
||||
rule, role)
|
||||
rule, roles)
|
||||
|
||||
return is_allowed
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ class RbacUtils(object):
|
|||
up, and primary credentials, needed to perform the API call which does
|
||||
policy enforcement. The primary credentials always cycle between roles
|
||||
defined by ``CONF.identity.admin_role`` and
|
||||
``CONF.patrole.rbac_test_role``.
|
||||
``CONF.patrole.rbac_test_roles``.
|
||||
"""
|
||||
|
||||
def __init__(self, test_obj):
|
||||
|
@ -58,10 +58,15 @@ class RbacUtils(object):
|
|||
"Patrole role overriding only supports v3 identity API.")
|
||||
|
||||
self.admin_roles_client = admin_roles_client
|
||||
|
||||
self.user_id = test_obj.os_primary.credentials.user_id
|
||||
self.project_id = test_obj.os_primary.credentials.tenant_id
|
||||
|
||||
# Change default role to admin
|
||||
self._override_role(test_obj, False)
|
||||
|
||||
admin_role_id = None
|
||||
rbac_role_id = None
|
||||
rbac_role_ids = None
|
||||
|
||||
@contextmanager
|
||||
def override_role(self, test_obj):
|
||||
|
@ -69,7 +74,7 @@ class RbacUtils(object):
|
|||
|
||||
Temporarily change the role used by ``os_primary`` credentials to:
|
||||
|
||||
* ``[patrole] rbac_test_role`` before test execution
|
||||
* ``[patrole] rbac_test_roles`` before test execution
|
||||
* ``[identity] admin_role`` after test execution
|
||||
|
||||
Automatically switches to admin role after test execution.
|
||||
|
@ -122,25 +127,21 @@ class RbacUtils(object):
|
|||
* If True: role is set to ``[patrole] rbac_test_role``
|
||||
* If False: role is set to ``[identity] admin_role``
|
||||
"""
|
||||
self.user_id = test_obj.os_primary.credentials.user_id
|
||||
self.project_id = test_obj.os_primary.credentials.tenant_id
|
||||
self.token = test_obj.os_primary.auth_provider.get_token()
|
||||
|
||||
LOG.debug('Overriding role to: %s.', toggle_rbac_role)
|
||||
role_already_present = False
|
||||
roles_already_present = False
|
||||
|
||||
try:
|
||||
if not all([self.admin_role_id, self.rbac_role_id]):
|
||||
if not all([self.admin_role_id, self.rbac_role_ids]):
|
||||
self._get_roles_by_name()
|
||||
|
||||
target_role = (
|
||||
self.rbac_role_id if toggle_rbac_role else self.admin_role_id)
|
||||
role_already_present = self._list_and_clear_user_roles_on_project(
|
||||
target_role)
|
||||
target_roles = (self.rbac_role_ids
|
||||
if toggle_rbac_role else [self.admin_role_id])
|
||||
roles_already_present = self._list_and_clear_user_roles_on_project(
|
||||
target_roles)
|
||||
|
||||
# Do not override roles if `target_role` already exists.
|
||||
if not role_already_present:
|
||||
self._create_user_role_on_project(target_role)
|
||||
if not roles_already_present:
|
||||
self._create_user_role_on_project(target_roles)
|
||||
except Exception as exp:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(exp)
|
||||
|
@ -152,8 +153,8 @@ class RbacUtils(object):
|
|||
# passing the second boundary before attempting to authenticate.
|
||||
# Only sleep if a token revocation occurred as a result of role
|
||||
# overriding. This will optimize test runtime in the case where
|
||||
# ``[identity] admin_role`` == ``[patrole] rbac_test_role``.
|
||||
if not role_already_present:
|
||||
# ``[identity] admin_role`` == ``[patrole] rbac_test_roles``.
|
||||
if not roles_already_present:
|
||||
time.sleep(1)
|
||||
|
||||
for provider in auth_providers:
|
||||
|
@ -164,41 +165,53 @@ class RbacUtils(object):
|
|||
role_map = {r['name']: r['id'] for r in available_roles}
|
||||
LOG.debug('Available roles: %s', list(role_map.keys()))
|
||||
|
||||
admin_role_id = role_map.get(CONF.identity.admin_role)
|
||||
rbac_role_id = role_map.get(CONF.patrole.rbac_test_role)
|
||||
rbac_role_ids = []
|
||||
roles = CONF.patrole.rbac_test_roles
|
||||
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
|
||||
if CONF.patrole.rbac_test_role:
|
||||
if not roles:
|
||||
roles.append(CONF.patrole.rbac_test_role)
|
||||
|
||||
if not all([admin_role_id, rbac_role_id]):
|
||||
for role_name in roles:
|
||||
rbac_role_ids.append(role_map.get(role_name))
|
||||
|
||||
admin_role_id = role_map.get(CONF.identity.admin_role)
|
||||
|
||||
if not all([admin_role_id, all(rbac_role_ids)]):
|
||||
missing_roles = []
|
||||
msg = ("Could not find `[patrole] rbac_test_role` or "
|
||||
msg = ("Could not find `[patrole] rbac_test_roles` or "
|
||||
"`[identity] admin_role`, both of which are required for "
|
||||
"RBAC testing.")
|
||||
if not admin_role_id:
|
||||
missing_roles.append(CONF.identity.admin_role)
|
||||
if not rbac_role_id:
|
||||
missing_roles.append(CONF.patrole.rbac_test_role)
|
||||
if not all(rbac_role_ids):
|
||||
missing_roles += [role_name for role_name in roles
|
||||
if not role_map.get(role_name)]
|
||||
|
||||
msg += " Following roles were not found: %s." % (
|
||||
", ".join(missing_roles))
|
||||
msg += " Available roles: %s." % ", ".join(list(role_map.keys()))
|
||||
raise rbac_exceptions.RbacResourceSetupFailed(msg)
|
||||
|
||||
self.admin_role_id = admin_role_id
|
||||
self.rbac_role_id = rbac_role_id
|
||||
self.rbac_role_ids = rbac_role_ids
|
||||
|
||||
def _create_user_role_on_project(self, role_id):
|
||||
self.admin_roles_client.create_user_role_on_project(
|
||||
self.project_id, self.user_id, role_id)
|
||||
def _create_user_role_on_project(self, role_ids):
|
||||
for role_id in role_ids:
|
||||
self.admin_roles_client.create_user_role_on_project(
|
||||
self.project_id, self.user_id, role_id)
|
||||
|
||||
def _list_and_clear_user_roles_on_project(self, role_id):
|
||||
def _list_and_clear_user_roles_on_project(self, role_ids):
|
||||
roles = self.admin_roles_client.list_user_roles_on_project(
|
||||
self.project_id, self.user_id)['roles']
|
||||
role_ids = [role['id'] for role in roles]
|
||||
all_role_ids = [role['id'] for role in roles]
|
||||
|
||||
# NOTE(felipemonteiro): We do not use ``role_id in role_ids`` here to
|
||||
# avoid over-permission errors: if the current list of roles on the
|
||||
# NOTE(felipemonteiro): We do not use ``role_id in all_role_ids`` here
|
||||
# to avoid over-permission errors: if the current list of roles on the
|
||||
# project includes "admin" and "Member", and we are switching to the
|
||||
# "Member" role, then we must delete the "admin" role. Thus, we only
|
||||
# return early if the user's roles on the project are an exact match.
|
||||
if [role_id] == role_ids:
|
||||
if set(role_ids) == set(all_role_ids):
|
||||
return True
|
||||
|
||||
for role in roles:
|
||||
|
@ -279,8 +292,14 @@ class RbacUtilsMixin(object):
|
|||
def is_admin():
|
||||
"""Verifies whether the current test role equals the admin role.
|
||||
|
||||
:returns: True if ``rbac_test_role`` is the admin role.
|
||||
:returns: True if ``rbac_test_roles`` contain the admin role.
|
||||
"""
|
||||
roles = CONF.patrole.rbac_test_roles
|
||||
# TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
|
||||
if CONF.patrole.rbac_test_role:
|
||||
roles.append(CONF.patrole.rbac_test_role)
|
||||
roles = list(set(roles))
|
||||
|
||||
# TODO(felipemonteiro): Make this more robust via a context is admin
|
||||
# lookup.
|
||||
return CONF.patrole.rbac_test_role == CONF.identity.admin_role
|
||||
return CONF.identity.admin_role in roles
|
||||
|
|
|
@ -95,13 +95,14 @@ class RequirementsAuthority(RbacAuthority):
|
|||
else:
|
||||
self.roles_dict = None
|
||||
|
||||
def allowed(self, rule_name, role):
|
||||
def allowed(self, rule_name, roles):
|
||||
"""Checks if a given rule in a policy is allowed with given role.
|
||||
|
||||
:param string rule_name: Rule to be checked using provided requirements
|
||||
file specified by ``[patrole].custom_requirements_file``. Must be
|
||||
a key present in this file, under the appropriate component.
|
||||
:param string role: Role to validate against custom requirements file.
|
||||
:param List[string] roles: Roles to validate against custom
|
||||
requirements file.
|
||||
:returns: True if ``role`` is allowed to perform ``rule_name``, else
|
||||
False.
|
||||
:rtype: bool
|
||||
|
@ -115,8 +116,7 @@ class RequirementsAuthority(RbacAuthority):
|
|||
"formatted.")
|
||||
try:
|
||||
_api = self.roles_dict[rule_name]
|
||||
return role in _api
|
||||
return all(role in _api for role in roles)
|
||||
except KeyError:
|
||||
raise KeyError("'%s' API is not defined in the requirements YAML "
|
||||
"file" % rule_name)
|
||||
return False
|
||||
|
|
|
@ -66,7 +66,8 @@ class RbacUtilsFixture(fixtures.Fixture):
|
|||
def setUp(self):
|
||||
super(RbacUtilsFixture, self).setUp()
|
||||
|
||||
self.useFixture(ConfPatcher(rbac_test_role='member', group='patrole'))
|
||||
self.useFixture(ConfPatcher(rbac_test_roles=['member'],
|
||||
group='patrole'))
|
||||
self.useFixture(ConfPatcher(
|
||||
admin_role='admin', auth_version='v3', group='identity'))
|
||||
self.useFixture(ConfPatcher(
|
||||
|
@ -92,7 +93,7 @@ class RbacUtilsFixture(fixtures.Fixture):
|
|||
mock_admin_mgr = mock.patch.object(
|
||||
clients, 'Manager', spec=clients.Manager,
|
||||
roles_v3_client=mock.Mock(), roles_client=mock.Mock()).start()
|
||||
self.roles_v3_client = mock_admin_mgr.return_value.roles_v3_client
|
||||
self.admin_roles_client = mock_admin_mgr.return_value.roles_v3_client
|
||||
|
||||
self.set_roles(['admin', 'member'], [])
|
||||
|
||||
|
@ -153,6 +154,6 @@ class RbacUtilsFixture(fixtures.Fixture):
|
|||
for role in roles_on_project]
|
||||
}
|
||||
|
||||
self.roles_v3_client.list_roles.return_value = available_roles
|
||||
self.roles_v3_client.list_user_roles_on_project.return_value = (
|
||||
self.admin_roles_client.list_roles.return_value = available_roles
|
||||
self.admin_roles_client.list_user_roles_on_project.return_value = (
|
||||
available_project_roles)
|
||||
|
|
|
@ -108,9 +108,60 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
|
||||
for rule, role_list in expected.items():
|
||||
for role in role_list:
|
||||
self.assertTrue(authority.allowed(rule, role))
|
||||
self.assertTrue(authority.allowed(rule, [role]))
|
||||
for role in set(default_roles) - set(role_list):
|
||||
self.assertFalse(authority.allowed(rule, role))
|
||||
self.assertFalse(authority.allowed(rule, [role]))
|
||||
|
||||
@mock.patch.object(policy_authority, 'LOG', autospec=True)
|
||||
def _test_custom_multi_roles_policy(self, *args):
|
||||
default_roles = ['zero', 'one', 'two', 'three', 'four',
|
||||
'five', 'six', 'seven', 'eight', 'nine']
|
||||
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "custom_rbac_policy")
|
||||
|
||||
expected = {
|
||||
'policy_action_1': ['two', 'four', 'six', 'eight'],
|
||||
'policy_action_2': ['one', 'three', 'five', 'seven', 'nine'],
|
||||
'policy_action_4': ['one', 'two', 'three', 'five', 'seven'],
|
||||
'policy_action_5': ['zero', 'one', 'two', 'three', 'four', 'five',
|
||||
'six', 'seven', 'eight', 'nine'],
|
||||
}
|
||||
|
||||
for rule, role_list in expected.items():
|
||||
allowed_roles_lists = [roles for roles in [
|
||||
role_list[len(role_list) // 2:],
|
||||
role_list[:len(role_list) // 2]] if roles]
|
||||
for test_roles in allowed_roles_lists:
|
||||
self.assertTrue(authority.allowed(rule, test_roles))
|
||||
|
||||
disallowed_roles = list(set(default_roles) - set(role_list))
|
||||
disallowed_roles_lists = [roles for roles in [
|
||||
disallowed_roles[len(disallowed_roles) // 2:],
|
||||
disallowed_roles[:len(disallowed_roles) // 2]] if roles]
|
||||
for test_roles in disallowed_roles_lists:
|
||||
self.assertFalse(authority.allowed(rule, test_roles))
|
||||
|
||||
def test_empty_rbac_test_roles(self):
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "custom_rbac_policy")
|
||||
|
||||
disallowed_for_empty_roles = ['policy_action_1', 'policy_action_2',
|
||||
'policy_action_3', 'policy_action_4',
|
||||
'policy_action_6']
|
||||
|
||||
# Due to "policy_action_5": "rule:all_rule" / "all_rule": ""
|
||||
allowed_for_empty_roles = ['policy_action_5']
|
||||
|
||||
for rule in disallowed_for_empty_roles:
|
||||
self.assertFalse(authority.allowed(rule, []))
|
||||
|
||||
for rule in allowed_for_empty_roles:
|
||||
self.assertTrue(authority.allowed(rule, []))
|
||||
|
||||
def test_custom_policy_json(self):
|
||||
# The CONF.patrole.custom_policy_files has a path to JSON file by
|
||||
|
@ -122,24 +173,34 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
custom_policy_files=[self.conf_policy_path_yaml], group='patrole'))
|
||||
self._test_custom_policy()
|
||||
|
||||
def test_custom_multi_roles_policy_json(self):
|
||||
# The CONF.patrole.custom_policy_files has a path to JSON file by
|
||||
# default, so we don't need to use ConfPatcher here.
|
||||
self._test_custom_multi_roles_policy()
|
||||
|
||||
def test_custom_multi_roles_policy_yaml(self):
|
||||
self.useFixture(fixtures.ConfPatcher(
|
||||
custom_policy_files=[self.conf_policy_path_yaml], group='patrole'))
|
||||
self._test_custom_multi_roles_policy()
|
||||
|
||||
def test_admin_policy_file_with_admin_role(self):
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "admin_rbac_policy")
|
||||
|
||||
role = 'admin'
|
||||
roles = ['admin']
|
||||
allowed_rules = [
|
||||
'admin_rule', 'is_admin_rule', 'alt_admin_rule'
|
||||
]
|
||||
disallowed_rules = ['non_admin_rule']
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, role)
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, role)
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
|
||||
def test_admin_policy_file_with_member_role(self):
|
||||
|
@ -148,7 +209,7 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "admin_rbac_policy")
|
||||
|
||||
role = 'Member'
|
||||
roles = ['Member']
|
||||
allowed_rules = [
|
||||
'non_admin_rule'
|
||||
]
|
||||
|
@ -156,11 +217,11 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
'admin_rule', 'is_admin_rule', 'alt_admin_rule']
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, role)
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, role)
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
|
||||
def test_alt_admin_policy_file_with_context_is_admin(self):
|
||||
|
@ -169,28 +230,28 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "alt_admin_rbac_policy")
|
||||
|
||||
role = 'fake_admin'
|
||||
roles = ['fake_admin']
|
||||
allowed_rules = ['non_admin_rule']
|
||||
disallowed_rules = ['admin_rule']
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, role)
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, role)
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
|
||||
role = 'super_admin'
|
||||
roles = ['super_admin']
|
||||
allowed_rules = ['admin_rule']
|
||||
disallowed_rules = ['non_admin_rule']
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, role)
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, role)
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
|
||||
def test_tenant_user_policy(self):
|
||||
|
@ -208,17 +269,17 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
# Check whether Member role can perform expected actions.
|
||||
allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, 'Member')
|
||||
allowed = authority.allowed(rule, ['Member'])
|
||||
self.assertTrue(allowed)
|
||||
|
||||
disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
|
||||
for disallowed_rule in disallowed_rules:
|
||||
self.assertFalse(authority.allowed(disallowed_rule, 'Member'))
|
||||
self.assertFalse(authority.allowed(disallowed_rule, ['Member']))
|
||||
|
||||
# Check whether admin role can perform expected actions.
|
||||
allowed_rules.extend(disallowed_rules)
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, 'admin')
|
||||
allowed = authority.allowed(rule, ['admin'])
|
||||
self.assertTrue(allowed)
|
||||
|
||||
# Check whether _try_rule is called with the correct target dictionary.
|
||||
|
@ -243,7 +304,7 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
}
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, 'Member')
|
||||
allowed = authority.allowed(rule, ['Member'])
|
||||
self.assertTrue(allowed)
|
||||
mock_try_rule.assert_called_once_with(
|
||||
rule, expected_target, expected_access_data, mock.ANY)
|
||||
|
@ -292,7 +353,7 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
fake_rule, [self.custom_policy_file], "custom_rbac_policy")
|
||||
|
||||
e = self.assertRaises(rbac_exceptions.RbacParsingException,
|
||||
authority.allowed, fake_rule, None)
|
||||
authority.allowed, fake_rule, [None])
|
||||
self.assertIn(expected_message, str(e))
|
||||
m_log.debug.assert_called_once_with(expected_message)
|
||||
|
||||
|
@ -309,13 +370,13 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
mock.sentinel.error)})
|
||||
|
||||
expected_message = (
|
||||
'Policy action "{0}" not found in policy files: {1} or among '
|
||||
'Policy action "[{0}]" not found in policy files: {1} or among '
|
||||
'registered policy in code defaults for {2} service.').format(
|
||||
mock.sentinel.rule, [self.custom_policy_file],
|
||||
"custom_rbac_policy")
|
||||
|
||||
e = self.assertRaises(rbac_exceptions.RbacParsingException,
|
||||
authority.allowed, mock.sentinel.rule, None)
|
||||
authority.allowed, [mock.sentinel.rule], [None])
|
||||
self.assertIn(expected_message, str(e))
|
||||
m_log.debug.assert_called_once_with(expected_message)
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ class BaseRBACRuleValidationTest(base.TestCase):
|
|||
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
|
||||
|
||||
self.useFixture(
|
||||
patrole_fixtures.ConfPatcher(rbac_test_role='Member',
|
||||
patrole_fixtures.ConfPatcher(rbac_test_roles=['member'],
|
||||
group='patrole'))
|
||||
# Disable patrole log for unit tests.
|
||||
self.useFixture(
|
||||
|
@ -55,6 +55,29 @@ class BaseRBACRuleValidationTest(base.TestCase):
|
|||
group='patrole_log'))
|
||||
|
||||
|
||||
class BaseRBACMultiRoleRuleValidationTest(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(BaseRBACMultiRoleRuleValidationTest, self).setUp()
|
||||
self.mock_test_args = mock.Mock(spec=test.BaseTestCase)
|
||||
self.mock_test_args.os_primary = mock.Mock(spec=manager.Manager)
|
||||
self.mock_test_args.rbac_utils = mock.Mock(
|
||||
spec_set=rbac_utils.RbacUtils)
|
||||
|
||||
# Setup credentials for mock client manager.
|
||||
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
|
||||
project_id=mock.sentinel.project_id)
|
||||
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
|
||||
|
||||
self.useFixture(
|
||||
patrole_fixtures.ConfPatcher(
|
||||
rbac_test_roles=['member', 'anotherrole'], group='patrole'))
|
||||
# Disable patrole log for unit tests.
|
||||
self.useFixture(
|
||||
patrole_fixtures.ConfPatcher(enable_reporting=False,
|
||||
group='patrole_log'))
|
||||
|
||||
|
||||
class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
||||
"""Test suite for validating fundamental functionality for the
|
||||
``rbac_rule_validation`` decorator.
|
||||
|
@ -118,8 +141,8 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
raise exceptions.Forbidden()
|
||||
|
||||
test_re = ("Role Member was not allowed to perform the following "
|
||||
"actions: \[%s\].*" % (mock.sentinel.action))
|
||||
test_re = ("User with roles \['member'\] was not allowed to perform "
|
||||
"the following actions: \[%s\].*" % (mock.sentinel.action))
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
|
||||
self.mock_test_args)
|
||||
|
@ -159,11 +182,10 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
raise rbac_exceptions.RbacMalformedResponse()
|
||||
|
||||
test_re = ("Role Member was not allowed to perform the following "
|
||||
"actions: \[%s\].*" % (mock.sentinel.action))
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
|
||||
self.mock_test_args)
|
||||
test_re = ("User with roles \['member'\] was not allowed to perform "
|
||||
"the following actions: \[%s\]. " % (mock.sentinel.action))
|
||||
self.assertRaisesRegex(rbac_exceptions.RbacUnderPermissionException,
|
||||
test_re, test_policy, self.mock_test_args)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
|
@ -214,8 +236,8 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
raise exceptions.NotFound()
|
||||
|
||||
expected_errors = [
|
||||
("Role Member was not allowed to perform the following "
|
||||
"actions: \['%s'\].*" % policy_names[0]),
|
||||
("User with roles \['member'\] was not allowed to perform the "
|
||||
"following actions: \['%s'\].*" % policy_names[0]),
|
||||
None
|
||||
]
|
||||
|
||||
|
@ -348,6 +370,108 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
mock_log.error.reset_mock()
|
||||
|
||||
|
||||
class RBACMultiRoleRuleValidationTest(BaseRBACMultiRoleRuleValidationTest,
|
||||
RBACRuleValidationTest):
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
|
||||
def test_rule_validation_forbidden_negative(self, mock_authority,
|
||||
mock_log):
|
||||
"""Test RbacUnderPermissionException error is thrown and have
|
||||
permission fails.
|
||||
|
||||
Negative test case: if Forbidden is thrown and the user should be
|
||||
allowed to perform the action, then the RbacUnderPermissionException
|
||||
exception should be raised.
|
||||
"""
|
||||
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
|
||||
|
||||
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
|
||||
def test_policy(*args):
|
||||
raise exceptions.Forbidden()
|
||||
|
||||
test_re = ("User with roles \['member', 'anotherrole'\] was not "
|
||||
"allowed to perform the following actions: \[%s\].*" %
|
||||
(mock.sentinel.action))
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
|
||||
self.mock_test_args)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
|
||||
def test_rule_validation_rbac_malformed_response_negative(
|
||||
self, mock_authority, mock_log):
|
||||
"""Test RbacMalformedResponse error is thrown with permission fails.
|
||||
|
||||
Negative test case: if RbacMalformedResponse is thrown and the user is
|
||||
allowed to perform the action, then this is an expected failure.
|
||||
"""
|
||||
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
|
||||
|
||||
@rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
|
||||
def test_policy(*args):
|
||||
raise rbac_exceptions.RbacMalformedResponse()
|
||||
|
||||
test_re = ("User with roles \['member', 'anotherrole'\] was not "
|
||||
"allowed to perform the following actions: \[%s\]. " %
|
||||
(mock.sentinel.action))
|
||||
self.assertRaisesRegex(rbac_exceptions.RbacUnderPermissionException,
|
||||
test_re, test_policy, self.mock_test_args)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
|
||||
def test_expect_not_found_and_raise_not_found(self, mock_authority,
|
||||
mock_log):
|
||||
"""Test that expecting 404 and getting 404 works for all scenarios.
|
||||
|
||||
Tests the following scenarios:
|
||||
1) Test no permission and 404 is expected and 404 is thrown succeeds.
|
||||
2) Test have permission and 404 is expected and 404 is thrown fails.
|
||||
|
||||
In both cases, a LOG.warning is called with the "irregular message"
|
||||
that signals to user that a 404 was expected and caught.
|
||||
"""
|
||||
policy_names = ['foo:bar']
|
||||
|
||||
@rbac_rv.action(mock.sentinel.service, rules=policy_names,
|
||||
expected_error_codes=[404])
|
||||
def test_policy(*args):
|
||||
raise exceptions.NotFound()
|
||||
|
||||
expected_errors = [
|
||||
("User with roles \['member', 'anotherrole'\] was not allowed to "
|
||||
"perform the following actions: \['%s'\].*" % policy_names[0]),
|
||||
None
|
||||
]
|
||||
|
||||
for pos, allowed in enumerate([True, False]):
|
||||
mock_authority.PolicyAuthority.return_value.allowed\
|
||||
.return_value = allowed
|
||||
|
||||
error_re = expected_errors[pos]
|
||||
|
||||
if error_re:
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
|
||||
else:
|
||||
test_policy(self.mock_test_args)
|
||||
mock_log.error.assert_not_called()
|
||||
|
||||
mock_log.warning.assert_called_with(
|
||||
"NotFound exception was caught for test %s. Expected policies "
|
||||
"which may have caused the error: %s. The service %s throws a "
|
||||
"404 instead of a 403, which is irregular",
|
||||
test_policy.__name__,
|
||||
', '.join(policy_names),
|
||||
mock.sentinel.service)
|
||||
|
||||
mock_log.warning.reset_mock()
|
||||
mock_log.error.reset_mock()
|
||||
|
||||
|
||||
class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
|
||||
"""Test class for validating the RBAC log, dedicated to just logging
|
||||
Patrole RBAC validation work flows.
|
||||
|
@ -422,7 +546,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
|
|||
policy_authority = mock_authority.PolicyAuthority.return_value
|
||||
policy_authority.allowed.assert_called_with(
|
||||
mock.sentinel.action,
|
||||
CONF.patrole.rbac_test_role)
|
||||
CONF.patrole.rbac_test_roles)
|
||||
|
||||
mock_log.error.assert_not_called()
|
||||
|
||||
|
@ -454,18 +578,23 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
|
|||
policy_authority = mock_authority.PolicyAuthority.return_value
|
||||
policy_authority.allowed.assert_called_with(
|
||||
"foo",
|
||||
CONF.patrole.rbac_test_role)
|
||||
CONF.patrole.rbac_test_roles)
|
||||
policy_authority.allowed.reset_mock()
|
||||
|
||||
test_bar_policy(self.mock_test_args)
|
||||
policy_authority = mock_authority.PolicyAuthority.return_value
|
||||
policy_authority.allowed.assert_called_with(
|
||||
"qux",
|
||||
CONF.patrole.rbac_test_role)
|
||||
CONF.patrole.rbac_test_roles)
|
||||
|
||||
mock_log.error.assert_not_called()
|
||||
|
||||
|
||||
class RBACMultiRoleRuleValidationLoggingTest(
|
||||
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationLoggingTest):
|
||||
pass
|
||||
|
||||
|
||||
class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -488,6 +617,11 @@ class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
|
|||
test_policy, self.mock_test_args)
|
||||
|
||||
|
||||
class RBACMultiRoleRuleValidationNegativeTest(
|
||||
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationNegativeTest):
|
||||
pass
|
||||
|
||||
|
||||
class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
||||
"""Test suite for validating multi-policy support for the
|
||||
``rbac_rule_validation`` decorator.
|
||||
|
@ -502,7 +636,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
def _assert_policy_authority_called_with(self, rules, mock_authority):
|
||||
m_authority = mock_authority.PolicyAuthority.return_value
|
||||
m_authority.allowed.assert_has_calls([
|
||||
mock.call(rule, CONF.patrole.rbac_test_role) for rule in rules
|
||||
mock.call(rule, CONF.patrole.rbac_test_roles) for rule in rules
|
||||
])
|
||||
|
||||
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
|
||||
|
@ -614,10 +748,10 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
mock_authority.PolicyAuthority.return_value.allowed\
|
||||
.return_value = True
|
||||
|
||||
error_re = ("Role Member was not allowed to perform the following "
|
||||
"actions: %s. Expected allowed actions: %s. Expected "
|
||||
"disallowed actions: []." % (rules, rules)).replace(
|
||||
'[', '\[').replace(']', '\]')
|
||||
error_re = ("User with roles ['member'] was not allowed to perform "
|
||||
"the following actions: %s. Expected allowed actions: %s. "
|
||||
"Expected disallowed actions: []." %
|
||||
(rules, rules)).replace('[', '\[').replace(']', '\]')
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
|
@ -739,6 +873,39 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
self.assertRaisesRegex(ValueError, error_re, _do_test, None, [404])
|
||||
|
||||
|
||||
class RBACMultiRoleRuleValidationTestMultiPolicy(
|
||||
BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationTestMultiPolicy):
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
|
||||
def test_rule_validation_multi_policy_forbidden_failure(
|
||||
self, mock_authority, mock_log):
|
||||
"""Test that when the expected result is authorized and the test
|
||||
fails (with a Forbidden error code) that the overall evaluation
|
||||
results in a RbacUnderPermissionException getting raised.
|
||||
"""
|
||||
|
||||
# NOTE: Avoid mock.sentinel here due to weird sorting with them.
|
||||
rules = ['action1', 'action2', 'action3']
|
||||
|
||||
@rbac_rv.action(mock.sentinel.service, rules=rules,
|
||||
expected_error_codes=[403, 403, 403])
|
||||
def test_policy(*args):
|
||||
raise exceptions.Forbidden()
|
||||
|
||||
mock_authority.PolicyAuthority.return_value.allowed\
|
||||
.return_value = True
|
||||
|
||||
error_re = ("User with roles ['member', 'anotherrole'] was not "
|
||||
"allowed to perform the following actions: %s. Expected "
|
||||
"allowed actions: %s. Expected disallowed actions: []." %
|
||||
(rules, rules)).replace('[', '\[').replace(']', '\]')
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
|
||||
self._assert_policy_authority_called_with(rules, mock_authority)
|
||||
|
||||
|
||||
class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
|
||||
"""Class for validating that untimely exceptions (outside
|
||||
``override_role`` is called) result in test failures.
|
||||
|
|
|
@ -52,7 +52,7 @@ class RBACUtilsTest(base.TestCase):
|
|||
self.rbac_utils.override_role()
|
||||
|
||||
mock_test_obj = self.rbac_utils.mock_test_obj
|
||||
roles_client = self.rbac_utils.roles_v3_client
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_called_once_with(
|
||||
|
@ -67,7 +67,7 @@ class RBACUtilsTest(base.TestCase):
|
|||
self.rbac_utils.set_roles(['admin', 'member'], 'admin')
|
||||
self.rbac_utils.override_role()
|
||||
|
||||
roles_client = self.rbac_utils.roles_v3_client
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_not_called()
|
||||
|
@ -77,7 +77,7 @@ class RBACUtilsTest(base.TestCase):
|
|||
self.rbac_utils.override_role(True)
|
||||
|
||||
mock_test_obj = self.rbac_utils.mock_test_obj
|
||||
roles_client = self.rbac_utils.roles_v3_client
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_has_calls([
|
||||
|
@ -96,7 +96,7 @@ class RBACUtilsTest(base.TestCase):
|
|||
self.rbac_utils.set_roles(['admin', 'member'], 'member')
|
||||
self.rbac_utils.override_role(True)
|
||||
|
||||
roles_client = self.rbac_utils.roles_v3_client
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_has_calls([
|
||||
|
@ -109,7 +109,7 @@ class RBACUtilsTest(base.TestCase):
|
|||
self.rbac_utils.override_role(True, False)
|
||||
|
||||
mock_test_obj = self.rbac_utils.mock_test_obj
|
||||
roles_client = self.rbac_utils.roles_v3_client
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_has_calls([
|
||||
|
@ -133,7 +133,7 @@ class RBACUtilsTest(base.TestCase):
|
|||
self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
|
||||
self.rbac_utils.override_role()
|
||||
|
||||
roles_client = self.rbac_utils.roles_v3_client
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
|
||||
roles_client.list_user_roles_on_project.assert_called_once_with(
|
||||
self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID)
|
||||
|
@ -169,7 +169,8 @@ class RBACUtilsTest(base.TestCase):
|
|||
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
|
||||
False)
|
||||
|
||||
@mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
|
||||
@mock.patch.object(rbac_utils.RbacUtils, '_override_role',
|
||||
autospec=True)
|
||||
def test_override_role_context_manager_simulate_fail(self,
|
||||
mock_override_role):
|
||||
"""Validate that expected override_role calls are made when switching
|
||||
|
|
|
@ -38,11 +38,11 @@ class RequirementsAuthorityTest(base.TestCase):
|
|||
def test_auth_allowed_empty_roles(self):
|
||||
self.rbac_auth.roles_dict = None
|
||||
self.assertRaises(exceptions.InvalidConfiguration,
|
||||
self.rbac_auth.allowed, "", "")
|
||||
self.rbac_auth.allowed, "", [""])
|
||||
|
||||
def test_auth_allowed_role_in_api(self):
|
||||
self.rbac_auth.roles_dict = {'api': ['_member_']}
|
||||
self.assertTrue(self.rbac_auth.allowed("api", "_member_"))
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["_member_"]))
|
||||
|
||||
def test_auth_allowed_role_not_in_api(self):
|
||||
self.rbac_auth.roles_dict = {'api': ['_member_']}
|
||||
|
@ -64,7 +64,8 @@ class RequirementsAuthorityTest(base.TestCase):
|
|||
self.rbac_auth.roles_dict = req_auth.RequirementsParser.parse("Test")
|
||||
|
||||
self.assertEqual(self.expected_result, self.rbac_auth.roles_dict)
|
||||
self.assertTrue(self.rbac_auth.allowed("test:create2", "test_member"))
|
||||
self.assertTrue(
|
||||
self.rbac_auth.allowed("test:create2", ["test_member"]))
|
||||
|
||||
def test_parser_role_not_in_api(self):
|
||||
req_auth.RequirementsParser.Inner._rbac_map = \
|
||||
|
@ -82,4 +83,4 @@ class RequirementsAuthorityTest(base.TestCase):
|
|||
|
||||
self.assertIsNone(self.rbac_auth.roles_dict)
|
||||
self.assertRaises(exceptions.InvalidConfiguration,
|
||||
self.rbac_auth.allowed, "", "")
|
||||
self.rbac_auth.allowed, "", [""])
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
We have replaced CONF.patrole.rbac_test_role with
|
||||
CONF.patrole.rbac_test_roles, where instead of single role we can specify
|
||||
list of roles to be assigned to test user. This way we may run rbac tests
|
||||
for scenarios that requires user to have more that a single role.
|
||||
deprecations:
|
||||
- |
|
||||
Config parameter CONF.rbac_test_role is deprecated in favor of
|
||||
CONF.rbac_test_roles that implements a list of roles instead of single role.
|
Loading…
Reference in New Issue