Merge "Rules in policy directory files can be deleted."

This commit is contained in:
Zuul 2021-10-21 14:22:54 +00:00 committed by Gerrit Code Review
commit 8a3998af18
3 changed files with 77 additions and 62 deletions

View File

@ -545,7 +545,6 @@ class Enforcer(object):
self.use_conf = use_conf
self._need_check_rule = True
self.overwrite = overwrite
self._loaded_files = []
self._policy_dir_mtimes = {}
self._file_cache = {}
self._informed_no_policy_file = False
@ -586,7 +585,6 @@ class Enforcer(object):
self.set_rules({})
self.default_rule = None
self.policy_path = None
self._loaded_files = []
self._policy_dir_mtimes = {}
self._file_cache.clear()
self.registered_rules = {}
@ -627,22 +625,48 @@ class Enforcer(object):
overwrite=self.overwrite
)
force_reload_policy_dir = force_reload
force_reload_policy_dirs = force_reload
if policy_file_rules_changed:
force_reload_policy_dir = True
force_reload_policy_dirs = True
existing_policy_dirs = []
for path in self.conf.oslo_policy.policy_dirs:
try:
path = self._get_policy_path(path)
absolute_path = self._get_policy_path(path)
existing_policy_dirs.append(absolute_path)
except cfg.ConfigFilesNotFoundError:
continue
if (self._is_directory_updated(self._policy_dir_mtimes, path)
or force_reload_policy_dir):
# If change was made in any policy directory or main policy
# file then all policy directories and main file are
# re-calculated from scratch. We don't have separate rule sets
# for every policy folder, we only have the only rule set in
# RAM for all rule configs (self.rules). So it's the only way
# to be consistent.
if self._is_directory_updated(self._policy_dir_mtimes,
absolute_path):
force_reload_policy_dirs = True
if force_reload_policy_dirs and existing_policy_dirs:
# Here we realize that some policy folders or main policy file
# were changed and we need to recalculate all rules from
# scratch.
# If policy_file_rules_changed is True then we know:
# 1. all rules were already reset.
# 2. rules from main policy file were already applied.
# Otherwise main policy file was not changed and rules were not
# reset and. So we reset rules and force to re-calculate
# rules in main policy file. And after that we apply rules
# from every policy directory.
if self.policy_path:
if not policy_file_rules_changed:
self._load_policy_file(path=self.policy_path,
force_reload=True,
overwrite=self.overwrite
)
else:
self.rules = Rules(default_rule=self.default_rule)
for path in existing_policy_dirs:
self._walk_through_policy_directory(
path,
self._load_policy_file,
force_reload_policy_dir, False
)
path, self._load_policy_file, True, False)
for default in self.registered_rules.values():
if default.deprecated_for_removal:
@ -917,7 +941,6 @@ class Enforcer(object):
self.set_rules(rules, overwrite=overwrite, use_conf=True)
rules_changed = True
self._record_file_rules(data, overwrite)
self._loaded_files.append(path)
LOG.debug('Reloaded policy file: %(path)s', {'path': path})
return rules_changed

View File

