Add ability to pass in target data for the oslopolicy-checker

This allows us to test the policy for other services which might have
different or unusual target data formats (such as Barbican). It would be
possible to pass it as a nested dictionary, e.g.:

{
    "target": {
        "secret": {
            "project_id": "my project id"
        }
    }
}

or as a key pair (as oslo.policy would expect):
{
    "target.secret.project_id": "my project id"
}

Both will work (note that this logic was taken from barbican).

This fixes around the limitation that the target is hardcoded to be
"project_id", and thus allows to test more scenarios (such as the
project ID not matching).

Change-Id: Ia9f7462072a8cb142251c8bb5ef19d9a25a98119
This commit is contained in:
Juan Antonio Osorio Robles 2018-10-25 15:48:25 +03:00
parent d746dfb5f4
commit f79650325f
3 changed files with 99 additions and 5 deletions

View File

@ -14,6 +14,7 @@
# limitations under the License.
import argparse
import collections
import sys
from oslo_serialization import jsonutils
@ -33,7 +34,25 @@ def _try_rule(key, rule, target, access_data, o):
print("exception: %s" % rule)
def tool(policy_file, access_file, apply_rule, is_admin=False):
def flatten(d, parent_key=''):
"""Flatten a nested dictionary
Converts a dictionary with nested values to a single level flat
dictionary, with dotted notation for each key.
"""
items = []
for k, v in d.items():
new_key = parent_key + '.' + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key).items())
else:
items.append((new_key, v))
return dict(items)
def tool(policy_file, access_file, apply_rule, is_admin=False,
target_file=None):
access = access_file.read()
access_data = jsonutils.loads(access)['token']
access_data['roles'] = [role['name'] for role in access_data['roles']]
@ -47,16 +66,20 @@ def tool(policy_file, access_file, apply_rule, is_admin=False):
o = Object()
o.rules = rules
target = {"project_id": access_data['project_id']}
if target_file:
target = target_file.read()
target_data = flatten(jsonutils.loads(target))
else:
target_data = {"project_id": access_data['project_id']}
if apply_rule:
key = apply_rule
rule = rules[apply_rule]
_try_rule(key, rule, target, access_data, o)
_try_rule(key, rule, target_data, access_data, o)
return
for key, rule in rules.items():
if ":" in key:
_try_rule(key, rule, target, access_data, o)
_try_rule(key, rule, target_data, access_data, o)
def main():
@ -72,6 +95,11 @@ def main():
type=argparse.FileType('rb', 0),
help='path to a file containing OpenStack Identity API' +
' access info in JSON format')
parser.add_argument(
'--target',
type=argparse.FileType('rb', 0),
help='path to a file containing custom target info in' +
' JSON format. This will be used to evaluate the policy with.')
parser.add_argument(
'--rule',
help='rule to test')
@ -85,7 +113,7 @@ def main():
is_admin = args.is_admin.lower() == "true"
except Exception:
is_admin = False
tool(args.policy, args.access, args.rule, is_admin)
tool(args.policy, args.access, args.rule, is_admin, args.target)
if __name__ == "__main__":

View File

@ -62,6 +62,39 @@ class CheckerTestCase(base.PolicyBaseTestCase):
'''
self.assertEqual(expected, stdout.getvalue())
@mock.patch("oslo_policy._checks.TrueCheck.__call__")
def test_pass_rule_parameters_with_custom_target(self, call_mock):
apply_rule = None
is_admin = False
access_data = token_fixture.SCOPED_TOKEN_FIXTURE["token"]
access_data['roles'] = [
role['name'] for role in access_data['roles']]
access_data['project_id'] = access_data['project']['id']
access_data['is_admin'] = is_admin
sample_target = {
"project_id": access_data["project"]["id"],
"domain_id": access_data["project"]["domain"]["id"]
}
self.create_config_file(
"target.json",
jsonutils.dumps(sample_target))
policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r')
access_file = open(self.get_config_file_fullname('access.json'), 'r')
target_file = open(self.get_config_file_fullname('target.json'), 'r')
stdout = self._capture_stdout()
shell.tool(policy_file, access_file, apply_rule, is_admin,
target_file)
call_mock.assert_called_once_with(
sample_target, access_data, mock.ANY,
current_rule="sampleservice:sample_rule")
expected = '''passed: sampleservice:sample_rule
'''
self.assertEqual(expected, stdout.getvalue())
def test_all_nonadmin(self):
policy_file = open(self.get_config_file_fullname('policy.yaml'), 'r')
@ -75,3 +108,30 @@ class CheckerTestCase(base.PolicyBaseTestCase):
expected = '''passed: sampleservice:sample_rule
'''
self.assertEqual(expected, stdout.getvalue())
def test_flatten_from_dict(self):
target = {
"target": {
"secret": {
"project_id": "1234"
}
}
}
result = shell.flatten(target)
self.assertEqual(result, {"target.secret.project_id": "1234"})
def test_flatten_from_file(self):
target = {
"target": {
"secret": {
"project_id": "1234"
}
}
}
self.create_config_file(
"target.json",
jsonutils.dumps(target))
target_file = open(self.get_config_file_fullname('target.json'), 'r')
target_from_file = target_file.read()
result = shell.flatten(jsonutils.loads(target_from_file))
self.assertEqual(result, {"target.secret.project_id": "1234"})

View File

@ -0,0 +1,6 @@
---
features:
- |
oslopolicy-checker was added the ability to accept a file containing a hash
that represents the target. This makes it possible to check policies that
have non-conventional targets such as barbican.