requirements authority: Use better exception/return code

This patch set makes some minor modifications to the requirements
authority module:

* return {} instead of None in RequirementsParser.parse since
  its return type is dict
* raise RbacParsingException if an invalid rule is passed to
  RequirementsAuthority.allowed since KeyError is a builtin type
  and is not specific enough
* change the exception message raise for the above case -- "API"
  is not the right word; the word should be "rule name" as that
  is what is being keyed into the roles_dict

Change-Id: Ia4408c0745d2b5ddb1c73c1eb9a6316ae0c1f646
This commit is contained in:
Felipe Monteiro 2018-11-08 01:08:07 -05:00
parent c81c1fb189
commit 0464e81c98
2 changed files with 83 additions and 80 deletions

View File

@ -18,9 +18,10 @@ import yaml
from oslo_log import log as logging
from tempest import config
from tempest.lib import exceptions
from tempest.lib import exceptions as lib_exc
from patrole_tempest_plugin.rbac_authority import RbacAuthority
from patrole_tempest_plugin import rbac_exceptions
CONF = config.CONF
LOG = logging.getLogger(__name__)
@ -81,7 +82,7 @@ class RequirementsParser(object):
except yaml.parser.ParserError:
LOG.error("Error while parsing the requirements YAML file. Did "
"you pass a valid component name from the test case?")
return None
return {}
class RequirementsAuthority(RbacAuthority):
@ -98,10 +99,10 @@ class RequirementsAuthority(RbacAuthority):
Defaults to ``[patrole].custom_requirements_file``.
:param str component: Name of the OpenStack service to be validated.
"""
filepath = filepath or CONF.patrole.custom_requirements_file
self.filepath = filepath or CONF.patrole.custom_requirements_file
if component is not None:
self.roles_dict = RequirementsParser(filepath).parse(component)
self.roles_dict = RequirementsParser(self.filepath).parse(
component)
else:
self.roles_dict = None
@ -116,33 +117,34 @@ class RequirementsAuthority(RbacAuthority):
:returns: True if ``role`` is allowed to perform ``rule_name``, else
False.
:rtype: bool
:raises KeyError: If ``rule_name`` does not exist among the keyed
policy names in the custom requirements file.
:raises RbacParsingException: If ``rule_name`` does not exist among the
keyed policy names in the custom requirements file.
"""
if self.roles_dict is None:
raise exceptions.InvalidConfiguration(
if not self.roles_dict:
raise lib_exc.InvalidConfiguration(
"Roles dictionary parsed from requirements YAML file is "
"empty. Ensure the requirements YAML file is correctly "
"formatted.")
try:
requirement_roles = self.roles_dict[rule_name]
for role_reqs in requirement_roles:
required_roles = [
role for role in role_reqs if not role.startswith("!")]
forbidden_roles = [
role[1:] for role in role_reqs if role.startswith("!")]
# User must have all required roles
required_passed = all([r in roles for r in required_roles])
# User must not have any forbidden roles
forbidden_passed = all([r not in forbidden_roles
for r in roles])
if required_passed and forbidden_passed:
return True
return False
except KeyError:
raise KeyError("'%s' API is not defined in the requirements YAML "
"file" % rule_name)
raise rbac_exceptions.RbacParsingException(
"'%s' rule name is not defined in the requirements YAML file: "
"%s" % (rule_name, self.filepath))
for role_reqs in requirement_roles:
required_roles = [
role for role in role_reqs if not role.startswith("!")]
forbidden_roles = [
role[1:] for role in role_reqs if role.startswith("!")]
# User must have all required roles
required_passed = all([r in roles for r in required_roles])
# User must not have any forbidden roles
forbidden_passed = all([r not in forbidden_roles
for r in roles])
if required_passed and forbidden_passed:
return True
return False

View File

@ -17,6 +17,7 @@ import os
from tempest.lib import exceptions
from tempest.tests import base
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import requirements_authority as req_auth
@ -50,17 +51,17 @@ class RequirementsAuthorityTest(BaseRequirementsAuthorityTest):
self.rbac_auth.allowed, "", [""])
def test_auth_allowed_role_in_api(self):
self.rbac_auth.roles_dict = {'api': [['_member_']]}
self.assertTrue(self.rbac_auth.allowed("api", ["_member_"]))
self.rbac_auth.roles_dict = {'rule': [['_member_']]}
self.assertTrue(self.rbac_auth.allowed("rule", ["_member_"]))
def test_auth_allowed_role_not_in_api(self):
self.rbac_auth.roles_dict = {'api': [['_member_']]}
self.assertFalse(self.rbac_auth.allowed("api", "support_member"))
self.rbac_auth.roles_dict = {'rule': [['_member_']]}
self.assertFalse(self.rbac_auth.allowed("rule", "support_member"))
def test_parser_get_allowed_except_keyerror(self):
self.rbac_auth.roles_dict = {}
self.assertRaises(KeyError, self.rbac_auth.allowed,
"api", "support_member")
def test_parser_get_allowed_invalid_rule_raises_parsing_exception(self):
self.rbac_auth.roles_dict = {"foo": "bar"}
self.assertRaises(rbac_exceptions.RbacParsingException,
self.rbac_auth.allowed, "baz", "support_member")
def test_parser_init(self):
req_auth.RequirementsParser(self.yaml_test_file)
@ -90,7 +91,7 @@ class RequirementsAuthorityTest(BaseRequirementsAuthorityTest):
self.rbac_auth.roles_dict = \
req_auth.RequirementsParser.parse("Failure")
self.assertIsNone(self.rbac_auth.roles_dict)
self.assertFalse(self.rbac_auth.roles_dict)
self.assertRaises(exceptions.InvalidConfiguration,
self.rbac_auth.allowed, "", [""])
@ -99,10 +100,10 @@ class RequirementsAuthorityTest(BaseRequirementsAuthorityTest):
considered as part of role itself.
"""
self.rbac_auth.roles_dict = {'api': [['!admin']]}
self.assertTrue(self.rbac_auth.allowed("api", ["member"]))
self.assertTrue(self.rbac_auth.allowed("api", ["!admin"]))
self.assertFalse(self.rbac_auth.allowed("api", ["admin"]))
self.rbac_auth.roles_dict = {'rule': [['!admin']]}
self.assertTrue(self.rbac_auth.allowed("rule", ["member"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["!admin"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["admin"]))
class RequirementsAuthorityMultiRoleTest(BaseRequirementsAuthorityTest):
@ -112,39 +113,39 @@ class RequirementsAuthorityMultiRoleTest(BaseRequirementsAuthorityTest):
considered as part of role itself.
"""
self.rbac_auth.roles_dict = {'api': [['member', '!admin']]}
self.assertFalse(self.rbac_auth.allowed("api", ["member", "admin"]))
self.assertTrue(self.rbac_auth.allowed("api", ["member", "!admin"]))
self.rbac_auth.roles_dict = {'rule': [['member', '!admin']]}
self.assertFalse(self.rbac_auth.allowed("rule", ["member", "admin"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["member", "!admin"]))
def test_auth_allowed_single_rule_scenario(self):
# member and support and not admin and not manager
self.rbac_auth.roles_dict = {'api': [['member', 'support',
'!admin', '!manager']]}
self.rbac_auth.roles_dict = {'rule': [['member', 'support',
'!admin', '!manager']]}
# User is member and support and not manager or admin
self.assertTrue(self.rbac_auth.allowed("api", ["member",
"support"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
"support"]))
# User is member and not manager or admin, but not support
self.assertFalse(self.rbac_auth.allowed("api", ["member"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["member"]))
# User is support and not manager or admin, but not member
self.assertFalse(self.rbac_auth.allowed("api", ["support"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["support"]))
# User is member and support and not manager, but have admin role
self.assertFalse(self.rbac_auth.allowed("api", ["member",
"support",
"admin"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["member",
"support",
"admin"]))
# User is member and not manager, but have admin role and not support
self.assertFalse(self.rbac_auth.allowed("api", ["member",
"admin"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["member",
"admin"]))
# User is member and support, but have manager and admin roles
self.assertFalse(self.rbac_auth.allowed("api", ["member",
"support",
"admin",
"manager"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["member",
"support",
"admin",
"manager"]))
def test_auth_allowed_multi_rule_scenario(self):
rules = [
@ -152,36 +153,36 @@ class RequirementsAuthorityMultiRoleTest(BaseRequirementsAuthorityTest):
['member', 'admin'],
["manager"]
]
self.rbac_auth.roles_dict = {'api': rules}
self.rbac_auth.roles_dict = {'rule': rules}
# Not a single role allows viewer
self.assertFalse(self.rbac_auth.allowed("api", ["viewer"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["viewer"]))
# We have no rule that allows support and admin
self.assertFalse(self.rbac_auth.allowed("api", ["support",
"admin"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["support",
"admin"]))
# There is no rule that requires member without additional requirements
self.assertFalse(self.rbac_auth.allowed("api", ["member"]))
self.assertFalse(self.rbac_auth.allowed("rule", ["member"]))
# Pass with rules[2]
self.assertTrue(self.rbac_auth.allowed("api", ["manager"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["manager"]))
# Pass with rules[0]
self.assertTrue(self.rbac_auth.allowed("api", ["member",
"support"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
"support"]))
# Pass with rules[1]
self.assertTrue(self.rbac_auth.allowed("api", ["member",
"admin"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
"admin"]))
# Pass with rules[2]
self.assertTrue(self.rbac_auth.allowed("api", ["manager",
"admin"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["manager",
"admin"]))
# Pass with rules[1]
self.assertTrue(self.rbac_auth.allowed("api", ["member",
"support",
"admin"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
"support",
"admin"]))
# Pass with rules[1]
self.assertTrue(self.rbac_auth.allowed("api", ["member",
"support",
"admin",
"manager"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["member",
"support",
"admin",
"manager"]))
# Pass with rules[2]
self.assertTrue(self.rbac_auth.allowed("api", ["admin",
"manager"]))
self.assertTrue(self.rbac_auth.allowed("rule", ["admin",
"manager"]))