Add scope_types to RuleDefault objects

This change will help oslo.policy consume different levels of scope
and enforce proper admin-ness across OpenStack. The idea is that once
keystone has the ability to issue system-scoped tokens, we can start
enforcing partial scope checks in `Enforcer.enforce()`.

bp add-scope-to-policy

Change-Id: I7fa171d859d82939511f8279e4e9464f792ed2cd
This commit is contained in:
Lance Bragstad 2017-10-06 19:30:14 +00:00
parent a9931f3708
commit 52c82ff9ab
4 changed files with 138 additions and 7 deletions

View File

@ -109,6 +109,16 @@ interact with the resource the policy protects. The `method` should be the HTTP
verb corresponding to the `path`. The list of `operations` can be supplied with verb corresponding to the `path`. The list of `operations` can be supplied with
multiple dictionaries if the policy is used to protect multiple paths. multiple dictionaries if the policy is used to protect multiple paths.
Setting scope
-------------
The `RuleDefault` and `DocumentedRuleDefault` objects have an attribute
dedicated to the intended scope of the operation called `scope_types`. This
attribute can only be set at rule definition and never overridden via a policy
file. This variable is designed to save the scope at which a policy should
operate. During enforcement, the information in `scope_types` is compared to
the scope of the token used in the request.
Sample file generation Sample file generation
---------------------- ----------------------

View File

@ -121,10 +121,16 @@ def _format_rule_default_yaml(default, include_help=True):
op += ('# %(method)s %(path)s\n' % op += ('# %(method)s %(path)s\n' %
{'method': operation['method'], {'method': operation['method'],
'path': operation['path']}) 'path': operation['path']})
intended_scope = ""
if getattr(default, 'scope_types', None) is not None:
intended_scope = (
'# Intended scope(s): ' + ', '.join(default.scope_types) + '\n'
)
text = ('%(help)s\n%(op)s#%(text)s\n' % text = ('%(help)s\n%(op)s%(scope)s#%(text)s\n' %
{'help': _format_help_text(default.description), {'help': _format_help_text(default.description),
'op': op, 'op': op,
'scope': intended_scope,
'text': text}) 'text': text})
if default.deprecated_for_removal: if default.deprecated_for_removal:

View File

