Helper for validating RBAC list actions

List RBAC actions typically perform soft authorization checks meaning
that the response bodies omit resources that the user isn't authorized
to see.
For example, if an admin user creates a user, member role might not be
able to see that user when listing all the users in a tenant,
depending on the RBAC rule.
This patch set adds override_role_and_validate_list function to
RbacUtils to validate RBAC flows for API list actions.

Change-Id: I5f39efc8aa0004d4ad435cbd6b8fb037c33832d6
This commit is contained in:
Sergey Vilgelm 2018-10-11 14:04:48 -05:00
parent 0464e81c98
commit bab9e9467c
No known key found for this signature in database
GPG Key ID: 08D0E2FF778887E6
4 changed files with 256 additions and 3 deletions

View File

@ -47,7 +47,7 @@ class RbacEmptyResponseBody(BasePatroleResponseBodyException):
"""Raised when a list or show action is empty following RBAC authorization
failure.
"""
message = ("The response body is empty due to policy enforcement failure.")
message = "The response body is empty due to policy enforcement failure."
class RbacResourceSetupFailed(BasePatroleException):
@ -104,3 +104,16 @@ class RbacOverrideRoleException(BasePatroleException):
* an exception is raised after ``override_role`` context
"""
message = "Override role failure or incorrect usage"
class RbacValidateListException(BasePatroleException):
"""Raised when override_role_and_validate_list is used incorrectly.
Specifically, when:
* Neither ``resource_id`` nor ``resources`` is initialized
* Both ``resource_id`` and ``resources`` are initialized
* The ``ctx.resources`` variable wasn't set in
override_role_and_validate_list context.
"""
message = "Incorrect usage of override_role_and_validate_list: %(reason)s"

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
import contextlib
import sys
import time
@ -31,6 +31,77 @@ CONF = config.CONF
LOG = logging.getLogger(__name__)
class _ValidateListContext(object):
"""Context class responsible for validation of the list functions.
This class is used in ``override_role_and_validate_list`` function and
the result of a list function must be assigned to the ``ctx.resources``
variable.
Example::
with self.rbac_utils.override_role_and_validate_list(...) as ctx:
ctx.resources = list_function()
"""
def __init__(self, admin_resources=None, admin_resource_id=None):
"""Constructor for ``ValidateListContext``.
Either ``admin_resources`` or ``admin_resource_id`` should be used,
not both.
:param list admin_resources: The list of resources received before
calling the ``override_role_and_validate_list`` function. To
validate will be used the ``_validate_len`` function.
:param UUID admin_resource_id: An ID of a resource created before
calling the ``override_role_and_validate_list`` function. To
validate will be used the ``_validate_resource`` function.
:raises RbacValidateListException: if both ``admin_resources`` and
``admin_resource_id`` are set or unset.
"""
self.resources = None
if admin_resources is not None and not admin_resource_id:
self._admin_len = len(admin_resources)
if not self._admin_len:
raise rbac_exceptions.RbacValidateListException(
reason="the list of admin resources cannot be empty")
self._validate_func = self._validate_len
elif admin_resource_id and admin_resources is None:
self._admin_resource_id = admin_resource_id
self._validate_func = self._validate_resource
else:
raise rbac_exceptions.RbacValidateListException(
reason="admin_resources and admin_resource_id are mutually "
"exclusive")
def _validate_len(self):
"""Validates that the number of resources is less than admin resources.
"""
if not len(self.resources):
raise rbac_exceptions.RbacEmptyResponseBody()
elif self._admin_len > len(self.resources):
raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
def _validate_resource(self):
"""Validates that the admin resource is present in the resources.
"""
for resource in self.resources:
if resource['id'] == self._admin_resource_id:
return
raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
def _validate(self):
"""Calls the proper validation function.
:raises RbacValidateListException: if the ``ctx.resources`` variable is
not assigned.
"""
if self.resources is None:
raise rbac_exceptions.RbacValidateListException(
reason="ctx.resources is not assigned")
self._validate_func()
class RbacUtils(object):
"""Utility class responsible for switching ``os_primary`` role.
@ -68,7 +139,7 @@ class RbacUtils(object):
admin_role_id = None
rbac_role_ids = None
@contextmanager
@contextlib.contextmanager
def override_role(self, test_obj):
"""Override the role used by ``os_primary`` Tempest credentials.
@ -220,6 +291,41 @@ class RbacUtils(object):
return False
@contextlib.contextmanager
def override_role_and_validate_list(self, test_obj, admin_resources=None,
admin_resource_id=None):
"""Call ``override_role`` and validate RBAC for a list API action.
List actions usually do soft authorization: partial or empty response
bodies are returned instead of exceptions. This helper validates
that unauthorized roles only return a subset of the available
resources.
Should only be used for validating list API actions.
:param test_obj: Instance of ``tempest.test.BaseTestCase``.
:param list admin_resources: The list of resources received before
calling the ``override_role_and_validate_list`` function.
:param UUID admin_resource_id: An ID of a resource created before
calling the ``override_role_and_validate_list`` function.
:return: py:class:`_ValidateListContext` object.
Example::
# the resource created by admin
admin_resource_id = (
self.ntp_client.create_dscp_marking_rule()
["dscp_marking_rule"]["id'])
with self.rbac_utils.override_role_and_validate_list(
self, admin_resource_id=admin_resource_id) as ctx:
# the list of resources available for member role
ctx.resources = self.ntp_client.list_dscp_marking_rules(
policy_id=self.policy_id)["dscp_marking_rules"]
"""
ctx = _ValidateListContext(admin_resources, admin_resource_id)
with self.override_role(test_obj):
yield ctx
ctx._validate()
class RbacUtilsMixin(object):
"""Mixin class to be used alongside an instance of

View File

@ -200,6 +200,19 @@ class RBACUtilsTest(base.TestCase):
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
def test_override_role_and_validate_list(self):
self.patchobject(rbac_utils.RbacUtils, '_override_role')
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
m_override_role = self.patchobject(_rbac_utils, 'override_role')
with (_rbac_utils.override_role_and_validate_list(
test_obj, 'foo')) as ctx:
self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
m_validate = self.patchobject(ctx, '_validate')
m_override_role.assert_called_once_with(test_obj)
m_validate.assert_called_once()
class RBACUtilsMixinTest(base.TestCase):
@ -233,3 +246,87 @@ class RBACUtilsMixinTest(base.TestCase):
self.assertTrue(hasattr(child_test, 'rbac_utils'))
self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
class ValidateListContextTest(base.TestCase):
@staticmethod
def _get_context(admin_resources=None, admin_resource_id=None):
return rbac_utils._ValidateListContext(
admin_resources=admin_resources,
admin_resource_id=admin_resource_id)
def test_incorrect_usage(self):
# admin_resources and admin_resource_is are not assigned
self.assertRaises(rbac_exceptions.RbacValidateListException,
self._get_context)
# both admin_resources and admin_resource_is are assigned
self.assertRaises(rbac_exceptions.RbacValidateListException,
self._get_context,
admin_resources='foo', admin_resource_id='bar')
# empty list assigned to admin_resources
self.assertRaises(rbac_exceptions.RbacValidateListException,
self._get_context, admin_resources=[])
# ctx.resources is not assigned
ctx = self._get_context(admin_resources='foo')
self.assertRaises(rbac_exceptions.RbacValidateListException,
ctx._validate)
def test_validate_len_negative(self):
ctx = self._get_context(admin_resources=[1, 2, 3, 4])
self.assertEqual(ctx._validate_len, ctx._validate_func)
self.assertEqual(4, ctx._admin_len)
self.assertFalse(hasattr(ctx, '_admin_resource_id'))
# the number of resources is less than admin resources
ctx.resources = [1, 2, 3]
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
ctx._validate_len)
# the resources is empty
ctx.resources = []
self.assertRaises(rbac_exceptions.RbacEmptyResponseBody,
ctx._validate_len)
def test_validate_len(self):
ctx = self._get_context(admin_resources=[1, 2, 3, 4])
# the number of resources and admin resources are same
ctx.resources = [1, 2, 3, 4]
self.assertIsNone(ctx._validate_len())
def test_validate_resource_negative(self):
ctx = self._get_context(admin_resource_id=1)
self.assertEqual(ctx._validate_resource, ctx._validate_func)
self.assertEqual(1, ctx._admin_resource_id)
self.assertFalse(hasattr(ctx, '_admin_len'))
# there is no admin resource in the resources
ctx.resources = [{'id': 2}, {'id': 3}]
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
ctx._validate_resource)
def test_validate_resource(self):
ctx = self._get_context(admin_resource_id=1)
# there is admin resource in the resources
ctx.resources = [{'id': 1}, {'id': 2}]
self.assertIsNone(ctx._validate_resource())
def test_validate(self):
ctx = self._get_context(admin_resources='foo')
ctx.resources = 'bar'
with mock.patch.object(ctx, '_validate_func',
autospec=False) as m_validate_func:
m_validate_func.side_effect = (
rbac_exceptions.RbacPartialResponseBody,
None
)
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
ctx._validate)
m_validate_func.assert_called_once()
m_validate_func.reset_mock()
ctx._validate()
m_validate_func.assert_called_once()

