Merge "Check add_image policy in the API"

This commit is contained in:
Zuul 2021-08-24 01:05:48 +00:00 committed by Gerrit Code Review
commit 646e15da44
6 changed files with 182 additions and 6 deletions

View File

@ -92,9 +92,17 @@ class ImagesController(object):
@utils.mutating
def create(self, req, image, extra_properties, tags):
image_factory = self.gateway.get_image_factory(req.context)
image_repo = self.gateway.get_repo(req.context)
image_factory = self.gateway.get_image_factory(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
try:
if 'owner' not in image:
image['owner'] = req.context.project_id
api_policy.ImageAPIPolicy(req.context, image,
self.policy).add_image()
ks_quota.enforce_image_count_total(req.context, req.context.owner)
image = image_factory.new_image(extra_properties=extra_properties,
tags=tags, **image)

View File

@ -53,6 +53,28 @@ def check_is_image_mutable(context, image):
raise exception.Forbidden(_('You do not own this image'))
def check_admin_or_same_owner(context, properties):
"""Check that legacy behavior on create with owner is preserved.
Legacy behavior requires a static check that owner is not
inconsistent with the context, unless the caller is an
admin. Enforce that here, if needed.
:param context: A RequestContext
:param properties: The properties being used to create the image, which may
contain an owner
:raises: exception.Forbidden if the context is not an admin and owner is
set to something other than the context's project
"""
if context.is_admin:
return
if context.project_id != properties.get('owner', context.project_id):
msg = _("You are not permitted to create images "
"owned by '%s'.")
raise exception.Forbidden(msg % properties['owner'])
class APIPolicyBase(object):
def __init__(self, context, target=None, enforcer=None):
self._context = context
@ -84,16 +106,53 @@ class APIPolicyBase(object):
class ImageAPIPolicy(APIPolicyBase):
def __init__(self, context, image, enforcer=None):
super(ImageAPIPolicy, self).__init__(context,
policy.ImageTarget(image),
enforcer)
"""Image API policy module.
:param context: The RequestContext
:param image: The ImageProxy object in question, or a dict of image
properties if no image is yet created or needed for
authorization context.
:param enforcer: The policy.Enforcer object to use for enforcement
operations. If not provided (or None), the default
enforcer will be selected.
"""
self._image = image
if not self.is_created:
# NOTE(danms): If we are being called with a dict of image
# properties then we are testing policies that involve
# creating an image or other image-related resources but
# without a specific image for context. The target is a
# dict of proposed image properties, similar to the
# dict-like interface the ImageTarget provides over
# a real Image object, with specific keys.
target = {'project_id': image.get('owner', context.project_id),
'owner': image.get('owner', context.project_id),
'visibility': image.get('visibility', 'private')}
else:
target = policy.ImageTarget(image)
super(ImageAPIPolicy, self).__init__(context, target, enforcer)
@property
def is_created(self):
"""Signal whether the image actually exists or not.
False if the image is only being proposed by a create operation,
True if it has already been created.
"""
return not isinstance(self._image, dict)
def _enforce(self, rule_name):
"""Translate Forbidden->NotFound for images."""
try:
super(ImageAPIPolicy, self)._enforce(rule_name)
except webob.exc.HTTPForbidden:
# If we are checking image policy before creating an
# image, or without a specific image for context, then we
# do not need to potentially hide the presence of anything
# based on visibility, so re-raise immediately.
if not self.is_created:
raise
# If we are checking get_image, then Forbidden means the
# user cannot see this image, so raise NotFound. If we are
# checking anything else and get Forbidden, then raise
@ -143,6 +202,25 @@ class ImageAPIPolicy(APIPolicyBase):
def get_image_location(self):
self._enforce('get_image_location')
def add_image(self):
try:
self._enforce('add_image')
except webob.exc.HTTPForbidden:
# NOTE(danms): If we fail add_image because the owner is
# different, alter the message to be informative and
# in-line with the current message users have been getting
# in the past.
if self._target['owner'] != self._context.project_id:
msg = _("You are not permitted to create images "
"owned by '%s'" % self._target['owner'])
raise webob.exc.HTTPForbidden(msg)
else:
raise
if 'visibility' in self._target:
self._enforce_visibility(self._target['visibility'])
if not CONF.enforce_secure_rbac:
check_admin_or_same_owner(self._context, self._target)
def get_image(self):
self._enforce('get_image')

View File

@ -69,6 +69,9 @@ ADMIN_OR_PROJECT_MEMBER_DOWNLOAD_IMAGE = (
f'role:admin or '
f'({PROJECT_MEMBER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC_OR_SHARED})'
)
ADMIN_OR_PROJECT_MEMBER_CREATE_IMAGE = (
f'role:admin or ({PROJECT_MEMBER} and project_id:%(owner)s)'
)
ADMIN_OR_SHARED_MEMBER = (

View File

@ -23,7 +23,7 @@ The image API now supports roles.
image_policies = [
policy.DocumentedRuleDefault(
name="add_image",
check_str=base.ADMIN_OR_PROJECT_MEMBER,
check_str=base.ADMIN_OR_PROJECT_MEMBER_CREATE_IMAGE,
scope_types=['system', 'project'],
description='Create new image',
operations=[

View File

@ -206,6 +206,36 @@ class TestImagesPolicy(functional.SynchronousAPIBase):
resp = self.api_get('/v2/images/%s' % image_id)
self.assertEqual(404, resp.status_code)
def test_image_create(self):
self.start_server()
# Make sure we can create an image
self.assertEqual(201, self._create().status_code)
# Now disable add_image and make sure we get 403
self.set_policy_rules({'add_image': '!'})
self.assertEqual(403, self._create().status_code)
def test_image_create_by_another(self):
self.start_server()
# NOTE(danms): There is no policy override in this test,
# specifically to test that the defaults (for rbac and
# non-rbac) properly catch the attempt by a non-admin to
# create an image owned by someone else.
image = {'name': 'foo',
'container_format': 'bare',
'disk_format': 'raw',
'owner': 'someoneelse'}
resp = self.api_post('/v2/images',
json=image,
headers={'X-Roles': 'member'})
# Make sure we get the expected owner-specific error message
self.assertIn("You are not permitted to create images "
"owned by 'someoneelse'", resp.text)
def test_image_delete(self):
self.start_server()

View File

@ -223,6 +223,63 @@ class APIImagePolicy(APIPolicyBase):
'get_images',
mock.ANY)
def test_add_image(self):
generic_target = {'project_id': self.context.project_id,
'owner': self.context.project_id,
'visibility': 'private'}
self.policy = policy.ImageAPIPolicy(self.context, {},
enforcer=self.enforcer)
self.policy.add_image()
self.enforcer.enforce.assert_called_once_with(self.context,
'add_image',
generic_target)
def test_add_image_falls_back_to_legacy(self):
self.config(enforce_secure_rbac=False)
self.context.is_admin = False
self.policy = policy.ImageAPIPolicy(self.context, {'owner': 'else'},
enforcer=self.enforcer)
self.assertRaises(exception.Forbidden, self.policy.add_image)
# Make sure we're calling the legacy handler if secure_rbac is False
with mock.patch('glance.api.v2.policy.check_admin_or_same_owner') as m:
self.policy.add_image()
m.assert_called_once_with(self.context, {'project_id': 'else',
'owner': 'else',
'visibility': 'private'})
# Make sure we are not calling the legacy handler if
# secure_rbac is being used. We won't fail the check because
# our enforcer is a mock, just make sure we don't call that handler.
self.config(enforce_secure_rbac=True)
with mock.patch('glance.api.v2.policy.check_admin_or_same_owner') as m:
self.policy.add_image()
m.assert_not_called()
def test_add_image_translates_owner_failure(self):
self.policy = policy.ImageAPIPolicy(self.context, {'owner': 'else'},
enforcer=self.enforcer)
# Make sure add_image works with no exception
self.policy.add_image()
# Make sure we don't get in the way of any other exceptions
self.enforcer.enforce.side_effect = exception.Duplicate
self.assertRaises(exception.Duplicate, self.policy.add_image)
# If the exception is HTTPForbidden and the owner differs,
# make sure we get the proper message translation
self.enforcer.enforce.side_effect = webob.exc.HTTPForbidden('original')
exc = self.assertRaises(webob.exc.HTTPForbidden, self.policy.add_image)
self.assertIn('You are not permitted to create images owned by',
str(exc))
# If the owner does not differ, make sure we get the original reason
self.policy = policy.ImageAPIPolicy(self.context, {},
enforcer=self.enforcer)
exc = self.assertRaises(webob.exc.HTTPForbidden, self.policy.add_image)
self.assertIn('original', str(exc))
def test_delete_image(self):
self.policy.delete_image()
self.enforcer.enforce.assert_called_once_with(self.context,