Make image update check policy at API layer

This implements the proposal at the PTG that makes the image update
API our first one to do all policy checks at the API layer instead
of the lower ones.

There are still some things that have to be resolved, like the image
repo will still check get_image even though we don't want it to.

However, this adds a new v2/policy module to encapsulate these checks
and adds them into the various places of the update call. It also
makes our test policy for modify_image match that of our actual
default, which is a major step in making sure our tests actually run
with the policy we expect at runtime.

NOTE: db.ImageRepo.save() was raising NotFound in cases where
Forbidden was raised by _check_mutate_authorization(). This means
that we could not tell the difference at the higher layers between
an actual NotFound and the 404 generated by an auth failure.

The ImageAPIPolicy module will raise Forbidden for cases where an
operation is forbidden, but the image would be otherwise visible to
the user, and NotFound otherwise to obscure the existence of images
that the user can not otherwise see.

Partially-Implements: blueprint policy-refactor

Change-Id: I43dbc88a9f3fd4c6b2a10c2534ccee9283663282
This commit is contained in:
Dan Smith 2021-05-05 08:26:31 -07:00
parent 8767489f7b
commit 3825d2111a
5 changed files with 322 additions and 6 deletions

View File

@ -35,6 +35,7 @@ import webob.exc
from glance.api import authorization
from glance.api import common
from glance.api import policy
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import location_strategy
from glance.common import store_utils
@ -562,14 +563,17 @@ class ImagesController(object):
@utils.mutating
def update(self, req, image_id, changes):
image_repo = self.gateway.get_repo(req.context)
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
try:
image = image_repo.get(image_id)
api_pol = api_policy.ImageAPIPolicy(req.context, image,
self.policy)
for change in changes:
change_method_name = '_do_%s' % change['op']
change_method = getattr(self, change_method_name)
change_method(req, image, change)
change_method(req, image, api_pol, change)
if changes:
image_repo.save(image)
@ -595,7 +599,7 @@ class ImagesController(object):
return image
def _do_replace(self, req, image, change):
def _do_replace(self, req, image, api_pol, change):
path = change['path']
path_root = path[0]
value = change['value']
@ -603,11 +607,13 @@ class ImagesController(object):
msg = _("Cannot set locations to empty list.")
raise webob.exc.HTTPForbidden(msg)
elif path_root == 'locations' and value:
api_pol.update_locations()
self._do_replace_locations(image, value)
elif path_root == 'owner' and req.context.is_admin == False:
msg = _("Owner can't be updated by non admin.")
raise webob.exc.HTTPForbidden(msg)
else:
api_pol.update_property(path_root, value)
if hasattr(image, path_root):
setattr(image, path_root, value)
elif path_root in image.extra_properties:
@ -616,14 +622,16 @@ class ImagesController(object):
msg = _("Property %s does not exist.")
raise webob.exc.HTTPConflict(msg % path_root)
def _do_add(self, req, image, change):
def _do_add(self, req, image, api_pol, change):
path = change['path']
path_root = path[0]
value = change['value']
json_schema_version = change.get('json_schema_version', 10)
if path_root == 'locations':
api_pol.update_locations()
self._do_add_locations(image, path[1], value)
else:
api_pol.update_property(path_root, value)
if ((hasattr(image, path_root) or
path_root in image.extra_properties)
and json_schema_version == 4):
@ -634,15 +642,17 @@ class ImagesController(object):
else:
image.extra_properties[path_root] = value
def _do_remove(self, req, image, change):
def _do_remove(self, req, image, api_pol, change):
path = change['path']
path_root = path[0]
if path_root == 'locations':
api_pol.delete_locations()
try:
self._do_remove_locations(image, path[1])
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(e.msg)
else:
api_pol.update_property(path_root)
if hasattr(image, path_root):
msg = _("Property %s may not be removed.")
raise webob.exc.HTTPForbidden(msg % path_root)

View File

