diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 68ff7193bc..a65c8c5207 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -35,6 +35,7 @@ import webob.exc from glance.api import authorization from glance.api import common from glance.api import policy +from glance.api.v2 import policy as api_policy from glance.common import exception from glance.common import location_strategy from glance.common import store_utils @@ -562,14 +563,17 @@ class ImagesController(object): @utils.mutating def update(self, req, image_id, changes): - image_repo = self.gateway.get_repo(req.context) + image_repo = self.gateway.get_repo(req.context, + authorization_layer=False) try: image = image_repo.get(image_id) + api_pol = api_policy.ImageAPIPolicy(req.context, image, + self.policy) for change in changes: change_method_name = '_do_%s' % change['op'] change_method = getattr(self, change_method_name) - change_method(req, image, change) + change_method(req, image, api_pol, change) if changes: image_repo.save(image) @@ -595,7 +599,7 @@ class ImagesController(object): return image - def _do_replace(self, req, image, change): + def _do_replace(self, req, image, api_pol, change): path = change['path'] path_root = path[0] value = change['value'] @@ -603,11 +607,13 @@ class ImagesController(object): msg = _("Cannot set locations to empty list.") raise webob.exc.HTTPForbidden(msg) elif path_root == 'locations' and value: + api_pol.update_locations() self._do_replace_locations(image, value) elif path_root == 'owner' and req.context.is_admin == False: msg = _("Owner can't be updated by non admin.") raise webob.exc.HTTPForbidden(msg) else: + api_pol.update_property(path_root, value) if hasattr(image, path_root): setattr(image, path_root, value) elif path_root in image.extra_properties: @@ -616,14 +622,16 @@ class ImagesController(object): msg = _("Property %s does not exist.") raise webob.exc.HTTPConflict(msg % path_root) - def _do_add(self, req, image, change): + def _do_add(self, req, image, api_pol, change): path = change['path'] path_root = path[0] value = change['value'] json_schema_version = change.get('json_schema_version', 10) if path_root == 'locations': + api_pol.update_locations() self._do_add_locations(image, path[1], value) else: + api_pol.update_property(path_root, value) if ((hasattr(image, path_root) or path_root in image.extra_properties) and json_schema_version == 4): @@ -634,15 +642,17 @@ class ImagesController(object): else: image.extra_properties[path_root] = value - def _do_remove(self, req, image, change): + def _do_remove(self, req, image, api_pol, change): path = change['path'] path_root = path[0] if path_root == 'locations': + api_pol.delete_locations() try: self._do_remove_locations(image, path[1]) except exception.Forbidden as e: raise webob.exc.HTTPForbidden(e.msg) else: + api_pol.update_property(path_root) if hasattr(image, path_root): msg = _("Property %s may not be removed.") raise webob.exc.HTTPForbidden(msg % path_root) diff --git a/glance/api/v2/policy.py b/glance/api/v2/policy.py index cd89f116a7..a9b7fafb2a 100644 --- a/glance/api/v2/policy.py +++ b/glance/api/v2/policy.py @@ -49,3 +49,61 @@ class APIPolicyBase(object): return True except webob.exc.HTTPForbidden: return False + + +class ImageAPIPolicy(APIPolicyBase): + def __init__(self, context, image, enforcer=None): + super(ImageAPIPolicy, self).__init__(context, + policy.ImageTarget(image), + enforcer) + self._image = image + + def _enforce(self, rule_name): + """Translate Forbidden->NotFound for images.""" + try: + super(ImageAPIPolicy, self)._enforce(rule_name) + except webob.exc.HTTPForbidden: + # If we are checking get_image, then Forbidden means the + # user cannot see this image, so raise NotFound. If we are + # checking anything else and get Forbidden, then raise + # NotFound in that case as well to avoid exposing images + # the user can not see, while preserving the Forbidden + # behavior for the ones they can see. + if rule_name == 'get_image' or not self.check('get_image'): + raise webob.exc.HTTPNotFound() + raise + + def check(self, name, *args): + try: + return super(ImageAPIPolicy, self).check(name, *args) + except webob.exc.HTTPNotFound: + # NOTE(danms): Since our _enforce can raise NotFound, that + # too means a False check response. + return False + + def _enforce_visibility(self, visibility): + # NOTE(danms): Use the existing enforcement routine for now, + # which shows that we're enforcing the same behavior. In the + # future, that should probably be moved here. + try: + policy._enforce_image_visibility(self.enforcer, self._context, + visibility, self._target) + except exception.Forbidden as e: + raise webob.exc.HTTPForbidden(explanation=str(e)) + + def update_property(self, name, value=None): + if name == 'visibility': + # NOTE(danms): Visibility changes have their own policy, + # so check that first, followed by the general + # modify_image policy below. + self._enforce_visibility(value) + self._enforce('modify_image') + + def update_locations(self): + self._enforce('set_image_location') + + def delete_locations(self): + self._enforce('delete_image_location') + + def get_image(self): + self._enforce('get_image') diff --git a/glance/tests/functional/v2/test_images_api_policy.py b/glance/tests/functional/v2/test_images_api_policy.py new file mode 100644 index 0000000000..19c7378fa8 --- /dev/null +++ b/glance/tests/functional/v2/test_images_api_policy.py @@ -0,0 +1,162 @@ +# Copyright 2021 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +import oslo_policy.policy + +from glance.api import policy +from glance.tests import functional + + +class TestImagesPolicy(functional.SynchronousAPIBase): + def setUp(self): + super(TestImagesPolicy, self).setUp() + self.policy = policy.Enforcer() + + def set_policy_rules(self, rules): + self.policy.set_rules( + oslo_policy.policy.Rules.from_dict(rules), + overwrite=True) + + def start_server(self): + with mock.patch.object(policy, 'Enforcer') as mock_enf: + mock_enf.return_value = self.policy + super(TestImagesPolicy, self).start_server() + + def test_image_update_basic(self): + self.start_server() + image_id = self._create_and_upload() + + # First make sure image update works with the default policy + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'add', + 'path': '/mykey1', + 'value': 'foo'}) + self.assertEqual(200, resp.status_code, resp.text) + self.assertEqual( + 'foo', + self.api_get('/v2/images/%s' % image_id).json['mykey1']) + + # Now disable modify_image permissions and make sure any other + # attempts fail + self.set_policy_rules({'get_image': '', + 'modify_image': '!'}) + + # Add should fail + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'add', + 'path': '/mykey2', + 'value': 'foo'}) + self.assertEqual(403, resp.status_code) + self.assertNotIn( + 'mykey2', + self.api_get('/v2/images/%s' % image_id).json) + + # Replace should fail, old value should persist + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'replace', + 'path': '/mykey1', + 'value': 'bar'}) + self.assertEqual(403, resp.status_code) + self.assertEqual( + 'foo', + self.api_get('/v2/images/%s' % image_id).json['mykey1']) + + # Remove should fail, old value should persist + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'remove', + 'path': '/mykey1'}) + self.assertEqual(403, resp.status_code) + self.assertEqual( + 'foo', + self.api_get('/v2/images/%s' % image_id).json['mykey1']) + + # Now disable get_image permissions and we should get a 404 + # instead of a 403 when trying to do the same operation as above. + # Remove should fail, old value should persist + self.set_policy_rules({'get_image': '!', + 'modify_image': '!'}) + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'remove', + 'path': '/mykey1'}) + self.assertEqual(404, resp.status_code) + + @mock.patch('glance.location._check_image_location', new=lambda *a: 0) + @mock.patch('glance.location.ImageRepoProxy._set_acls', new=lambda *a: 0) + def test_image_update_locations(self): + self.config(show_multiple_locations=True) + self.start_server() + image_id = self._create_and_upload() + + # First make sure we can add and delete locations + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'add', + 'path': '/locations/0', + 'value': {'url': 'http://foo.bar', + 'metadata': {}}}) + self.assertEqual(200, resp.status_code, resp.text) + self.assertEqual(2, + len(self.api_get( + '/v2/images/%s' % image_id).json['locations'])) + self.assertEqual( + 'http://foo.bar', + self.api_get( + '/v2/images/%s' % image_id).json['locations'][1]['url']) + + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'remove', + 'path': '/locations/0'}) + self.assertEqual(200, resp.status_code, resp.text) + self.assertEqual(1, + len(self.api_get( + '/v2/images/%s' % image_id).json['locations'])) + + # Add another while we still can so we can try to delete it below + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'add', + 'path': '/locations/0', + 'value': {'url': 'http://foo.baz', + 'metadata': {}}}) + self.assertEqual(200, resp.status_code, resp.text) + self.assertEqual(2, + len(self.api_get( + '/v2/images/%s' % image_id).json['locations'])) + + # Now disable set/delete_image_location permissions and make + # sure any other attempts fail + self.set_policy_rules({'get_image': '', + 'get_image_location': '', + 'set_image_location': '!', + 'delete_image_location': '!'}) + + # Make sure we cannot delete the above or add another + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'remove', + 'path': '/locations/0'}) + self.assertEqual(403, resp.status_code, resp.text) + self.assertEqual(2, + len(self.api_get( + '/v2/images/%s' % image_id).json['locations'])) + + resp = self.api_patch('/v2/images/%s' % image_id, + {'op': 'add', + 'path': '/locations/0', + 'value': {'url': 'http://foo.baz', + 'metadata': {}}}) + self.assertEqual(403, resp.status_code, resp.text) + self.assertEqual(2, + len(self.api_get( + '/v2/images/%s' % image_id).json['locations'])) diff --git a/glance/tests/unit/v2/test_images_resource.py b/glance/tests/unit/v2/test_images_resource.py index e2e594d7a7..10011e7e91 100644 --- a/glance/tests/unit/v2/test_images_resource.py +++ b/glance/tests/unit/v2/test_images_resource.py @@ -1409,7 +1409,8 @@ class TestImagesController(base.IsolatedUnitTest): def test_update_format_properties(self): statuses_for_immutability = ['active', 'saving', 'killed'] - request = unit_test_utils.get_fake_request(is_admin=True) + request = unit_test_utils.get_fake_request(roles=['admin'], + is_admin=True) for status in statuses_for_immutability: image = { 'id': str(uuid.uuid4()), diff --git a/glance/tests/unit/v2/test_v2_policy.py b/glance/tests/unit/v2/test_v2_policy.py index f0c1551285..500541f17e 100644 --- a/glance/tests/unit/v2/test_v2_policy.py +++ b/glance/tests/unit/v2/test_v2_policy.py @@ -54,3 +54,88 @@ class APIPolicyBase(utils.BaseTestCase): # Check fails self.enforcer.enforce.side_effect = exception.Forbidden self.assertFalse(self.policy.check('_enforce', 'fake_rule')) + + +class APIImagePolicy(APIPolicyBase): + def setUp(self): + super(APIImagePolicy, self).setUp() + self.image = mock.MagicMock() + self.policy = policy.ImageAPIPolicy(self.context, self.image, + enforcer=self.enforcer) + + def test_enforce(self): + self.assertRaises(webob.exc.HTTPNotFound, + super(APIImagePolicy, self).test_enforce) + + @mock.patch('glance.api.policy._enforce_image_visibility') + def test_enforce_visibility(self, mock_enf): + # Visibility passes + self.policy._enforce_visibility('something') + mock_enf.assert_called_once_with(self.enforcer, + self.context, + 'something', + mock.ANY) + + # Make sure that Forbidden gets caught and translated + mock_enf.side_effect = exception.Forbidden + self.assertRaises(webob.exc.HTTPForbidden, + self.policy._enforce_visibility, 'something') + + # Any other exception comes straight through + mock_enf.side_effect = exception.ImageNotFound + self.assertRaises(exception.ImageNotFound, + self.policy._enforce_visibility, 'something') + + def test_update_property(self): + with mock.patch.object(self.policy, '_enforce') as mock_enf: + self.policy.update_property('foo', None) + mock_enf.assert_called_once_with('modify_image') + + with mock.patch.object(self.policy, '_enforce_visibility') as mock_enf: + self.policy.update_property('visibility', 'foo') + mock_enf.assert_called_once_with('foo') + + def test_update_locations(self): + self.policy.update_locations() + self.enforcer.enforce.assert_called_once_with(self.context, + 'set_image_location', + mock.ANY) + + def test_delete_locations(self): + self.policy.delete_locations() + self.enforcer.enforce.assert_called_once_with(self.context, + 'delete_image_location', + mock.ANY) + + def test_enforce_exception_behavior(self): + with mock.patch.object(self.policy.enforcer, 'enforce') as mock_enf: + # First make sure we can update if allowed + self.policy.update_property('foo', None) + self.assertTrue(mock_enf.called) + + # Make sure that if modify_image and get_image both return + # Forbidden then we should get NotFound. This is because + # we are not allowed to delete the image, nor see that it + # even exists. + mock_enf.reset_mock() + mock_enf.side_effect = exception.Forbidden + self.assertRaises(webob.exc.HTTPNotFound, + self.policy.update_property, 'foo', None) + # Make sure we checked modify_image, and then get_image. + mock_enf.assert_has_calls([ + mock.call(mock.ANY, 'modify_image', mock.ANY), + mock.call(mock.ANY, 'get_image', mock.ANY)]) + + # Make sure that if modify_image is disallowed, but + # get_image is allowed, that we get Forbidden. This is + # because we are allowed to see the image, but not modify + # it, so 403 indicates that without confusing the user and + # returning "not found" for an image they are able to GET. + mock_enf.reset_mock() + mock_enf.side_effect = [exception.Forbidden, lambda *a: None] + self.assertRaises(webob.exc.HTTPForbidden, + self.policy.update_property, 'foo', None) + # Make sure we checked modify_image, and then get_image. + mock_enf.assert_has_calls([ + mock.call(mock.ANY, 'modify_image', mock.ANY), + mock.call(mock.ANY, 'get_image', mock.ANY)])