Merge "Allow policy file to not exist"

This commit is contained in:
Jenkins 2016-08-03 09:49:49 +00:00 committed by Gerrit Code Review
commit 43587dec7e
2 changed files with 54 additions and 3 deletions

View File

@ -453,6 +453,7 @@ class Enforcer(object):
self._loaded_files = []
self._policy_dir_mtimes = {}
self._file_cache = {}
self._informed_no_policy_file = False
def set_rules(self, rules, overwrite=True, use_conf=False):
"""Create a new :class:`Rules` based on the provided dict of rules.
@ -486,6 +487,7 @@ class Enforcer(object):
self._file_cache.clear()
self.registered_rules = {}
self.file_rules = {}
self._informed_no_policy_file = False
def load_rules(self, force_reload=False):
"""Loads policy_path's rules.
@ -500,10 +502,17 @@ class Enforcer(object):
if self.use_conf:
if not self.policy_path:
self.policy_path = self._get_policy_path(self.policy_file)
try:
self.policy_path = self._get_policy_path(self.policy_file)
except cfg.ConfigFilesNotFoundError:
if not self._informed_no_policy_file:
LOG.debug('The policy file %s could not be found.',
self.policy_file)
self._informed_no_policy_file = True
self._load_policy_file(self.policy_path, force_reload,
overwrite=self.overwrite)
if self.policy_path:
self._load_policy_file(self.policy_path, force_reload,
overwrite=self.overwrite)
for path in self.conf.oslo_policy.policy_dirs:
try:
path = self._get_policy_path(path)

View File

@ -642,6 +642,48 @@ class EnforcerTest(base.PolicyBaseTestCase):
{'roles': ['test']})
class EnforcerNoPolicyFileTest(base.PolicyBaseTestCase):
def setUp(self):
super(EnforcerNoPolicyFileTest, self).setUp()
def check_loaded_files(self, filenames):
self.assertEqual(
[self.get_config_file_fullname(n)
for n in filenames],
self.enforcer._loaded_files
)
def test_load_rules(self):
# Check that loading rules with no policy file does not error
self.enforcer.load_rules(True)
self.assertIsNotNone(self.enforcer.rules)
self.assertEqual(0, len(self.enforcer.rules))
def test_opts_registered(self):
self.enforcer.register_default(policy.RuleDefault(name='admin',
check_str='is_admin:False'))
self.enforcer.register_default(policy.RuleDefault(name='owner',
check_str='role:owner'))
self.enforcer.load_rules(True)
self.assertEqual({}, self.enforcer.file_rules)
self.assertEqual('role:owner', str(self.enforcer.rules['owner']))
self.assertEqual('is_admin:False', str(self.enforcer.rules['admin']))
def test_load_directory(self):
self.create_config_file('policy.d/a.conf', POLICY_JSON_CONTENTS)
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
self.enforcer.load_rules(True)
self.assertIsNotNone(self.enforcer.rules)
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
self.assertEqual('role:fakeB', loaded_rules['default'])
self.assertEqual('is_admin:True', loaded_rules['admin'])
self.check_loaded_files([
'policy.d/a.conf',
'policy.d/b.conf',
])
class CheckFunctionTestCase(base.PolicyBaseTestCase):
def setUp(self):