diff --git a/patrole_tempest_plugin/rbac_exceptions.py b/patrole_tempest_plugin/rbac_exceptions.py index ad697b05..c30961ba 100644 --- a/patrole_tempest_plugin/rbac_exceptions.py +++ b/patrole_tempest_plugin/rbac_exceptions.py @@ -47,7 +47,7 @@ class RbacEmptyResponseBody(BasePatroleResponseBodyException): """Raised when a list or show action is empty following RBAC authorization failure. """ - message = ("The response body is empty due to policy enforcement failure.") + message = "The response body is empty due to policy enforcement failure." class RbacResourceSetupFailed(BasePatroleException): @@ -104,3 +104,16 @@ class RbacOverrideRoleException(BasePatroleException): * an exception is raised after ``override_role`` context """ message = "Override role failure or incorrect usage" + + +class RbacValidateListException(BasePatroleException): + """Raised when override_role_and_validate_list is used incorrectly. + + Specifically, when: + + * Neither ``resource_id`` nor ``resources`` is initialized + * Both ``resource_id`` and ``resources`` are initialized + * The ``ctx.resources`` variable wasn't set in + override_role_and_validate_list context. + """ + message = "Incorrect usage of override_role_and_validate_list: %(reason)s" diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py index 33955c31..6aab4d72 100644 --- a/patrole_tempest_plugin/rbac_utils.py +++ b/patrole_tempest_plugin/rbac_utils.py @@ -13,7 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. -from contextlib import contextmanager +import contextlib import sys import time @@ -31,6 +31,77 @@ CONF = config.CONF LOG = logging.getLogger(__name__) +class _ValidateListContext(object): + """Context class responsible for validation of the list functions. + + This class is used in ``override_role_and_validate_list`` function and + the result of a list function must be assigned to the ``ctx.resources`` + variable. + + Example:: + + with self.rbac_utils.override_role_and_validate_list(...) as ctx: + ctx.resources = list_function() + + """ + def __init__(self, admin_resources=None, admin_resource_id=None): + """Constructor for ``ValidateListContext``. + + Either ``admin_resources`` or ``admin_resource_id`` should be used, + not both. + + :param list admin_resources: The list of resources received before + calling the ``override_role_and_validate_list`` function. To + validate will be used the ``_validate_len`` function. + :param UUID admin_resource_id: An ID of a resource created before + calling the ``override_role_and_validate_list`` function. To + validate will be used the ``_validate_resource`` function. + :raises RbacValidateListException: if both ``admin_resources`` and + ``admin_resource_id`` are set or unset. + """ + self.resources = None + if admin_resources is not None and not admin_resource_id: + self._admin_len = len(admin_resources) + if not self._admin_len: + raise rbac_exceptions.RbacValidateListException( + reason="the list of admin resources cannot be empty") + self._validate_func = self._validate_len + elif admin_resource_id and admin_resources is None: + self._admin_resource_id = admin_resource_id + self._validate_func = self._validate_resource + else: + raise rbac_exceptions.RbacValidateListException( + reason="admin_resources and admin_resource_id are mutually " + "exclusive") + + def _validate_len(self): + """Validates that the number of resources is less than admin resources. + """ + if not len(self.resources): + raise rbac_exceptions.RbacEmptyResponseBody() + elif self._admin_len > len(self.resources): + raise rbac_exceptions.RbacPartialResponseBody(body=self.resources) + + def _validate_resource(self): + """Validates that the admin resource is present in the resources. + """ + for resource in self.resources: + if resource['id'] == self._admin_resource_id: + return + raise rbac_exceptions.RbacPartialResponseBody(body=self.resources) + + def _validate(self): + """Calls the proper validation function. + + :raises RbacValidateListException: if the ``ctx.resources`` variable is + not assigned. + """ + if self.resources is None: + raise rbac_exceptions.RbacValidateListException( + reason="ctx.resources is not assigned") + self._validate_func() + + class RbacUtils(object): """Utility class responsible for switching ``os_primary`` role. @@ -68,7 +139,7 @@ class RbacUtils(object): admin_role_id = None rbac_role_ids = None - @contextmanager + @contextlib.contextmanager def override_role(self, test_obj): """Override the role used by ``os_primary`` Tempest credentials. @@ -220,6 +291,41 @@ class RbacUtils(object): return False + @contextlib.contextmanager + def override_role_and_validate_list(self, test_obj, admin_resources=None, + admin_resource_id=None): + """Call ``override_role`` and validate RBAC for a list API action. + + List actions usually do soft authorization: partial or empty response + bodies are returned instead of exceptions. This helper validates + that unauthorized roles only return a subset of the available + resources. + Should only be used for validating list API actions. + + :param test_obj: Instance of ``tempest.test.BaseTestCase``. + :param list admin_resources: The list of resources received before + calling the ``override_role_and_validate_list`` function. + :param UUID admin_resource_id: An ID of a resource created before + calling the ``override_role_and_validate_list`` function. + :return: py:class:`_ValidateListContext` object. + + Example:: + + # the resource created by admin + admin_resource_id = ( + self.ntp_client.create_dscp_marking_rule() + ["dscp_marking_rule"]["id']) + with self.rbac_utils.override_role_and_validate_list( + self, admin_resource_id=admin_resource_id) as ctx: + # the list of resources available for member role + ctx.resources = self.ntp_client.list_dscp_marking_rules( + policy_id=self.policy_id)["dscp_marking_rules"] + """ + ctx = _ValidateListContext(admin_resources, admin_resource_id) + with self.override_role(test_obj): + yield ctx + ctx._validate() + class RbacUtilsMixin(object): """Mixin class to be used alongside an instance of diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py index bd13e345..9fe5ffad 100644 --- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py +++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py @@ -200,6 +200,19 @@ class RBACUtilsTest(base.TestCase): mock_override_role.assert_called_once_with(_rbac_utils, test_obj, False) + def test_override_role_and_validate_list(self): + self.patchobject(rbac_utils.RbacUtils, '_override_role') + test_obj = mock.MagicMock() + _rbac_utils = rbac_utils.RbacUtils(test_obj) + m_override_role = self.patchobject(_rbac_utils, 'override_role') + + with (_rbac_utils.override_role_and_validate_list( + test_obj, 'foo')) as ctx: + self.assertIsInstance(ctx, rbac_utils._ValidateListContext) + m_validate = self.patchobject(ctx, '_validate') + m_override_role.assert_called_once_with(test_obj) + m_validate.assert_called_once() + class RBACUtilsMixinTest(base.TestCase): @@ -233,3 +246,87 @@ class RBACUtilsMixinTest(base.TestCase): self.assertTrue(hasattr(child_test, 'rbac_utils')) self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils) + + +class ValidateListContextTest(base.TestCase): + @staticmethod + def _get_context(admin_resources=None, admin_resource_id=None): + return rbac_utils._ValidateListContext( + admin_resources=admin_resources, + admin_resource_id=admin_resource_id) + + def test_incorrect_usage(self): + # admin_resources and admin_resource_is are not assigned + self.assertRaises(rbac_exceptions.RbacValidateListException, + self._get_context) + + # both admin_resources and admin_resource_is are assigned + self.assertRaises(rbac_exceptions.RbacValidateListException, + self._get_context, + admin_resources='foo', admin_resource_id='bar') + # empty list assigned to admin_resources + self.assertRaises(rbac_exceptions.RbacValidateListException, + self._get_context, admin_resources=[]) + + # ctx.resources is not assigned + ctx = self._get_context(admin_resources='foo') + self.assertRaises(rbac_exceptions.RbacValidateListException, + ctx._validate) + + def test_validate_len_negative(self): + ctx = self._get_context(admin_resources=[1, 2, 3, 4]) + self.assertEqual(ctx._validate_len, ctx._validate_func) + self.assertEqual(4, ctx._admin_len) + self.assertFalse(hasattr(ctx, '_admin_resource_id')) + + # the number of resources is less than admin resources + ctx.resources = [1, 2, 3] + self.assertRaises(rbac_exceptions.RbacPartialResponseBody, + ctx._validate_len) + + # the resources is empty + ctx.resources = [] + self.assertRaises(rbac_exceptions.RbacEmptyResponseBody, + ctx._validate_len) + + def test_validate_len(self): + ctx = self._get_context(admin_resources=[1, 2, 3, 4]) + + # the number of resources and admin resources are same + ctx.resources = [1, 2, 3, 4] + self.assertIsNone(ctx._validate_len()) + + def test_validate_resource_negative(self): + ctx = self._get_context(admin_resource_id=1) + self.assertEqual(ctx._validate_resource, ctx._validate_func) + self.assertEqual(1, ctx._admin_resource_id) + self.assertFalse(hasattr(ctx, '_admin_len')) + + # there is no admin resource in the resources + ctx.resources = [{'id': 2}, {'id': 3}] + self.assertRaises(rbac_exceptions.RbacPartialResponseBody, + ctx._validate_resource) + + def test_validate_resource(self): + ctx = self._get_context(admin_resource_id=1) + + # there is admin resource in the resources + ctx.resources = [{'id': 1}, {'id': 2}] + self.assertIsNone(ctx._validate_resource()) + + def test_validate(self): + ctx = self._get_context(admin_resources='foo') + ctx.resources = 'bar' + with mock.patch.object(ctx, '_validate_func', + autospec=False) as m_validate_func: + m_validate_func.side_effect = ( + rbac_exceptions.RbacPartialResponseBody, + None + ) + self.assertRaises(rbac_exceptions.RbacPartialResponseBody, + ctx._validate) + m_validate_func.assert_called_once() + + m_validate_func.reset_mock() + ctx._validate() + m_validate_func.assert_called_once() diff --git a/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml new file mode 100644 index 00000000..de05b761 --- /dev/null +++ b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml @@ -0,0 +1,37 @@ +--- +features: + - | + In order to test the list actions which doesn't have its own policy, + implemented the ``override_role_and_validate_list`` function. + The function has two modes: + + * Validating the number of the resources in a ``ResponseBody`` before + calling the ``override_role`` and after. + + .. code-block:: python + + # make sure at least one resource is available + self.ntp_client.create_policy_dscp_marking_rule() + # the list of resources available for a user with admin role + admin_resources = self.ntp_client.list_dscp_marking_rules( + policy_id=self.policy_id)["dscp_marking_rules"] + with self.rbac_utils.override_role_and_validate_list( + self, admin_resources=admin_resources) as ctx: + # the list of resources available for a user with member role + ctx.resources = self.ntp_client.list_dscp_marking_rules( + policy_id=self.policy_id)["dscp_marking_rules"] + + * Validating that a resource, created before ``override_role``, is not + present in a ``ResponseBody``. + + .. code-block:: python + + # the resource created by a user with admin role + admin_resource_id = ( + self.ntp_client.create_dscp_marking_rule() + ["dscp_marking_rule"]["id']) + with self.rbac_utils.override_role_and_validate_list( + self, admin_resource_id=admin_resource_id) as ctx: + # the list of resources available for a user wirh member role + ctx.resources = self.ntp_client.list_dscp_marking_rules( + policy_id=self.policy_id)["dscp_marking_rules"]