diff --git a/oslo_policy/policy.py b/oslo_policy/policy.py index 2582212..46c2362 100644 --- a/oslo_policy/policy.py +++ b/oslo_policy/policy.py @@ -453,6 +453,7 @@ class Enforcer(object): self._loaded_files = [] self._policy_dir_mtimes = {} self._file_cache = {} + self._informed_no_policy_file = False def set_rules(self, rules, overwrite=True, use_conf=False): """Create a new :class:`Rules` based on the provided dict of rules. @@ -486,6 +487,7 @@ class Enforcer(object): self._file_cache.clear() self.registered_rules = {} self.file_rules = {} + self._informed_no_policy_file = False def load_rules(self, force_reload=False): """Loads policy_path's rules. @@ -500,10 +502,17 @@ class Enforcer(object): if self.use_conf: if not self.policy_path: - self.policy_path = self._get_policy_path(self.policy_file) + try: + self.policy_path = self._get_policy_path(self.policy_file) + except cfg.ConfigFilesNotFoundError: + if not self._informed_no_policy_file: + LOG.debug('The policy file %s could not be found.', + self.policy_file) + self._informed_no_policy_file = True - self._load_policy_file(self.policy_path, force_reload, - overwrite=self.overwrite) + if self.policy_path: + self._load_policy_file(self.policy_path, force_reload, + overwrite=self.overwrite) for path in self.conf.oslo_policy.policy_dirs: try: path = self._get_policy_path(path) diff --git a/oslo_policy/tests/test_policy.py b/oslo_policy/tests/test_policy.py index 7178b79..3fad5c9 100644 --- a/oslo_policy/tests/test_policy.py +++ b/oslo_policy/tests/test_policy.py @@ -642,6 +642,48 @@ class EnforcerTest(base.PolicyBaseTestCase): {'roles': ['test']}) +class EnforcerNoPolicyFileTest(base.PolicyBaseTestCase): + def setUp(self): + super(EnforcerNoPolicyFileTest, self).setUp() + + def check_loaded_files(self, filenames): + self.assertEqual( + [self.get_config_file_fullname(n) + for n in filenames], + self.enforcer._loaded_files + ) + + def test_load_rules(self): + # Check that loading rules with no policy file does not error + self.enforcer.load_rules(True) + self.assertIsNotNone(self.enforcer.rules) + self.assertEqual(0, len(self.enforcer.rules)) + + def test_opts_registered(self): + self.enforcer.register_default(policy.RuleDefault(name='admin', + check_str='is_admin:False')) + self.enforcer.register_default(policy.RuleDefault(name='owner', + check_str='role:owner')) + self.enforcer.load_rules(True) + + self.assertEqual({}, self.enforcer.file_rules) + self.assertEqual('role:owner', str(self.enforcer.rules['owner'])) + self.assertEqual('is_admin:False', str(self.enforcer.rules['admin'])) + + def test_load_directory(self): + self.create_config_file('policy.d/a.conf', POLICY_JSON_CONTENTS) + self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS) + self.enforcer.load_rules(True) + self.assertIsNotNone(self.enforcer.rules) + loaded_rules = jsonutils.loads(str(self.enforcer.rules)) + self.assertEqual('role:fakeB', loaded_rules['default']) + self.assertEqual('is_admin:True', loaded_rules['admin']) + self.check_loaded_files([ + 'policy.d/a.conf', + 'policy.d/b.conf', + ]) + + class CheckFunctionTestCase(base.PolicyBaseTestCase): def setUp(self):