requirements authority: Use better exception/return code
This patch set makes some minor modifications to the requirements authority module: * return {} instead of None in RequirementsParser.parse since its return type is dict * raise RbacParsingException if an invalid rule is passed to RequirementsAuthority.allowed since KeyError is a builtin type and is not specific enough * change the exception message raise for the above case -- "API" is not the right word; the word should be "rule name" as that is what is being keyed into the roles_dict Change-Id: Ia4408c0745d2b5ddb1c73c1eb9a6316ae0c1f646
This commit is contained in:
parent
c81c1fb189
commit
0464e81c98
|
@ -18,9 +18,10 @@ import yaml
|
|||
from oslo_log import log as logging
|
||||
|
||||
from tempest import config
|
||||
from tempest.lib import exceptions
|
||||
from tempest.lib import exceptions as lib_exc
|
||||
|
||||
from patrole_tempest_plugin.rbac_authority import RbacAuthority
|
||||
from patrole_tempest_plugin import rbac_exceptions
|
||||
|
||||
CONF = config.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
@ -81,7 +82,7 @@ class RequirementsParser(object):
|
|||
except yaml.parser.ParserError:
|
||||
LOG.error("Error while parsing the requirements YAML file. Did "
|
||||
"you pass a valid component name from the test case?")
|
||||
return None
|
||||
return {}
|
||||
|
||||
|
||||
class RequirementsAuthority(RbacAuthority):
|
||||
|
@ -98,10 +99,10 @@ class RequirementsAuthority(RbacAuthority):
|
|||
Defaults to ``[patrole].custom_requirements_file``.
|
||||
:param str component: Name of the OpenStack service to be validated.
|
||||
"""
|
||||
filepath = filepath or CONF.patrole.custom_requirements_file
|
||||
|
||||
self.filepath = filepath or CONF.patrole.custom_requirements_file
|
||||
if component is not None:
|
||||
self.roles_dict = RequirementsParser(filepath).parse(component)
|
||||
self.roles_dict = RequirementsParser(self.filepath).parse(
|
||||
component)
|
||||
else:
|
||||
self.roles_dict = None
|
||||
|
||||
|
@ -116,33 +117,34 @@ class RequirementsAuthority(RbacAuthority):
|
|||
:returns: True if ``role`` is allowed to perform ``rule_name``, else
|
||||
False.
|
||||
:rtype: bool
|
||||
:raises KeyError: If ``rule_name`` does not exist among the keyed
|
||||
policy names in the custom requirements file.
|
||||
:raises RbacParsingException: If ``rule_name`` does not exist among the
|
||||
keyed policy names in the custom requirements file.
|
||||
"""
|
||||
if self.roles_dict is None:
|
||||
raise exceptions.InvalidConfiguration(
|
||||
if not self.roles_dict:
|
||||
raise lib_exc.InvalidConfiguration(
|
||||
"Roles dictionary parsed from requirements YAML file is "
|
||||
"empty. Ensure the requirements YAML file is correctly "
|
||||
"formatted.")
|
||||
try:
|
||||
requirement_roles = self.roles_dict[rule_name]
|
||||
|
||||
for role_reqs in requirement_roles:
|
||||
required_roles = [
|
||||
role for role in role_reqs if not role.startswith("!")]
|
||||
forbidden_roles = [
|
||||
role[1:] for role in role_reqs if role.startswith("!")]
|
||||
|
||||
# User must have all required roles
|
||||
required_passed = all([r in roles for r in required_roles])
|
||||
# User must not have any forbidden roles
|
||||
forbidden_passed = all([r not in forbidden_roles
|
||||
for r in roles])
|
||||
|
||||
if required_passed and forbidden_passed:
|
||||
return True
|
||||
|
||||
return False
|
||||
except KeyError:
|
||||
raise KeyError("'%s' API is not defined in the requirements YAML "
|
||||
"file" % rule_name)
|
||||
raise rbac_exceptions.RbacParsingException(
|
||||
"'%s' rule name is not defined in the requirements YAML file: "
|
||||
"%s" % (rule_name, self.filepath))
|
||||
|
||||
for role_reqs in requirement_roles:
|
||||
required_roles = [
|
||||
role for role in role_reqs if not role.startswith("!")]
|
||||
forbidden_roles = [
|
||||
role[1:] for role in role_reqs if role.startswith("!")]
|
||||
|
||||
# User must have all required roles
|
||||
required_passed = all([r in roles for r in required_roles])
|
||||
# User must not have any forbidden roles
|
||||
forbidden_passed = all([r not in forbidden_roles
|
||||
for r in roles])
|
||||
|
||||
if required_passed and forbidden_passed:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
|
|
@ -17,6 +17,7 @@ import os
|
|||
from tempest.lib import exceptions
|
||||
from tempest.tests import base
|
||||
|
||||
from patrole_tempest_plugin import rbac_exceptions
|
||||
from patrole_tempest_plugin import requirements_authority as req_auth
|
||||
|
||||
|
||||
|
@ -50,17 +51,17 @@ class RequirementsAuthorityTest(BaseRequirementsAuthorityTest):
|
|||
self.rbac_auth.allowed, "", [""])
|
||||
|
||||
def test_auth_allowed_role_in_api(self):
|
||||
self.rbac_auth.roles_dict = {'api': [['_member_']]}
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["_member_"]))
|
||||
self.rbac_auth.roles_dict = {'rule': [['_member_']]}
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["_member_"]))
|
||||
|
||||
def test_auth_allowed_role_not_in_api(self):
|
||||
self.rbac_auth.roles_dict = {'api': [['_member_']]}
|
||||
self.assertFalse(self.rbac_auth.allowed("api", "support_member"))
|
||||
self.rbac_auth.roles_dict = {'rule': [['_member_']]}
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", "support_member"))
|
||||
|
||||
def test_parser_get_allowed_except_keyerror(self):
|
||||
self.rbac_auth.roles_dict = {}
|
||||
self.assertRaises(KeyError, self.rbac_auth.allowed,
|
||||
"api", "support_member")
|
||||
def test_parser_get_allowed_invalid_rule_raises_parsing_exception(self):
|
||||
self.rbac_auth.roles_dict = {"foo": "bar"}
|
||||
self.assertRaises(rbac_exceptions.RbacParsingException,
|
||||
self.rbac_auth.allowed, "baz", "support_member")
|
||||
|
||||
def test_parser_init(self):
|
||||
req_auth.RequirementsParser(self.yaml_test_file)
|
||||
|
@ -90,7 +91,7 @@ class RequirementsAuthorityTest(BaseRequirementsAuthorityTest):
|
|||
self.rbac_auth.roles_dict = \
|
||||
req_auth.RequirementsParser.parse("Failure")
|
||||
|
||||
self.assertIsNone(self.rbac_auth.roles_dict)
|
||||
self.assertFalse(self.rbac_auth.roles_dict)
|
||||
self.assertRaises(exceptions.InvalidConfiguration,
|
||||
self.rbac_auth.allowed, "", [""])
|
||||
|
||||
|
@ -99,10 +100,10 @@ class RequirementsAuthorityTest(BaseRequirementsAuthorityTest):
|
|||
considered as part of role itself.
|
||||
"""
|
||||
|
||||
self.rbac_auth.roles_dict = {'api': [['!admin']]}
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["member"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["!admin"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["admin"]))
|
||||
self.rbac_auth.roles_dict = {'rule': [['!admin']]}
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["member"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["!admin"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["admin"]))
|
||||
|
||||
|
||||
class RequirementsAuthorityMultiRoleTest(BaseRequirementsAuthorityTest):
|
||||
|
@ -112,39 +113,39 @@ class RequirementsAuthorityMultiRoleTest(BaseRequirementsAuthorityTest):
|
|||
considered as part of role itself.
|
||||
"""
|
||||
|
||||
self.rbac_auth.roles_dict = {'api': [['member', '!admin']]}
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["member", "admin"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["member", "!admin"]))
|
||||
self.rbac_auth.roles_dict = {'rule': [['member', '!admin']]}
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["member", "admin"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["member", "!admin"]))
|
||||
|
||||
def test_auth_allowed_single_rule_scenario(self):
|
||||
# member and support and not admin and not manager
|
||||
self.rbac_auth.roles_dict = {'api': [['member', 'support',
|
||||
'!admin', '!manager']]}
|
||||
self.rbac_auth.roles_dict = {'rule': [['member', 'support',
|
||||
'!admin', '!manager']]}
|
||||
|
||||
# User is member and support and not manager or admin
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["member",
|
||||
"support"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
|
||||
"support"]))
|
||||
|
||||
# User is member and not manager or admin, but not support
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["member"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["member"]))
|
||||
|
||||
# User is support and not manager or admin, but not member
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["support"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["support"]))
|
||||
|
||||
# User is member and support and not manager, but have admin role
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["member",
|
||||
"support",
|
||||
"admin"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["member",
|
||||
"support",
|
||||
"admin"]))
|
||||
|
||||
# User is member and not manager, but have admin role and not support
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["member",
|
||||
"admin"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["member",
|
||||
"admin"]))
|
||||
|
||||
# User is member and support, but have manager and admin roles
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["member",
|
||||
"support",
|
||||
"admin",
|
||||
"manager"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["member",
|
||||
"support",
|
||||
"admin",
|
||||
"manager"]))
|
||||
|
||||
def test_auth_allowed_multi_rule_scenario(self):
|
||||
rules = [
|
||||
|
@ -152,36 +153,36 @@ class RequirementsAuthorityMultiRoleTest(BaseRequirementsAuthorityTest):
|
|||
['member', 'admin'],
|
||||
["manager"]
|
||||
]
|
||||
self.rbac_auth.roles_dict = {'api': rules}
|
||||
self.rbac_auth.roles_dict = {'rule': rules}
|
||||
|
||||
# Not a single role allows viewer
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["viewer"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["viewer"]))
|
||||
# We have no rule that allows support and admin
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["support",
|
||||
"admin"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["support",
|
||||
"admin"]))
|
||||
# There is no rule that requires member without additional requirements
|
||||
self.assertFalse(self.rbac_auth.allowed("api", ["member"]))
|
||||
self.assertFalse(self.rbac_auth.allowed("rule", ["member"]))
|
||||
|
||||
# Pass with rules[2]
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["manager"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["manager"]))
|
||||
# Pass with rules[0]
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["member",
|
||||
"support"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
|
||||
"support"]))
|
||||
# Pass with rules[1]
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["member",
|
||||
"admin"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
|
||||
"admin"]))
|
||||
# Pass with rules[2]
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["manager",
|
||||
"admin"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["manager",
|
||||
"admin"]))
|
||||
# Pass with rules[1]
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["member",
|
||||
"support",
|
||||
"admin"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
|
||||
"support",
|
||||
"admin"]))
|
||||
# Pass with rules[1]
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["member",
|
||||
"support",
|
||||
"admin",
|
||||
"manager"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
|
||||
"support",
|
||||
"admin",
|
||||
"manager"]))
|
||||
# Pass with rules[2]
|
||||
self.assertTrue(self.rbac_auth.allowed("api", ["admin",
|
||||
"manager"]))
|
||||
self.assertTrue(self.rbac_auth.allowed("rule", ["admin",
|
||||
"manager"]))
|
||||
|
|
Loading…
Reference in New Issue