Merge "Helper for validating RBAC list actions"

This commit is contained in:
Zuul 2018-11-28 17:11:37 +00:00 committed by Gerrit Code Review
commit 1d91ee1010
4 changed files with 256 additions and 3 deletions

View File

@ -47,7 +47,7 @@ class RbacEmptyResponseBody(BasePatroleResponseBodyException):
"""Raised when a list or show action is empty following RBAC authorization
failure.
"""
message = ("The response body is empty due to policy enforcement failure.")
message = "The response body is empty due to policy enforcement failure."
class RbacResourceSetupFailed(BasePatroleException):
@ -104,3 +104,16 @@ class RbacOverrideRoleException(BasePatroleException):
* an exception is raised after ``override_role`` context
"""
message = "Override role failure or incorrect usage"
class RbacValidateListException(BasePatroleException):
"""Raised when override_role_and_validate_list is used incorrectly.
Specifically, when:
* Neither ``resource_id`` nor ``resources`` is initialized
* Both ``resource_id`` and ``resources`` are initialized
* The ``ctx.resources`` variable wasn't set in
override_role_and_validate_list context.
"""
message = "Incorrect usage of override_role_and_validate_list: %(reason)s"

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
import contextlib
import sys
import time
@ -31,6 +31,77 @@ CONF = config.CONF
LOG = logging.getLogger(__name__)
class _ValidateListContext(object):
"""Context class responsible for validation of the list functions.
This class is used in ``override_role_and_validate_list`` function and
the result of a list function must be assigned to the ``ctx.resources``
variable.
Example::
with self.rbac_utils.override_role_and_validate_list(...) as ctx:
ctx.resources = list_function()
"""
def __init__(self, admin_resources=None, admin_resource_id=None):
"""Constructor for ``ValidateListContext``.
Either ``admin_resources`` or ``admin_resource_id`` should be used,
not both.
:param list admin_resources: The list of resources received before
calling the ``override_role_and_validate_list`` function. To
validate will be used the ``_validate_len`` function.
:param UUID admin_resource_id: An ID of a resource created before
calling the ``override_role_and_validate_list`` function. To
validate will be used the ``_validate_resource`` function.
:raises RbacValidateListException: if both ``admin_resources`` and
``admin_resource_id`` are set or unset.
"""
self.resources = None
if admin_resources is not None and not admin_resource_id:
self._admin_len = len(admin_resources)
if not self._admin_len:
raise rbac_exceptions.RbacValidateListException(
reason="the list of admin resources cannot be empty")
self._validate_func = self._validate_len
elif admin_resource_id and admin_resources is None:
self._admin_resource_id = admin_resource_id
self._validate_func = self._validate_resource
else:
raise rbac_exceptions.RbacValidateListException(
reason="admin_resources and admin_resource_id are mutually "
"exclusive")
def _validate_len(self):
"""Validates that the number of resources is less than admin resources.
"""
if not len(self.resources):
raise rbac_exceptions.RbacEmptyResponseBody()
elif self._admin_len > len(self.resources):
raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
def _validate_resource(self):
"""Validates that the admin resource is present in the resources.
"""
for resource in self.resources:
if resource['id'] == self._admin_resource_id:
return
raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
def _validate(self):
"""Calls the proper validation function.
:raises RbacValidateListException: if the ``ctx.resources`` variable is
not assigned.
"""
if self.resources is None:
raise rbac_exceptions.RbacValidateListException(
reason="ctx.resources is not assigned")
self._validate_func()
class RbacUtils(object):
"""Utility class responsible for switching ``os_primary`` role.
@ -68,7 +139,7 @@ class RbacUtils(object):
admin_role_id = None
rbac_role_ids = None
@contextmanager
@contextlib.contextmanager
def override_role(self, test_obj):
"""Override the role used by ``os_primary`` Tempest credentials.
@ -220,6 +291,41 @@ class RbacUtils(object):
return False
@contextlib.contextmanager
def override_role_and_validate_list(self, test_obj, admin_resources=None,
admin_resource_id=None):
"""Call ``override_role`` and validate RBAC for a list API action.
List actions usually do soft authorization: partial or empty response
bodies are returned instead of exceptions. This helper validates
that unauthorized roles only return a subset of the available
resources.
Should only be used for validating list API actions.
:param test_obj: Instance of ``tempest.test.BaseTestCase``.
:param list admin_resources: The list of resources received before
calling the ``override_role_and_validate_list`` function.
:param UUID admin_resource_id: An ID of a resource created before
calling the ``override_role_and_validate_list`` function.
:return: py:class:`_ValidateListContext` object.
Example::
# the resource created by admin
admin_resource_id = (
self.ntp_client.create_dscp_marking_rule()
["dscp_marking_rule"]["id'])
with self.rbac_utils.override_role_and_validate_list(
self, admin_resource_id=admin_resource_id) as ctx:
# the list of resources available for member role
ctx.resources = self.ntp_client.list_dscp_marking_rules(
policy_id=self.policy_id)["dscp_marking_rules"]
"""
ctx = _ValidateListContext(admin_resources, admin_resource_id)
with self.override_role(test_obj):
yield ctx
ctx._validate()
class RbacUtilsMixin(object):
"""Mixin class to be used alongside an instance of

View File

@ -200,6 +200,19 @@ class RBACUtilsTest(base.TestCase):
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
def test_override_role_and_validate_list(self):
self.patchobject(rbac_utils.RbacUtils, '_override_role')
test_obj = mock.MagicMock()
_rbac_utils = rbac_utils.RbacUtils(test_obj)
m_override_role = self.patchobject(_rbac_utils, 'override_role')
with (_rbac_utils.override_role_and_validate_list(
test_obj, 'foo')) as ctx:
self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
m_validate = self.patchobject(ctx, '_validate')
m_override_role.assert_called_once_with(test_obj)
m_validate.assert_called_once()
class RBACUtilsMixinTest(base.TestCase):
@ -233,3 +246,87 @@ class RBACUtilsMixinTest(base.TestCase):
self.assertTrue(hasattr(child_test, 'rbac_utils'))
self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
class ValidateListContextTest(base.TestCase):
@staticmethod
def _get_context(admin_resources=None, admin_resource_id=None):
return rbac_utils._ValidateListContext(
admin_resources=admin_resources,
admin_resource_id=admin_resource_id)
def test_incorrect_usage(self):
# admin_resources and admin_resource_is are not assigned
self.assertRaises(rbac_exceptions.RbacValidateListException,
self._get_context)
# both admin_resources and admin_resource_is are assigned
self.assertRaises(rbac_exceptions.RbacValidateListException,
self._get_context,
admin_resources='foo', admin_resource_id='bar')
# empty list assigned to admin_resources
self.assertRaises(rbac_exceptions.RbacValidateListException,
self._get_context, admin_resources=[])
# ctx.resources is not assigned
ctx = self._get_context(admin_resources='foo')
self.assertRaises(rbac_exceptions.RbacValidateListException,
ctx._validate)
def test_validate_len_negative(self):
ctx = self._get_context(admin_resources=[1, 2, 3, 4])
self.assertEqual(ctx._validate_len, ctx._validate_func)
self.assertEqual(4, ctx._admin_len)
self.assertFalse(hasattr(ctx, '_admin_resource_id'))
# the number of resources is less than admin resources
ctx.resources = [1, 2, 3]
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
ctx._validate_len)
# the resources is empty
ctx.resources = []
self.assertRaises(rbac_exceptions.RbacEmptyResponseBody,
ctx._validate_len)
def test_validate_len(self):
ctx = self._get_context(admin_resources=[1, 2, 3, 4])
# the number of resources and admin resources are same
ctx.resources = [1, 2, 3, 4]
self.assertIsNone(ctx._validate_len())
def test_validate_resource_negative(self):
ctx = self._get_context(admin_resource_id=1)
self.assertEqual(ctx._validate_resource, ctx._validate_func)
self.assertEqual(1, ctx._admin_resource_id)
self.assertFalse(hasattr(ctx, '_admin_len'))
# there is no admin resource in the resources
ctx.resources = [{'id': 2}, {'id': 3}]
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
ctx._validate_resource)
def test_validate_resource(self):
ctx = self._get_context(admin_resource_id=1)
# there is admin resource in the resources
ctx.resources = [{'id': 1}, {'id': 2}]
self.assertIsNone(ctx._validate_resource())
def test_validate(self):
ctx = self._get_context(admin_resources='foo')
ctx.resources = 'bar'
with mock.patch.object(ctx, '_validate_func',
autospec=False) as m_validate_func:
m_validate_func.side_effect = (
rbac_exceptions.RbacPartialResponseBody,
None
)
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
ctx._validate)
m_validate_func.assert_called_once()
m_validate_func.reset_mock()
ctx._validate()
m_validate_func.assert_called_once()

