Check policies for Image Cache in API

This patch enforces policy checks required for caching images
in API layer.

Partially-Implements: blueprint policy-refactor
Depends-On: https://review.opendev.org/c/openstack/nova/+/688802

Change-Id: Ie17b8f5bf308b8f07915ea18ace9b49955b8f0f0
This commit is contained in:
Abhishek Kekane 2021-08-24 09:06:58 +00:00
parent ab0c95da68
commit ea13046919
7 changed files with 306 additions and 4 deletions

View File

@ -21,6 +21,7 @@ from oslo_log import log as logging
import webob.exc
from glance.api import policy
from glance.api.v2 import policy as api_policy
from glance.common import exception
from glance.common import wsgi
from glance import image_cache
@ -40,7 +41,8 @@ class CacheController(object):
def _enforce(self, req):
"""Authorize request against 'manage_image_cache' policy"""
try:
self.policy.enforce(req.context, 'manage_image_cache', {})
api_policy.CacheImageAPIPolicy(
req.context, enforcer=self.policy).manage_image_cache()
except exception.Forbidden:
LOG.debug("User not permitted to manage the image cache")
raise webob.exc.HTTPForbidden()

View File

@ -104,6 +104,17 @@ class APIPolicyBase(object):
return False
class CacheImageAPIPolicy(APIPolicyBase):
def __init__(self, context, target=None, enforcer=None):
self._context = context
self._target = target or {}
self.enforcer = enforcer or policy.Enforcer()
super(CacheImageAPIPolicy, self).__init__(context, target, enforcer)
def manage_image_cache(self):
self._enforce('manage_image_cache')
class ImageAPIPolicy(APIPolicyBase):
def __init__(self, context, image, enforcer=None):
"""Image API policy module.

View File

@ -43,7 +43,7 @@ class Prefetcher(base.CacheApp):
ctx = context.RequestContext(is_admin=True, show_deleted=True)
try:
image_repo = self.gateway.get_repo(ctx)
image_repo = self.gateway.get_repo(ctx, authorization_layer=False)
image = image_repo.get(image_id)
except exception.NotFound:
LOG.warning(_LW("Image '%s' not found"), image_id)

View File

@ -1540,6 +1540,14 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
[filter:fakeauth]
paste.filter_factory = glance.tests.utils:\
FakeAuthMiddleware.factory
[filter:cache]
paste.filter_factory = glance.api.middleware.cache:\
CacheFilter.factory
[filter:cachemanage]
paste.filter_factory = glance.api.middleware.cache_manage:\
CacheManageFilter.factory
[pipeline:glance-api-cachemanagement]
pipeline = context cache cachemanage rootapp
[pipeline:glance-api]
pipeline = context rootapp
[composite:rootapp]
@ -1588,7 +1596,7 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
self.setup_simple_paste()
self.setup_stores()
def start_server(self):
def start_server(self, enable_cache=False):
"""Builds and "starts" the API server.
Note that this doesn't actually "start" anything like
@ -1596,7 +1604,12 @@ class SynchronousAPIBase(test_utils.BaseTestCase):
to make it seem like the same sort of pattern.
"""
config.set_config_defaults()
self.api = config.load_paste_app('glance-api',
root_app = 'glance-api'
if enable_cache:
root_app = 'glance-api-cachemanagement'
self.config(image_cache_dir=self._store_dir('cache'))
self.api = config.load_paste_app(root_app,
conf_file=self.paste_config)
secure_rbac = bool(os.getenv('OS_GLANCE_TEST_RBAC_DEFAULTS'))
self.config(enforce_secure_rbac=secure_rbac)

View File

@ -0,0 +1,242 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslo_policy.policy
from glance.api import policy
from glance.image_cache import prefetcher
from glance.tests import functional
class TestCacheImagesPolicy(functional.SynchronousAPIBase):
def setUp(self):
super(TestCacheImagesPolicy, self).setUp()
self.policy = policy.Enforcer(suppress_deprecation_warnings=True)
def set_policy_rules(self, rules):
self.policy.set_rules(
oslo_policy.policy.Rules.from_dict(rules),
overwrite=True)
def start_server(self):
with mock.patch.object(policy, 'Enforcer') as mock_enf:
mock_enf.return_value = self.policy
super(TestCacheImagesPolicy, self).start_server(enable_cache=True)
def _create_upload_and_cache(self, cache_image=False,
expected_code=200):
image_id = self._create_and_upload()
# Queue image for caching
path = '/v2/queued_images/%s' % image_id
response = self.api_put(path)
self.assertEqual(expected_code, response.status_code)
if cache_image:
# NOTE(abhishekk): Here we are not running periodic job which
# caches queued images as precaching is not part of this
# patch, so to test all caching operations we are using this
# way to cache images for us
cache_prefetcher = prefetcher.Prefetcher()
cache_prefetcher.run()
return image_id
def test_queued_images(self):
self.start_server()
# Verify that you can queue image for caching
self._create_upload_and_cache(expected_code=200)
# Now disable manage_image_cache to ensure you will get
# 403 Forbidden error
self.set_policy_rules({
'manage_image_cache': '!',
'add_image': '',
'upload_image': ''
})
self._create_upload_and_cache(expected_code=403)
def test_get_queued_images(self):
self.start_server()
# Create image and queue it for caching
image_id = self._create_upload_and_cache()
# make sure you are able to get queued images
path = '/v2/queued_images'
response = self.api_get(path)
self.assertEqual(200, response.status_code)
output = response.json
self.assertIn(image_id, output['queued_images'])
# Now disable manage_image_cache to ensure you will get
# 403 Forbidden error
self.set_policy_rules({
'manage_image_cache': '!'
})
response = self.api_get(path)
self.assertEqual(403, response.status_code)
def test_delete_queued_image(self):
self.start_server()
# Create image and queue it for caching
image_id = self._create_upload_and_cache()
# Create another image while you can
second_image_id = self._create_upload_and_cache()
# make sure you are able to delete queued image
path = '/v2/queued_images/%s' % image_id
response = self.api_delete(path)
self.assertEqual(200, response.status_code)
# verify image is deleted from queue list
path = '/v2/queued_images'
response = self.api_get(path)
output = response.json
self.assertNotIn(image_id, output['queued_images'])
# Now disable manage_image_cache to ensure you will get
# 403 Forbidden error
self.set_policy_rules({
'manage_image_cache': '!'
})
path = '/v2/queued_images/%s' % second_image_id
response = self.api_delete(path)
self.assertEqual(403, response.status_code)
def test_delete_queued_images(self):
self.start_server()
# Create image and queue it for caching
self._create_upload_and_cache()
# Create another image while you can
self._create_upload_and_cache()
# make sure you are able to delete queued image
path = '/v2/queued_images'
response = self.api_delete(path)
self.assertEqual(200, response.status_code)
# verify images are deleted from queue list
path = '/v2/queued_images'
response = self.api_get(path)
output = response.json
self.assertEqual([], output['queued_images'])
# Create another image and queue it for caching
image_id = self._create_upload_and_cache()
# Now disable manage_image_cache to ensure you will get
# 403 Forbidden error
self.set_policy_rules({
'manage_image_cache': '!'
})
path = '/v2/queued_images'
response = self.api_delete(path)
self.assertEqual(403, response.status_code)
# Verify that image is still present in queue list
self.set_policy_rules({
'manage_image_cache': '',
})
path = '/v2/queued_images'
response = self.api_get(path)
output = response.json
self.assertIn(image_id, output['queued_images'])
def test_get_cached_images(self):
self.start_server()
# Create image and cache it
image_id = self._create_upload_and_cache(cache_image=True)
# make sure you are able to get cached images
path = '/v2/cached_images'
response = self.api_get(path)
self.assertEqual(200, response.status_code)
output = response.json
self.assertEqual(image_id, output['cached_images'][0]['image_id'])
# Now disable manage_image_cache to ensure you will get
# 403 Forbidden error
self.set_policy_rules({
'manage_image_cache': '!'
})
response = self.api_get(path)
self.assertEqual(403, response.status_code)
def test_delete_cached_image(self):
self.start_server()
# Create image and cache it
image_id = self._create_upload_and_cache(cache_image=True)
# Create another image while you can
second_image_id = self._create_upload_and_cache(cache_image=True)
# make sure you are able to delete cached image
path = '/v2/cached_images/%s' % image_id
response = self.api_delete(path)
self.assertEqual(200, response.status_code)
# verify image is deleted from cached list
path = '/v2/cached_images'
response = self.api_get(path)
output = response.json
self.assertEqual(1, len(output['cached_images']))
# Now disable manage_image_cache to ensure you will get
# 403 Forbidden error
self.set_policy_rules({
'manage_image_cache': '!'
})
path = '/v2/cached_images/%s' % second_image_id
response = self.api_delete(path)
self.assertEqual(403, response.status_code)
def test_delete_cached_images(self):
self.start_server()
# Create image and cache it
self._create_upload_and_cache(cache_image=True)
# Create another image while you can
self._create_upload_and_cache(cache_image=True)
# make sure you are able to delete cached image
path = '/v2/cached_images'
response = self.api_delete(path)
self.assertEqual(200, response.status_code)
# verify images are deleted from cached list
path = '/v2/cached_images'
response = self.api_get(path)
output = response.json
self.assertEqual(0, len(output['cached_images']))
# Create another image and cache it
self._create_upload_and_cache(cache_image=True)
# Now disable manage_image_cache to ensure you will get
# 403 Forbidden error
self.set_policy_rules({
'manage_image_cache': '!'
})
path = '/v2/cached_images'
response = self.api_delete(path)
self.assertEqual(403, response.status_code)
# Verify that image is still present in cache
self.set_policy_rules({
'manage_image_cache': '',
})
path = '/v2/cached_images'
response = self.api_get(path)
output = response.json
self.assertEqual(1, len(output['cached_images']))

View File

@ -17,6 +17,7 @@ from contextlib import contextmanager
import datetime
import os
import time
from unittest import mock
import fixtures
from oslo_utils import secretutils
@ -27,6 +28,7 @@ from six.moves import range
from glance.common import exception
from glance import image_cache
from glance.image_cache import prefetcher
from glance.tests import utils as test_utils
from glance.tests.utils import skip_if_disabled
from glance.tests.utils import xattr_writes_supported
@ -564,3 +566,20 @@ class TestImageCacheNoDep(test_utils.BaseTestCase):
caching_iter = cache.get_caching_iter('dummy_id', None, iter(data))
self.assertEqual(data, list(caching_iter))
class TestImagePrefetcher(test_utils.BaseTestCase):
def setUp(self):
super(TestImagePrefetcher, self).setUp()
self.cache_dir = self.useFixture(fixtures.TempDir()).path
self.config(image_cache_dir=self.cache_dir,
image_cache_driver='xattr',
image_cache_max_size=5 * units.Ki)
self.prefetcher = prefetcher.Prefetcher()
def test_fetch_image_into_cache_without_auth(self):
with mock.patch.object(self.prefetcher.gateway,
'get_repo') as mock_get:
self.prefetcher.fetch_image_into_cache('fake-image-id')
mock_get.assert_called_once_with(mock.ANY,
authorization_layer=False)

View File

@ -738,3 +738,18 @@ class TestTasksAPIPolicy(APIPolicyBase):
self.enforcer.enforce.assert_called_once_with(self.context,
'tasks_api_access',
mock.ANY)
class TestCacheImageAPIPolicy(APIPolicyBase):
def setUp(self):
super(TestCacheImageAPIPolicy, self).setUp()
self.enforcer = mock.MagicMock()
self.context = mock.MagicMock()
self.policy = policy.CacheImageAPIPolicy(
self.context, enforcer=self.enforcer)
def test_manage_image_cache(self):
self.policy.manage_image_cache()
self.enforcer.enforce.assert_called_once_with(self.context,
'manage_image_cache',
mock.ANY)