@ -238,13 +238,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
super(EnforcerTest, self).setUp()
self.create_config_file('policy.json', POLICY_JSON_CONTENTS)
def check_loaded_files(self, filenames):
self.assertEqual(
[self.get_config_file_fullname(n)
for n in filenames],
self.enforcer._loaded_files
)
def _test_scenario_with_opts_registered(self, scenario, *args, **kwargs):
# This test registers some rules, calls the scenario and then checks
# the registered rules. The scenario should be a method which loads
@ -291,11 +284,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeB', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
os.path.join('policy.d', 'b.conf'),
])
def test_load_directory_after_file_update(self):
self.create_config_file(
@ -305,10 +293,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeA', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
])
new_policy_json_contents = jsonutils.dumps({
"default": "rule:admin",
"admin": "is_admin:True",
@ -332,12 +316,41 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertEqual('role:fakeA', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.assertEqual('rule:bar', loaded_rules['foo'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
'policy.json',
os.path.join('policy.d', 'a.conf'),
])
def test_load_directory_after_file_is_emptied(self):
def dict_rules(enforcer_rules):
"""Converts enforcer rules to dictionary.
:param enforcer_rules: enforcer rules represented as a class Rules
:return: enforcer rules represented as a dictionary
"""
return jsonutils.loads(str(enforcer_rules))
self.assertEqual(self.enforcer.rules, {})
self.enforcer.load_rules()
main_policy_file_rules = jsonutils.loads(POLICY_JSON_CONTENTS)
self.assertEqual(main_policy_file_rules,
dict_rules(self.enforcer.rules))
folder_policy_file = os.path.join('policy.d', 'a.conf')
self.create_config_file(folder_policy_file, POLICY_A_CONTENTS)
self.enforcer.load_rules()
expected_rules = main_policy_file_rules.copy()
expected_rules.update(jsonutils.loads(POLICY_A_CONTENTS))
self.assertEqual(expected_rules, dict_rules(self.enforcer.rules))
self.create_config_file(folder_policy_file, '{}')
# Force the mtime change since the unit test may write to this file
# too fast for mtime to actually change.
absolute_folder_policy_file_path = self.get_config_file_fullname(
folder_policy_file)
stinfo = os.stat(absolute_folder_policy_file_path)
os.utime(absolute_folder_policy_file_path,
(stinfo.st_atime + 42, stinfo.st_mtime + 42))
self.enforcer.load_rules()
self.assertEqual(main_policy_file_rules,
dict_rules(self.enforcer.rules))
def test_load_directory_opts_registered(self):
self._test_scenario_with_opts_registered(self.test_load_directory)
@ -364,11 +377,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
os.path.join('policy.d', 'a.conf'),
])
def test_load_directory_caching_with_files_updated_opts_registered(self):
self._test_scenario_with_opts_registered(
@ -392,10 +400,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
])
def test_load_directory_caching_with_files_same_but_overwrite_false(self):
self.test_load_directory_caching_with_files_same(overwrite=False)
@ -453,12 +457,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeC', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.json',
os.path.join('policy.d', 'a.conf'),
os.path.join('policy.d', 'b.conf'),
os.path.join('policy.2.d', 'fake.conf'),
])
def test_load_multiple_directories_opts_registered(self):
self._test_scenario_with_opts_registered(
@ -474,8 +472,6 @@ class EnforcerTest(base.PolicyBaseTestCase):
self.assertIsNotNone(self.enforcer.rules)
self.assertIn('default', self.enforcer.rules)
self.assertIn('admin', self.enforcer.rules)
self.check_loaded_files(
['policy.json', os.path.join('policy.d', 'a.conf')])
def test_load_non_existed_directory_opts_registered(self):
self._test_scenario_with_opts_registered(
@ -1008,13 +1004,6 @@ class EnforcerNoPolicyFileTest(base.PolicyBaseTestCase):
def setUp(self):
super(EnforcerNoPolicyFileTest, self).setUp()
def check_loaded_files(self, filenames):
self.assertEqual(
[self.get_config_file_fullname(n)
for n in filenames],
self.enforcer._loaded_files
)
def test_load_rules(self):
# Check that loading rules with no policy file does not error
self.enforcer.load_rules(True)
@ -1040,10 +1029,6 @@ class EnforcerNoPolicyFileTest(base.PolicyBaseTestCase):
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeB', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.d/a.conf',
'policy.d/b.conf',
])
class CheckFunctionTestCase(base.PolicyBaseTestCase):

View File

@ -0,0 +1,7 @@
---
fixes:
- |
[`bug 1943584 <https://bugs.launchpad.net/oslo.policy/+bug/1943584>`_]
If file in policy directory was emptied, rules were not re-calculated. The
only workaround was to restart an application. Now rules are re-calculated
"on the fly", without app restart.