Check upload_image policy in the API

Note that this adds a clause to the upload exception handler which
returns 404 if the user can't upload the image via policy, but leaves
the existing internal forbidden->403 handler because of how image
property protections work.

Partially-Implements: blueprint policy-refactor

Change-Id: I1353aacf595aa36c8c4823fbe7c6d0600a532f73
This commit is contained in:
Abhishek Kekane 2021-08-12 21:13:06 +00:00
parent 9e002a77f2
commit 4407d6ceb5
5 changed files with 135 additions and 9 deletions

View File

@ -26,6 +26,7 @@ import six
import webob.exc
import glance.api.policy
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import trust_auth
from glance.common import utils
@ -140,14 +141,19 @@ class ImageDataController(object):
request=req,
content_type='text/plain')
image_repo = self.gateway.get_repo(req.context)
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image = None
refresher = None
cxt = req.context
try:
image = image_repo.get(image_id)
target = {'project_id': image.owner}
self.policy.enforce(cxt, 'upload_image', target)
# NOTE(abhishekk): This is the right place to check whether user
# have permission to upload the image and remove the policy check
# later from the policy layer.
api_pol = api_policy.ImageAPIPolicy(req.context, image,
self.policy)
api_pol.upload_image()
image.status = 'saving'
try:
# create a trust if backend is registry

View File

@ -149,6 +149,13 @@ class ImageAPIPolicy(APIPolicyBase):
if not CONF.enforce_secure_rbac:
check_is_image_mutable(self._context, self._image)
def upload_image(self):
self._enforce('upload_image')
# TODO(danms): Remove this legacy fallback when secure RBAC
# replaces the legacy policy.
if not CONF.enforce_secure_rbac:
check_is_image_mutable(self._context, self._image)
class MetadefAPIPolicy(APIPolicyBase):
def __init__(self, context, md_resource=None, target=None, enforcer=None):

View File

@ -253,3 +253,44 @@ class TestImagesPolicy(functional.SynchronousAPIBase):
# see the image, we can delete it
resp = self.api_delete('/v2/images/%s' % image_id)
self.assertEqual(204, resp.status_code)
def test_image_upload(self):
self.start_server()
# Make sure we can upload the image
self._create_and_upload(expected_code=204)
# Now disable upload permissions, but allow get_image
self.set_policy_rules({
'add_image': '',
'get_image': '',
'upload_image': '!'
})
# Make sure upload returns 403 because we can see the image,
# just not upload data to it
self._create_and_upload(expected_code=403)
# Now disable upload permissions, including get_image
self.set_policy_rules({
'add_image': '',
'get_image': '!',
'upload_image': '!',
})
# Make sure upload returns 404 because we can not see nor
# upload data to it
self._create_and_upload(expected_code=404)
# Now allow upload, but disallow get_image, just to prove that
# you do not need get_image in order to be granted upload, and
# that we only use it for error code determination if
# permission is denied.
self.set_policy_rules({
'add_image': '',
'get_image': '!',
'upload_image': ''})
# Make sure upload returns 204 because even though we can not
# see the image, we can upload data to it
self._create_and_upload(expected_code=204)

View File

