Refactoring RbacUtils
Remove RbacUtils class and move all functionality to RbacUtilsMixin. Story: 2002604 Task: 22223 Change-Id: If476be8fd3df78b28669ca940ebeb288af534899
This commit is contained in:
parent
9866753ca8
commit
ace8ea37c8
|
@ -345,7 +345,7 @@ def _is_authorized(test_obj, service, rule, extra_target_data):
|
|||
roles.append(CONF.patrole.rbac_test_role)
|
||||
|
||||
# Adding implied roles
|
||||
roles = test_obj.rbac_utils.get_all_needed_roles(roles)
|
||||
roles = test_obj.get_all_needed_roles(roles)
|
||||
|
||||
# Test RBAC against custom requirements. Otherwise use oslo.policy.
|
||||
if CONF.patrole.test_custom_requirements:
|
||||
|
|
|
@ -102,8 +102,23 @@ class _ValidateListContext(object):
|
|||
self._validate_func()
|
||||
|
||||
|
||||
class RbacUtils(object):
|
||||
"""Utility class responsible for switching ``os_primary`` role.
|
||||
class RbacUtilsMixin(object):
|
||||
"""Utility mixin responsible for switching ``os_primary`` role.
|
||||
|
||||
Should be used as a mixin class alongside an instance of
|
||||
:py:class:`tempest.test.BaseTestCase` to perform Patrole class setup for a
|
||||
base RBAC class. Child classes should not use this mixin.
|
||||
|
||||
Example::
|
||||
|
||||
class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(BaseRbacTest, cls).setup_clients()
|
||||
|
||||
cls.hosts_client = cls.os_primary.hosts_client
|
||||
...
|
||||
|
||||
This class is responsible for overriding the value of the primary Tempest
|
||||
credential's role (i.e. ``os_primary`` role). By doing so, it is possible
|
||||
|
@ -114,15 +129,29 @@ class RbacUtils(object):
|
|||
``CONF.patrole.rbac_test_roles``.
|
||||
"""
|
||||
|
||||
def __init__(self, test_obj):
|
||||
"""Constructor for ``RbacUtils``.
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RbacUtilsMixin, self).__init__(*args, **kwargs)
|
||||
|
||||
:param test_obj: An instance of `tempest.test.BaseTestCase`.
|
||||
"""
|
||||
self.admin_role_id = None
|
||||
self.rbac_role_ids = None
|
||||
self._role_map = None
|
||||
# Shows if override_role was called.
|
||||
self.__override_role_called = False
|
||||
# Shows if exception raised during override_role.
|
||||
self.__override_role_caught_exc = False
|
||||
|
||||
_admin_role_id = None
|
||||
_rbac_role_ids = None
|
||||
_project_id = None
|
||||
_user_id = None
|
||||
_role_map = None
|
||||
_role_inferences_mapping = None
|
||||
|
||||
admin_roles_client = None
|
||||
|
||||
@property
|
||||
def rbac_utils(self):
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
# Intialize the admin roles_client to perform role switching.
|
||||
admin_mgr = clients.Manager(
|
||||
credentials.get_configured_admin_credentials())
|
||||
|
@ -132,16 +161,20 @@ class RbacUtils(object):
|
|||
raise lib_exc.InvalidConfiguration(
|
||||
"Patrole role overriding only supports v3 identity API.")
|
||||
|
||||
self.admin_roles_client = admin_roles_client
|
||||
cls.admin_roles_client = admin_roles_client
|
||||
|
||||
self.user_id = test_obj.os_primary.credentials.user_id
|
||||
self.project_id = test_obj.os_primary.credentials.tenant_id
|
||||
self._role_inferences_mapping = self._prepare_role_inferences_mapping()
|
||||
cls._project_id = cls.os_primary.credentials.tenant_id
|
||||
cls._user_id = cls.os_primary.credentials.user_id
|
||||
cls._role_inferences_mapping = cls._prepare_role_inferences_mapping()
|
||||
|
||||
cls._init_roles()
|
||||
|
||||
# Change default role to admin
|
||||
self._override_role(test_obj, False)
|
||||
cls._override_role(False)
|
||||
super(RbacUtilsMixin, cls).setup_clients()
|
||||
|
||||
def _prepare_role_inferences_mapping(self):
|
||||
@classmethod
|
||||
def _prepare_role_inferences_mapping(cls):
|
||||
"""Preparing roles mapping to support role inferences
|
||||
|
||||
Making query to `list-all-role-inference-rules`_ keystone API
|
||||
|
@ -186,7 +219,7 @@ class RbacUtils(object):
|
|||
res[prior_role] = implies
|
||||
return res
|
||||
|
||||
raw_data = self.admin_roles_client.list_all_role_inference_rules()
|
||||
raw_data = cls.admin_roles_client.list_all_role_inference_rules()
|
||||
data = convert_data(raw_data['role_inferences'])
|
||||
res = {}
|
||||
for role_id in data:
|
||||
|
@ -207,15 +240,17 @@ class RbacUtils(object):
|
|||
"""
|
||||
res = set(r for r in roles)
|
||||
for role in res.copy():
|
||||
role_id = self._role_map.get(role)
|
||||
implied_roles = self._role_inferences_mapping.get(role_id, set())
|
||||
role_names = {self._role_map[rid] for rid in implied_roles}
|
||||
role_id = self.__class__._role_map.get(role)
|
||||
implied_roles = self.__class__._role_inferences_mapping.get(
|
||||
role_id, set())
|
||||
role_names = {self.__class__._role_map[rid]
|
||||
for rid in implied_roles}
|
||||
res.update(role_names)
|
||||
LOG.debug('All needed roles: %s; Base roles: %s', res, roles)
|
||||
return list(res)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def override_role(self, test_obj):
|
||||
def override_role(self, test_obj=None):
|
||||
"""Override the role used by ``os_primary`` Tempest credentials.
|
||||
|
||||
Temporarily change the role used by ``os_primary`` credentials to:
|
||||
|
@ -225,7 +260,6 @@ class RbacUtils(object):
|
|||
|
||||
Automatically switches to admin role after test execution.
|
||||
|
||||
:param test_obj: Instance of ``tempest.test.BaseTestCase``.
|
||||
:returns: None
|
||||
|
||||
.. warning::
|
||||
|
@ -248,8 +282,8 @@ class RbacUtils(object):
|
|||
# if the API call above threw an exception, any code below this
|
||||
# point in the test is not executed.
|
||||
"""
|
||||
test_obj._set_override_role_called()
|
||||
self._override_role(test_obj, True)
|
||||
self._set_override_role_called()
|
||||
self._override_role(True)
|
||||
try:
|
||||
# Execute the test.
|
||||
yield
|
||||
|
@ -258,16 +292,16 @@ class RbacUtils(object):
|
|||
# for future validation.
|
||||
exc = sys.exc_info()[0]
|
||||
if exc is not None:
|
||||
test_obj._set_override_role_caught_exc()
|
||||
self._set_override_role_caught_exc()
|
||||
# This code block is always executed, no matter the result of the
|
||||
# test. Automatically switch back to the admin role for test clean
|
||||
# up.
|
||||
self._override_role(test_obj, False)
|
||||
self._override_role(False)
|
||||
|
||||
def _override_role(self, test_obj, toggle_rbac_role=False):
|
||||
@classmethod
|
||||
def _override_role(cls, toggle_rbac_role=False):
|
||||
"""Private helper for overriding ``os_primary`` Tempest credentials.
|
||||
|
||||
:param test_obj: instance of :py:class:`tempest.test.BaseTestCase`
|
||||
:param toggle_rbac_role: Boolean value that controls the role that
|
||||
overrides default role of ``os_primary`` credentials.
|
||||
* If True: role is set to ``[patrole] rbac_test_role``
|
||||
|
@ -277,22 +311,19 @@ class RbacUtils(object):
|
|||
roles_already_present = False
|
||||
|
||||
try:
|
||||
if not all([self.admin_role_id, self.rbac_role_ids]):
|
||||
self._get_roles_by_name()
|
||||
|
||||
target_roles = (self.rbac_role_ids
|
||||
if toggle_rbac_role else [self.admin_role_id])
|
||||
roles_already_present = self._list_and_clear_user_roles_on_project(
|
||||
target_roles = (cls._rbac_role_ids
|
||||
if toggle_rbac_role else [cls._admin_role_id])
|
||||
roles_already_present = cls._list_and_clear_user_roles_on_project(
|
||||
target_roles)
|
||||
|
||||
# Do not override roles if `target_role` already exists.
|
||||
if not roles_already_present:
|
||||
self._create_user_role_on_project(target_roles)
|
||||
cls._create_user_role_on_project(target_roles)
|
||||
except Exception as exp:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(exp)
|
||||
finally:
|
||||
auth_providers = test_obj.get_auth_providers()
|
||||
auth_providers = cls.get_auth_providers()
|
||||
for provider in auth_providers:
|
||||
provider.clear_auth()
|
||||
# Fernet tokens are not subsecond aware so sleep to ensure we are
|
||||
|
@ -306,10 +337,11 @@ class RbacUtils(object):
|
|||
for provider in auth_providers:
|
||||
provider.set_auth()
|
||||
|
||||
def _get_roles_by_name(self):
|
||||
available_roles = self.admin_roles_client.list_roles()['roles']
|
||||
self._role_map = {r['name']: r['id'] for r in available_roles}
|
||||
LOG.debug('Available roles: %s', list(self._role_map.keys()))
|
||||
@classmethod
|
||||
def _init_roles(cls):
|
||||
available_roles = cls.admin_roles_client.list_roles()['roles']
|
||||
cls._role_map = {r['name']: r['id'] for r in available_roles}
|
||||
LOG.debug('Available roles: %s', cls._role_map.keys())
|
||||
|
||||
rbac_role_ids = []
|
||||
roles = CONF.patrole.rbac_test_roles
|
||||
|
@ -319,9 +351,9 @@ class RbacUtils(object):
|
|||
roles.append(CONF.patrole.rbac_test_role)
|
||||
|
||||
for role_name in roles:
|
||||
rbac_role_ids.append(self._role_map.get(role_name))
|
||||
rbac_role_ids.append(cls._role_map.get(role_name))
|
||||
|
||||
admin_role_id = self._role_map.get(CONF.identity.admin_role)
|
||||
admin_role_id = cls._role_map.get(CONF.identity.admin_role)
|
||||
|
||||
if not all([admin_role_id, all(rbac_role_ids)]):
|
||||
missing_roles = []
|
||||
|
@ -332,27 +364,28 @@ class RbacUtils(object):
|
|||
missing_roles.append(CONF.identity.admin_role)
|
||||
if not all(rbac_role_ids):
|
||||
missing_roles += [role_name for role_name in roles
|
||||
if not self._role_map.get(role_name)]
|
||||
if role_name not in cls._role_map]
|
||||
|
||||
msg += " Following roles were not found: %s." % (
|
||||
", ".join(missing_roles))
|
||||
msg += " Available roles: %s." % ", ".join(list(
|
||||
self._role_map.keys()))
|
||||
msg += " Available roles: %s." % ", ".join(cls._role_map)
|
||||
raise rbac_exceptions.RbacResourceSetupFailed(msg)
|
||||
|
||||
self.admin_role_id = admin_role_id
|
||||
self.rbac_role_ids = rbac_role_ids
|
||||
cls._admin_role_id = admin_role_id
|
||||
cls._rbac_role_ids = rbac_role_ids
|
||||
# Adding backward mapping
|
||||
self._role_map.update({v: k for k, v in self._role_map.items()})
|
||||
cls._role_map.update({v: k for k, v in cls._role_map.items()})
|
||||
|
||||
def _create_user_role_on_project(self, role_ids):
|
||||
@classmethod
|
||||
def _create_user_role_on_project(cls, role_ids):
|
||||
for role_id in role_ids:
|
||||
self.admin_roles_client.create_user_role_on_project(
|
||||
self.project_id, self.user_id, role_id)
|
||||
cls.admin_roles_client.create_user_role_on_project(
|
||||
cls._project_id, cls._user_id, role_id)
|
||||
|
||||
def _list_and_clear_user_roles_on_project(self, role_ids):
|
||||
roles = self.admin_roles_client.list_user_roles_on_project(
|
||||
self.project_id, self.user_id)['roles']
|
||||
@classmethod
|
||||
def _list_and_clear_user_roles_on_project(cls, role_ids):
|
||||
roles = cls.admin_roles_client.list_user_roles_on_project(
|
||||
cls._project_id, cls._user_id)['roles']
|
||||
all_role_ids = [role['id'] for role in roles]
|
||||
|
||||
# NOTE(felipemonteiro): We do not use ``role_id in all_role_ids`` here
|
||||
|
@ -364,13 +397,14 @@ class RbacUtils(object):
|
|||
return True
|
||||
|
||||
for role in roles:
|
||||
self.admin_roles_client.delete_role_from_user_on_project(
|
||||
self.project_id, self.user_id, role['id'])
|
||||
cls.admin_roles_client.delete_role_from_user_on_project(
|
||||
cls._project_id, cls._user_id, role['id'])
|
||||
|
||||
return False
|
||||
|
||||
@contextlib.contextmanager
|
||||
def override_role_and_validate_list(self, test_obj, admin_resources=None,
|
||||
def override_role_and_validate_list(self, test_obj=None,
|
||||
admin_resources=None,
|
||||
admin_resource_id=None):
|
||||
"""Call ``override_role`` and validate RBAC for a list API action.
|
||||
|
||||
|
@ -400,38 +434,10 @@ class RbacUtils(object):
|
|||
policy_id=self.policy_id)["dscp_marking_rules"]
|
||||
"""
|
||||
ctx = _ValidateListContext(admin_resources, admin_resource_id)
|
||||
with self.override_role(test_obj):
|
||||
with self.override_role():
|
||||
yield ctx
|
||||
ctx._validate()
|
||||
|
||||
|
||||
class RbacUtilsMixin(object):
|
||||
"""Mixin class to be used alongside an instance of
|
||||
:py:class:`tempest.test.BaseTestCase`.
|
||||
|
||||
Should be used to perform Patrole class setup for a base RBAC class. Child
|
||||
classes should not use this mixin.
|
||||
|
||||
Example::
|
||||
|
||||
class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
|
||||
|
||||
@classmethod
|
||||
def skip_checks(cls):
|
||||
super(BaseRbacTest, cls).skip_checks()
|
||||
cls.skip_rbac_checks()
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(BaseRbacTest, cls).setup_clients()
|
||||
cls.setup_rbac_utils()
|
||||
"""
|
||||
|
||||
# Shows if override_role was called.
|
||||
__override_role_called = False
|
||||
# Shows if exception raised during override_role.
|
||||
__override_role_caught_exc = False
|
||||
|
||||
@classmethod
|
||||
def get_auth_providers(cls):
|
||||
"""Returns list of auth_providers used within test.
|
||||
|
@ -441,10 +447,6 @@ class RbacUtilsMixin(object):
|
|||
"""
|
||||
return [cls.os_primary.auth_provider]
|
||||
|
||||
@classmethod
|
||||
def setup_rbac_utils(cls):
|
||||
cls.rbac_utils = RbacUtils(cls)
|
||||
|
||||
def _set_override_role_called(self):
|
||||
"""Helper for tracking whether ``override_role`` was called."""
|
||||
self.__override_role_called = True
|
||||
|
|
|
@ -24,7 +24,6 @@ class BaseV2ComputeRbacTest(rbac_utils.RbacUtilsMixin,
|
|||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(BaseV2ComputeRbacTest, cls).setup_clients()
|
||||
cls.setup_rbac_utils()
|
||||
cls.hosts_client = cls.os_primary.hosts_client
|
||||
cls.tenant_usages_client = cls.os_primary.tenant_usages_client
|
||||
cls.networks_client = cls.os_primary.networks_client
|
||||
|
|
|
@ -27,11 +27,6 @@ LOG = logging.getLogger(__name__)
|
|||
class BaseIdentityRbacTest(rbac_utils.RbacUtilsMixin,
|
||||
base.BaseIdentityTest):
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(BaseIdentityRbacTest, cls).setup_clients()
|
||||
cls.setup_rbac_utils()
|
||||
|
||||
@classmethod
|
||||
def setup_test_endpoint(cls, service=None):
|
||||
"""Creates a service and an endpoint for test."""
|
||||
|
|
|
@ -18,8 +18,4 @@ from patrole_tempest_plugin import rbac_utils
|
|||
|
||||
class BaseV2ImageRbacTest(rbac_utils.RbacUtilsMixin,
|
||||
image_base.BaseV2ImageTest):
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(BaseV2ImageRbacTest, cls).setup_clients()
|
||||
cls.setup_rbac_utils()
|
||||
pass
|
||||
|
|
|
@ -23,11 +23,7 @@ from patrole_tempest_plugin import rbac_utils
|
|||
|
||||
class BaseNetworkRbacTest(rbac_utils.RbacUtilsMixin,
|
||||
network_base.BaseNetworkTest):
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(BaseNetworkRbacTest, cls).setup_clients()
|
||||
cls.setup_rbac_utils()
|
||||
pass
|
||||
|
||||
|
||||
class BaseNetworkExtRbacTest(BaseNetworkRbacTest):
|
||||
|
|
|
@ -31,7 +31,6 @@ class BaseVolumeRbacTest(rbac_utils.RbacUtilsMixin,
|
|||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(BaseVolumeRbacTest, cls).setup_clients()
|
||||
cls.setup_rbac_utils()
|
||||
cls.volume_hosts_client = cls.os_primary.volume_hosts_client_latest
|
||||
cls.volume_types_client = cls.os_primary.volume_types_client_latest
|
||||
cls.groups_client = cls.os_primary.groups_client_latest
|
||||
|
|
|
@ -16,17 +16,15 @@
|
|||
|
||||
from tempest.tests import base
|
||||
|
||||
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
|
||||
|
||||
|
||||
class TestCase(base.TestCase):
|
||||
|
||||
"""Test case base class for all unit tests."""
|
||||
|
||||
def get_all_needed_roles(self, roles):
|
||||
role_inferences_mapping = {
|
||||
"admin": {"member", "reader"},
|
||||
"member": {"reader"}
|
||||
}
|
||||
res = set(r.lower() for r in roles)
|
||||
for role in res.copy():
|
||||
res.update(role_inferences_mapping.get(role, set()))
|
||||
return list(res)
|
||||
def setUp(self):
|
||||
super(TestCase, self).setUp()
|
||||
# Disable patrole log for unit tests.
|
||||
self.useFixture(
|
||||
patrole_fixtures.ConfPatcher(enable_reporting=False,
|
||||
group='patrole_log'))
|
||||
|
|
|
@ -16,12 +16,10 @@
|
|||
"""Fixtures for Patrole tests."""
|
||||
from __future__ import absolute_import
|
||||
|
||||
from contextlib import contextmanager
|
||||
import fixtures
|
||||
import mock
|
||||
import time
|
||||
|
||||
from tempest import clients
|
||||
from tempest.common import credentials_factory as credentials
|
||||
from tempest import config
|
||||
from tempest import test
|
||||
|
@ -57,81 +55,84 @@ class ConfPatcher(fixtures.Fixture):
|
|||
CONF.set_override(k, v, self.group)
|
||||
|
||||
|
||||
class RbacUtilsFixture(fixtures.Fixture):
|
||||
class FakeBaseRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
|
||||
os_primary = None
|
||||
|
||||
def runTest(self):
|
||||
pass
|
||||
|
||||
|
||||
class RbacUtilsMixinFixture(fixtures.Fixture):
|
||||
"""Fixture for `RbacUtils` class."""
|
||||
|
||||
USER_ID = mock.sentinel.user_id
|
||||
PROJECT_ID = mock.sentinel.project_id
|
||||
|
||||
def setUp(self):
|
||||
super(RbacUtilsFixture, self).setUp()
|
||||
def __init__(self, do_reset_mocks=True, rbac_test_roles=None):
|
||||
self._do_reset_mocks = do_reset_mocks
|
||||
self._rbac_test_roles = rbac_test_roles or ['member']
|
||||
|
||||
self.useFixture(ConfPatcher(rbac_test_roles=['member'],
|
||||
def patchobject(self, target, attribute, *args, **kwargs):
|
||||
p = mock.patch.object(target, attribute, *args, **kwargs)
|
||||
m = p.start()
|
||||
self.addCleanup(p.stop)
|
||||
return m
|
||||
|
||||
def setUp(self):
|
||||
super(RbacUtilsMixinFixture, self).setUp()
|
||||
|
||||
self.useFixture(ConfPatcher(rbac_test_roles=self._rbac_test_roles,
|
||||
group='patrole'))
|
||||
self.useFixture(ConfPatcher(
|
||||
admin_role='admin', auth_version='v3', group='identity'))
|
||||
self.useFixture(ConfPatcher(
|
||||
api_v3=True, group='identity-feature-enabled'))
|
||||
|
||||
test_obj_kwargs = {
|
||||
'os_primary.credentials.user_id': self.USER_ID,
|
||||
'os_primary.credentials.tenant_id': self.PROJECT_ID,
|
||||
'os_primary.credentials.project_id': self.PROJECT_ID,
|
||||
}
|
||||
self.mock_test_obj = mock.Mock(
|
||||
__name__='patrole_unit_test', spec=test.BaseTestCase,
|
||||
os_primary=mock.Mock(),
|
||||
get_auth_providers=mock.Mock(return_value=[mock.Mock()]),
|
||||
**test_obj_kwargs)
|
||||
|
||||
# Mock out functionality that can't be used by unit tests. Mocking out
|
||||
# time.sleep is a test optimization.
|
||||
self.mock_time = mock.patch.object(
|
||||
rbac_utils, 'time', __name__='mock_time', spec=time).start()
|
||||
mock.patch.object(credentials, 'get_configured_admin_credentials',
|
||||
spec=object).start()
|
||||
mock_admin_mgr = mock.patch.object(
|
||||
clients, 'Manager', spec=clients.Manager,
|
||||
roles_v3_client=mock.Mock(), roles_client=mock.Mock()).start()
|
||||
self.mock_time = self.patchobject(rbac_utils, 'time',
|
||||
__name__='mock_time', spec=time)
|
||||
self.patchobject(credentials, 'get_configured_admin_credentials',
|
||||
spec=object)
|
||||
mock_admin_mgr = self.patchobject(rbac_utils.clients, 'Manager',
|
||||
spec=rbac_utils.clients.Manager,
|
||||
roles_v3_client=mock.Mock(),
|
||||
roles_client=mock.Mock())
|
||||
self.admin_roles_client = mock_admin_mgr.return_value.roles_v3_client
|
||||
self.admin_roles_client.list_all_role_inference_rules.return_value = {
|
||||
"role_inferences": []}
|
||||
"role_inferences": [
|
||||
{
|
||||
"implies": [{"id": "reader_id", "name": "reader"}],
|
||||
"prior_role": {"id": "member_id", "name": "member"}
|
||||
},
|
||||
{
|
||||
"implies": [{"id": "member_id", "name": "member"}],
|
||||
"prior_role": {"id": "admin_id", "name": "admin"}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
self.set_roles(['admin', 'member'], [])
|
||||
default_roles = {'admin', 'member', 'reader'}.union(
|
||||
set(self._rbac_test_roles))
|
||||
self.set_roles(list(default_roles), [])
|
||||
|
||||
def override_role(self, *role_toggles):
|
||||
"""Instantiate `rbac_utils.RbacUtils` and call `override_role`.
|
||||
test_obj_kwargs = {
|
||||
'credentials.user_id': self.USER_ID,
|
||||
'credentials.tenant_id': self.PROJECT_ID,
|
||||
'credentials.project_id': self.PROJECT_ID,
|
||||
}
|
||||
|
||||
Create an instance of `rbac_utils.RbacUtils` and call `override_role`
|
||||
for each boolean value in `role_toggles`. The number of calls to
|
||||
`override_role` is always 1 + len(`role_toggles`) because the
|
||||
`rbac_utils.RbacUtils` constructor automatically calls `override_role`.
|
||||
class FakeRbacTest(FakeBaseRbacTest):
|
||||
os_primary = mock.Mock()
|
||||
|
||||
:param role_toggles: the list of boolean values iterated over and
|
||||
passed to `override_role`.
|
||||
"""
|
||||
_rbac_utils = rbac_utils.RbacUtils(self.mock_test_obj)
|
||||
FakeRbacTest.os_primary.configure_mock(**test_obj_kwargs)
|
||||
|
||||
for role_toggle in role_toggles:
|
||||
_rbac_utils._override_role(self.mock_test_obj, role_toggle)
|
||||
# NOTE(felipemonteiro): Simulate that a role switch has occurred
|
||||
# by updating the user's current role to the new role. This means
|
||||
# that all API actions involved during a role switch -- listing,
|
||||
# deleting and adding roles -- are executed, making it easier to
|
||||
# assert that mock calls were called as expected.
|
||||
new_role = 'member' if role_toggle else 'admin'
|
||||
self.set_roles(['admin', 'member'], [new_role])
|
||||
|
||||
@contextmanager
|
||||
def real_override_role(self, test_obj):
|
||||
"""Actual call to ``override_role``.
|
||||
|
||||
Useful for ensuring all the necessary mocks are performed before
|
||||
the method in question is called.
|
||||
"""
|
||||
_rbac_utils = rbac_utils.RbacUtils(test_obj)
|
||||
with _rbac_utils.override_role(test_obj):
|
||||
yield
|
||||
FakeRbacTest.setUpClass()
|
||||
self.test_obj = FakeRbacTest()
|
||||
if self._do_reset_mocks:
|
||||
self.admin_roles_client.reset_mock()
|
||||
self.test_obj.os_primary.reset_mock()
|
||||
self.mock_time.reset_mock()
|
||||
|
||||
def set_roles(self, roles, roles_on_project=None):
|
||||
"""Set the list of available roles in the system.
|
||||
|
@ -159,28 +160,3 @@ class RbacUtilsFixture(fixtures.Fixture):
|
|||
self.admin_roles_client.list_roles.return_value = available_roles
|
||||
self.admin_roles_client.list_user_roles_on_project.return_value = (
|
||||
available_project_roles)
|
||||
|
||||
def get_all_needed_roles(self, roles):
|
||||
self.admin_roles_client.list_all_role_inference_rules.return_value = {
|
||||
"role_inferences": [
|
||||
{
|
||||
"implies": [{"id": "3", "name": "reader"}],
|
||||
"prior_role": {"id": "2", "name": "member"}
|
||||
},
|
||||
{
|
||||
"implies": [{"id": "2", "name": "member"}],
|
||||
"prior_role": {"id": "1", "name": "admin"}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Call real get_all_needed_roles function
|
||||
with mock.patch.object(rbac_utils.RbacUtils, '_override_role',
|
||||
autospec=True):
|
||||
obj = rbac_utils.RbacUtils(mock.Mock())
|
||||
obj._role_map = {
|
||||
"1": "admin", "admin": "1",
|
||||
"2": "member", "member": "2",
|
||||
"3": "reader", "reader": "3"
|
||||
}
|
||||
return obj.get_all_needed_roles(roles)
|
||||
|
|
|
@ -76,6 +76,9 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
if attr in dir(policy_authority.PolicyAuthority):
|
||||
delattr(policy_authority.PolicyAuthority, attr)
|
||||
|
||||
self.test_obj = self.useFixture(fixtures.RbacUtilsMixinFixture()).\
|
||||
test_obj
|
||||
|
||||
@staticmethod
|
||||
def _get_fake_policies(rules):
|
||||
fake_rules = []
|
||||
|
@ -96,7 +99,7 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, service)
|
||||
|
||||
roles = self.get_all_needed_roles(roles)
|
||||
roles = self.test_obj.get_all_needed_roles(roles)
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
|
@ -289,7 +292,7 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(
|
||||
rule, self.get_all_needed_roles(['member']))
|
||||
rule, self.test_obj.get_all_needed_roles(['member']))
|
||||
self.assertTrue(allowed)
|
||||
# for sure that roles are in same order
|
||||
mock_try_rule.call_args[0][2]["roles"] = sorted(
|
||||
|
|
|
@ -20,12 +20,9 @@ from oslo_config import cfg
|
|||
|
||||
import fixtures
|
||||
from tempest.lib import exceptions
|
||||
from tempest import manager
|
||||
from tempest import test
|
||||
|
||||
from patrole_tempest_plugin import rbac_exceptions
|
||||
from patrole_tempest_plugin import rbac_rule_validation as rbac_rv
|
||||
from patrole_tempest_plugin import rbac_utils
|
||||
from patrole_tempest_plugin.tests.unit import base
|
||||
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
|
||||
|
||||
|
@ -38,25 +35,10 @@ class BaseRBACRuleValidationTest(base.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
super(BaseRBACRuleValidationTest, self).setUp()
|
||||
self.mock_test_args = mock.Mock(spec=test.BaseTestCase)
|
||||
self.mock_test_args.os_primary = mock.Mock(spec=manager.Manager)
|
||||
self.mock_test_args.rbac_utils = mock.Mock(
|
||||
spec_set=rbac_utils.RbacUtils)
|
||||
self.mock_test_args.rbac_utils.get_all_needed_roles.side_effect = \
|
||||
self.get_all_needed_roles
|
||||
|
||||
# Setup credentials for mock client manager.
|
||||
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
|
||||
project_id=mock.sentinel.project_id)
|
||||
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
|
||||
|
||||
self.useFixture(
|
||||
patrole_fixtures.ConfPatcher(rbac_test_roles=self.test_roles,
|
||||
group='patrole'))
|
||||
# Disable patrole log for unit tests.
|
||||
self.useFixture(
|
||||
patrole_fixtures.ConfPatcher(enable_reporting=False,
|
||||
group='patrole_log'))
|
||||
self.rbac_utils_fixture = self.useFixture(
|
||||
patrole_fixtures.RbacUtilsMixinFixture(
|
||||
rbac_test_roles=self.test_roles))
|
||||
self.test_obj = self.rbac_utils_fixture.test_obj
|
||||
|
||||
|
||||
class BaseRBACMultiRoleRuleValidationTest(BaseRBACRuleValidationTest):
|
||||
|
@ -89,7 +71,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
pass
|
||||
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
mock_log.error.assert_not_called()
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
|
@ -107,7 +89,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
raise exceptions.Forbidden()
|
||||
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
mock_log.error.assert_not_called()
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
|
@ -131,7 +113,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
"the following actions: \[%s\].*" % (mock.sentinel.action))
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
|
||||
self.mock_test_args)
|
||||
self.test_obj)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
|
@ -188,7 +170,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
mock.sentinel.action))
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, test_re,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
|
||||
|
||||
_do_test(rbac_exceptions.RbacMissingAttributeResponseBody,
|
||||
|
@ -221,7 +203,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
return_value = allowed
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacExpectedWrongException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
self.assertTrue(mock_log.error.called)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
|
@ -259,10 +241,10 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
if error_re:
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
|
||||
else:
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
mock_log.error.assert_not_called()
|
||||
|
||||
mock_log.warning.assert_called_with(
|
||||
|
@ -302,7 +284,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
|
||||
error_re = ".*OverPermission: .* \[%s\]$" % mock.sentinel.action
|
||||
self.assertRaisesRegex(rbac_exceptions.RbacOverPermissionException,
|
||||
error_re, test_policy, self.mock_test_args)
|
||||
error_re, test_policy, self.test_obj)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
|
||||
mock_log.error.reset_mock()
|
||||
|
||||
|
@ -320,7 +302,7 @@ class RBACRuleValidationTest(BaseRBACRuleValidationTest):
|
|||
|
||||
error_re = 'Attempted to test an invalid policy file or action'
|
||||
self.assertRaisesRegex(rbac_exceptions.RbacParsingException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
|
||||
mock_authority.PolicyAuthority.assert_called_once_with(
|
||||
mock.sentinel.project_id, mock.sentinel.user_id,
|
||||
|
@ -403,7 +385,7 @@ class RBACMultiRoleRuleValidationTest(BaseRBACMultiRoleRuleValidationTest,
|
|||
(mock.sentinel.action))
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
|
||||
self.mock_test_args)
|
||||
self.test_obj)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
|
@ -441,10 +423,10 @@ class RBACMultiRoleRuleValidationTest(BaseRBACMultiRoleRuleValidationTest,
|
|||
if error_re:
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
|
||||
else:
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
mock_log.error.assert_not_called()
|
||||
|
||||
mock_log.warning.assert_called_with(
|
||||
|
@ -486,7 +468,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
pass
|
||||
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
self.assertFalse(mock_rbaclog.info.called)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'RBACLOG', autospec=True)
|
||||
|
@ -506,7 +488,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
pass
|
||||
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
mock_rbaclog.info.assert_called_once_with(
|
||||
"[Service]: %s, [Test]: %s, [Rules]: %s, "
|
||||
"[Expected]: %s, [Actual]: %s",
|
||||
|
@ -528,12 +510,12 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
pass
|
||||
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
|
||||
policy_authority = mock_authority.PolicyAuthority.return_value
|
||||
policy_authority.allowed.assert_called_with(
|
||||
mock.sentinel.action,
|
||||
self.get_all_needed_roles(CONF.patrole.rbac_test_roles))
|
||||
self.test_obj.get_all_needed_roles(CONF.patrole.rbac_test_roles))
|
||||
|
||||
mock_log.error.assert_not_called()
|
||||
|
||||
|
@ -545,7 +527,7 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
|
|||
evaluated correctly.
|
||||
"""
|
||||
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
|
||||
expected_roles = self.get_all_needed_roles(
|
||||
expected_roles = self.test_obj.get_all_needed_roles(
|
||||
CONF.patrole.rbac_test_roles)
|
||||
|
||||
def partial_func(x):
|
||||
|
@ -563,14 +545,14 @@ class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
|
|||
def test_bar_policy(*args):
|
||||
pass
|
||||
|
||||
test_foo_policy(self.mock_test_args)
|
||||
test_foo_policy(self.test_obj)
|
||||
policy_authority = mock_authority.PolicyAuthority.return_value
|
||||
policy_authority.allowed.assert_called_with(
|
||||
"foo",
|
||||
expected_roles)
|
||||
policy_authority.allowed.reset_mock()
|
||||
|
||||
test_bar_policy(self.mock_test_args)
|
||||
test_bar_policy(self.test_obj)
|
||||
policy_authority = mock_authority.PolicyAuthority.return_value
|
||||
policy_authority.allowed.assert_called_with(
|
||||
"qux",
|
||||
|
@ -603,7 +585,7 @@ class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
|
|||
pass
|
||||
|
||||
self.assertRaises(rbac_exceptions.RbacInvalidServiceException,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
|
||||
|
||||
class RBACMultiRoleRuleValidationNegativeTest(
|
||||
|
@ -627,7 +609,8 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
m_authority.allowed.assert_has_calls([
|
||||
mock.call(
|
||||
rule,
|
||||
self.get_all_needed_roles(CONF.patrole.rbac_test_roles)
|
||||
self.test_obj.get_all_needed_roles(
|
||||
CONF.patrole.rbac_test_roles)
|
||||
) for rule in rules
|
||||
])
|
||||
m_authority.allowed.reset_mock()
|
||||
|
@ -648,7 +631,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
pass
|
||||
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
self._assert_policy_authority_called_with(rules, mock_authority)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
|
||||
|
@ -677,7 +660,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
error_re = ".*OverPermission: .* \[%s\]$" % fail_on_action
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacOverPermissionException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
mock_log.debug.assert_any_call(
|
||||
"%s: Expecting %d to be raised for policy name: %s",
|
||||
'test_policy', 403, fail_on_action)
|
||||
|
@ -710,7 +693,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
def _do_test(allowed_list, fail_on_action):
|
||||
mock_authority.PolicyAuthority.return_value.allowed.\
|
||||
side_effect = allowed_list
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
mock_log.debug.assert_called_with(
|
||||
"%s: Expecting %d to be raised for policy name: %s",
|
||||
'test_policy', 403, fail_on_action)
|
||||
|
@ -747,7 +730,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
(rules, rules)).replace('[', '\[').replace(']', '\]')
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
|
||||
self._assert_policy_authority_called_with(rules, mock_authority)
|
||||
|
||||
|
@ -773,7 +756,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
def _do_test(allowed_list, fail_on_action):
|
||||
mock_authority.PolicyAuthority.return_value.allowed.\
|
||||
side_effect = allowed_list
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
mock_log.debug.assert_called_with(
|
||||
"%s: Expecting %d to be raised for policy name: %s",
|
||||
'test_policy', 403, fail_on_action)
|
||||
|
@ -806,7 +789,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
def _do_test(allowed_list, fail_on_action):
|
||||
mock_authority.PolicyAuthority.return_value.allowed.\
|
||||
side_effect = allowed_list
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
mock_log.debug.assert_called_with(
|
||||
"%s: Expecting %d to be raised for policy name: %s",
|
||||
'test_policy', 404, fail_on_action)
|
||||
|
@ -833,7 +816,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
raise exceptions.Forbidden()
|
||||
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
self._assert_policy_authority_called_with(rules, mock_authority)
|
||||
# Assert that 403 is expected.
|
||||
mock_calls = [x[1] for x in mock_log.debug.mock_calls]
|
||||
|
@ -847,7 +830,7 @@ class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
|
|||
def test_policy(*args):
|
||||
raise exceptions.Forbidden()
|
||||
|
||||
test_policy(self.mock_test_args)
|
||||
test_policy(self.test_obj)
|
||||
self._assert_policy_authority_called_with(rules, mock_authority)
|
||||
# Assert that 403 is expected.
|
||||
mock_calls = [x[1] for x in mock_log.debug.mock_calls]
|
||||
|
@ -932,7 +915,7 @@ class RBACMultiRoleRuleValidationTestMultiPolicy(
|
|||
(rules, rules)).replace('[', '\[').replace(']', '\]')
|
||||
self.assertRaisesRegex(
|
||||
rbac_exceptions.RbacUnderPermissionException, error_re,
|
||||
test_policy, self.mock_test_args)
|
||||
test_policy, self.test_obj)
|
||||
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
|
||||
self._assert_policy_authority_called_with(rules, mock_authority)
|
||||
|
||||
|
@ -951,21 +934,7 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
|
|||
def setUp(self):
|
||||
super(RBACOverrideRoleValidationTest, self).setUp()
|
||||
|
||||
# Mixin automatically initializes __override_role_called to False.
|
||||
class FakeRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
|
||||
def runTest(self):
|
||||
pass
|
||||
|
||||
# Stub out problematic function calls.
|
||||
FakeRbacTest.os_primary = mock.Mock(spec=manager.Manager)
|
||||
FakeRbacTest.rbac_utils = self.useFixture(
|
||||
patrole_fixtures.RbacUtilsFixture())
|
||||
mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
|
||||
project_id=mock.sentinel.project_id)
|
||||
setattr(FakeRbacTest.os_primary, 'credentials', mock_creds)
|
||||
setattr(FakeRbacTest.os_primary, 'auth_provider', mock.Mock())
|
||||
|
||||
self.parent_class = FakeRbacTest
|
||||
self.parent_class = self.test_obj.__class__
|
||||
|
||||
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
|
||||
def test_rule_validation_override_role_called_inside_ctx(self,
|
||||
|
@ -981,7 +950,7 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
|
|||
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
|
||||
expected_error_codes=[404])
|
||||
def test_called(self_):
|
||||
with self_.rbac_utils.real_override_role(self_):
|
||||
with self_.override_role():
|
||||
raise exceptions.NotFound()
|
||||
|
||||
child_test = ChildRbacTest()
|
||||
|
@ -991,8 +960,8 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
|
|||
def test_rule_validation_override_role_patrole_exception_ignored(
|
||||
self, mock_authority):
|
||||
"""Test success case where Patrole exception is raised (which is
|
||||
valid in case of e.g. RbacPartialResponseBody) after override_role
|
||||
passes.
|
||||
valid in case of e.g. BasePatroleResponseBodyException) after
|
||||
override_role passes.
|
||||
"""
|
||||
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
|
||||
True
|
||||
|
@ -1002,14 +971,14 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
|
|||
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
|
||||
expected_error_codes=[404])
|
||||
def test_called(self_):
|
||||
with self_.rbac_utils.real_override_role(self_):
|
||||
with self_.override_role():
|
||||
pass
|
||||
# Instances of BasePatroleException don't count as they are
|
||||
# part of the validation work flow.
|
||||
raise rbac_exceptions.BasePatroleException()
|
||||
# Instances of BasePatroleResponseBodyException don't count as
|
||||
# they are part of the validation work flow.
|
||||
raise rbac_exceptions.BasePatroleResponseBodyException()
|
||||
|
||||
child_test = ChildRbacTest()
|
||||
self.assertRaises(rbac_exceptions.BasePatroleException,
|
||||
self.assertRaises(rbac_exceptions.RbacUnderPermissionException,
|
||||
child_test.test_called)
|
||||
|
||||
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
|
||||
|
@ -1057,7 +1026,7 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
|
|||
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
|
||||
expected_error_codes=[404])
|
||||
def test_called_after(self_):
|
||||
with self_.rbac_utils.real_override_role(self_):
|
||||
with self_.override_role():
|
||||
pass
|
||||
# Simulates a test tearDown failure or some such.
|
||||
raise exception_type()
|
||||
|
@ -1098,7 +1067,7 @@ class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
|
|||
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule1"],
|
||||
expected_error_codes=[404])
|
||||
def test_called(self_):
|
||||
with self_.rbac_utils.real_override_role(self_):
|
||||
with self_.override_role():
|
||||
raise exceptions.NotFound()
|
||||
|
||||
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule2"],
|
||||
|
|
|
@ -17,7 +17,6 @@ import mock
|
|||
import testtools
|
||||
|
||||
from tempest.lib import exceptions as lib_exc
|
||||
from tempest import test
|
||||
|
||||
from patrole_tempest_plugin import rbac_exceptions
|
||||
from patrole_tempest_plugin import rbac_utils
|
||||
|
@ -25,118 +24,132 @@ from patrole_tempest_plugin.tests.unit import base
|
|||
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
|
||||
|
||||
|
||||
class RBACUtilsTest(base.TestCase):
|
||||
class RBACUtilsMixinTest(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(RBACUtilsTest, self).setUp()
|
||||
# Reset the role history after each test run to avoid validation
|
||||
# errors between tests.
|
||||
rbac_utils.RbacUtils.override_role_history = {}
|
||||
self.rbac_utils = self.useFixture(patrole_fixtures.RbacUtilsFixture())
|
||||
super(RBACUtilsMixinTest, self).setUp()
|
||||
self.rbac_utils_fixture = self.useFixture(
|
||||
patrole_fixtures.RbacUtilsMixinFixture())
|
||||
self.test_obj = self.rbac_utils_fixture.test_obj
|
||||
|
||||
def test_override_role_with_missing_admin_role(self):
|
||||
self.rbac_utils.set_roles('member')
|
||||
def test_init_roles_with_missing_admin_role(self):
|
||||
self.rbac_utils_fixture.set_roles('member')
|
||||
error_re = (".*Following roles were not found: admin. Available "
|
||||
"roles: member.")
|
||||
self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
|
||||
error_re, self.rbac_utils.override_role)
|
||||
error_re, self.test_obj._init_roles)
|
||||
|
||||
def test_override_role_with_missing_rbac_role(self):
|
||||
self.rbac_utils.set_roles('admin')
|
||||
def test_init_roles_with_missing_rbac_role(self):
|
||||
self.rbac_utils_fixture.set_roles('admin')
|
||||
error_re = (".*Following roles were not found: member. Available "
|
||||
"roles: admin.")
|
||||
self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
|
||||
error_re, self.rbac_utils.override_role)
|
||||
error_re, self.test_obj._init_roles)
|
||||
|
||||
def test_override_role_to_admin_role(self):
|
||||
self.rbac_utils.override_role()
|
||||
|
||||
mock_test_obj = self.rbac_utils.mock_test_obj
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
def test_override_role_to_admin_role_at_creating(self):
|
||||
rbac_utils_fixture = self.useFixture(
|
||||
patrole_fixtures.RbacUtilsMixinFixture(do_reset_mocks=False))
|
||||
test_obj = rbac_utils_fixture.test_obj
|
||||
roles_client = rbac_utils_fixture.admin_roles_client
|
||||
mock_time = rbac_utils_fixture.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_called_once_with(
|
||||
self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID, 'admin_id')
|
||||
mock_test_obj.get_auth_providers()[0].clear_auth\
|
||||
rbac_utils_fixture.PROJECT_ID,
|
||||
rbac_utils_fixture.USER_ID,
|
||||
'admin_id')
|
||||
test_obj.get_auth_providers()[0].clear_auth.assert_called_once_with()
|
||||
test_obj.get_auth_providers()[0].set_auth.assert_called_once_with()
|
||||
mock_time.sleep.assert_called_once_with(1)
|
||||
|
||||
def test_override_role_to_admin_role(self):
|
||||
self.test_obj._override_role()
|
||||
|
||||
roles_client = self.rbac_utils_fixture.admin_roles_client
|
||||
mock_time = self.rbac_utils_fixture.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_called_once_with(
|
||||
self.rbac_utils_fixture.PROJECT_ID,
|
||||
self.rbac_utils_fixture.USER_ID,
|
||||
'admin_id')
|
||||
self.test_obj.get_auth_providers()[0].clear_auth\
|
||||
.assert_called_once_with()
|
||||
mock_test_obj.get_auth_providers()[0].set_auth\
|
||||
self.test_obj.get_auth_providers()[0].set_auth\
|
||||
.assert_called_once_with()
|
||||
mock_time.sleep.assert_called_once_with(1)
|
||||
|
||||
def test_override_role_to_admin_role_avoids_role_switch(self):
|
||||
self.rbac_utils.set_roles(['admin', 'member'], 'admin')
|
||||
self.rbac_utils.override_role()
|
||||
self.rbac_utils_fixture.set_roles(['admin', 'member'], 'admin')
|
||||
self.test_obj._override_role()
|
||||
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
roles_client = self.rbac_utils_fixture.admin_roles_client
|
||||
mock_time = self.rbac_utils_fixture.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_not_called()
|
||||
mock_time.sleep.assert_not_called()
|
||||
|
||||
def test_override_role_to_member_role(self):
|
||||
self.rbac_utils.override_role(True)
|
||||
self.test_obj._override_role(True)
|
||||
|
||||
mock_test_obj = self.rbac_utils.mock_test_obj
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
roles_client = self.rbac_utils_fixture.admin_roles_client
|
||||
mock_time = self.rbac_utils_fixture.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_has_calls([
|
||||
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
|
||||
'admin_id'),
|
||||
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
|
||||
mock.call(self.rbac_utils_fixture.PROJECT_ID,
|
||||
self.rbac_utils_fixture.USER_ID,
|
||||
'member_id')
|
||||
])
|
||||
mock_test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
|
||||
[mock.call()] * 2)
|
||||
mock_test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
|
||||
[mock.call()] * 2)
|
||||
mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
|
||||
self.test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
|
||||
[mock.call()])
|
||||
self.test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
|
||||
[mock.call()])
|
||||
mock_time.sleep.assert_has_calls([mock.call(1)])
|
||||
|
||||
def test_override_role_to_member_role_avoids_role_switch(self):
|
||||
self.rbac_utils.set_roles(['admin', 'member'], 'member')
|
||||
self.rbac_utils.override_role(True)
|
||||
self.rbac_utils_fixture.set_roles(['admin', 'member'], 'member')
|
||||
self.test_obj._override_role(True)
|
||||
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
roles_client = self.rbac_utils_fixture.admin_roles_client
|
||||
mock_time = self.rbac_utils_fixture.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_has_calls([
|
||||
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
|
||||
'admin_id')
|
||||
])
|
||||
mock_time.sleep.assert_called_once_with(1)
|
||||
self.assertEqual(0,
|
||||
roles_client.create_user_role_on_project.call_count)
|
||||
self.assertEqual(0,
|
||||
mock_time.sleep.call_count)
|
||||
|
||||
def test_override_role_to_member_role_then_admin_role(self):
|
||||
self.rbac_utils.override_role(True, False)
|
||||
self.test_obj._override_role(True)
|
||||
self.test_obj._override_role(False)
|
||||
|
||||
mock_test_obj = self.rbac_utils.mock_test_obj
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
mock_time = self.rbac_utils.mock_time
|
||||
roles_client = self.rbac_utils_fixture.admin_roles_client
|
||||
mock_time = self.rbac_utils_fixture.mock_time
|
||||
|
||||
roles_client.create_user_role_on_project.assert_has_calls([
|
||||
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
|
||||
'admin_id'),
|
||||
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
|
||||
mock.call(self.rbac_utils_fixture.PROJECT_ID,
|
||||
self.rbac_utils_fixture.USER_ID,
|
||||
'member_id'),
|
||||
mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
|
||||
mock.call(self.rbac_utils_fixture.PROJECT_ID,
|
||||
self.rbac_utils_fixture.USER_ID,
|
||||
'admin_id')
|
||||
])
|
||||
mock_test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
|
||||
[mock.call()] * 3)
|
||||
mock_test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
|
||||
[mock.call()] * 3)
|
||||
mock_time.sleep.assert_has_calls([mock.call(1)] * 3)
|
||||
self.test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
|
||||
[mock.call()] * 2)
|
||||
self.test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
|
||||
[mock.call()] * 2)
|
||||
mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
|
||||
|
||||
def test_clear_user_roles(self):
|
||||
# NOTE(felipemonteiro): Set the user's roles on the project to
|
||||
# include 'random' to coerce a role switch, or else it will be
|
||||
# skipped.
|
||||
self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
|
||||
self.rbac_utils.override_role()
|
||||
self.rbac_utils_fixture.set_roles(['admin', 'member'],
|
||||
['member', 'random'])
|
||||
self.test_obj._override_role()
|
||||
|
||||
roles_client = self.rbac_utils.admin_roles_client
|
||||
roles_client = self.rbac_utils_fixture.admin_roles_client
|
||||
|
||||
roles_client.list_user_roles_on_project.assert_called_once_with(
|
||||
self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID)
|
||||
self.rbac_utils_fixture.PROJECT_ID,
|
||||
self.rbac_utils_fixture.USER_ID)
|
||||
roles_client.delete_role_from_user_on_project.\
|
||||
assert_has_calls([
|
||||
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
|
||||
|
@ -144,52 +157,32 @@ class RBACUtilsTest(base.TestCase):
|
|||
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
|
||||
'random_id')])
|
||||
|
||||
@mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
|
||||
def test_override_role_context_manager_simulate_pass(self,
|
||||
mock_override_role):
|
||||
def test_override_role_context_manager_simulate_pass(self):
|
||||
"""Validate that expected override_role calls are made when switching
|
||||
to admin role for success path.
|
||||
"""
|
||||
test_obj = mock.MagicMock()
|
||||
_rbac_utils = rbac_utils.RbacUtils(test_obj)
|
||||
|
||||
# Validate constructor called _override_role with False.
|
||||
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
|
||||
False)
|
||||
mock_override_role.reset_mock()
|
||||
|
||||
with _rbac_utils.override_role(test_obj):
|
||||
mock_override_role = self.patchobject(self.test_obj, '_override_role')
|
||||
with self.test_obj.override_role():
|
||||
# Validate `override_role` public method called private method
|
||||
# `_override_role` with True.
|
||||
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
|
||||
True)
|
||||
mock_override_role.assert_called_once_with(True)
|
||||
mock_override_role.reset_mock()
|
||||
# Validate that `override_role` switched back to admin role after
|
||||
# contextmanager.
|
||||
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
|
||||
False)
|
||||
mock_override_role.assert_called_once_with(False)
|
||||
|
||||
@mock.patch.object(rbac_utils.RbacUtils, '_override_role',
|
||||
autospec=True)
|
||||
def test_override_role_context_manager_simulate_fail(self,
|
||||
mock_override_role):
|
||||
def test_override_role_context_manager_simulate_fail(self):
|
||||
"""Validate that expected override_role calls are made when switching
|
||||
to admin role for failure path (i.e. when test raises exception).
|
||||
"""
|
||||
test_obj = mock.MagicMock()
|
||||
_rbac_utils = rbac_utils.RbacUtils(test_obj)
|
||||
|
||||
# Validate constructor called _override_role with False.
|
||||
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
|
||||
False)
|
||||
mock_override_role.reset_mock()
|
||||
mock_override_role = self.patchobject(self.test_obj, '_override_role')
|
||||
|
||||
def _do_test():
|
||||
with _rbac_utils.override_role(test_obj):
|
||||
with self.test_obj.override_role():
|
||||
# Validate `override_role` public method called private method
|
||||
# `_override_role` with True.
|
||||
mock_override_role.assert_called_once_with(
|
||||
_rbac_utils, test_obj, True)
|
||||
mock_override_role.assert_called_once_with(True)
|
||||
mock_override_role.reset_mock()
|
||||
# Raise exc to verify role switch works for negative case.
|
||||
raise lib_exc.Forbidden()
|
||||
|
@ -197,61 +190,51 @@ class RBACUtilsTest(base.TestCase):
|
|||
# Validate that role is switched back to admin, despite test failure.
|
||||
with testtools.ExpectedException(lib_exc.Forbidden):
|
||||
_do_test()
|
||||
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
|
||||
False)
|
||||
mock_override_role.assert_called_once_with(False)
|
||||
|
||||
def test_override_role_and_validate_list(self):
|
||||
self.patchobject(rbac_utils.RbacUtils, '_override_role')
|
||||
test_obj = mock.MagicMock()
|
||||
_rbac_utils = rbac_utils.RbacUtils(test_obj)
|
||||
m_override_role = self.patchobject(_rbac_utils, 'override_role')
|
||||
m_override_role = self.patchobject(self.test_obj, 'override_role')
|
||||
|
||||
with (_rbac_utils.override_role_and_validate_list(
|
||||
test_obj, 'foo')) as ctx:
|
||||
with (self.test_obj.override_role_and_validate_list(
|
||||
admin_resource_id='foo')) as ctx:
|
||||
self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
|
||||
m_validate = self.patchobject(ctx, '_validate')
|
||||
m_override_role.assert_called_once_with(test_obj)
|
||||
m_override_role.assert_called_once_with()
|
||||
m_validate.assert_called_once()
|
||||
|
||||
def test_prepare_role_inferences_mapping(self):
|
||||
self.patchobject(rbac_utils.RbacUtils, '_override_role')
|
||||
test_obj = mock.MagicMock()
|
||||
_rbac_utils = rbac_utils.RbacUtils(test_obj)
|
||||
_rbac_utils.admin_roles_client.list_all_role_inference_rules.\
|
||||
self.test_obj.admin_roles_client.list_all_role_inference_rules.\
|
||||
return_value = {
|
||||
"role_inferences": [
|
||||
{
|
||||
"implies": [{"id": "3", "name": "reader"}],
|
||||
"prior_role": {"id": "2", "name": "member"}
|
||||
"implies": [{"id": "reader_id", "name": "reader"}],
|
||||
"prior_role": {"id": "member_id", "name": "member"}
|
||||
},
|
||||
{
|
||||
"implies": [{"id": "2", "name": "member"}],
|
||||
"prior_role": {"id": "1", "name": "admin"}
|
||||
"implies": [{"id": "member_id", "name": "member"}],
|
||||
"prior_role": {"id": "admin_id", "name": "admin"}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
expected_role_inferences_mapping = {
|
||||
"2": {"3"}, # "member": ["reader"],
|
||||
"1": {"2", "3"} # "admin": ["member", "reader"]
|
||||
"member_id": {"reader_id"},
|
||||
"admin_id": {"member_id", "reader_id"}
|
||||
}
|
||||
actual_role_inferences_mapping = _rbac_utils.\
|
||||
actual_role_inferences_mapping = self.test_obj.\
|
||||
_prepare_role_inferences_mapping()
|
||||
self.assertEqual(expected_role_inferences_mapping,
|
||||
actual_role_inferences_mapping)
|
||||
|
||||
def test_get_all_needed_roles(self):
|
||||
self.patchobject(rbac_utils.RbacUtils, '_override_role')
|
||||
test_obj = mock.MagicMock()
|
||||
_rbac_utils = rbac_utils.RbacUtils(test_obj)
|
||||
_rbac_utils._role_inferences_mapping = {
|
||||
"2": {"3"}, # "member": ["reader"],
|
||||
"1": {"2", "3"} # "admin": ["member", "reader"]
|
||||
self.test_obj.__class__._role_inferences_mapping = {
|
||||
"member_id": {"reader_id"},
|
||||
"admin_id": {"member_id", "reader_id"}
|
||||
}
|
||||
_rbac_utils._role_map = {
|
||||
"1": "admin", "admin": "1",
|
||||
"2": "member", "member": "2",
|
||||
"3": "reader", "reader": "3"
|
||||
self.test_obj.__class__._role_map = {
|
||||
"admin_id": "admin", "admin": "admin_id",
|
||||
"member_id": "member", "member": "member_id",
|
||||
"reader_id": "reader", "reader": "reader_id"
|
||||
}
|
||||
for roles, expected_roles in (
|
||||
(['admin'], ['admin', 'member', 'reader']),
|
||||
|
@ -262,44 +245,10 @@ class RBACUtilsTest(base.TestCase):
|
|||
(['admin', 'member'], ['admin', 'member', 'reader']),
|
||||
):
|
||||
expected_roles = sorted(expected_roles)
|
||||
actual_roles = sorted(_rbac_utils.get_all_needed_roles(roles))
|
||||
actual_roles = sorted(self.test_obj.get_all_needed_roles(roles))
|
||||
self.assertEqual(expected_roles, actual_roles)
|
||||
|
||||
|
||||
class RBACUtilsMixinTest(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(RBACUtilsMixinTest, self).setUp()
|
||||
|
||||
class FakeRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
|
||||
|
||||
@classmethod
|
||||
def setup_clients(cls):
|
||||
super(FakeRbacTest, cls).setup_clients()
|
||||
cls.setup_rbac_utils()
|
||||
|
||||
def runTest(self):
|
||||
pass
|
||||
|
||||
self.parent_class = FakeRbacTest
|
||||
|
||||
def test_setup_rbac_utils(self):
|
||||
"""Validate that the child class has the `rbac_utils` attribute after
|
||||
running parent class's `cls.setup_rbac_utils`.
|
||||
"""
|
||||
class ChildRbacTest(self.parent_class):
|
||||
pass
|
||||
|
||||
child_test = ChildRbacTest()
|
||||
|
||||
with mock.patch.object(rbac_utils.RbacUtils, '__init__',
|
||||
lambda *args: None):
|
||||
child_test.setUpClass()
|
||||
|
||||
self.assertTrue(hasattr(child_test, 'rbac_utils'))
|
||||
self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
|
||||
|
||||
|
||||
class ValidateListContextTest(base.TestCase):
|
||||
@staticmethod
|
||||
def _get_context(admin_resources=None, admin_resource_id=None):
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Merged ``RbacUtils`` and ``RbacUtilsMixin`` classes. Now there is only
|
||||
``RbacUtilsMixin`` class. The new class still provides all functionality of
|
||||
the original ``RbacUtils`` class. New implementation simplifies the usage
|
||||
of the rbac utils:
|
||||
|
||||
* there is no need in calling ``cls.setup_rbac_utils()`` function, because
|
||||
it happens automatically at the ``setup_clients`` step.
|
||||
|
||||
* there is no ``rbac_utils`` variable, so if you need to call a
|
||||
``override_role`` function, just do it using ``self``:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
with self.override_role():
|
||||
...
|
||||
|
||||
* there is no need in ``test_obj`` variable for ``override_role`` function,
|
||||
because it can use ``self``.
|
||||
|
||||
upgrade:
|
||||
- Remove usage of ``cls.setup_rbac_utils()`` function.
|
||||
- |
|
||||
Remove usage of ``self.rbac_utils`` variable:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
with self.rbac_utils.override_role(self):
|
||||
|
||||
convert to
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
with self.override_role():
|
||||
|
||||
- |
|
||||
Remove ``test_obj`` in usage of ``override_role`` context manager:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
with self.override_role(self):
|
||||
|
||||
convert to
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
with self.override_role():
|
Loading…
Reference in New Issue