Adds a flag to determine whether to reload the rules in policy
Initing a Enforcer class with rules can not set rules, because the policy file is modified , the load_rules always try to load the rules from the cache or config file when checks the policy. This patch adds a flag to determine whether to reload the rules from policy file. Closes-Bug: #1279198 Change-Id: Ife84189be4b86a3ee90da4539ff2dbed125be23d
This commit is contained in:
parent
a2c7385b5e
commit
15722f1702
|
@ -182,27 +182,31 @@ class Enforcer(object):
|
|||
is called this will be overwritten.
|
||||
:param default_rule: Default rule to use, CONF.default_rule will
|
||||
be used if none is specified.
|
||||
:param use_conf: Whether to load rules from cache or config file.
|
||||
"""
|
||||
|
||||
def __init__(self, policy_file=None, rules=None, default_rule=None):
|
||||
def __init__(self, policy_file=None, rules=None,
|
||||
default_rule=None, use_conf=True):
|
||||
self.rules = Rules(rules, default_rule)
|
||||
self.default_rule = default_rule or CONF.policy_default_rule
|
||||
|
||||
self.policy_path = None
|
||||
self.policy_file = policy_file or CONF.policy_file
|
||||
self.use_conf = use_conf
|
||||
|
||||
def set_rules(self, rules, overwrite=True):
|
||||
def set_rules(self, rules, overwrite=True, use_conf=False):
|
||||
"""Create a new Rules object based on the provided dict of rules.
|
||||
|
||||
:param rules: New rules to use. It should be an instance of dict.
|
||||
:param overwrite: Whether to overwrite current rules or update them
|
||||
with the new rules.
|
||||
:param use_conf: Whether to reload rules from cache or config file.
|
||||
"""
|
||||
|
||||
if not isinstance(rules, dict):
|
||||
raise TypeError(_("Rules must be an instance of dict or Rules, "
|
||||
"got %s instead") % type(rules))
|
||||
|
||||
self.use_conf = use_conf
|
||||
if overwrite:
|
||||
self.rules = Rules(rules, self.default_rule)
|
||||
else:
|
||||
|
@ -222,15 +226,19 @@ class Enforcer(object):
|
|||
:param force_reload: Whether to overwrite current rules.
|
||||
"""
|
||||
|
||||
if not self.policy_path:
|
||||
self.policy_path = self._get_policy_path()
|
||||
if force_reload:
|
||||
self.use_conf = force_reload
|
||||
|
||||
reloaded, data = fileutils.read_cached_file(self.policy_path,
|
||||
force_reload=force_reload)
|
||||
if reloaded or not self.rules:
|
||||
rules = Rules.load_json(data, self.default_rule)
|
||||
self.set_rules(rules)
|
||||
LOG.debug("Rules successfully reloaded")
|
||||
if self.use_conf:
|
||||
if not self.policy_path:
|
||||
self.policy_path = self._get_policy_path()
|
||||
|
||||
reloaded, data = fileutils.read_cached_file(
|
||||
self.policy_path, force_reload=force_reload)
|
||||
if reloaded or not self.rules:
|
||||
rules = Rules.load_json(data, self.default_rule)
|
||||
self.set_rules(rules)
|
||||
LOG.debug("Rules successfully reloaded")
|
||||
|
||||
def _get_policy_path(self):
|
||||
"""Locate the policy json data file.
|
||||
|
|
|
@ -151,8 +151,6 @@ class EnforcerTest(PolicyBaseTestCase):
|
|||
"cloudwatch:PutMetricData": ""
|
||||
}"""
|
||||
rules = policy.Rules.load_json(rules_json)
|
||||
# Make sure enforce won't try to reload the rules on us
|
||||
self.enforcer.load_rules()
|
||||
self.enforcer.set_rules(rules)
|
||||
action = "cloudwatch:PutMetricData"
|
||||
creds = {'roles': ''}
|
||||
|
@ -179,9 +177,6 @@ class EnforcerTest(PolicyBaseTestCase):
|
|||
self.assertIn('admin', self.enforcer.rules)
|
||||
|
||||
def test_enforcer_force_reload_false(self):
|
||||
# Make sure we have the current policy file loaded, or load_rules may
|
||||
# reload even with force_reload False
|
||||
self.enforcer.load_rules()
|
||||
self.enforcer.set_rules({'test': 'test'})
|
||||
self.enforcer.load_rules(force_reload=False)
|
||||
self.assertIn('test', self.enforcer.rules)
|
||||
|
@ -213,6 +208,12 @@ class EnforcerTest(PolicyBaseTestCase):
|
|||
enforcer._get_policy_path)
|
||||
self.assertEqual(('raise_error.json', ), e.config_files)
|
||||
|
||||
def test_enforcer_set_rules(self):
|
||||
self.enforcer.load_rules()
|
||||
self.enforcer.set_rules({'test': 'test1'})
|
||||
self.enforcer.load_rules()
|
||||
self.assertEqual(self.enforcer.rules, {'test': 'test1'})
|
||||
|
||||
|
||||
class FakeCheck(policy.BaseCheck):
|
||||
def __init__(self, result=None):
|
||||
|
@ -242,17 +243,13 @@ class CheckFunctionTestCase(PolicyBaseTestCase):
|
|||
self.assertEqual(result, False)
|
||||
|
||||
def test_check_with_rule(self):
|
||||
# Make sure the load_rules call in enforce won't overwrite our test
|
||||
# rules.
|
||||
self.enforcer.load_rules()
|
||||
self.enforcer.rules = dict(default=FakeCheck())
|
||||
self.enforcer.set_rules(dict(default=FakeCheck()))
|
||||
result = self.enforcer.enforce("default", "target", "creds")
|
||||
|
||||
self.assertEqual(result, ("target", "creds", self.enforcer))
|
||||
|
||||
def test_check_raises(self):
|
||||
self.enforcer.load_rules()
|
||||
self.enforcer.rules = dict(default=policy.FalseCheck())
|
||||
self.enforcer.set_rules(dict(default=policy.FalseCheck()))
|
||||
|
||||
try:
|
||||
self.enforcer.enforce('rule', 'target', 'creds',
|
||||
|
|
Loading…
Reference in New Issue