Check delete_image policy in the API

Note that this adds a clause to the delete exception handler which
returns 404 if the user can't delete the image via policy, but leaves
the existing internal forbidden->403 handler because of how image
property protections work.

Partially-Implements: blueprint policy-refactor

Change-Id: I5f2a406e31b706906b71ea5ecb4f1a53674c97fa
This commit is contained in:
Abhishek Kekane 2021-06-25 10:42:55 +00:00 committed by Dan Smith
parent 39198f93f8
commit c87bfbbcd7
5 changed files with 131 additions and 6 deletions

View File

@ -805,13 +805,24 @@ class ImagesController(object):
# continue on the local worker to match the user's
# expectations. If the image is already deleted, the caller
# will catch this NotFound like normal.
return self.gateway.get_repo(req.context).get(image.image_id)
return self.gateway.get_repo(
req.context,
authorization_layer=False).get(image.image_id)
@utils.mutating
def delete(self, req, image_id):
image_repo = self.gateway.get_repo(req.context)
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
try:
image = image_repo.get(image_id)
# NOTE(abhishekk): This is the right place to check whether user
# have permission to delete the image and remove the policy check
# later from the policy layer.
api_pol = api_policy.ImageAPIPolicy(req.context, image,
self.policy)
api_pol.delete_image()
if self.is_proxyable(image):
# NOTE(danms): Image is staged on another worker; proxy the
# delete request to that worker with the user's token, as if

View File

@ -141,3 +141,10 @@ class ImageAPIPolicy(APIPolicyBase):
def get_images(self):
self._enforce('get_images')
def delete_image(self):
self._enforce('delete_image')
# TODO(danms): Remove this legacy fallback when secure RBAC
# replaces the legacy policy.
if not CONF.enforce_secure_rbac:
check_is_image_mutable(self._context, self._image)

View File

@ -204,3 +204,52 @@ class TestImagesPolicy(functional.SynchronousAPIBase):
self.assertEqual(403, resp.status_code)
resp = self.api_get('/v2/images/%s' % image_id)
self.assertEqual(404, resp.status_code)
def test_image_delete(self):
self.start_server()
image_id = self._create_and_upload()
# Make sure we can delete the image
resp = self.api_delete('/v2/images/%s' % image_id)
self.assertEqual(204, resp.status_code)
# Make sure it is really gone
resp = self.api_get('/v2/images/%s' % image_id)
self.assertEqual(404, resp.status_code)
# Make sure we get a 404 trying to delete a non-existent image
resp = self.api_delete('/v2/images/%s' % image_id)
self.assertEqual(404, resp.status_code)
image_id = self._create_and_upload()
# Now disable delete permissions, but allow get_image
self.set_policy_rules({'get_image': '',
'delete_image': '!'})
# Make sure delete returns 403 because we can see the image,
# just not delete it
resp = self.api_delete('/v2/images/%s' % image_id)
self.assertEqual(403, resp.status_code)
# Now disable delete permissions, including get_image
self.set_policy_rules({'get_image': '!',
'delete_image': '!'})
# Make sure delete returns 404 because we can not see nor
# delete it
resp = self.api_delete('/v2/images/%s' % image_id)
self.assertEqual(404, resp.status_code)
# Now allow delete, but disallow get_image, just to prove that
# you do not need get_image in order to be granted delete, and
# that we only use it for error code determination if
# permission is denied.
self.set_policy_rules({'get_image': '!',
'delete_image': ''})
# Make sure delete returns 204 because even though we can not
# see the image, we can delete it
resp = self.api_delete('/v2/images/%s' % image_id)
self.assertEqual(204, resp.status_code)

View File