View File

@ -0,0 +1,37 @@
---
features:
- |
In order to test the list actions which doesn't have its own policy,
implemented the ``override_role_and_validate_list`` function.
The function has two modes:
* Validating the number of the resources in a ``ResponseBody`` before
calling the ``override_role`` and after.
.. code-block:: python
# make sure at least one resource is available
self.ntp_client.create_policy_dscp_marking_rule()
# the list of resources available for a user with admin role
admin_resources = self.ntp_client.list_dscp_marking_rules(
policy_id=self.policy_id)["dscp_marking_rules"]
with self.rbac_utils.override_role_and_validate_list(
self, admin_resources=admin_resources) as ctx:
# the list of resources available for a user with member role
ctx.resources = self.ntp_client.list_dscp_marking_rules(
policy_id=self.policy_id)["dscp_marking_rules"]
* Validating that a resource, created before ``override_role``, is not
present in a ``ResponseBody``.
.. code-block:: python
# the resource created by a user with admin role
admin_resource_id = (
self.ntp_client.create_dscp_marking_rule()
["dscp_marking_rule"]["id'])
with self.rbac_utils.override_role_and_validate_list(
self, admin_resource_id=admin_resource_id) as ctx:
# the list of resources available for a user wirh member role
ctx.resources = self.ntp_client.list_dscp_marking_rules(
policy_id=self.policy_id)["dscp_marking_rules"]