@ -108,7 +108,7 @@ class FakeGateway(object):
self.policy = policy
self.repo = repo
def get_repo(self, context):
def get_repo(self, context, authorization_layer=True):
return self.repo
@ -126,6 +126,12 @@ class TestImagesController(base.StoreClearingUnitTest):
self.controller = glance.api.v2.image_data.ImageDataController()
self.controller.gateway = FakeGateway(db, store, notifier, policy,
self.image_repo)
# FIXME(abhishekk): Everything is fake in this test, so mocked the
# image mutable_check, Later we need to fix these tests to use
# some realistic data
patcher = mock.patch('glance.api.v2.policy.check_is_image_mutable')
patcher.start()
self.addCleanup(patcher.stop)
def test_download(self):
request = unit_test_utils.get_fake_request()
@ -187,6 +193,28 @@ class TestImagesController(base.StoreClearingUnitTest):
self.assertEqual('YYYY', image.data)
self.assertEqual(4, image.size)
def test_upload_not_allowed_by_policy(self):
request = unit_test_utils.get_fake_request()
with mock.patch.object(self.controller.policy, 'enforce') as mock_enf:
mock_enf.side_effect = webob.exc.HTTPForbidden()
exc = self.assertRaises(webob.exc.HTTPNotFound,
self.controller.upload, request,
unit_test_utils.UUID1, 'YYYY', 4)
self.assertTrue(mock_enf.called)
# Make sure we did not leak details of the original Forbidden
# error into the NotFound returned to the client.
self.assertEqual('The resource could not be found.', str(exc))
# Now reject the upload_image call, but allow get_image to ensure that
# we properly see a Forbidden result.
with mock.patch.object(self.controller.policy, 'enforce') as mock_enf:
mock_enf.side_effect = [webob.exc.HTTPForbidden(),
lambda *a: None]
exc = self.assertRaises(webob.exc.HTTPForbidden,
self.controller.upload, request,
unit_test_utils.UUID1, 'YYYY', 4)
self.assertTrue(mock_enf.called)
def test_upload_status(self):
request = unit_test_utils.get_fake_request()
image = FakeImage('abcd')
@ -217,13 +245,14 @@ class TestImagesController(base.StoreClearingUnitTest):
request = unit_test_utils.get_fake_request()
image = FakeImage('abcd', owner='tenant1')
self.image_repo.result = image
mock_enforce.side_effect = exception.Forbidden
mock_enforce.side_effect = [exception.Forbidden, lambda *a: None]
self.assertRaises(webob.exc.HTTPForbidden, self.controller.upload,
request, unit_test_utils.UUID2, 'YYYY', 4)
expected_target = {'project_id': 'tenant1'}
mock_enforce.assert_called_once_with(request.context,
"upload_image",
expected_target)
expected_call = [
mock.call(mock.ANY, 'upload_image', mock.ANY),
mock.call(mock.ANY, 'get_image', mock.ANY)
]
mock_enforce.has_calls(expected_call)
def test_upload_invalid(self):
request = unit_test_utils.get_fake_request()
@ -1019,6 +1048,12 @@ class TestMultiBackendImagesController(base.MultiStoreClearingUnitTest):
self.controller = glance.api.v2.image_data.ImageDataController()
self.controller.gateway = FakeGateway(db, store, notifier, policy,
self.image_repo)
# FIXME(abhishekk): Everything is fake in this test, so mocked the
# image muntable_check, Later we need to fix these tests to use
# some realistic data
patcher = mock.patch('glance.api.v2.policy.check_is_image_mutable')
patcher.start()
self.addCleanup(patcher.stop)
def test_upload(self):
request = unit_test_utils.get_fake_request()

View File

@ -223,6 +223,43 @@ class APIImagePolicy(APIPolicyBase):
self.policy.delete_image()
self.assertFalse(m.called)
def test_upload_image(self):
self.policy.upload_image()
self.enforcer.enforce.assert_called_once_with(self.context,
'upload_image',
mock.ANY)
def test_upload_image_falls_back_to_legacy(self):
self.config(enforce_secure_rbac=False)
# As admin, image is mutable even if owner does not match
self.context.is_admin = True
self.context.owner = 'someuser'
self.image.owner = 'someotheruser'
self.policy.upload_image()
# As non-admin, owner matches, so we're good
self.context.is_admin = False
self.context.owner = 'someuser'
self.image.owner = 'someuser'
self.policy.upload_image()
# If owner does not match, we fail
self.image.owner = 'someotheruser'
self.assertRaises(exception.Forbidden,
self.policy.upload_image)
# Make sure we are checking the legacy handler
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.upload_image()
m.assert_called_once_with(self.context, self.image)
# Make sure we are not checking it if enforce_secure_rbac=True
self.config(enforce_secure_rbac=True)
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.upload_image()
self.assertFalse(m.called)
class TestMetadefAPIPolicy(APIPolicyBase):
def setUp(self):