Refactoring RbacUtils

Remove RbacUtils class and move all functionality to RbacUtilsMixin.

Story: 2002604
Task: 22223

Change-Id: If476be8fd3df78b28669ca940ebeb288af534899
This commit is contained in:
Sergey Vilgelm 2018-11-19 16:25:10 -06:00
parent 9866753ca8
commit ace8ea37c8
No known key found for this signature in database
GPG Key ID: 08D0E2FF778887E6
13 changed files with 366 additions and 435 deletions

View File

@ -345,7 +345,7 @@ def _is_authorized(test_obj, service, rule, extra_target_data):
roles.append(CONF.patrole.rbac_test_role)
# Adding implied roles
roles = test_obj.rbac_utils.get_all_needed_roles(roles)
roles = test_obj.get_all_needed_roles(roles)
# Test RBAC against custom requirements. Otherwise use oslo.policy.
if CONF.patrole.test_custom_requirements:

View File

@ -102,8 +102,23 @@ class _ValidateListContext(object):
self._validate_func()
class RbacUtils(object):
"""Utility class responsible for switching ``os_primary`` role.
class RbacUtilsMixin(object):
"""Utility mixin responsible for switching ``os_primary`` role.
Should be used as a mixin class alongside an instance of
:py:class:`tempest.test.BaseTestCase` to perform Patrole class setup for a
base RBAC class. Child classes should not use this mixin.
Example::
class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
@classmethod
def setup_clients(cls):
super(BaseRbacTest, cls).setup_clients()
cls.hosts_client = cls.os_primary.hosts_client
...
This class is responsible for overriding the value of the primary Tempest
credential's role (i.e. ``os_primary`` role). By doing so, it is possible
@ -114,15 +129,29 @@ class RbacUtils(object):
``CONF.patrole.rbac_test_roles``.
"""
def __init__(self, test_obj):
"""Constructor for ``RbacUtils``.
def __init__(self, *args, **kwargs):
super(RbacUtilsMixin, self).__init__(*args, **kwargs)
:param test_obj: An instance of `tempest.test.BaseTestCase`.
"""
self.admin_role_id = None
self.rbac_role_ids = None
self._role_map = None
# Shows if override_role was called.
self.__override_role_called = False
# Shows if exception raised during override_role.
self.__override_role_caught_exc = False
_admin_role_id = None
_rbac_role_ids = None
_project_id = None
_user_id = None
_role_map = None
_role_inferences_mapping = None
admin_roles_client = None
@property
def rbac_utils(self):
return self
@classmethod
def setup_clients(cls):
# Intialize the admin roles_client to perform role switching.
admin_mgr = clients.Manager(
credentials.get_configured_admin_credentials())
@ -132,16 +161,20 @@ class RbacUtils(object):
raise lib_exc.InvalidConfiguration(
"Patrole role overriding only supports v3 identity API.")
self.admin_roles_client = admin_roles_client
cls.admin_roles_client = admin_roles_client
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
self._role_inferences_mapping = self._prepare_role_inferences_mapping()
cls._project_id = cls.os_primary.credentials.tenant_id
cls._user_id = cls.os_primary.credentials.user_id
cls._role_inferences_mapping = cls._prepare_role_inferences_mapping()
cls._init_roles()
# Change default role to admin
self._override_role(test_obj, False)
cls._override_role(False)
super(RbacUtilsMixin, cls).setup_clients()
def _prepare_role_inferences_mapping(self):
@classmethod
def _prepare_role_inferences_mapping(cls):
"""Preparing roles mapping to support role inferences
Making query to `list-all-role-inference-rules`_ keystone API
@ -186,7 +219,7 @@ class RbacUtils(object):
res[prior_role] = implies
return res
raw_data = self.admin_roles_client.list_all_role_inference_rules()
raw_data = cls.admin_roles_client.list_all_role_inference_rules()
data = convert_data(raw_data['role_inferences'])
res = {}
for role_id in data:
@ -207,15 +240,17 @@ class RbacUtils(object):
"""
res = set(r for r in roles)
for role in res.copy():
role_id = self._role_map.get(role)
implied_roles = self._role_inferences_mapping.get(role_id, set())
role_names = {self._role_map[rid] for rid in implied_roles}
role_id = self.__class__._role_map.get(role)
implied_roles = self.__class__._role_inferences_mapping.get(
role_id, set())
role_names = {self.__class__._role_map[rid]
for rid in implied_roles}
res.update(role_names)
LOG.debug('All needed roles: %s; Base roles: %s', res, roles)
return list(res)
@contextlib.contextmanager
def override_role(self, test_obj):
def override_role(self, test_obj=None):
"""Override the role used by ``os_primary`` Tempest credentials.
Temporarily change the role used by ``os_primary`` credentials to:
@ -225,7 +260,6 @@ class RbacUtils(object):
Automatically switches to admin role after test execution.
:param test_obj: Instance of ``tempest.test.BaseTestCase``.
:returns: None
.. warning::
@ -248,8 +282,8 @@ class RbacUtils(object):
# if the API call above threw an exception, any code below this
# point in the test is not executed.
"""
test_obj._set_override_role_called()
self._override_role(test_obj, True)
self._set_override_role_called()
self._override_role(True)
try:
# Execute the test.
yield
@ -258,16 +292,16 @@ class RbacUtils(object):
# for future validation.
exc = sys.exc_info()[0]
if exc is not None:
test_obj._set_override_role_caught_exc()
self._set_override_role_caught_exc()
# This code block is always executed, no matter the result of the
# test. Automatically switch back to the admin role for test clean
# up.
self._override_role(test_obj, False)
self._override_role(False)
def _override_role(self, test_obj, toggle_rbac_role=False):
@classmethod
def _override_role(cls, toggle_rbac_role=False):
"""Private helper for overriding ``os_primary`` Tempest credentials.
:param test_obj: instance of :py:class:`tempest.test.BaseTestCase`
:param toggle_rbac_role: Boolean value that controls the role that
overrides default role of ``os_primary`` credentials.
* If True: role is set to ``[patrole] rbac_test_role``
@ -277,22 +311,19 @@ class RbacUtils(object):
roles_already_present = False
try:
if not all([self.admin_role_id, self.rbac_role_ids]):
self._get_roles_by_name()
target_roles = (self.rbac_role_ids
if toggle_rbac_role else [self.admin_role_id])
roles_already_present = self._list_and_clear_user_roles_on_project(
target_roles = (cls._rbac_role_ids
if toggle_rbac_role else [cls._admin_role_id])
roles_already_present = cls._list_and_clear_user_roles_on_project(
target_roles)
# Do not override roles if `target_role` already exists.
if not roles_already_present:
self._create_user_role_on_project(target_roles)
cls._create_user_role_on_project(target_roles)
except Exception as exp:
with excutils.save_and_reraise_exception():
LOG.exception(exp)
finally:
auth_providers = test_obj.get_auth_providers()
auth_providers = cls.get_auth_providers()
for provider in auth_providers:
provider.clear_auth()
# Fernet tokens are not subsecond aware so sleep to ensure we are
@ -306,10 +337,11 @@ class RbacUtils(object):
for provider in auth_providers:
provider.set_auth()
def _get_roles_by_name(self):
available_roles = self.admin_roles_client.list_roles()['roles']
self._role_map = {r['name']: r['id'] for r in available_roles}
LOG.debug('Available roles: %s', list(self._role_map.keys()))
@classmethod
def _init_roles(cls):
available_roles = cls.admin_roles_client.list_roles()['roles']
cls._role_map = {r['name']: r['id'] for r in available_roles}
LOG.debug('Available roles: %s', cls._role_map.keys())
rbac_role_ids = []
roles = CONF.patrole.rbac_test_roles
@ -319,9 +351,9 @@ class RbacUtils(object):
roles.append(CONF.patrole.rbac_test_role)
for role_name in roles:
rbac_role_ids.append(self._role_map.get(role_name))
rbac_role_ids.append(cls._role_map.get(role_name))
admin_role_id = self._role_map.get(CONF.identity.admin_role)
admin_role_id = cls._role_map.get(CONF.identity.admin_role)
if not all([admin_role_id, all(rbac_role_ids)]):
missing_roles = []
@ -332,27 +364,28 @@ class RbacUtils(object):
missing_roles.append(CONF.identity.admin_role)
if not all(rbac_role_ids):
missing_roles += [role_name for role_name in roles
if not self._role_map.get(role_name)]
if role_name not in cls._role_map]
msg += " Following roles were not found: %s." % (
", ".join(missing_roles))
msg += " Available roles: %s." % ", ".join(list(
self._role_map.keys()))
msg += " Available roles: %s." % ", ".join(cls._role_map)
raise rbac_exceptions.RbacResourceSetupFailed(msg)
self.admin_role_id = admin_role_id
self.rbac_role_ids = rbac_role_ids
cls._admin_role_id = admin_role_id
cls._rbac_role_ids = rbac_role_ids
# Adding backward mapping
self._role_map.update({v: k for k, v in self._role_map.items()})
cls._role_map.update({v: k for k, v in cls._role_map.items()})
def _create_user_role_on_project(self, role_ids):
@classmethod
def _create_user_role_on_project(cls, role_ids):
for role_id in role_ids:
self.admin_roles_client.create_user_role_on_project(
self.project_id, self.user_id, role_id)
cls.admin_roles_client.create_user_role_on_project(
cls._project_id, cls._user_id, role_id)
def _list_and_clear_user_roles_on_project(self, role_ids):
roles = self.admin_roles_client.list_user_roles_on_project(
self.project_id, self.user_id)['roles']
@classmethod
def _list_and_clear_user_roles_on_project(cls, role_ids):
roles = cls.admin_roles_client.list_user_roles_on_project(
cls._project_id, cls._user_id)['roles']
all_role_ids = [role['id'] for role in roles]
# NOTE(felipemonteiro): We do not use ``role_id in all_role_ids`` here
@ -364,13 +397,14 @@ class RbacUtils(object):
return True
for role in roles:
self.admin_roles_client.delete_role_from_user_on_project(
self.project_id, self.user_id, role['id'])
cls.admin_roles_client.delete_role_from_user_on_project(
cls._project_id, cls._user_id, role['id'])
return False
@contextlib.contextmanager
def override_role_and_validate_list(self, test_obj, admin_resources=None,
def override_role_and_validate_list(self, test_obj=None,
admin_resources=None,
admin_resource_id=None):
"""Call ``override_role`` and validate RBAC for a list API action.
@ -400,38 +434,10 @@ class RbacUtils(object):
policy_id=self.policy_id)["dscp_marking_rules"]
"""
ctx = _ValidateListContext(admin_resources, admin_resource_id)
with self.override_role(test_obj):
with self.override_role():
yield ctx
ctx._validate()
class RbacUtilsMixin(object):
"""Mixin class to be used alongside an instance of
:py:class:`tempest.test.BaseTestCase`.
Should be used to perform Patrole class setup for a base RBAC class. Child
classes should not use this mixin.
Example::
class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
@classmethod
def skip_checks(cls):
super(BaseRbacTest, cls).skip_checks()
cls.skip_rbac_checks()
@classmethod
def setup_clients(cls):
super(BaseRbacTest, cls).setup_clients()
cls.setup_rbac_utils()
"""
# Shows if override_role was called.
__override_role_called = False
# Shows if exception raised during override_role.
__override_role_caught_exc = False
@classmethod
def get_auth_providers(cls):
"""Returns list of auth_providers used within test.
@ -441,10 +447,6 @@ class RbacUtilsMixin(object):
"""
return [cls.os_primary.auth_provider]
@classmethod
def setup_rbac_utils(cls):
cls.rbac_utils = RbacUtils(cls)
def _set_override_role_called(self):
"""Helper for tracking whether ``override_role`` was called."""
self.__override_role_called = True

View File

@ -24,7 +24,6 @@ class BaseV2ComputeRbacTest(rbac_utils.RbacUtilsMixin,
@classmethod
def setup_clients(cls):
super(BaseV2ComputeRbacTest, cls).setup_clients()
cls.setup_rbac_utils()
cls.hosts_client = cls.os_primary.hosts_client
cls.tenant_usages_client = cls.os_primary.tenant_usages_client
cls.networks_client = cls.os_primary.networks_client

View File

@ -27,11 +27,6 @@ LOG = logging.getLogger(__name__)
class BaseIdentityRbacTest(rbac_utils.RbacUtilsMixin,
base.BaseIdentityTest):
@classmethod
def setup_clients(cls):
super(BaseIdentityRbacTest, cls).setup_clients()
cls.setup_rbac_utils()
@classmethod
def setup_test_endpoint(cls, service=None):
"""Creates a service and an endpoint for test."""

View File

@ -18,8 +18,4 @@ from patrole_tempest_plugin import rbac_utils
class BaseV2ImageRbacTest(rbac_utils.RbacUtilsMixin,
image_base.BaseV2ImageTest):
@classmethod
def setup_clients(cls):
super(BaseV2ImageRbacTest, cls).setup_clients()
cls.setup_rbac_utils()
pass

View File

@ -23,11 +23,7 @@ from patrole_tempest_plugin import rbac_utils
class BaseNetworkRbacTest(rbac_utils.RbacUtilsMixin,
network_base.BaseNetworkTest):
@classmethod
def setup_clients(cls):
super(BaseNetworkRbacTest, cls).setup_clients()
cls.setup_rbac_utils()
pass
class BaseNetworkExtRbacTest(BaseNetworkRbacTest):

View File

@ -31,7 +31,6 @@ class BaseVolumeRbacTest(rbac_utils.RbacUtilsMixin,
@classmethod
def setup_clients(cls):
super(BaseVolumeRbacTest, cls).setup_clients()
cls.setup_rbac_utils()
cls.volume_hosts_client = cls.os_primary.volume_hosts_client_latest
cls.volume_types_client = cls.os_primary.volume_types_client_latest
cls.groups_client = cls.os_primary.groups_client_latest

View File

@ -16,17 +16,15 @@
from tempest.tests import base
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
class TestCase(base.TestCase):
"""Test case base class for all unit tests."""
def get_all_needed_roles(self, roles):
role_inferences_mapping = {
"admin": {"member", "reader"},
"member": {"reader"}
}
res = set(r.lower() for r in roles)
for role in res.copy():
res.update(role_inferences_mapping.get(role, set()))
return list(res)
def setUp(self):
super(TestCase, self).setUp()
# Disable patrole log for unit tests.
self.useFixture(
patrole_fixtures.ConfPatcher(enable_reporting=False,
group='patrole_log'))

View File

@ -16,12 +16,10 @@
"""Fixtures for Patrole tests."""
from __future__ import absolute_import
from contextlib import contextmanager
import fixtures
import mock
import time
from tempest import clients
from tempest.common import credentials_factory as credentials
from tempest import config
from tempest import test
@ -57,81 +55,84 @@ class ConfPatcher(fixtures.Fixture):
CONF.set_override(k, v, self.group)
class RbacUtilsFixture(fixtures.Fixture):
class FakeBaseRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
os_primary = None
def runTest(self):
pass
class RbacUtilsMixinFixture(fixtures.Fixture):
"""Fixture for `RbacUtils` class."""
USER_ID = mock.sentinel.user_id
PROJECT_ID = mock.sentinel.project_id
def setUp(self):
super(RbacUtilsFixture, self).setUp()
def __init__(self, do_reset_mocks=True, rbac_test_roles=None):
self._do_reset_mocks = do_reset_mocks
self._rbac_test_roles = rbac_test_roles or ['member']
self.useFixture(ConfPatcher(rbac_test_roles=['member'],
def patchobject(self, target, attribute, *args, **kwargs):
p = mock.patch.object(target, attribute, *args, **kwargs)
m = p.start()
self.addCleanup(p.stop)
return m
def setUp(self):
super(RbacUtilsMixinFixture, self).setUp()
self.useFixture(ConfPatcher(rbac_test_roles=self._rbac_test_roles,
group='patrole'))
self.useFixture(ConfPatcher(
admin_role='admin', auth_version='v3', group='identity'))
self.useFixture(ConfPatcher(
api_v3=True, group='identity-feature-enabled'))
test_obj_kwargs = {
'os_primary.credentials.user_id': self.USER_ID,
'os_primary.credentials.tenant_id': self.PROJECT_ID,
'os_primary.credentials.project_id': self.PROJECT_ID,
}
self.mock_test_obj = mock.Mock(
__name__='patrole_unit_test', spec=test.BaseTestCase,
os_primary=mock.Mock(),
get_auth_providers=mock.Mock(return_value=[mock.Mock()]),
**test_obj_kwargs)
# Mock out functionality that can't be used by unit tests. Mocking out
# time.sleep is a test optimization.
self.mock_time = mock.patch.object(
rbac_utils, 'time', __name__='mock_time', spec=time).start()
mock.patch.object(credentials, 'get_configured_admin_credentials',
spec=object).start()
mock_admin_mgr = mock.patch.object(
clients, 'Manager', spec=clients.Manager,
roles_v3_client=mock.Mock(), roles_client=mock.Mock()).start()
self.mock_time = self.patchobject(rbac_utils, 'time',
__name__='mock_time', spec=time)
self.patchobject(credentials, 'get_configured_admin_credentials',
spec=object)
mock_admin_mgr = self.patchobject(rbac_utils.clients, 'Manager',
spec=rbac_utils.clients.Manager,
roles_v3_client=mock.Mock(),
roles_client=mock.Mock())
self.admin_roles_client = mock_admin_mgr.return_value.roles_v3_client
self.admin_roles_client.list_all_role_inference_rules.return_value = {
"role_inferences": []}
"role_inferences": [
{
"implies": [{"id": "reader_id", "name": "reader"}],
"prior_role": {"id": "member_id", "name": "member"}
},
{
"implies": [{"id": "member_id", "name": "member"}],
"prior_role": {"id": "admin_id", "name": "admin"}
}
]
}
self.set_roles(['admin', 'member'], [])
default_roles = {'admin', 'member', 'reader'}.union(
set(self._rbac_test_roles))
self.set_roles(list(default_roles), [])
def override_role(self, *role_toggles):
"""Instantiate `rbac_utils.RbacUtils` and call `override_role`.
test_obj_kwargs = {
'credentials.user_id': self.USER_ID,
'credentials.tenant_id': self.PROJECT_ID,
'credentials.project_id': self.PROJECT_ID,
}
Create an instance of `rbac_utils.RbacUtils` and call `override_role`
for each boolean value in `role_toggles`. The number of calls to
`override_role` is always 1 + len(`role_toggles`) because the
`rbac_utils.RbacUtils` constructor automatically calls `override_role`.
class FakeRbacTest(FakeBaseRbacTest):
os_primary = mock.Mock()
:param role_toggles: the list of boolean values iterated over and
passed to `override_role`.
"""
_rbac_utils = rbac_utils.RbacUtils(self.mock_test_obj)
FakeRbacTest.os_primary.configure_mock(**test_obj_kwargs)
for role_toggle in role_toggles:
_rbac_utils._override_role(self.mock_test_obj, role_toggle)
# NOTE(felipemonteiro): Simulate that a role switch has occurred
# by updating the user's current role to the new role. This means
# that all API actions involved during a role switch -- listing,
# deleting and adding roles -- are executed, making it easier to
# assert that mock calls were called as expected.
new_role = 'member' if role_toggle else 'admin'
self.set_roles(['admin', 'member'], [new_role])
@contextmanager
def real_override_role(self, test_obj):
"""Actual call to ``override_role``.
Useful for ensuring all the necessary mocks are performed before
the method in question is called.
"""
_rbac_utils = rbac_utils.RbacUtils(test_obj)
with _rbac_utils.override_role(test_obj):
yield
FakeRbacTest.setUpClass()
self.test_obj = FakeRbacTest()
if self._do_reset_mocks:
self.admin_roles_client.reset_mock()
self.test_obj.os_primary.reset_mock()
self.mock_time.reset_mock()
def set_roles(self, roles, roles_on_project=None):
"""Set the list of available roles in the system.
@ -159,28 +160,3 @@ class RbacUtilsFixture(fixtures.Fixture):
self.admin_roles_client.list_roles.return_value = available_roles
self.admin_roles_client.list_user_roles_on_project.return_value = (
available_project_roles)
def get_all_needed_roles(self, roles):
self.admin_roles_client.list_all_role_inference_rules.return_value = {
"role_inferences": [
{
"implies": [{"id": "3", "name": "reader"}],
"prior_role": {"id": "2", "name": "member"}
},
{
"implies": [{"id": "2", "name": "member"}],
"prior_role": {"id": "1", "name": "admin"}
}
]
}
# Call real get_all_needed_roles function
with mock.patch.object(rbac_utils.RbacUtils, '_override_role',
autospec=True):
obj = rbac_utils.RbacUtils(mock.Mock())
obj._role_map = {
"1": "admin", "admin": "1",
"2": "member", "member": "2",
"3": "reader", "reader": "3"
}
return obj.get_all_needed_roles(roles)

View File

@ -76,6 +76,9 @@ class PolicyAuthorityTest(base.TestCase):
if attr in dir(policy_authority.PolicyAuthority):
delattr(policy_authority.PolicyAuthority, attr)
self.test_obj = self.useFixture(fixtures.RbacUtilsMixinFixture()).\
test_obj
@staticmethod
def _get_fake_policies(rules):
fake_rules = []
@ -96,7 +99,7 @@ class PolicyAuthorityTest(base.TestCase):
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, service)
roles = self.get_all_needed_roles(roles)
roles = self.test_obj.get_all_needed_roles(roles)
for rule in allowed_rules:
allowed = authority.allowed(rule, roles)
@ -289,7 +292,7 @@ class PolicyAuthorityTest(base.TestCase):
for rule in allowed_rules:
allowed = authority.allowed(
rule, self.get_all_needed_roles(['member']))
rule, self.test_obj.get_all_needed_roles(['member']))
self.assertTrue(allowed)
# for sure that roles are in same order
mock_try_rule.call_args[0][2]["roles"] = sorted(

View File

@ -20,12 +20,9 @@ from oslo_config import cfg
import fixtures
from tempest.lib import exceptions
from tempest import manager
from tempest import test
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_rule_validation as rbac_rv
from patrole_tempest_plugin import rbac_utils
from patrole_tempest_plugin.tests.unit import base
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
@ -38,25 +35,10 @@ class BaseRBACRuleValidationTest(base.TestCase):
def setUp(self):
super(BaseRBACRuleValidationTest, self).setUp()
self.mock_test_args = mock.Mock(spec=test.BaseTestCase)
self.mock_test_args.os_primary = mock.Mock(spec=manager.Manager)
self.mock_test_args.rbac_utils = mock.Mock(
spec_set=rbac_utils.RbacUtils)
self.mock_test_args.rbac_utils.get_all_needed_roles.side_effect = \
self.get_all_needed_roles
# Setup credentials for mock client manager.
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
project_id=mock.sentinel.project_id)
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
self.useFixture(
patrole_fixtures.ConfPatcher(rbac_test_roles=self.test_roles,
group='patrole'))
# Disable patrole log for unit tests.
self.useFixture(
patrole_fixtures.ConfPatcher(enable_reporting=False,
group='patrole_log'))
self.rbac_utils_fixture = self.useFixture(
patrole_fixtures.RbacUtilsMixinFixture(
rbac_test_roles=self.test_roles))
self.test_obj = self.rbac_utils_fixture.test_obj
class BaseRBACMultiRoleRuleValidationTest(BaseRBACRuleValidationTest):
@ -89,7 +71,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
def test_policy(*args):
pass
test_policy(self.mock_test_args)
test_policy(self.test_obj)
mock_log.error.assert_not_called()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@ -107,7 +89,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
def test_policy(*args):
raise exceptions.Forbidden()
test_policy(self.mock_test_args)
test_policy(self.test_obj)
mock_log.error.assert_not_called()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@ -131,7 +113,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
"the following actions: \[%s\].*" % (mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
self.mock_test_args)
self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@ -188,7 +170,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
_do_test(rbac_exceptions.RbacMissingAttributeResponseBody,
@ -221,7 +203,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
return_value = allowed
self.assertRaisesRegex(
rbac_exceptions.RbacExpectedWrongException, error_re,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
self.assertTrue(mock_log.error.called)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@ -259,10 +241,10 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
if error_re:
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
else:
test_policy(self.mock_test_args)
test_policy(self.test_obj)
mock_log.error.assert_not_called()
mock_log.warning.assert_called_with(
@ -302,7 +284,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
error_re = ".*OverPermission: .* \[%s\]$" % mock.sentinel.action
self.assertRaisesRegex(rbac_exceptions.RbacOverPermissionException,
error_re, test_policy, self.mock_test_args)
error_re, test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
mock_log.error.reset_mock()
@ -320,7 +302,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
error_re = 'Attempted to test an invalid policy file or action'
self.assertRaisesRegex(rbac_exceptions.RbacParsingException, error_re,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
mock_authority.PolicyAuthority.assert_called_once_with(
mock.sentinel.project_id, mock.sentinel.user_id,
@ -403,7 +385,7 @@ class RBACMultiRoleRuleValidationTest(BaseRBACMultiRoleRuleValidationTest,
(mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
self.mock_test_args)
self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@ -441,10 +423,10 @@ class RBACMultiRoleRuleValidationTest(BaseRBACMultiRoleRuleValidationTest,
if error_re:
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
else:
test_policy(self.mock_test_args)
test_policy(self.test_obj)
mock_log.error.assert_not_called()
mock_log.warning.assert_called_with(
@ -486,7 +468,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
def test_policy(*args):
pass
test_policy(self.mock_test_args)
test_policy(self.test_obj)
self.assertFalse(mock_rbaclog.info.called)
@mock.patch.object(rbac_rv, 'RBACLOG', autospec=True)
@ -506,7 +488,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
def test_policy(*args):
pass
test_policy(self.mock_test_args)
test_policy(self.test_obj)
mock_rbaclog.info.assert_called_once_with(
"[Service]: %s, [Test]: %s, [Rules]: %s, "
"[Expected]: %s, [Actual]: %s",
@ -528,12 +510,12 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
def test_policy(*args):
pass
test_policy(self.mock_test_args)
test_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
mock.sentinel.action,
self.get_all_needed_roles(CONF.patrole.rbac_test_roles))
self.test_obj.get_all_needed_roles(CONF.patrole.rbac_test_roles))
mock_log.error.assert_not_called()
@ -545,7 +527,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
evaluated correctly.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
expected_roles = self.get_all_needed_roles(
expected_roles = self.test_obj.get_all_needed_roles(
CONF.patrole.rbac_test_roles)
def partial_func(x):
@ -563,14 +545,14 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
def test_bar_policy(*args):
pass
test_foo_policy(self.mock_test_args)
test_foo_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"foo",
expected_roles)
policy_authority.allowed.reset_mock()
test_bar_policy(self.mock_test_args)
test_bar_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"qux",
@ -603,7 +585,7 @@ class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
pass
self.assertRaises(rbac_exceptions.RbacInvalidServiceException,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
class RBACMultiRoleRuleValidationNegativeTest(
@ -627,7 +609,8 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
m_authority.allowed.assert_has_calls([
mock.call(
rule,
self.get_all_needed_roles(CONF.patrole.rbac_test_roles)
self.test_obj.get_all_needed_roles(
CONF.patrole.rbac_test_roles)
) for rule in rules
])
m_authority.allowed.reset_mock()
@ -648,7 +631,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
def test_policy(*args):
pass
test_policy(self.mock_test_args)
test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@ -677,7 +660,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
error_re = ".*OverPermission: .* \[%s\]$" % fail_on_action
self.assertRaisesRegex(
rbac_exceptions.RbacOverPermissionException, error_re,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
mock_log.debug.assert_any_call(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
@ -710,7 +693,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.mock_test_args)
test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
@ -747,7 +730,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
(rules, rules)).replace('[', '\[').replace(']', '\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
self._assert_policy_authority_called_with(rules, mock_authority)
@ -773,7 +756,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.mock_test_args)
test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
@ -806,7 +789,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
test_policy(self.mock_test_args)
test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 404, fail_on_action)
@ -833,7 +816,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
def test_policy(*args):
raise exceptions.Forbidden()
test_policy(self.mock_test_args)
test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
# Assert that 403 is expected.
mock_calls = [x[1] for x in mock_log.debug.mock_calls]
@ -847,7 +830,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
def test_policy(*args):
raise exceptions.Forbidden()
test_policy(self.mock_test_args)
test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
# Assert that 403 is expected.
mock_calls = [x[1] for x in mock_log.debug.mock_calls]
@ -932,7 +915,7 @@ class RBACMultiRoleRuleValidationTestMultiPolicy(
(rules, rules)).replace('[', '\[').replace(']', '\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.mock_test_args)
test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
self._assert_policy_authority_called_with(rules, mock_authority)
@ -951,21 +934,7 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
def setUp(self):
super(RBACOverrideRoleValidationTest, self).setUp()
# Mixin automatically initializes __override_role_called to False.
class FakeRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
def runTest(self):
pass
# Stub out problematic function calls.
FakeRbacTest.os_primary = mock.Mock(spec=manager.Manager)
FakeRbacTest.rbac_utils = self.useFixture(
patrole_fixtures.RbacUtilsFixture())
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
project_id=mock.sentinel.project_id)
setattr(FakeRbacTest.os_primary, 'credentials', mock_creds)
setattr(FakeRbacTest.os_primary, 'auth_provider', mock.Mock())
self.parent_class = FakeRbacTest
self.parent_class = self.test_obj.__class__
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_called_inside_ctx(self,
@ -981,7 +950,7 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called(self_):
with self_.rbac_utils.real_override_role(self_):
with self_.override_role():
raise exceptions.NotFound()
child_test = ChildRbacTest()
@ -991,8 +960,8 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
def test_rule_validation_override_role_patrole_exception_ignored(
self, mock_authority):
"""Test success case where Patrole exception is raised (which is
valid in case of e.g. RbacPartialResponseBody) after override_role
passes.
valid in case of e.g. BasePatroleResponseBodyException) after
override_role passes.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
True
@ -1002,14 +971,14 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called(self_):
with self_.rbac_utils.real_override_role(self_):
with self_.override_role():
pass
# Instances of BasePatroleException don't count as they are
# part of the validation work flow.
raise rbac_exceptions.BasePatroleException()
# Instances of BasePatroleResponseBodyException don't count as
# they are part of the validation work flow.
raise rbac_exceptions.BasePatroleResponseBodyException()
child_test = ChildRbacTest()
self.assertRaises(rbac_exceptions.BasePatroleException,
self.assertRaises(rbac_exceptions.RbacUnderPermissionException,
child_test.test_called)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
@ -1057,7 +1026,7 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called_after(self_):
with self_.rbac_utils.real_override_role(self_):
with self_.override_role():
pass
# Simulates a test tearDown failure or some such.
raise exception_type()
@ -1098,7 +1067,7 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule1"],
expected_error_codes=[404])
def test_called(self_):
with self_.rbac_utils.real_override_role(self_):
with self_.override_role():
raise exceptions.NotFound()
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule2"],

View File

@ -17,7 +17,6 @@ import mock
import testtools
from tempest.lib import exceptions as lib_exc
from tempest import test
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_utils
@ -25,118 +24,132 @@ from patrole_tempest_plugin.tests.unit import base
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
class RBACUtilsTest(base.TestCase):
class RBACUtilsMixinTest(base.TestCase):
def setUp(self):
super(RBACUtilsTest, self).setUp()
# Reset the role history after each test run to avoid validation
# errors between tests.
rbac_utils.RbacUtils.override_role_history = {}
self.rbac_utils = self.useFixture(patrole_fixtures.RbacUtilsFixture())
super(RBACUtilsMixinTest, self).setUp()
self.rbac_utils_fixture = self.useFixture(
patrole_fixtures.RbacUtilsMixinFixture())
self.test_obj = self.rbac_utils_fixture.test_obj
def test_override_role_with_missing_admin_role(self):
self.rbac_utils.set_roles('member')
def test_init_roles_with_missing_admin_role(self):
self.rbac_utils_fixture.set_roles('member')
error_re = (".*Following roles were not found: admin. Available "
"roles: member.")
self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
error_re, self.rbac_utils.override_role)
error_re, self.test_obj._init_roles)
def test_override_role_with_missing_rbac_role(self):
self.rbac_utils.set_roles('admin')
def test_init_roles_with_missing_rbac_role(self):
self.rbac_utils_fixture.set_roles('admin')
error_re = (".*Following roles were not found: member. Available "
"roles: admin.")
self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
error_re, self.rbac_utils.override_role)
error_re, self.test_obj._init_roles)
def test_override_role_to_admin_role(self):
self.rbac_utils.override_role()
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
def test_override_role_to_admin_role_at_creating(self):
rbac_utils_fixture = self.useFixture(
patrole_fixtures.RbacUtilsMixinFixture(do_reset_mocks=False))
test_obj = rbac_utils_fixture.test_obj
roles_client = rbac_utils_fixture.admin_roles_client
mock_time = rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_called_once_with(
self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID, 'admin_id')
mock_test_obj.get_auth_providers()[0].clear_auth\
rbac_utils_fixture.PROJECT_ID,
rbac_utils_fixture.USER_ID,
'admin_id')
test_obj.get_auth_providers()[0].clear_auth.assert_called_once_with()
test_obj.get_auth_providers()[0].set_auth.assert_called_once_with()
mock_time.sleep.assert_called_once_with(1)
def test_override_role_to_admin_role(self):
self.test_obj._override_role()
roles_client = self.rbac_utils_fixture.admin_roles_client
mock_time = self.rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_called_once_with(
self.rbac_utils_fixture.PROJECT_ID,
self.rbac_utils_fixture.USER_ID,
'admin_id')
self.test_obj.get_auth_providers()[0].clear_auth\
.assert_called_once_with()
mock_test_obj.get_auth_providers()[0].set_auth\
self.test_obj.get_auth_providers()[0].set_auth\
.assert_called_once_with()
mock_time.sleep.assert_called_once_with(1)
def test_override_role_to_admin_role_avoids_role_switch(self):
self.rbac_utils.set_roles(['admin', 'member'], 'admin')
self.rbac_utils.override_role()
self.rbac_utils_fixture.set_roles(['admin', 'member'], 'admin')
self.test_obj._override_role()
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client = self.rbac_utils_fixture.admin_roles_client
mock_time = self.rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_not_called()
mock_time.sleep.assert_not_called()
def test_override_role_to_member_role(self):
self.rbac_utils.override_role(True)
self.test_obj._override_role(True)
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client = self.rbac_utils_fixture.admin_roles_client
mock_time = self.rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
'admin_id'),
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
mock.call(self.rbac_utils_fixture.PROJECT_ID,
self.rbac_utils_fixture.USER_ID,
'member_id')
])
mock_test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
[mock.call()] * 2)
mock_test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
[mock.call()] * 2)
mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
self.test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
[mock.call()])
self.test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
[mock.call()])
mock_time.sleep.assert_has_calls([mock.call(1)])
def test_override_role_to_member_role_avoids_role_switch(self):
self.rbac_utils.set_roles(['admin', 'member'], 'member')
self.rbac_utils.override_role(True)
self.rbac_utils_fixture.set_roles(['admin', 'member'], 'member')
self.test_obj._override_role(True)
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client = self.rbac_utils_fixture.admin_roles_client
mock_time = self.rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
'admin_id')
])
mock_time.sleep.assert_called_once_with(1)
self.assertEqual(0,
roles_client.create_user_role_on_project.call_count)
self.assertEqual(0,
mock_time.sleep.call_count)
def test_override_role_to_member_role_then_admin_role(self):
self.rbac_utils.override_role(True, False)
self.test_obj._override_role(True)
self.test_obj._override_role(False)
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client = self.rbac_utils_fixture.admin_roles_client
mock_time = self.rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
'admin_id'),
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
mock.call(self.rbac_utils_fixture.PROJECT_ID,
self.rbac_utils_fixture.USER_ID,
'member_id'),
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
mock.call(self.rbac_utils_fixture.PROJECT_ID,
self.rbac_utils_fixture.USER_ID,
'admin_id')
])
mock_test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
[mock.call()] * 3)
mock_test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
[mock.call()] * 3)
mock_time.sleep.assert_has_calls([mock.call(1)] * 3)
self.test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
[mock.call()] * 2)
self.test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
[mock.call()] * 2)
mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
def test_clear_user_roles(self):
# NOTE(felipemonteiro): Set the user's roles on the project to
# include 'random' to coerce a role switch, or else it will be
# skipped.
self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
self.rbac_utils.override_role()
self.rbac_utils_fixture.set_roles(['admin', 'member'],
['member', 'random'])
self.test_obj._override_role()
roles_client = self.rbac_utils.admin_roles_client
roles_client = self.rbac_utils_fixture.admin_roles_client
roles_client.list_user_roles_on_project.assert_called_once_with(
self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID)
self.rbac_utils_fixture.PROJECT_ID,
self.rbac_utils_fixture.USER_ID)
roles_client.delete_role_from_user_on_project.\
assert_has_calls([
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
@ -144,52 +157,32 @@ class RBACUtilsTest(base.TestCase):
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
'random_id')])
@mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
def test_override_role_context_manager_simulate_pass(self,
mock_override_role):
def test_override_role_context_manager_simulate_pass(self):
"""Validate that expected override_role calls are made when switching
to admin role for success path.
"""
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
# Validate constructor called _override_role with False.
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
mock_override_role.reset_mock()
with _rbac_utils.override_role(test_obj):
mock_override_role = self.patchobject(self.test_obj, '_override_role')
with self.test_obj.override_role():
# Validate `override_role` public method called private method
# `_override_role` with True.
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
True)
mock_override_role.assert_called_once_with(True)
mock_override_role.reset_mock()
# Validate that `override_role` switched back to admin role after
# contextmanager.
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
mock_override_role.assert_called_once_with(False)
@mock.patch.object(rbac_utils.RbacUtils, '_override_role',
autospec=True)
def test_override_role_context_manager_simulate_fail(self,
mock_override_role):
def test_override_role_context_manager_simulate_fail(self):
"""Validate that expected override_role calls are made when switching
to admin role for failure path (i.e. when test raises exception).
"""
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
# Validate constructor called _override_role with False.
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
mock_override_role.reset_mock()
mock_override_role = self.patchobject(self.test_obj, '_override_role')
def _do_test():
with _rbac_utils.override_role(test_obj):
with self.test_obj.override_role():
# Validate `override_role` public method called private method
# `_override_role` with True.
mock_override_role.assert_called_once_with(
_rbac_utils, test_obj, True)
mock_override_role.assert_called_once_with(True)
mock_override_role.reset_mock()
# Raise exc to verify role switch works for negative case.
raise lib_exc.Forbidden()
@ -197,61 +190,51 @@ class RBACUtilsTest(base.TestCase):
# Validate that role is switched back to admin, despite test failure.
with testtools.ExpectedException(lib_exc.Forbidden):
_do_test()
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
mock_override_role.assert_called_once_with(False)
def test_override_role_and_validate_list(self):
self.patchobject(rbac_utils.RbacUtils, '_override_role')
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
m_override_role = self.patchobject(_rbac_utils, 'override_role')
m_override_role = self.patchobject(self.test_obj, 'override_role')
with (_rbac_utils.override_role_and_validate_list(
test_obj, 'foo')) as ctx:
with (self.test_obj.override_role_and_validate_list(
admin_resource_id='foo')) as ctx:
self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
m_validate = self.patchobject(ctx, '_validate')
m_override_role.assert_called_once_with(test_obj)
m_override_role.assert_called_once_with()
m_validate.assert_called_once()
def test_prepare_role_inferences_mapping(self):
self.patchobject(rbac_utils.RbacUtils, '_override_role')
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
_rbac_utils.admin_roles_client.list_all_role_inference_rules.\
self.test_obj.admin_roles_client.list_all_role_inference_rules.\
return_value = {
"role_inferences": [
{
"implies": [{"id": "3", "name": "reader"}],
"prior_role": {"id": "2", "name": "member"}
"implies": [{"id": "reader_id", "name": "reader"}],
"prior_role": {"id": "member_id", "name": "member"}
},
{
"implies": [{"id": "2", "name": "member"}],
"prior_role": {"id": "1", "name": "admin"}
"implies": [{"id": "member_id", "name": "member"}],
"prior_role": {"id": "admin_id", "name": "admin"}
}
]
}
expected_role_inferences_mapping = {
"2": {"3"}, # "member": ["reader"],
"1": {"2", "3"} # "admin": ["member", "reader"]
"member_id": {"reader_id"},
"admin_id": {"member_id", "reader_id"}
}
actual_role_inferences_mapping = _rbac_utils.\
actual_role_inferences_mapping = self.test_obj.\
_prepare_role_inferences_mapping()
self.assertEqual(expected_role_inferences_mapping,
actual_role_inferences_mapping)
def test_get_all_needed_roles(self):
self.patchobject(rbac_utils.RbacUtils, '_override_role')
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
_rbac_utils._role_inferences_mapping = {
"2": {"3"}, # "member": ["reader"],
"1": {"2", "3"} # "admin": ["member", "reader"]
self.test_obj.__class__._role_inferences_mapping = {
"member_id": {"reader_id"},
"admin_id": {"member_id", "reader_id"}
}
_rbac_utils._role_map = {
"1": "admin", "admin": "1",
"2": "member", "member": "2",
"3": "reader", "reader": "3"
self.test_obj.__class__._role_map = {
"admin_id": "admin", "admin": "admin_id",
"member_id": "member", "member": "member_id",
"reader_id": "reader", "reader": "reader_id"
}
for roles, expected_roles in (
(['admin'], ['admin', 'member', 'reader']),
@ -262,44 +245,10 @@ class RBACUtilsTest(base.TestCase):
(['admin', 'member'], ['admin', 'member', 'reader']),
):
expected_roles = sorted(expected_roles)
actual_roles = sorted(_rbac_utils.get_all_needed_roles(roles))
actual_roles = sorted(self.test_obj.get_all_needed_roles(roles))
self.assertEqual(expected_roles, actual_roles)
class RBACUtilsMixinTest(base.TestCase):
def setUp(self):
super(RBACUtilsMixinTest, self).setUp()
class FakeRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
@classmethod
def setup_clients(cls):
super(FakeRbacTest, cls).setup_clients()
cls.setup_rbac_utils()
def runTest(self):
pass
self.parent_class = FakeRbacTest
def test_setup_rbac_utils(self):
"""Validate that the child class has the `rbac_utils` attribute after
running parent class's `cls.setup_rbac_utils`.
"""
class ChildRbacTest(self.parent_class):
pass
child_test = ChildRbacTest()
with mock.patch.object(rbac_utils.RbacUtils, '__init__',
lambda *args: None):
child_test.setUpClass()
self.assertTrue(hasattr(child_test, 'rbac_utils'))
self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
class ValidateListContextTest(base.TestCase):
@staticmethod
def _get_context(admin_resources=None, admin_resource_id=None):

View File

@ -0,0 +1,49 @@
---
features:
- |
Merged ``RbacUtils`` and ``RbacUtilsMixin`` classes. Now there is only
``RbacUtilsMixin`` class. The new class still provides all functionality of
the original ``RbacUtils`` class. New implementation simplifies the usage
of the rbac utils:
* there is no need in calling ``cls.setup_rbac_utils()`` function, because
it happens automatically at the ``setup_clients`` step.
* there is no ``rbac_utils`` variable, so if you need to call a
``override_role`` function, just do it using ``self``:
.. code-block:: python
with self.override_role():
...
* there is no need in ``test_obj`` variable for ``override_role`` function,
because it can use ``self``.
upgrade:
- Remove usage of ``cls.setup_rbac_utils()`` function.
- |
Remove usage of ``self.rbac_utils`` variable:
.. code-block:: python
with self.rbac_utils.override_role(self):
convert to
.. code-block:: python
with self.override_role():
- |
Remove ``test_obj`` in usage of ``override_role`` context manager:
.. code-block:: python
with self.override_role(self):
convert to
.. code-block:: python
with self.override_role():