@ -49,3 +49,61 @@ class APIPolicyBase(object):
return True
except webob.exc.HTTPForbidden:
return False
class ImageAPIPolicy(APIPolicyBase):
def __init__(self, context, image, enforcer=None):
super(ImageAPIPolicy, self).__init__(context,
policy.ImageTarget(image),
enforcer)
self._image = image
def _enforce(self, rule_name):
"""Translate Forbidden->NotFound for images."""
try:
super(ImageAPIPolicy, self)._enforce(rule_name)
except webob.exc.HTTPForbidden:
# If we are checking get_image, then Forbidden means the
# user cannot see this image, so raise NotFound. If we are
# checking anything else and get Forbidden, then raise
# NotFound in that case as well to avoid exposing images
# the user can not see, while preserving the Forbidden
# behavior for the ones they can see.
if rule_name == 'get_image' or not self.check('get_image'):
raise webob.exc.HTTPNotFound()
raise
def check(self, name, *args):
try:
return super(ImageAPIPolicy, self).check(name, *args)
except webob.exc.HTTPNotFound:
# NOTE(danms): Since our _enforce can raise NotFound, that
# too means a False check response.
return False
def _enforce_visibility(self, visibility):
# NOTE(danms): Use the existing enforcement routine for now,
# which shows that we're enforcing the same behavior. In the
# future, that should probably be moved here.
try:
policy._enforce_image_visibility(self.enforcer, self._context,
visibility, self._target)
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=str(e))
def update_property(self, name, value=None):
if name == 'visibility':
# NOTE(danms): Visibility changes have their own policy,
# so check that first, followed by the general
# modify_image policy below.
self._enforce_visibility(value)
self._enforce('modify_image')
def update_locations(self):
self._enforce('set_image_location')
def delete_locations(self):
self._enforce('delete_image_location')
def get_image(self):
self._enforce('get_image')

View File

@ -0,0 +1,162 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslo_policy.policy
from glance.api import policy
from glance.tests import functional
class TestImagesPolicy(functional.SynchronousAPIBase):
def setUp(self):
super(TestImagesPolicy, self).setUp()
self.policy = policy.Enforcer()
def set_policy_rules(self, rules):
self.policy.set_rules(
oslo_policy.policy.Rules.from_dict(rules),
overwrite=True)
def start_server(self):
with mock.patch.object(policy, 'Enforcer') as mock_enf:
mock_enf.return_value = self.policy
super(TestImagesPolicy, self).start_server()
def test_image_update_basic(self):
self.start_server()
image_id = self._create_and_upload()
# First make sure image update works with the default policy
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'add',
'path': '/mykey1',
'value': 'foo'})
self.assertEqual(200, resp.status_code, resp.text)
self.assertEqual(
'foo',
self.api_get('/v2/images/%s' % image_id).json['mykey1'])
# Now disable modify_image permissions and make sure any other
# attempts fail
self.set_policy_rules({'get_image': '',
'modify_image': '!'})
# Add should fail
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'add',
'path': '/mykey2',
'value': 'foo'})
self.assertEqual(403, resp.status_code)
self.assertNotIn(
'mykey2',
self.api_get('/v2/images/%s' % image_id).json)
# Replace should fail, old value should persist
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'replace',
'path': '/mykey1',
'value': 'bar'})
self.assertEqual(403, resp.status_code)
self.assertEqual(
'foo',
self.api_get('/v2/images/%s' % image_id).json['mykey1'])
# Remove should fail, old value should persist
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'remove',
'path': '/mykey1'})
self.assertEqual(403, resp.status_code)
self.assertEqual(
'foo',
self.api_get('/v2/images/%s' % image_id).json['mykey1'])
# Now disable get_image permissions and we should get a 404
# instead of a 403 when trying to do the same operation as above.
# Remove should fail, old value should persist
self.set_policy_rules({'get_image': '!',
'modify_image': '!'})
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'remove',
'path': '/mykey1'})
self.assertEqual(404, resp.status_code)
@mock.patch('glance.location._check_image_location', new=lambda *a: 0)
@mock.patch('glance.location.ImageRepoProxy._set_acls', new=lambda *a: 0)
def test_image_update_locations(self):
self.config(show_multiple_locations=True)
self.start_server()
image_id = self._create_and_upload()
# First make sure we can add and delete locations
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'add',
'path': '/locations/0',
'value': {'url': 'http://foo.bar',
'metadata': {}}})
self.assertEqual(200, resp.status_code, resp.text)
self.assertEqual(2,
len(self.api_get(
'/v2/images/%s' % image_id).json['locations']))
self.assertEqual(
'http://foo.bar',
self.api_get(
'/v2/images/%s' % image_id).json['locations'][1]['url'])
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'remove',
'path': '/locations/0'})
self.assertEqual(200, resp.status_code, resp.text)
self.assertEqual(1,
len(self.api_get(
'/v2/images/%s' % image_id).json['locations']))
# Add another while we still can so we can try to delete it below
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'add',
'path': '/locations/0',
'value': {'url': 'http://foo.baz',
'metadata': {}}})
self.assertEqual(200, resp.status_code, resp.text)
self.assertEqual(2,
len(self.api_get(
'/v2/images/%s' % image_id).json['locations']))
# Now disable set/delete_image_location permissions and make
# sure any other attempts fail
self.set_policy_rules({'get_image': '',
'get_image_location': '',
'set_image_location': '!',
'delete_image_location': '!'})
# Make sure we cannot delete the above or add another
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'remove',
'path': '/locations/0'})
self.assertEqual(403, resp.status_code, resp.text)
self.assertEqual(2,
len(self.api_get(
'/v2/images/%s' % image_id).json['locations']))
resp = self.api_patch('/v2/images/%s' % image_id,
{'op': 'add',
'path': '/locations/0',
'value': {'url': 'http://foo.baz',
'metadata': {}}})
self.assertEqual(403, resp.status_code, resp.text)
self.assertEqual(2,
len(self.api_get(
'/v2/images/%s' % image_id).json['locations']))

