Add base policy check module
This adds new v2/policy module which includes abstract kind of base class which can be subclassed as per need to enforce policies for different glance resources. Related blueprint policy-refactor Change-Id: I3cc6bab69af001ad0a2c3cf274c0f327953eec95
This commit is contained in:
parent
fa55888550
commit
485677a3f3
|
@ -0,0 +1,51 @@
|
|||
# Copyright 2021 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import webob.exc
|
||||
|
||||
from glance.api import policy
|
||||
from glance.common import exception
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class APIPolicyBase(object):
|
||||
def __init__(self, context, target=None, enforcer=None):
|
||||
self._context = context
|
||||
self._target = target or {}
|
||||
self.enforcer = enforcer or policy.Enforcer()
|
||||
|
||||
def _enforce(self, rule_name):
|
||||
try:
|
||||
self.enforcer.enforce(self._context, rule_name, self._target)
|
||||
except exception.Forbidden as e:
|
||||
raise webob.exc.HTTPForbidden(explanation=str(e))
|
||||
|
||||
def check(self, name, *args):
|
||||
"""Perform a soft check of a named policy.
|
||||
|
||||
This is used when you need to check if a policy is allowed for the
|
||||
given resource, without needing to catch an exception. If the policy
|
||||
check requires args, those are accepted here as well.
|
||||
|
||||
:param name: Policy name to check
|
||||
:returns: bool indicating if the policy is allowed.
|
||||
"""
|
||||
try:
|
||||
getattr(self, name)(*args)
|
||||
return True
|
||||
except webob.exc.HTTPForbidden:
|
||||
return False
|
|
@ -0,0 +1,56 @@
|
|||
# Copyright 2021 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
import webob.exc
|
||||
|
||||
from glance.api.v2 import policy
|
||||
from glance.common import exception
|
||||
from glance.tests import utils
|
||||
|
||||
|
||||
class APIPolicyBase(utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(APIPolicyBase, self).setUp()
|
||||
self.enforcer = mock.MagicMock()
|
||||
self.context = mock.MagicMock()
|
||||
self.policy = policy.APIPolicyBase(self.context,
|
||||
enforcer=self.enforcer)
|
||||
|
||||
def test_enforce(self):
|
||||
# Enforce passes
|
||||
self.policy._enforce('fake_rule')
|
||||
self.enforcer.enforce.assert_called_once_with(
|
||||
self.context,
|
||||
'fake_rule',
|
||||
mock.ANY)
|
||||
|
||||
# Make sure that Forbidden gets caught and translated
|
||||
self.enforcer.enforce.side_effect = exception.Forbidden
|
||||
self.assertRaises(webob.exc.HTTPForbidden,
|
||||
self.policy._enforce, 'fake_rule')
|
||||
|
||||
# Any other exception comes straight through
|
||||
self.enforcer.enforce.side_effect = exception.ImageNotFound
|
||||
self.assertRaises(exception.ImageNotFound,
|
||||
self.policy._enforce, 'fake_rule')
|
||||
|
||||
def test_check(self):
|
||||
# Check passes
|
||||
self.assertTrue(self.policy.check('_enforce', 'fake_rule'))
|
||||
|
||||
# Check fails
|
||||
self.enforcer.enforce.side_effect = exception.Forbidden
|
||||
self.assertFalse(self.policy.check('_enforce', 'fake_rule'))
|
Loading…
Reference in New Issue