@ -39,6 +39,7 @@ from glance.common import exception
from glance.common import store_utils
from glance.common import timeutils
from glance import domain
import glance.notifier
import glance.schema
from glance.tests.unit import base
from glance.tests.unit.keymgr import fake as fake_keymgr
@ -957,7 +958,7 @@ class TestImagesController(base.IsolatedUnitTest):
request = unit_test_utils.get_fake_request(
'/v2/images/%s' % UUID4, method='DELETE')
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
glance.notifier.ImageRepoProxy, 'get') as mock_get:
mock_get.return_value = FakeImage(status='uploading')
mock_get.return_value.extra_properties['os_glance_stage_host'] = (
'https://glance-worker1.openstack.org')
@ -1013,7 +1014,7 @@ class TestImagesController(base.IsolatedUnitTest):
request = unit_test_utils.get_fake_request(
'/v2/images/%s' % UUID4, method='DELETE')
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
glance.notifier.ImageRepoProxy, 'get') as mock_get:
mock_get.return_value = FakeImage(status='uploading')
mock_get.return_value.extra_properties['os_glance_stage_host'] = (
'https://glance-worker1.openstack.org')
@ -1035,8 +1036,8 @@ class TestImagesController(base.IsolatedUnitTest):
json=None, timeout=60)
@mock.patch('glance.context.get_ksa_client')
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'get')
@mock.patch.object(glance.api.authorization.ImageRepoProxy, 'remove')
@mock.patch.object(glance.notifier.ImageRepoProxy, 'get')
@mock.patch.object(glance.notifier.ImageRepoProxy, 'remove')
def test_image_delete_deletes_locally_on_error(self, mock_remove, mock_get,
mock_client):
# Make sure that if the proxy delete fails due to a connection error
@ -3063,6 +3064,26 @@ class TestImagesController(base.IsolatedUnitTest):
self.assertEqual('deleted', deleted_img['status'])
self.assertNotIn('%s/%s' % (BASE_URI, UUID1), self.store.data)
def test_delete_not_allowed_by_policy(self):
request = unit_test_utils.get_fake_request()
with mock.patch.object(self.controller.policy, 'enforce') as mock_enf:
mock_enf.side_effect = webob.exc.HTTPForbidden()
exc = self.assertRaises(webob.exc.HTTPNotFound,
self.controller.delete, request, UUID1)
self.assertTrue(mock_enf.called)
# Make sure we did not leak details of the original Forbidden
# error into the NotFound returned to the client.
self.assertEqual('The resource could not be found.', str(exc))
# Now reject the delete_image call, but allow get_image to ensure that
# we properly see a Forbidden result.
with mock.patch.object(self.controller.policy, 'enforce') as mock_enf:
mock_enf.side_effect = [webob.exc.HTTPForbidden(),
lambda *a: None]
exc = self.assertRaises(webob.exc.HTTPForbidden,
self.controller.delete, request, UUID1)
self.assertTrue(mock_enf.called)
@mock.patch.object(store, 'get_store_from_store_identifier')
@mock.patch.object(store.location, 'get_location_from_uri_and_backend')
@mock.patch.object(store_utils, 'get_dir_separator')

View File

@ -185,3 +185,40 @@ class APIImagePolicy(APIPolicyBase):
self.enforcer.enforce.assert_called_once_with(self.context,
'get_images',
mock.ANY)
def test_delete_image(self):
self.policy.delete_image()
self.enforcer.enforce.assert_called_once_with(self.context,
'delete_image',
mock.ANY)
def test_delete_image_falls_back_to_legacy(self):
self.config(enforce_secure_rbac=False)
# As admin, image is mutable even if owner does not match
self.context.is_admin = True
self.context.owner = 'someuser'
self.image.owner = 'someotheruser'
self.policy.delete_image()
# As non-admin, owner matches, so we're good
self.context.is_admin = False
self.context.owner = 'someuser'
self.image.owner = 'someuser'
self.policy.delete_image()
# If owner does not match, we fail
self.image.owner = 'someotheruser'
self.assertRaises(exception.Forbidden,
self.policy.delete_image)
# Make sure we are checking the legacy handler
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.delete_image()
m.assert_called_once_with(self.context, self.image)
# Make sure we are not checking it if enforce_secure_rbac=True
self.config(enforce_secure_rbac=True)
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.delete_image()
self.assertFalse(m.called)