@ -301,6 +301,21 @@ class PolicyNotAuthorized(Exception):
super(PolicyNotAuthorized, self).__init__(msg) super(PolicyNotAuthorized, self).__init__(msg)
class InvalidScope(Exception):
"""Raised when the scope of the request mismatches the policy scope."""
def __init__(self, rule, operation_scopes, token_scope):
msg = (
"(rule)s requires a scope of %(operation_scopes)s, request "
"was made with %(token_scope)s scope." % {
'rule': rule,
'operation_scopes': operation_scopes,
'token_scope': token_scope
}
)
super(InvalidScope, self).__init__(msg)
class DuplicatePolicyError(Exception): class DuplicatePolicyError(Exception):
def __init__(self, name): def __init__(self, name):
msg = _('Policy %(name)s is already registered') % {'name': name} msg = _('Policy %(name)s is already registered') % {'name': name}
@ -761,8 +776,8 @@ class Enforcer(object):
raise cfg.ConfigFilesNotFoundError((path,)) raise cfg.ConfigFilesNotFoundError((path,))
def enforce(self, rule, target, creds, do_raise=False, def enforce(self, rule, target, creds, do_raise=False, exc=None,
exc=None, *args, **kwargs): enforce_scope=True, *args, **kwargs):
"""Checks authorization of a rule against the target and credentials. """Checks authorization of a rule against the target and credentials.
:param rule: The rule to evaluate. :param rule: The rule to evaluate.
@ -778,6 +793,12 @@ class Enforcer(object):
positional and keyword arguments) will be passed to positional and keyword arguments) will be passed to
the exception class. If not specified, the exception class. If not specified,
:class:`PolicyNotAuthorized` will be used. :class:`PolicyNotAuthorized` will be used.
:param enforce_scope: A boolean value denoting if an exception should
be raised in the event the operation requires a
different scope from the one in the request (e.g.
using a project-scope token to do something
system-wide). If False, a warning will be logged
with details of the scope failure.
:return: ``False`` if the policy does not allow the action and `exc` is :return: ``False`` if the policy does not allow the action and `exc` is
not provided; otherwise, returns a value that evaluates to not provided; otherwise, returns a value that evaluates to
@ -810,6 +831,41 @@ class Enforcer(object):
# If the rule doesn't exist, fail closed # If the rule doesn't exist, fail closed
result = False result = False
else: else:
# Check the scope of the operation against the possible scope
# attributes provided in `creds`.
if creds.get('system'):
token_scope = 'system'
else:
# If the token isn't system-scoped then we're dealing with
# either a domain-scoped token or a project-scoped token.
# From a policy perspective, both are "project" operations.
# Whether or not the project is a domain depends on where
# it sits in the hierarchy.
token_scope = 'project'
registered_rule = self.registered_rules.get(rule)
if registered_rule and registered_rule.scope_types:
if token_scope not in registered_rule.scope_types:
if enforce_scope:
raise InvalidScope(
rule, registered_rule.scope_types, token_scope
)
# If we don't raise an exception we should at least
# inform operators about policies that are being used
# with improper scopes.
msg = (
'Policy %(rule)s failed scope check. The token '
'used to make the request was %(token_scope)s '
'scoped but the policy requires %(policy_scope)s '
'scope. This behavior may change in the future '
'where using the intended scope is required' % {
'rule': rule,
'token_scope': token_scope,
'policy_scope': registered_rule.scope_types
}
)
warnings.warn(msg)
result = _checks._check( result = _checks._check(
rule=to_check, rule=to_check,
target=target, target=target,
@ -893,6 +949,8 @@ class RuleDefault(object):
in. Accepts any string, though valid version in. Accepts any string, though valid version
strings are encouraged. Silently ignored if strings are encouraged. Silently ignored if
deprecated_for_removal is False. deprecated_for_removal is False.
:param scope_types: A list containing the intended scopes of the operation
being done.
.. versionchanged 1.29 .. versionchanged 1.29
Added *deprecated_rule* parameter. Added *deprecated_rule* parameter.
@ -905,10 +963,15 @@ class RuleDefault(object):
.. versionchanged 1.29 .. versionchanged 1.29
Added *deprecated_since* parameter. Added *deprecated_since* parameter.
.. versionchanged 1.31
Added *scope_types* parameter.
""" """
def __init__(self, name, check_str, description=None, def __init__(self, name, check_str, description=None,
deprecated_rule=None, deprecated_for_removal=False, deprecated_rule=None, deprecated_for_removal=False,
deprecated_reason=None, deprecated_since=None): deprecated_reason=None, deprecated_since=None,
scope_types=None):
self.name = name self.name = name
self.check_str = check_str self.check_str = check_str
self.check = _parser.parse_rule(check_str) self.check = _parser.parse_rule(check_str)
@ -932,6 +995,19 @@ class RuleDefault(object):
'policy' % {'name': self.name} 'policy' % {'name': self.name}
) )
if scope_types:
msg = 'scope_types must be a list of strings.'
if not isinstance(scope_types, list):
raise ValueError(msg)
for scope_type in scope_types:
if not isinstance(scope_type, six.string_types):
raise ValueError(msg)
if scope_types.count(scope_type) > 1:
raise ValueError(
'scope_types must be a list of unique strings.'
)
self.scope_types = scope_types
def __str__(self): def __str__(self):
return '"%(name)s": "%(check_str)s"' % {'name': self.name, return '"%(name)s": "%(check_str)s"' % {'name': self.name,
'check_str': self.check_str} 'check_str': self.check_str}
@ -976,13 +1052,15 @@ class DocumentedRuleDefault(RuleDefault):
""" """
def __init__(self, name, check_str, description, operations, def __init__(self, name, check_str, description, operations,
deprecated_rule=None, deprecated_for_removal=False, deprecated_rule=None, deprecated_for_removal=False,
deprecated_reason=None, deprecated_since=None): deprecated_reason=None, deprecated_since=None,
scope_types=None):
super(DocumentedRuleDefault, self).__init__( super(DocumentedRuleDefault, self).__init__(
name, check_str, description, name, check_str, description,
deprecated_rule=deprecated_rule, deprecated_rule=deprecated_rule,
deprecated_for_removal=deprecated_for_removal, deprecated_for_removal=deprecated_for_removal,
deprecated_reason=deprecated_reason, deprecated_reason=deprecated_reason,
deprecated_since=deprecated_since deprecated_since=deprecated_since,
scope_types=scope_types
) )
self.operations = operations self.operations = operations

View File

@ -742,7 +742,8 @@ class CheckFunctionTestCase(base.PolicyBaseTestCase):
creds = {} creds = {}
exc = self.assertRaises( exc = self.assertRaises(
MyException, self.enforcer.enforce, 'rule', 'target', creds, MyException, self.enforcer.enforce, 'rule', 'target', creds,
True, MyException, 'arg1', 'arg2', kw1='kwarg1', kw2='kwarg2') True, MyException, False, 'arg1', 'arg2', kw1='kwarg1',
kw2='kwarg2')
self.assertEqual(('arg1', 'arg2'), exc.args) self.assertEqual(('arg1', 'arg2'), exc.args)
self.assertEqual(dict(kw1='kwarg1', kw2='kwarg2'), exc.kwargs) self.assertEqual(dict(kw1='kwarg1', kw2='kwarg2'), exc.kwargs)
@ -843,6 +844,42 @@ class RuleDefaultTestCase(base.PolicyBaseTestCase):
opt2 = RuleDefaultSub(name='bar', check_str='rule:foo') opt2 = RuleDefaultSub(name='bar', check_str='rule:foo')
self.assertNotEqual(opt1, opt2) self.assertNotEqual(opt1, opt2)
def test_create_opt_with_scope_types(self):
scope_types = ['project']
opt = policy.RuleDefault(
name='foo',
check_str='role:bar',
scope_types=scope_types
)
self.assertEqual(opt.scope_types, scope_types)
def test_create_opt_with_scope_type_strings_fails(self):
self.assertRaises(
ValueError,
policy.RuleDefault,
name='foo',
check_str='role:bar',
scope_types='project'
)
def test_create_opt_with_multiple_scope_types(self):
opt = policy.RuleDefault(
name='foo',
check_str='role:bar',
scope_types=['project', 'system']
)
self.assertEqual(opt.scope_types, ['project', 'system'])
def test_ensure_scope_types_are_unique(self):
self.assertRaises(
ValueError,
policy.RuleDefault,
name='foo',
check_str='role:bar',
scope_types=['project', 'project']
)
class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase): class DocumentedRuleDefaultDeprecationTestCase(base.PolicyBaseTestCase):