Fix OverPermission exception for keystone tests
Extend the roles in `access_token` according to the implementation[0] of the bp basic-default-roles: `admin` implies `member` implies `reader` Support deprecated rules. [0] Ie18a269e3d1075d955fe494acaf634a393c6bd7b Change-Id: I0d0de2a20b03548a7e5dab1ee7af7b72651abcb6 Story: 2004709 Task: 28740
This commit is contained in:
parent
cd2c5fdbb9
commit
55e5dfe640
|
@ -96,6 +96,11 @@ api_action = the policy action that is being tested. Examples:
|
|||
* add_image
|
||||
|
||||
allowed_role = the ``oslo.policy`` role that is allowed to perform the API.
|
||||
"""),
|
||||
cfg.BoolOpt('validate_deprecated_rules', default=True,
|
||||
help="""Some of the policy rules have deprecated version,
|
||||
Patrole should be able to run check against default and deprecated rules,
|
||||
otherwise the result of the tests may not be correct.
|
||||
""")
|
||||
]
|
||||
|
||||
|
|
|
@ -169,6 +169,27 @@ class PolicyAuthority(RbacAuthority):
|
|||
is_admin=is_admin_context)
|
||||
return is_allowed
|
||||
|
||||
def _handle_deprecated_rule(self, default):
|
||||
deprecated_rule = default.deprecated_rule
|
||||
deprecated_msg = (
|
||||
'Policy "%(old_name)s":"%(old_check_str)s" was deprecated in '
|
||||
'%(release)s in favor of "%(name)s":"%(check_str)s". Reason: '
|
||||
'%(reason)s. Either ensure your deployment is ready for the new '
|
||||
'default or copy/paste the deprecated policy into your policy '
|
||||
'file and maintain it manually.' % {
|
||||
'old_name': deprecated_rule.name,
|
||||
'old_check_str': deprecated_rule.check_str,
|
||||
'release': default.deprecated_since,
|
||||
'name': default.name,
|
||||
'check_str': default.check_str,
|
||||
'reason': default.deprecated_reason
|
||||
}
|
||||
)
|
||||
LOG.warn(deprecated_msg)
|
||||
check_str = '(%s) or (%s)' % (default.check_str,
|
||||
deprecated_rule.check_str)
|
||||
return policy.RuleDefault(default.name, check_str)
|
||||
|
||||
def get_rules(self):
|
||||
rules = policy.Rules()
|
||||
# Check whether policy file exists and attempt to read it.
|
||||
|
@ -203,6 +224,12 @@ class PolicyAuthority(RbacAuthority):
|
|||
if self.service in policy_generator:
|
||||
for rule in policy_generator[self.service]:
|
||||
if rule.name not in rules:
|
||||
if CONF.patrole.validate_deprecated_rules:
|
||||
# NOTE (sergey.vilgelm):
|
||||
# The `DocumentedRuleDefault` object has no
|
||||
# `deprecated_rule` attribute in Pike
|
||||
if getattr(rule, 'deprecated_rule', False):
|
||||
rule = self._handle_deprecated_rule(rule)
|
||||
rules[rule.name] = rule.check
|
||||
elif str(rule.check) != str(rules[rule.name]):
|
||||
msg = ("The same policy name: %s was found in the "
|
||||
|
@ -238,13 +265,17 @@ class PolicyAuthority(RbacAuthority):
|
|||
return CONF.identity.admin_role in roles
|
||||
|
||||
def _get_access_token(self, roles):
|
||||
roles = {r.lower() for r in roles if r}
|
||||
|
||||
# Extend roles for an user with admin or member role
|
||||
if 'admin' in roles:
|
||||
roles.add('member')
|
||||
if 'member' in roles:
|
||||
roles.add('reader')
|
||||
|
||||
access_token = {
|
||||
"token": {
|
||||
"roles": [
|
||||
{
|
||||
"name": role
|
||||
} for role in roles
|
||||
],
|
||||
"roles": [{'name': r} for r in roles],
|
||||
"project_id": self.project_id,
|
||||
"tenant_id": self.project_id,
|
||||
"user_id": self.user_id
|
||||
|
|
|
@ -2,5 +2,6 @@
|
|||
"admin_rule": "role:admin",
|
||||
"is_admin_rule": "is_admin:True",
|
||||
"alt_admin_rule": "is_admin:True or (role:admin and is_project_admin:True)",
|
||||
"non_admin_rule": "role:Member"
|
||||
"member_rule": "role:member",
|
||||
"reader_rule": "role:reader"
|
||||
}
|
||||
|
|
|
@ -83,9 +83,29 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
for name, check in rules.items():
|
||||
fake_rule = mock.Mock(check=check, __name__='foo')
|
||||
fake_rule.name = name
|
||||
fake_rule.deprecated_rule = False
|
||||
fake_rules.append(fake_rule)
|
||||
return fake_rules
|
||||
|
||||
def _test_policy_file(self, roles, allowed_rules,
|
||||
disallowed_rules, authority=None, service=None):
|
||||
if authority is None:
|
||||
self.assertIsNotNone(service)
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, service)
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
|
||||
return authority
|
||||
|
||||
@mock.patch.object(policy_authority, 'LOG', autospec=True)
|
||||
def _test_custom_policy(self, *args):
|
||||
default_roles = ['zero', 'one', 'two', 'three', 'four',
|
||||
|
@ -145,23 +165,14 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
self.assertFalse(authority.allowed(rule, test_roles))
|
||||
|
||||
def test_empty_rbac_test_roles(self):
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "custom_rbac_policy")
|
||||
|
||||
disallowed_for_empty_roles = ['policy_action_1', 'policy_action_2',
|
||||
'policy_action_3', 'policy_action_4',
|
||||
'policy_action_6']
|
||||
|
||||
# Due to "policy_action_5": "rule:all_rule" / "all_rule": ""
|
||||
allowed_for_empty_roles = ['policy_action_5']
|
||||
|
||||
for rule in disallowed_for_empty_roles:
|
||||
self.assertFalse(authority.allowed(rule, []))
|
||||
|
||||
for rule in allowed_for_empty_roles:
|
||||
self.assertTrue(authority.allowed(rule, []))
|
||||
self._test_policy_file(
|
||||
roles=[],
|
||||
allowed_rules=['policy_action_5'],
|
||||
disallowed_rules=['policy_action_1', 'policy_action_2',
|
||||
'policy_action_3', 'policy_action_4',
|
||||
'policy_action_6'],
|
||||
service="custom_rbac_policy"
|
||||
)
|
||||
|
||||
def test_custom_policy_json(self):
|
||||
# The CONF.patrole.custom_policy_files has a path to JSON file by
|
||||
|
@ -184,75 +195,47 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
self._test_custom_multi_roles_policy()
|
||||
|
||||
def test_admin_policy_file_with_admin_role(self):
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "admin_rbac_policy")
|
||||
|
||||
roles = ['admin']
|
||||
allowed_rules = [
|
||||
'admin_rule', 'is_admin_rule', 'alt_admin_rule'
|
||||
]
|
||||
disallowed_rules = ['non_admin_rule']
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
# admin role implies member and reader roles
|
||||
self._test_policy_file(
|
||||
roles=['admin'],
|
||||
allowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule',
|
||||
'member_rule', 'reader_rule'],
|
||||
disallowed_rules=[],
|
||||
service="admin_rbac_policy"
|
||||
)
|
||||
|
||||
def test_admin_policy_file_with_member_role(self):
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "admin_rbac_policy")
|
||||
# member role implies reader role
|
||||
self._test_policy_file(
|
||||
roles=['member'],
|
||||
allowed_rules=['member_rule', 'reader_rule'],
|
||||
disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule'],
|
||||
service="admin_rbac_policy"
|
||||
)
|
||||
|
||||
roles = ['Member']
|
||||
allowed_rules = [
|
||||
'non_admin_rule'
|
||||
]
|
||||
disallowed_rules = [
|
||||
'admin_rule', 'is_admin_rule', 'alt_admin_rule']
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
def test_admin_policy_file_with_reader_role(self):
|
||||
self._test_policy_file(
|
||||
roles=['reader'],
|
||||
allowed_rules=['reader_rule'],
|
||||
disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule',
|
||||
'member_rule'],
|
||||
service="admin_rbac_policy"
|
||||
)
|
||||
|
||||
def test_alt_admin_policy_file_with_context_is_admin(self):
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "alt_admin_rbac_policy")
|
||||
authority = self._test_policy_file(
|
||||
roles=['fake_admin'],
|
||||
allowed_rules=['non_admin_rule'],
|
||||
disallowed_rules=['admin_rule'],
|
||||
service="alt_admin_rbac_policy"
|
||||
)
|
||||
|
||||
roles = ['fake_admin']
|
||||
allowed_rules = ['non_admin_rule']
|
||||
disallowed_rules = ['admin_rule']
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
|
||||
roles = ['super_admin']
|
||||
allowed_rules = ['admin_rule']
|
||||
disallowed_rules = ['non_admin_rule']
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertTrue(allowed)
|
||||
|
||||
for rule in disallowed_rules:
|
||||
allowed = authority.allowed(rule, roles)
|
||||
self.assertFalse(allowed)
|
||||
self._test_policy_file(
|
||||
roles=['super_admin'],
|
||||
allowed_rules=['admin_rule'],
|
||||
disallowed_rules=['non_admin_rule'],
|
||||
authority=authority
|
||||
)
|
||||
|
||||
def test_tenant_user_policy(self):
|
||||
"""Test whether rules with format tenant_id/user_id formatting work.
|
||||
|
@ -261,26 +244,25 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
network:tenant_id pass. And test whether Nova rules that contain
|
||||
user_id pass.
|
||||
"""
|
||||
test_tenant_id = mock.sentinel.tenant_id
|
||||
test_user_id = mock.sentinel.user_id
|
||||
authority = policy_authority.PolicyAuthority(
|
||||
test_tenant_id, test_user_id, "tenant_rbac_policy")
|
||||
allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
|
||||
disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
|
||||
|
||||
# Check whether Member role can perform expected actions.
|
||||
allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, ['Member'])
|
||||
self.assertTrue(allowed)
|
||||
|
||||
disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
|
||||
for disallowed_rule in disallowed_rules:
|
||||
self.assertFalse(authority.allowed(disallowed_rule, ['Member']))
|
||||
authority = self._test_policy_file(
|
||||
roles=['member'],
|
||||
allowed_rules=allowed_rules,
|
||||
disallowed_rules=disallowed_rules,
|
||||
service="tenant_rbac_policy",
|
||||
)
|
||||
|
||||
# Check whether admin role can perform expected actions.
|
||||
allowed_rules.extend(disallowed_rules)
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, ['admin'])
|
||||
self.assertTrue(allowed)
|
||||
self._test_policy_file(
|
||||
roles=['admin'],
|
||||
allowed_rules=allowed_rules,
|
||||
disallowed_rules=[],
|
||||
authority=authority
|
||||
)
|
||||
|
||||
# Check whether _try_rule is called with the correct target dictionary.
|
||||
with mock.patch.object(
|
||||
|
@ -295,7 +277,7 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
}
|
||||
|
||||
expected_access_data = {
|
||||
"roles": ['Member'],
|
||||
"roles": sorted(['member', 'reader']),
|
||||
"is_admin": False,
|
||||
"is_admin_project": True,
|
||||
"user_id": mock.sentinel.user_id,
|
||||
|
@ -304,8 +286,11 @@ class PolicyAuthorityTest(base.TestCase):
|
|||
}
|
||||
|
||||
for rule in allowed_rules:
|
||||
allowed = authority.allowed(rule, ['Member'])
|
||||
allowed = authority.allowed(rule, ['member'])
|
||||
self.assertTrue(allowed)
|
||||
# for sure that roles are in same order
|
||||
mock_try_rule.call_args[0][2]["roles"] = sorted(
|
||||
mock_try_rule.call_args[0][2]["roles"])
|
||||
mock_try_rule.assert_called_once_with(
|
||||
rule, expected_target, expected_access_data, mock.ANY)
|
||||
mock_try_rule.reset_mock()
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Patrole will validate the deprecated policy rules (if applicable) alongside
|
||||
the current policy rule.
|
||||
Add ``[patrole] validate_deprecated_rules`` enabled by default to validate
|
||||
the deprecated rules.
|
Loading…
Reference in New Issue