Merge "Helper for validating RBAC list actions"
This commit is contained in:
commit
1d91ee1010
|
@ -47,7 +47,7 @@ class RbacEmptyResponseBody(BasePatroleResponseBodyException):
|
|||
"""Raised when a list or show action is empty following RBAC authorization
|
||||
failure.
|
||||
"""
|
||||
message = ("The response body is empty due to policy enforcement failure.")
|
||||
message = "The response body is empty due to policy enforcement failure."
|
||||
|
||||
|
||||
class RbacResourceSetupFailed(BasePatroleException):
|
||||
|
@ -104,3 +104,16 @@ class RbacOverrideRoleException(BasePatroleException):
|
|||
* an exception is raised after ``override_role`` context
|
||||
"""
|
||||
message = "Override role failure or incorrect usage"
|
||||
|
||||
|
||||
class RbacValidateListException(BasePatroleException):
|
||||
"""Raised when override_role_and_validate_list is used incorrectly.
|
||||
|
||||
Specifically, when:
|
||||
|
||||
* Neither ``resource_id`` nor ``resources`` is initialized
|
||||
* Both ``resource_id`` and ``resources`` are initialized
|
||||
* The ``ctx.resources`` variable wasn't set in
|
||||
override_role_and_validate_list context.
|
||||
"""
|
||||
message = "Incorrect usage of override_role_and_validate_list: %(reason)s"
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from contextlib import contextmanager
|
||||
import contextlib
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
@ -31,6 +31,77 @@ CONF = config.CONF
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _ValidateListContext(object):
|
||||
"""Context class responsible for validation of the list functions.
|
||||
|
||||
This class is used in ``override_role_and_validate_list`` function and
|
||||
the result of a list function must be assigned to the ``ctx.resources``
|
||||
variable.
|
||||
|
||||
Example::
|
||||
|
||||
with self.rbac_utils.override_role_and_validate_list(...) as ctx:
|
||||
ctx.resources = list_function()
|
||||
|
||||
"""
|
||||
def __init__(self, admin_resources=None, admin_resource_id=None):
|
||||
"""Constructor for ``ValidateListContext``.
|
||||
|
||||
Either ``admin_resources`` or ``admin_resource_id`` should be used,
|
||||
not both.
|
||||
|
||||
:param list admin_resources: The list of resources received before
|
||||
calling the ``override_role_and_validate_list`` function. To
|
||||
validate will be used the ``_validate_len`` function.
|
||||
:param UUID admin_resource_id: An ID of a resource created before
|
||||
calling the ``override_role_and_validate_list`` function. To
|
||||
validate will be used the ``_validate_resource`` function.
|
||||
:raises RbacValidateListException: if both ``admin_resources`` and
|
||||
``admin_resource_id`` are set or unset.
|
||||
"""
|
||||
self.resources = None
|
||||
if admin_resources is not None and not admin_resource_id:
|
||||
self._admin_len = len(admin_resources)
|
||||
if not self._admin_len:
|
||||
raise rbac_exceptions.RbacValidateListException(
|
||||
reason="the list of admin resources cannot be empty")
|
||||
self._validate_func = self._validate_len
|
||||
elif admin_resource_id and admin_resources is None:
|
||||
self._admin_resource_id = admin_resource_id
|
||||
self._validate_func = self._validate_resource
|
||||
else:
|
||||
raise rbac_exceptions.RbacValidateListException(
|
||||
reason="admin_resources and admin_resource_id are mutually "
|
||||
"exclusive")
|
||||
|
||||
def _validate_len(self):
|
||||
"""Validates that the number of resources is less than admin resources.
|
||||
"""
|
||||
if not len(self.resources):
|
||||
raise rbac_exceptions.RbacEmptyResponseBody()
|
||||
elif self._admin_len > len(self.resources):
|
||||
raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
|
||||
|
||||
def _validate_resource(self):
|
||||
"""Validates that the admin resource is present in the resources.
|
||||
"""
|
||||
for resource in self.resources:
|
||||
if resource['id'] == self._admin_resource_id:
|
||||
return
|
||||
raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
|
||||
|
||||
def _validate(self):
|
||||
"""Calls the proper validation function.
|
||||
|
||||
:raises RbacValidateListException: if the ``ctx.resources`` variable is
|
||||
not assigned.
|
||||
"""
|
||||
if self.resources is None:
|
||||
raise rbac_exceptions.RbacValidateListException(
|
||||
reason="ctx.resources is not assigned")
|
||||
self._validate_func()
|
||||
|
||||
|
||||
class RbacUtils(object):
|
||||
"""Utility class responsible for switching ``os_primary`` role.
|
||||
|
||||
|
@ -68,7 +139,7 @@ class RbacUtils(object):
|
|||
admin_role_id = None
|
||||
rbac_role_ids = None
|
||||
|
||||
@contextmanager
|
||||
@contextlib.contextmanager
|
||||
def override_role(self, test_obj):
|
||||
"""Override the role used by ``os_primary`` Tempest credentials.
|
||||
|
||||
|
@ -220,6 +291,41 @@ class RbacUtils(object):
|
|||
|
||||
return False
|
||||
|
||||
@contextlib.contextmanager
|
||||
def override_role_and_validate_list(self, test_obj, admin_resources=None,
|
||||
admin_resource_id=None):
|
||||
"""Call ``override_role`` and validate RBAC for a list API action.
|
||||
|
||||
List actions usually do soft authorization: partial or empty response
|
||||
bodies are returned instead of exceptions. This helper validates
|
||||
that unauthorized roles only return a subset of the available
|
||||
resources.
|
||||
Should only be used for validating list API actions.
|
||||
|
||||
:param test_obj: Instance of ``tempest.test.BaseTestCase``.
|
||||
:param list admin_resources: The list of resources received before
|
||||
calling the ``override_role_and_validate_list`` function.
|
||||
:param UUID admin_resource_id: An ID of a resource created before
|
||||
calling the ``override_role_and_validate_list`` function.
|
||||
:return: py:class:`_ValidateListContext` object.
|
||||
|
||||
Example::
|
||||
|
||||
# the resource created by admin
|
||||
admin_resource_id = (
|
||||
self.ntp_client.create_dscp_marking_rule()
|
||||
["dscp_marking_rule"]["id'])
|
||||
with self.rbac_utils.override_role_and_validate_list(
|
||||
self, admin_resource_id=admin_resource_id) as ctx:
|
||||
# the list of resources available for member role
|
||||
ctx.resources = self.ntp_client.list_dscp_marking_rules(
|
||||
policy_id=self.policy_id)["dscp_marking_rules"]
|
||||
"""
|
||||
ctx = _ValidateListContext(admin_resources, admin_resource_id)
|
||||
with self.override_role(test_obj):
|
||||
yield ctx
|
||||
ctx._validate()
|
||||
|
||||
|
||||
class RbacUtilsMixin(object):
|
||||
"""Mixin class to be used alongside an instance of
|
||||
|
|
|
@ -200,6 +200,19 @@ class RBACUtilsTest(base.TestCase):
|
|||
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
|
||||
False)
|
||||
|
||||
def test_override_role_and_validate_list(self):
|
||||
self.patchobject(rbac_utils.RbacUtils, '_override_role')
|
||||
test_obj = mock.MagicMock()
|
||||
_rbac_utils = rbac_utils.RbacUtils(test_obj)
|
||||
m_override_role = self.patchobject(_rbac_utils, 'override_role')
|
||||
|
||||
with (_rbac_utils.override_role_and_validate_list(
|
||||
test_obj, 'foo')) as ctx:
|
||||
self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
|
||||
m_validate = self.patchobject(ctx, '_validate')
|
||||
m_override_role.assert_called_once_with(test_obj)
|
||||
m_validate.assert_called_once()
|
||||
|
||||
|
||||
class RBACUtilsMixinTest(base.TestCase):
|
||||
|
||||
|
@ -233,3 +246,87 @@ class RBACUtilsMixinTest(base.TestCase):
|
|||
|
||||
self.assertTrue(hasattr(child_test, 'rbac_utils'))
|
||||
self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
|
||||
|
||||
|
||||
class ValidateListContextTest(base.TestCase):
|
||||
@staticmethod
|
||||
def _get_context(admin_resources=None, admin_resource_id=None):
|
||||
return rbac_utils._ValidateListContext(
|
||||
admin_resources=admin_resources,
|
||||
admin_resource_id=admin_resource_id)
|
||||
|
||||
def test_incorrect_usage(self):
|
||||
# admin_resources and admin_resource_is are not assigned
|
||||
self.assertRaises(rbac_exceptions.RbacValidateListException,
|
||||
self._get_context)
|
||||
|
||||
# both admin_resources and admin_resource_is are assigned
|
||||
self.assertRaises(rbac_exceptions.RbacValidateListException,
|
||||
self._get_context,
|
||||
admin_resources='foo', admin_resource_id='bar')
|
||||
# empty list assigned to admin_resources
|
||||
self.assertRaises(rbac_exceptions.RbacValidateListException,
|
||||
self._get_context, admin_resources=[])
|
||||
|
||||
# ctx.resources is not assigned
|
||||
ctx = self._get_context(admin_resources='foo')
|
||||
self.assertRaises(rbac_exceptions.RbacValidateListException,
|
||||
ctx._validate)
|
||||
|
||||
def test_validate_len_negative(self):
|
||||
ctx = self._get_context(admin_resources=[1, 2, 3, 4])
|
||||
self.assertEqual(ctx._validate_len, ctx._validate_func)
|
||||
self.assertEqual(4, ctx._admin_len)
|
||||
self.assertFalse(hasattr(ctx, '_admin_resource_id'))
|
||||
|
||||
# the number of resources is less than admin resources
|
||||
ctx.resources = [1, 2, 3]
|
||||
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
|
||||
ctx._validate_len)
|
||||
|
||||
# the resources is empty
|
||||
ctx.resources = []
|
||||
self.assertRaises(rbac_exceptions.RbacEmptyResponseBody,
|
||||
ctx._validate_len)
|
||||
|
||||
def test_validate_len(self):
|
||||
ctx = self._get_context(admin_resources=[1, 2, 3, 4])
|
||||
|
||||
# the number of resources and admin resources are same
|
||||
ctx.resources = [1, 2, 3, 4]
|
||||
self.assertIsNone(ctx._validate_len())
|
||||
|
||||
def test_validate_resource_negative(self):
|
||||
ctx = self._get_context(admin_resource_id=1)
|
||||
self.assertEqual(ctx._validate_resource, ctx._validate_func)
|
||||
self.assertEqual(1, ctx._admin_resource_id)
|
||||
self.assertFalse(hasattr(ctx, '_admin_len'))
|
||||
|
||||
# there is no admin resource in the resources
|
||||
ctx.resources = [{'id': 2}, {'id': 3}]
|
||||
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
|
||||
ctx._validate_resource)
|
||||
|
||||
def test_validate_resource(self):
|
||||
ctx = self._get_context(admin_resource_id=1)
|
||||
|
||||
# there is admin resource in the resources
|
||||
ctx.resources = [{'id': 1}, {'id': 2}]
|
||||
self.assertIsNone(ctx._validate_resource())
|
||||
|
||||
def test_validate(self):
|
||||
ctx = self._get_context(admin_resources='foo')
|
||||
ctx.resources = 'bar'
|
||||
with mock.patch.object(ctx, '_validate_func',
|
||||
autospec=False) as m_validate_func:
|
||||
m_validate_func.side_effect = (
|
||||
rbac_exceptions.RbacPartialResponseBody,
|
||||
None
|
||||
)
|
||||
self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
|
||||
ctx._validate)
|
||||
m_validate_func.assert_called_once()
|
||||
|
||||
m_validate_func.reset_mock()
|
||||
ctx._validate()
|
||||
m_validate_func.assert_called_once()
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
In order to test the list actions which doesn't have its own policy,
|
||||
implemented the ``override_role_and_validate_list`` function.
|
||||
The function has two modes:
|
||||
|
||||
* Validating the number of the resources in a ``ResponseBody`` before
|
||||
calling the ``override_role`` and after.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# make sure at least one resource is available
|
||||
self.ntp_client.create_policy_dscp_marking_rule()
|
||||
# the list of resources available for a user with admin role
|
||||
admin_resources = self.ntp_client.list_dscp_marking_rules(
|
||||
policy_id=self.policy_id)["dscp_marking_rules"]
|
||||
with self.rbac_utils.override_role_and_validate_list(
|
||||
self, admin_resources=admin_resources) as ctx:
|
||||
# the list of resources available for a user with member role
|
||||
ctx.resources = self.ntp_client.list_dscp_marking_rules(
|
||||
policy_id=self.policy_id)["dscp_marking_rules"]
|
||||
|
||||
* Validating that a resource, created before ``override_role``, is not
|
||||
present in a ``ResponseBody``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# the resource created by a user with admin role
|
||||
admin_resource_id = (
|
||||
self.ntp_client.create_dscp_marking_rule()
|
||||
["dscp_marking_rule"]["id'])
|
||||
with self.rbac_utils.override_role_and_validate_list(
|
||||
self, admin_resource_id=admin_resource_id) as ctx:
|
||||
# the list of resources available for a user wirh member role
|
||||
ctx.resources = self.ntp_client.list_dscp_marking_rules(
|
||||
policy_id=self.policy_id)["dscp_marking_rules"]
|
Loading…
Reference in New Issue