Helper for validating RBAC list actions
List RBAC actions typically perform soft authorization checks meaning that the response bodies omit resources that the user isn't authorized to see. For example, if an admin user creates a user, member role might not be able to see that user when listing all the users in a tenant, depending on the RBAC rule. This patch set adds override_role_and_validate_list function to RbacUtils to validate RBAC flows for API list actions. Change-Id: I5f39efc8aa0004d4ad435cbd6b8fb037c33832d6
This commit is contained in:
parent
0464e81c98
commit
bab9e9467c
|
@ -47,7 +47,7 @@ class RbacEmptyResponseBody(BasePatroleResponseBodyException):
|
|||
"""Raised when a list or show action is empty following RBAC authorization
|
||||
failure.
|
||||
"""
|
||||
message = ("The response body is empty due to policy enforcement failure.")
|
||||
message = "The response body is empty due to policy enforcement failure."
|
||||
|
||||
|
||||
class RbacResourceSetupFailed(BasePatroleException):
|
||||
|
@ -104,3 +104,16 @@ class RbacOverrideRoleException(BasePatroleException):
|
|||
* an exception is raised after ``override_role`` context
|
||||
"""
|
||||
message = "Override role failure or incorrect usage"
|
||||
|
||||
|
||||
class RbacValidateListException(BasePatroleException):
|
||||
"""Raised when override_role_and_validate_list is used incorrectly.
|
||||
|
||||
Specifically, when:
|
||||
|
||||
* Neither ``resource_id`` nor ``resources`` is initialized
|
||||
* Both ``resource_id`` and ``resources`` are initialized
|
||||
* The ``ctx.resources`` variable wasn't set in
|
||||
override_role_and_validate_list context.
|
||||
"""
|
||||
message = "Incorrect usage of override_role_and_validate_list: %(reason)s"
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from contextlib import contextmanager
|
||||
import contextlib
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
@ -31,6 +31,77 @@ CONF = config.CONF
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _ValidateListContext(object):
|
||||
"""Context class responsible for validation of the list functions.
|
||||
|
||||
This class is used in ``override_role_and_validate_list`` function and
|
||||
the result of a list function must be assigned to the ``ctx.resources``
|
||||
variable.
|
||||
|
||||
Example::
|
||||
|
||||
with self.rbac_utils.override_role_and_validate_list(...) as ctx:
|
||||
ctx.resources = list_function()
|
||||
|
||||
"""
|
||||
def __init__(self, admin_resources=None, admin_resource_id=None):
|
||||
"""Constructor for ``ValidateListContext``.
|
||||
|
||||
Either ``admin_resources`` or ``admin_resource_id`` should be used,
|
||||
not both.
|
||||
|
||||
:param list admin_resources: The list of resources received before
|
||||
calling the ``override_role_and_validate_list`` function. To
|
||||
validate will be used the ``_validate_len`` function.
|
||||
:param UUID admin_resource_id: An ID of a resource created before
|
||||
calling the ``override_role_and_validate_list`` function. To
|
||||
validate will be used the ``_validate_resource`` function.
|
||||
:raises RbacValidateListException: if both ``admin_resources`` and
|
||||
``admin_resource_id`` are set or unset.
|
||||
"""
|
||||
self.resources = None
|
||||
if admin_resources is not None and not admin_resource_id:
|
||||
self._admin_len = len(admin_resources)
|
||||
if not self._admin_len:
|
||||
raise rbac_exceptions.RbacValidateListException(
|
||||
reason="the list of admin resources cannot be empty")
|
||||
self._validate_func = self._validate_len
|
||||
elif admin_resource_id and admin_resources is None:
|
||||
self._admin_resource_id = admin_resource_id
|
||||
self._validate_func = self._validate_resource
|
||||
else:
|
||||
raise rbac_exceptions.RbacValidateListException(
|
||||
reason="admin_resources and admin_resource_id are mutually "
|
||||
"exclusive")
|
||||
|
||||
def _validate_len(self):
|
||||
"""Validates that the number of resources is less than admin resources.
|
||||
"""
|
||||
if not len(self.resources):
|
||||
raise rbac_exceptions.RbacEmptyResponseBody()
|
||||
elif self._admin_len > len(self.resources):
|
||||
raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
|
||||
|
||||
def _validate_resource(self):
|
||||
"""Validates that the admin resource is present in the resources.
|
||||
"""
|
||||
for resource in self.resources:
|
||||
if resource['id'] == self._admin_resource_id:
|
||||
return
|
||||
raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
|
||||
|
||||
def _validate(self):
|
||||
"""Calls the proper validation function.
|
||||
|
||||
:raises RbacValidateListException: if the ``ctx.resources`` variable is
|
||||
not assigned.
|
||||
"""
|
||||
if self.resources is None:
|
||||
raise rbac_exceptions.RbacValidateListException(
|
||||
reason="ctx.resources is not assigned")
|
||||
self._validate_func()
|
||||
|
||||
|
||||
class RbacUtils(object):
|
||||
"""Utility class responsible for switching ``os_primary`` role.
|
||||
|
||||
|
@ -68,7 +139,7 @@ class RbacUtils(object):
|
|||
admin_role_id = None
|
||||
rbac_role_ids = None
|
||||
|
||||
@contextmanager
|
||||
@contextlib.contextmanager
|
||||
def override_role(self, test_obj):
|
||||
"""Override the role used by ``os_primary`` Tempest credentials.
|
||||
|
||||
|
@ -220,6 +291,41 @@ class RbacUtils(object):
|
|||
|
||||
return False
|
||||
|
||||
@contextlib.contextmanager
|
||||
def override_role_and_validate_list(self, test_obj, admin_resources=None,
|
||||
admin_resource_id=None):
|
||||
"""Call ``override_role`` and validate RBAC for a list API action.
|
||||
|
||||
List actions usually do soft authorization: partial or empty response
|
||||
bodies are returned instead of exceptions. This helper validates
|
||||
that unauthorized roles only return a subset of the available
|
||||
resources.
|
||||
Should only be used for validating list API actions.
|
||||
|
||||
:param test_obj: Instance of ``tempest.test.BaseTestCase``.
|
||||
:param list admin_resources: The list of resources received before
|
||||
calling the ``override_role_and_validate_list`` function.
|
||||
:param UUID admin_resource_id: An ID of a resource created before
|
||||
calling the ``override_role_and_validate_list`` function.
|
||||
:return: py:class:`_ValidateListContext` object.
|
||||
|
||||
Example::
|
||||
|
||||
# the resource created by admin
|
||||
admin_resource_id = (
|
||||
self.ntp_client.create_dscp_marking_rule()
|
||||
["dscp_marking_rule"]["id'])
|
||||
with self.rbac_utils.override_role_and_validate_list(
|
||||
self, admin_resource_id=admin_resource_id) as ctx:
|
||||
# the list of resources available for member role
|
||||
ctx.resources = self.ntp_client.list_dscp_marking_rules(
|
||||
policy_id=self.policy_id)["dscp_marking_rules"]
|
||||
"""
|
||||
ctx = _ValidateListContext(admin_resources, admin_resource_id)
|
||||
with self.override_role(test_obj):
|
||||
yield ctx
|
||||
ctx._validate()
|
||||
|
||||
|
||||
class RbacUtilsMixin(object):
|
||||
"""Mixin class to be used alongside an instance of
|
||||
|
|
|
@ -200,6 +200,19 @@ class RBACUtilsTest(base.TestCase):
|
|||
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
|
||||
False)
|
||||
|
||||
def test_override_role_and_validate_list(self):
|
||||
self.patchobject(rbac_utils.RbacUtils, '_override_role')
|
||||
test_obj = mock.MagicMock()
|
||||
_rbac_utils = rbac_utils.RbacUtils(test_obj)
|
||||
m_override_role = self.patchobject(_rbac_utils, 'override_role')
|
||||
|
||||
with (_rbac_utils.override_role_and_validate_list(
|
||||
test_obj, 'foo')) as ctx:
|
||||
self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
|
||||
m_validate = self.patchobject(ctx, '_validate')
|
||||
m_override_role.assert_called_once_with(test_obj)
|
||||
m_validate.assert_called_once()
|
||||
|
||||
|
||||
class RBACUtilsMixinTest(base.TestCase):
|
||||
|
||||
|
@ -233,3 +246,87 @@ class RBACUtilsMixinTest(base.TestCase):
|
|||
|
||||
self.assertTrue(hasattr(child_test, 'rbac_utils'))
|
||||
self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
|
||||
|
||||
|
||||
class ValidateListContextTest(base.TestCase):
|
||||
@staticmethod
|
||||
def _get_context(admin_resources=None, admin_resource_id=None):
|
||||
return rbac_utils._ValidateListContext(
|
||||
admin_resources=admin_resources,
|
||||
admin_resource_id=admin_resource_id)
|
||||
|
||||
def test_incorrect_usage(self):
|
||||
# admin_resources and admin_resource_is are not assigned
|
||||
self.assertRaises(rbac_exceptions.RbacValidateListException,
|
||||
self._get_context)
|
||||
|
||||
# both admin_resources and admin_resource_is are assigned
|
||||
self.assertRaises(rbac_exceptions.RbacValidateListException,
|
||||
self._get_context,
|
||||
admin_resources='foo', admin_resource_id='bar')
|
||||
# empty list assigned to admin_resources
|
||||
self.assertRaises(rbac_exceptions.RbacValidateListException,
|
||||
self._get_context, admin_resources=[])
|
||||
|
||||
# ctx.resources is not assigned
|
||||
ctx = self._get_context(admin_resources='foo')
|
||||
self.assertRaises(rbac_exceptions.RbacValidateListException,
|
||||
ctx._validate)
|
||||
|
||||
def test_validate_len_negative(self):
|
||||
ctx = self._get_context(admin_resources=[1, 2, 3, 4])
|
||||
self.assertEqual(ctx._validate_len, ctx._validate_func)
|
||||
self.assertEqual(4, ctx._admin_len)
|
||||
self.assertFalse(hasattr(ctx, '_admin_resource_id'))
|
||||
|
||||
# the number of resources is less than admin resources
|
||||
ctx.resources = [1, 2, 3]
|
||||
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
|
||||
ctx._validate_len)
|
||||
|
||||
# the resources is empty
|
||||
ctx.resources = []
|
||||
self.assertRaises(rbac_exceptions.RbacEmptyResponseBody,
|
||||
ctx._validate_len)
|
||||
|
||||
def test_validate_len(self):
|
||||
ctx = self._get_context(admin_resources=[1, 2, 3, 4])
|
||||
|
||||
# the number of resources and admin resources are same
|
||||
ctx.resources = [1, 2, 3, 4]
|
||||
self.assertIsNone(ctx._validate_len())
|
||||
|
||||
def test_validate_resource_negative(self):
|
||||
ctx = self._get_context(admin_resource_id=1)
|
||||
self.assertEqual(ctx._validate_resource, ctx._validate_func)
|
||||
self.assertEqual(1, ctx._admin_resource_id)
|
||||
self.assertFalse(hasattr(ctx, '_admin_len'))
|
||||
|
||||
# there is no admin resource in the resources
|
||||
ctx.resources = [{'id': 2}, {'id': 3}]
|
||||
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
|
||||
ctx._validate_resource)
|
||||
|
||||
def test_validate_resource(self):
|
||||
ctx = self._get_context(admin_resource_id=1)
|
||||
|
||||
# there is admin resource in the resources
|
||||
ctx.resources = [{'id': 1}, {'id': 2}]
|
||||
self.assertIsNone(ctx._validate_resource())
|
||||
|
||||
def test_validate(self):
|
||||
ctx = self._get_context(admin_resources='foo')
|
||||
ctx.resources = 'bar'
|
||||
with mock.patch.object(ctx, '_validate_func',
|
||||
autospec=False) as m_validate_func:
|
||||
m_validate_func.side_effect = (
|
||||
rbac_exceptions.RbacPartialResponseBody,
|
||||
None
|
||||
)
|
||||
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
|
||||
ctx._validate)
|
||||
m_validate_func.assert_called_once()
|
||||
|
||||
m_validate_func.reset_mock()
|
||||
ctx._validate()
|
||||
m_validate_func.assert_called_once()
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
In order to test the list actions which doesn't have its own policy,
|
||||
implemented the ``override_role_and_validate_list`` function.
|
||||
The function has two modes:
|
||||
|
||||
* Validating the number of the resources in a ``ResponseBody`` before
|
||||
calling the ``override_role`` and after.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# make sure at least one resource is available
|
||||
self.ntp_client.create_policy_dscp_marking_rule()
|
||||
# the list of resources available for a user with admin role
|
||||
admin_resources = self.ntp_client.list_dscp_marking_rules(
|
||||
policy_id=self.policy_id)["dscp_marking_rules"]
|
||||
with self.rbac_utils.override_role_and_validate_list(
|
||||
self, admin_resources=admin_resources) as ctx:
|
||||
# the list of resources available for a user with member role
|
||||
ctx.resources = self.ntp_client.list_dscp_marking_rules(
|
||||
policy_id=self.policy_id)["dscp_marking_rules"]
|
||||
|
||||
* Validating that a resource, created before ``override_role``, is not
|
||||
present in a ``ResponseBody``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# the resource created by a user with admin role
|
||||
admin_resource_id = (
|
||||
self.ntp_client.create_dscp_marking_rule()
|
||||
["dscp_marking_rule"]["id'])
|
||||
with self.rbac_utils.override_role_and_validate_list(
|
||||
self, admin_resource_id=admin_resource_id) as ctx:
|
||||
# the list of resources available for a user wirh member role
|
||||
ctx.resources = self.ntp_client.list_dscp_marking_rules(
|
||||
policy_id=self.policy_id)["dscp_marking_rules"]
|
Loading…
Reference in New Issue