Fix OverPermission exception for keystone tests

Extend the roles in `access_token` according to the implementation[0]
of the bp basic-default-roles:
    `admin` implies `member` implies `reader`
Support deprecated rules.

[0] Ie18a269e3d1075d955fe494acaf634a393c6bd7b

Change-Id: I0d0de2a20b03548a7e5dab1ee7af7b72651abcb6
Story: 2004709
Task: 28740
This commit is contained in:
Sergey Vilgelm 2019-01-07 11:59:41 -06:00
parent cd2c5fdbb9
commit 55e5dfe640
No known key found for this signature in database
GPG Key ID: 08D0E2FF778887E6
5 changed files with 132 additions and 103 deletions

View File

@ -96,6 +96,11 @@ api_action = the policy action that is being tested. Examples:
* add_image
allowed_role = the ``oslo.policy`` role that is allowed to perform the API.
"""),
cfg.BoolOpt('validate_deprecated_rules', default=True,
help="""Some of the policy rules have deprecated version,
Patrole should be able to run check against default and deprecated rules,
otherwise the result of the tests may not be correct.
""")
]

View File

@ -169,6 +169,27 @@ class PolicyAuthority(RbacAuthority):
is_admin=is_admin_context)
return is_allowed
def _handle_deprecated_rule(self, default):
deprecated_rule = default.deprecated_rule
deprecated_msg = (
'Policy "%(old_name)s":"%(old_check_str)s" was deprecated in '
'%(release)s in favor of "%(name)s":"%(check_str)s". Reason: '
'%(reason)s. Either ensure your deployment is ready for the new '
'default or copy/paste the deprecated policy into your policy '
'file and maintain it manually.' % {
'old_name': deprecated_rule.name,
'old_check_str': deprecated_rule.check_str,
'release': default.deprecated_since,
'name': default.name,
'check_str': default.check_str,
'reason': default.deprecated_reason
}
)
LOG.warn(deprecated_msg)
check_str = '(%s) or (%s)' % (default.check_str,
deprecated_rule.check_str)
return policy.RuleDefault(default.name, check_str)
def get_rules(self):
rules = policy.Rules()
# Check whether policy file exists and attempt to read it.
@ -203,6 +224,12 @@ class PolicyAuthority(RbacAuthority):
if self.service in policy_generator:
for rule in policy_generator[self.service]:
if rule.name not in rules:
if CONF.patrole.validate_deprecated_rules:
# NOTE (sergey.vilgelm):
# The `DocumentedRuleDefault` object has no
# `deprecated_rule` attribute in Pike
if getattr(rule, 'deprecated_rule', False):
rule = self._handle_deprecated_rule(rule)
rules[rule.name] = rule.check
elif str(rule.check) != str(rules[rule.name]):
msg = ("The same policy name: %s was found in the "
@ -238,13 +265,17 @@ class PolicyAuthority(RbacAuthority):
return CONF.identity.admin_role in roles
def _get_access_token(self, roles):
roles = {r.lower() for r in roles if r}
# Extend roles for an user with admin or member role
if 'admin' in roles:
roles.add('member')
if 'member' in roles:
roles.add('reader')
access_token = {
"token": {
"roles": [
{
"name": role
} for role in roles
],
"roles": [{'name': r} for r in roles],
"project_id": self.project_id,
"tenant_id": self.project_id,
"user_id": self.user_id

View File

@ -2,5 +2,6 @@
"admin_rule": "role:admin",
"is_admin_rule": "is_admin:True",
"alt_admin_rule": "is_admin:True or (role:admin and is_project_admin:True)",
"non_admin_rule": "role:Member"
"member_rule": "role:member",
"reader_rule": "role:reader"
}

View File

@ -83,9 +83,29 @@ class PolicyAuthorityTest(base.TestCase):
for name, check in rules.items():
fake_rule = mock.Mock(check=check, __name__='foo')
fake_rule.name = name
fake_rule.deprecated_rule = False
fake_rules.append(fake_rule)
return fake_rules
def _test_policy_file(self, roles, allowed_rules,
disallowed_rules, authority=None, service=None):
if authority is None:
self.assertIsNotNone(service)
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, service)
for rule in allowed_rules:
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
return authority
@mock.patch.object(policy_authority, 'LOG', autospec=True)
def _test_custom_policy(self, *args):
default_roles = ['zero', 'one', 'two', 'three', 'four',
@ -145,23 +165,14 @@ class PolicyAuthorityTest(base.TestCase):
self.assertFalse(authority.allowed(rule, test_roles))
def test_empty_rbac_test_roles(self):
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "custom_rbac_policy")
disallowed_for_empty_roles = ['policy_action_1', 'policy_action_2',
'policy_action_3', 'policy_action_4',
'policy_action_6']
# Due to "policy_action_5": "rule:all_rule" / "all_rule": ""
allowed_for_empty_roles = ['policy_action_5']
for rule in disallowed_for_empty_roles:
self.assertFalse(authority.allowed(rule, []))
for rule in allowed_for_empty_roles:
self.assertTrue(authority.allowed(rule, []))
self._test_policy_file(
roles=[],
allowed_rules=['policy_action_5'],
disallowed_rules=['policy_action_1', 'policy_action_2',
'policy_action_3', 'policy_action_4',
'policy_action_6'],
service="custom_rbac_policy"
)
def test_custom_policy_json(self):
# The CONF.patrole.custom_policy_files has a path to JSON file by
@ -184,75 +195,47 @@ class PolicyAuthorityTest(base.TestCase):
self._test_custom_multi_roles_policy()
def test_admin_policy_file_with_admin_role(self):
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "admin_rbac_policy")
roles = ['admin']
allowed_rules = [
'admin_rule', 'is_admin_rule', 'alt_admin_rule'
]
disallowed_rules = ['non_admin_rule']
for rule in allowed_rules:
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
# admin role implies member and reader roles
self._test_policy_file(
roles=['admin'],
allowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule',
'member_rule', 'reader_rule'],
disallowed_rules=[],
service="admin_rbac_policy"
)
def test_admin_policy_file_with_member_role(self):
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "admin_rbac_policy")
# member role implies reader role
self._test_policy_file(
roles=['member'],
allowed_rules=['member_rule', 'reader_rule'],
disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule'],
service="admin_rbac_policy"
)
roles = ['Member']
allowed_rules = [
'non_admin_rule'
]
disallowed_rules = [
'admin_rule', 'is_admin_rule', 'alt_admin_rule']
for rule in allowed_rules:
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
def test_admin_policy_file_with_reader_role(self):
self._test_policy_file(
roles=['reader'],
allowed_rules=['reader_rule'],
disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule',
'member_rule'],
service="admin_rbac_policy"
)
def test_alt_admin_policy_file_with_context_is_admin(self):
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "alt_admin_rbac_policy")
authority = self._test_policy_file(
roles=['fake_admin'],
allowed_rules=['non_admin_rule'],
disallowed_rules=['admin_rule'],
service="alt_admin_rbac_policy"
)
roles = ['fake_admin']
allowed_rules = ['non_admin_rule']
disallowed_rules = ['admin_rule']
for rule in allowed_rules:
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
roles = ['super_admin']
allowed_rules = ['admin_rule']
disallowed_rules = ['non_admin_rule']
for rule in allowed_rules:
allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
self._test_policy_file(
roles=['super_admin'],
allowed_rules=['admin_rule'],
disallowed_rules=['non_admin_rule'],
authority=authority
)
def test_tenant_user_policy(self):
"""Test whether rules with format tenant_id/user_id formatting work.
@ -261,26 +244,25 @@ class PolicyAuthorityTest(base.TestCase):
network:tenant_id pass. And test whether Nova rules that contain
user_id pass.
"""
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "tenant_rbac_policy")
allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
# Check whether Member role can perform expected actions.
allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
for rule in allowed_rules:
allowed = authority.allowed(rule, ['Member'])
self.assertTrue(allowed)
disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
for disallowed_rule in disallowed_rules:
self.assertFalse(authority.allowed(disallowed_rule, ['Member']))
authority = self._test_policy_file(
roles=['member'],
allowed_rules=allowed_rules,
disallowed_rules=disallowed_rules,
service="tenant_rbac_policy",
)
# Check whether admin role can perform expected actions.
allowed_rules.extend(disallowed_rules)
for rule in allowed_rules:
allowed = authority.allowed(rule, ['admin'])
self.assertTrue(allowed)
self._test_policy_file(
roles=['admin'],
allowed_rules=allowed_rules,
disallowed_rules=[],
authority=authority
)
# Check whether _try_rule is called with the correct target dictionary.
with mock.patch.object(
@ -295,7 +277,7 @@ class PolicyAuthorityTest(base.TestCase):
}
expected_access_data = {
"roles": ['Member'],
"roles": sorted(['member', 'reader']),
"is_admin": False,
"is_admin_project": True,
"user_id": mock.sentinel.user_id,
@ -304,8 +286,11 @@ class PolicyAuthorityTest(base.TestCase):
}
for rule in allowed_rules:
allowed = authority.allowed(rule, ['Member'])
allowed = authority.allowed(rule, ['member'])
self.assertTrue(allowed)
# for sure that roles are in same order
mock_try_rule.call_args[0][2]["roles"] = sorted(
mock_try_rule.call_args[0][2]["roles"])
mock_try_rule.assert_called_once_with(
rule, expected_target, expected_access_data, mock.ANY)
mock_try_rule.reset_mock()

View File

@ -0,0 +1,7 @@
---
features:
- |
Patrole will validate the deprecated policy rules (if applicable) alongside
the current policy rule.
Add ``[patrole] validate_deprecated_rules`` enabled by default to validate
the deprecated rules.