View File

@ -1409,7 +1409,8 @@ class TestImagesController(base.IsolatedUnitTest):
def test_update_format_properties(self):
statuses_for_immutability = ['active', 'saving', 'killed']
request = unit_test_utils.get_fake_request(is_admin=True)
request = unit_test_utils.get_fake_request(roles=['admin'],
is_admin=True)
for status in statuses_for_immutability:
image = {
'id': str(uuid.uuid4()),

View File

@ -54,3 +54,88 @@ class APIPolicyBase(utils.BaseTestCase):
# Check fails
self.enforcer.enforce.side_effect = exception.Forbidden
self.assertFalse(self.policy.check('_enforce', 'fake_rule'))
class APIImagePolicy(APIPolicyBase):
def setUp(self):
super(APIImagePolicy, self).setUp()
self.image = mock.MagicMock()
self.policy = policy.ImageAPIPolicy(self.context, self.image,
enforcer=self.enforcer)
def test_enforce(self):
self.assertRaises(webob.exc.HTTPNotFound,
super(APIImagePolicy, self).test_enforce)
@mock.patch('glance.api.policy._enforce_image_visibility')
def test_enforce_visibility(self, mock_enf):
# Visibility passes
self.policy._enforce_visibility('something')
mock_enf.assert_called_once_with(self.enforcer,
self.context,
'something',
mock.ANY)
# Make sure that Forbidden gets caught and translated
mock_enf.side_effect = exception.Forbidden
self.assertRaises(webob.exc.HTTPForbidden,
self.policy._enforce_visibility, 'something')
# Any other exception comes straight through
mock_enf.side_effect = exception.ImageNotFound
self.assertRaises(exception.ImageNotFound,
self.policy._enforce_visibility, 'something')
def test_update_property(self):
with mock.patch.object(self.policy, '_enforce') as mock_enf:
self.policy.update_property('foo', None)
mock_enf.assert_called_once_with('modify_image')
with mock.patch.object(self.policy, '_enforce_visibility') as mock_enf:
self.policy.update_property('visibility', 'foo')
mock_enf.assert_called_once_with('foo')
def test_update_locations(self):
self.policy.update_locations()
self.enforcer.enforce.assert_called_once_with(self.context,
'set_image_location',
mock.ANY)
def test_delete_locations(self):
self.policy.delete_locations()
self.enforcer.enforce.assert_called_once_with(self.context,
'delete_image_location',
mock.ANY)
def test_enforce_exception_behavior(self):
with mock.patch.object(self.policy.enforcer, 'enforce') as mock_enf:
# First make sure we can update if allowed
self.policy.update_property('foo', None)
self.assertTrue(mock_enf.called)
# Make sure that if modify_image and get_image both return
# Forbidden then we should get NotFound. This is because
# we are not allowed to delete the image, nor see that it
# even exists.
mock_enf.reset_mock()
mock_enf.side_effect = exception.Forbidden
self.assertRaises(webob.exc.HTTPNotFound,
self.policy.update_property, 'foo', None)
# Make sure we checked modify_image, and then get_image.
mock_enf.assert_has_calls([
mock.call(mock.ANY, 'modify_image', mock.ANY),
mock.call(mock.ANY, 'get_image', mock.ANY)])
# Make sure that if modify_image is disallowed, but
# get_image is allowed, that we get Forbidden. This is
# because we are allowed to see the image, but not modify
# it, so 403 indicates that without confusing the user and
# returning "not found" for an image they are able to GET.
mock_enf.reset_mock()
mock_enf.side_effect = [exception.Forbidden, lambda *a: None]
self.assertRaises(webob.exc.HTTPForbidden,
self.policy.update_property, 'foo', None)
# Make sure we checked modify_image, and then get_image.
mock_enf.assert_has_calls([
mock.call(mock.ANY, 'modify_image', mock.ANY),
mock.call(mock.ANY, 'get_image', mock.ANY)])