View File

@ -0,0 +1,37 @@
---
features:
- |
In order to test the list actions which doesn't have its own policy,
implemented the ``override_role_and_validate_list`` function.
The function has two modes:
* Validating the number of the resources in a ``ResponseBody`` before
calling the ``override_role`` and after.
.. code-block:: python
# make sure at least one resource is available
self.ntp_client.create_policy_dscp_marking_rule()
# the list of resources available for a user with admin role
admin_resources = self.ntp_client.list_dscp_marking_rules(
policy_id=self.policy_id)["dscp_marking_rules"]
with self.rbac_utils.override_role_and_validate_list(
self, admin_resources=admin_resources) as ctx:
# the list of resources available for a user with member role
ctx.resources = self.ntp_client.list_dscp_marking_rules(
policy_id=self.policy_id)["dscp_marking_rules"]
* Validating that a resource, created before ``override_role``, is not
present in a ``ResponseBody``.
.. code-block:: python
# the resource created by a user with admin role
admin_resource_id = (
self.ntp_client.create_dscp_marking_rule()
["dscp_marking_rule"]["id'])
with self.rbac_utils.override_role_and_validate_list(
self, admin_resource_id=admin_resource_id) as ctx:
# the list of resources available for a user wirh member role
ctx.resources = self.ntp_client.list_dscp_marking_rules(
policy_id=self.policy_id)["dscp_marking_rules"]