diff --git a/patrole_tempest_plugin/config.py b/patrole_tempest_plugin/config.py index 62337f73..63f0a8a9 100644 --- a/patrole_tempest_plugin/config.py +++ b/patrole_tempest_plugin/config.py @@ -96,6 +96,11 @@ api_action = the policy action that is being tested. Examples: * add_image allowed_role = the ``oslo.policy`` role that is allowed to perform the API. +"""), + cfg.BoolOpt('validate_deprecated_rules', default=True, + help="""Some of the policy rules have deprecated version, +Patrole should be able to run check against default and deprecated rules, +otherwise the result of the tests may not be correct. """) ] diff --git a/patrole_tempest_plugin/policy_authority.py b/patrole_tempest_plugin/policy_authority.py index e0a26a3f..fd7b9f79 100644 --- a/patrole_tempest_plugin/policy_authority.py +++ b/patrole_tempest_plugin/policy_authority.py @@ -169,6 +169,27 @@ class PolicyAuthority(RbacAuthority): is_admin=is_admin_context) return is_allowed + def _handle_deprecated_rule(self, default): + deprecated_rule = default.deprecated_rule + deprecated_msg = ( + 'Policy "%(old_name)s":"%(old_check_str)s" was deprecated in ' + '%(release)s in favor of "%(name)s":"%(check_str)s". Reason: ' + '%(reason)s. Either ensure your deployment is ready for the new ' + 'default or copy/paste the deprecated policy into your policy ' + 'file and maintain it manually.' % { + 'old_name': deprecated_rule.name, + 'old_check_str': deprecated_rule.check_str, + 'release': default.deprecated_since, + 'name': default.name, + 'check_str': default.check_str, + 'reason': default.deprecated_reason + } + ) + LOG.warn(deprecated_msg) + check_str = '(%s) or (%s)' % (default.check_str, + deprecated_rule.check_str) + return policy.RuleDefault(default.name, check_str) + def get_rules(self): rules = policy.Rules() # Check whether policy file exists and attempt to read it. @@ -203,6 +224,12 @@ class PolicyAuthority(RbacAuthority): if self.service in policy_generator: for rule in policy_generator[self.service]: if rule.name not in rules: + if CONF.patrole.validate_deprecated_rules: + # NOTE (sergey.vilgelm): + # The `DocumentedRuleDefault` object has no + # `deprecated_rule` attribute in Pike + if getattr(rule, 'deprecated_rule', False): + rule = self._handle_deprecated_rule(rule) rules[rule.name] = rule.check elif str(rule.check) != str(rules[rule.name]): msg = ("The same policy name: %s was found in the " @@ -238,13 +265,17 @@ class PolicyAuthority(RbacAuthority): return CONF.identity.admin_role in roles def _get_access_token(self, roles): + roles = {r.lower() for r in roles if r} + + # Extend roles for an user with admin or member role + if 'admin' in roles: + roles.add('member') + if 'member' in roles: + roles.add('reader') + access_token = { "token": { - "roles": [ - { - "name": role - } for role in roles - ], + "roles": [{'name': r} for r in roles], "project_id": self.project_id, "tenant_id": self.project_id, "user_id": self.user_id diff --git a/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json b/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json index 78289210..d6d9605c 100644 --- a/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json +++ b/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json @@ -2,5 +2,6 @@ "admin_rule": "role:admin", "is_admin_rule": "is_admin:True", "alt_admin_rule": "is_admin:True or (role:admin and is_project_admin:True)", - "non_admin_rule": "role:Member" + "member_rule": "role:member", + "reader_rule": "role:reader" } diff --git a/patrole_tempest_plugin/tests/unit/test_policy_authority.py b/patrole_tempest_plugin/tests/unit/test_policy_authority.py index 6a4d219b..90e45f99 100644 --- a/patrole_tempest_plugin/tests/unit/test_policy_authority.py +++ b/patrole_tempest_plugin/tests/unit/test_policy_authority.py @@ -83,9 +83,29 @@ class PolicyAuthorityTest(base.TestCase): for name, check in rules.items(): fake_rule = mock.Mock(check=check, __name__='foo') fake_rule.name = name + fake_rule.deprecated_rule = False fake_rules.append(fake_rule) return fake_rules + def _test_policy_file(self, roles, allowed_rules, + disallowed_rules, authority=None, service=None): + if authority is None: + self.assertIsNotNone(service) + test_tenant_id = mock.sentinel.tenant_id + test_user_id = mock.sentinel.user_id + authority = policy_authority.PolicyAuthority( + test_tenant_id, test_user_id, service) + + for rule in allowed_rules: + allowed = authority.allowed(rule, roles) + self.assertTrue(allowed) + + for rule in disallowed_rules: + allowed = authority.allowed(rule, roles) + self.assertFalse(allowed) + + return authority + @mock.patch.object(policy_authority, 'LOG', autospec=True) def _test_custom_policy(self, *args): default_roles = ['zero', 'one', 'two', 'three', 'four', @@ -145,23 +165,14 @@ class PolicyAuthorityTest(base.TestCase): self.assertFalse(authority.allowed(rule, test_roles)) def test_empty_rbac_test_roles(self): - test_tenant_id = mock.sentinel.tenant_id - test_user_id = mock.sentinel.user_id - authority = policy_authority.PolicyAuthority( - test_tenant_id, test_user_id, "custom_rbac_policy") - - disallowed_for_empty_roles = ['policy_action_1', 'policy_action_2', - 'policy_action_3', 'policy_action_4', - 'policy_action_6'] - - # Due to "policy_action_5": "rule:all_rule" / "all_rule": "" - allowed_for_empty_roles = ['policy_action_5'] - - for rule in disallowed_for_empty_roles: - self.assertFalse(authority.allowed(rule, [])) - - for rule in allowed_for_empty_roles: - self.assertTrue(authority.allowed(rule, [])) + self._test_policy_file( + roles=[], + allowed_rules=['policy_action_5'], + disallowed_rules=['policy_action_1', 'policy_action_2', + 'policy_action_3', 'policy_action_4', + 'policy_action_6'], + service="custom_rbac_policy" + ) def test_custom_policy_json(self): # The CONF.patrole.custom_policy_files has a path to JSON file by @@ -184,75 +195,47 @@ class PolicyAuthorityTest(base.TestCase): self._test_custom_multi_roles_policy() def test_admin_policy_file_with_admin_role(self): - test_tenant_id = mock.sentinel.tenant_id - test_user_id = mock.sentinel.user_id - authority = policy_authority.PolicyAuthority( - test_tenant_id, test_user_id, "admin_rbac_policy") - - roles = ['admin'] - allowed_rules = [ - 'admin_rule', 'is_admin_rule', 'alt_admin_rule' - ] - disallowed_rules = ['non_admin_rule'] - - for rule in allowed_rules: - allowed = authority.allowed(rule, roles) - self.assertTrue(allowed) - - for rule in disallowed_rules: - allowed = authority.allowed(rule, roles) - self.assertFalse(allowed) + # admin role implies member and reader roles + self._test_policy_file( + roles=['admin'], + allowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule', + 'member_rule', 'reader_rule'], + disallowed_rules=[], + service="admin_rbac_policy" + ) def test_admin_policy_file_with_member_role(self): - test_tenant_id = mock.sentinel.tenant_id - test_user_id = mock.sentinel.user_id - authority = policy_authority.PolicyAuthority( - test_tenant_id, test_user_id, "admin_rbac_policy") + # member role implies reader role + self._test_policy_file( + roles=['member'], + allowed_rules=['member_rule', 'reader_rule'], + disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule'], + service="admin_rbac_policy" + ) - roles = ['Member'] - allowed_rules = [ - 'non_admin_rule' - ] - disallowed_rules = [ - 'admin_rule', 'is_admin_rule', 'alt_admin_rule'] - - for rule in allowed_rules: - allowed = authority.allowed(rule, roles) - self.assertTrue(allowed) - - for rule in disallowed_rules: - allowed = authority.allowed(rule, roles) - self.assertFalse(allowed) + def test_admin_policy_file_with_reader_role(self): + self._test_policy_file( + roles=['reader'], + allowed_rules=['reader_rule'], + disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule', + 'member_rule'], + service="admin_rbac_policy" + ) def test_alt_admin_policy_file_with_context_is_admin(self): - test_tenant_id = mock.sentinel.tenant_id - test_user_id = mock.sentinel.user_id - authority = policy_authority.PolicyAuthority( - test_tenant_id, test_user_id, "alt_admin_rbac_policy") + authority = self._test_policy_file( + roles=['fake_admin'], + allowed_rules=['non_admin_rule'], + disallowed_rules=['admin_rule'], + service="alt_admin_rbac_policy" + ) - roles = ['fake_admin'] - allowed_rules = ['non_admin_rule'] - disallowed_rules = ['admin_rule'] - - for rule in allowed_rules: - allowed = authority.allowed(rule, roles) - self.assertTrue(allowed) - - for rule in disallowed_rules: - allowed = authority.allowed(rule, roles) - self.assertFalse(allowed) - - roles = ['super_admin'] - allowed_rules = ['admin_rule'] - disallowed_rules = ['non_admin_rule'] - - for rule in allowed_rules: - allowed = authority.allowed(rule, roles) - self.assertTrue(allowed) - - for rule in disallowed_rules: - allowed = authority.allowed(rule, roles) - self.assertFalse(allowed) + self._test_policy_file( + roles=['super_admin'], + allowed_rules=['admin_rule'], + disallowed_rules=['non_admin_rule'], + authority=authority + ) def test_tenant_user_policy(self): """Test whether rules with format tenant_id/user_id formatting work. @@ -261,26 +244,25 @@ class PolicyAuthorityTest(base.TestCase): network:tenant_id pass. And test whether Nova rules that contain user_id pass. """ - test_tenant_id = mock.sentinel.tenant_id - test_user_id = mock.sentinel.user_id - authority = policy_authority.PolicyAuthority( - test_tenant_id, test_user_id, "tenant_rbac_policy") + allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4'] + disallowed_rules = ['admin_tenant_rule', 'admin_user_rule'] # Check whether Member role can perform expected actions. - allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4'] - for rule in allowed_rules: - allowed = authority.allowed(rule, ['Member']) - self.assertTrue(allowed) - - disallowed_rules = ['admin_tenant_rule', 'admin_user_rule'] - for disallowed_rule in disallowed_rules: - self.assertFalse(authority.allowed(disallowed_rule, ['Member'])) + authority = self._test_policy_file( + roles=['member'], + allowed_rules=allowed_rules, + disallowed_rules=disallowed_rules, + service="tenant_rbac_policy", + ) # Check whether admin role can perform expected actions. allowed_rules.extend(disallowed_rules) - for rule in allowed_rules: - allowed = authority.allowed(rule, ['admin']) - self.assertTrue(allowed) + self._test_policy_file( + roles=['admin'], + allowed_rules=allowed_rules, + disallowed_rules=[], + authority=authority + ) # Check whether _try_rule is called with the correct target dictionary. with mock.patch.object( @@ -295,7 +277,7 @@ class PolicyAuthorityTest(base.TestCase): } expected_access_data = { - "roles": ['Member'], + "roles": sorted(['member', 'reader']), "is_admin": False, "is_admin_project": True, "user_id": mock.sentinel.user_id, @@ -304,8 +286,11 @@ class PolicyAuthorityTest(base.TestCase): } for rule in allowed_rules: - allowed = authority.allowed(rule, ['Member']) + allowed = authority.allowed(rule, ['member']) self.assertTrue(allowed) + # for sure that roles are in same order + mock_try_rule.call_args[0][2]["roles"] = sorted( + mock_try_rule.call_args[0][2]["roles"]) mock_try_rule.assert_called_once_with( rule, expected_target, expected_access_data, mock.ANY) mock_try_rule.reset_mock() diff --git a/releasenotes/notes/support-deprecated-roles-eae9dc742cb4fa33.yaml b/releasenotes/notes/support-deprecated-roles-eae9dc742cb4fa33.yaml new file mode 100644 index 00000000..0d581a0d --- /dev/null +++ b/releasenotes/notes/support-deprecated-roles-eae9dc742cb4fa33.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Patrole will validate the deprecated policy rules (if applicable) alongside + the current policy rule. + Add ``[patrole] validate_deprecated_rules`` enabled by default to validate + the deprecated rules.