Check policies for staging operation in API

This patch enforces policy checks required for staging image data
in API layer. Image mutability check is maintained if secure RBAC
is not enabled in deployments.

Partially-Implements: blueprint policy-refactor

Change-Id: Iba990e59c454a2e9f37a0c659d8634583d3a8bf7
This commit is contained in:
Abhishek Kekane 2021-08-14 09:39:07 +00:00
parent 6ffd80f9c0
commit fa6f871e20
4 changed files with 144 additions and 4 deletions

View File

@ -329,8 +329,24 @@ class ImageDataController(object):
raise webob.exc.HTTPRequestEntityTooLarge(explanation=str(e),
request=req)
image_repo = self.gateway.get_repo(req.context)
image = None
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
# NOTE(abhishekk): stage API call does not have its own policy but
# it requires get_image access, this is the right place to check
# whether user has access to image or not
try:
image = image_repo.get(image_id)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.msg)
api_pol = api_policy.ImageAPIPolicy(req.context, image,
enforcer=self.policy)
try:
api_pol.modify_image()
except exception.Forbidden as e:
# NOTE(abhishekk): This will throw Forbidden if S-RBAC is not
# enabled
raise webob.exc.HTTPForbidden(explanation=e.msg)
# NOTE(jokke): this is horrible way to do it but as long as
# glance_store is in a shape it is, the only way. Don't hold me
@ -368,7 +384,6 @@ class ImageDataController(object):
staging_store = _build_staging_store()
try:
image = image_repo.get(image_id)
image.status = 'uploading'
image_repo.save(image, from_state='queued')
ks_quota.enforce_image_count_uploading(req.context,

View File

@ -128,7 +128,7 @@ class ImageAPIPolicy(APIPolicyBase):
# so check that first, followed by the general
# modify_image policy below.
self._enforce_visibility(value)
self._enforce('modify_image')
self.modify_image()
def update_locations(self):
self._enforce('set_image_location')
@ -159,6 +159,13 @@ class ImageAPIPolicy(APIPolicyBase):
def download_image(self):
self._enforce('download_image')
def modify_image(self):
self._enforce('modify_image')
# TODO(danms): Remove this legacy fallback when secure RBAC
# replaces the legacy policy.
if not CONF.enforce_secure_rbac:
check_is_image_mutable(self._context, self._image)
class MetadefAPIPolicy(APIPolicyBase):
def __init__(self, context, md_resource=None, target=None, enforcer=None):

View File

@ -16,6 +16,7 @@
from unittest import mock
import oslo_policy.policy
from oslo_utils.fixture import uuidsentinel as uuids
from glance.api import policy
from glance.tests import functional
@ -341,3 +342,83 @@ class TestImagesPolicy(functional.SynchronousAPIBase):
response = self.api_get(path)
self.assertEqual(200, response.status_code)
self.assertEqual('IMAGEDATA', response.text)
def test_image_stage(self):
self.start_server()
# First make sure we can perform staging operation
self._create_and_stage(expected_code=204)
# Now disable get_image permissions, but allow modify_image
# should return 204 as well, means even if we can not see
# image details, we can stage data for it.
self.set_policy_rules({
'get_image': '!',
'modify_image': '',
'add_image': ''
})
self._create_and_stage(expected_code=204)
# Now allow get_image and disable modify_image should return 403
self.set_policy_rules({
'get_image': '',
'modify_image': '!',
'add_image': ''
})
self._create_and_stage(expected_code=403)
# Now disabling both permissions will return 404
self.set_policy_rules({
'get_image': '!',
'modify_image': '!',
'add_image': ''
})
self._create_and_stage(expected_code=404)
# create shared visibility image and stage by 2nd project should
# return 404 until it is actually shared with that project.
self.set_policy_rules({
'get_image': '',
'modify_image': '!',
'add_image': '',
'add_member': ''
})
resp = self.api_post('/v2/images',
json={'name': 'foo',
'container_format': 'bare',
'disk_format': 'raw',
'visibility': 'shared'})
self.assertEqual(201, resp.status_code, resp.text)
image = resp.json
# Now stage data using another project details
headers = self._headers({
'X-Project-Id': 'fake-tenant-id',
'Content-Type': 'application/octet-stream'
})
resp = self.api_put(
'/v2/images/%s/stage' % image['id'],
headers=headers,
data=b'IMAGEDATA')
self.assertEqual(404, resp.status_code)
# Now share image with another project and then staging
# data by that project should return 403
path = '/v2/images/%s/members' % image['id']
data = {
'member': uuids.random_member
}
response = self.api_post(path, json=data)
member = response.json
self.assertEqual(200, response.status_code)
self.assertEqual(image['id'], member['image_id'])
# Now stage data using another project details
headers = self._headers({
'X-Project-Id': uuids.random_member,
'X-Roles': 'member',
'Content-Type': 'application/octet-stream'
})
resp = self.api_put(
'/v2/images/%s/stage' % image['id'],
headers=headers,
data=b'IMAGEDATA')
self.assertEqual(403, resp.status_code)

View File

@ -266,6 +266,43 @@ class APIImagePolicy(APIPolicyBase):
'download_image',
mock.ANY)
def test_modify_image(self):
self.policy.modify_image()
self.enforcer.enforce.assert_called_once_with(self.context,
'modify_image',
mock.ANY)
def test_modify_image_falls_back_to_legacy(self):
self.config(enforce_secure_rbac=False)
# As admin, image is mutable even if owner does not match
self.context.is_admin = True
self.context.owner = 'someuser'
self.image.owner = 'someotheruser'
self.policy.modify_image()
# As non-admin, owner matches, so we're good
self.context.is_admin = False
self.context.owner = 'someuser'
self.image.owner = 'someuser'
self.policy.modify_image()
# If owner does not match, we fail
self.image.owner = 'someotheruser'
self.assertRaises(exception.Forbidden,
self.policy.modify_image)
# Make sure we are checking the legacy handler
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.modify_image()
m.assert_called_once_with(self.context, self.image)
# Make sure we are not checking it if enforce_secure_rbac=True
self.config(enforce_secure_rbac=True)
with mock.patch('glance.api.v2.policy.check_is_image_mutable') as m:
self.policy.modify_image()
self.assertFalse(m.called)
class TestMetadefAPIPolicy(APIPolicyBase):
def setUp(self):