Image not downloaded without restricted property

If restrict download rule is configured in policy.json, and image is
added without protected property mentioned in "restricted" rule, then
normal users (other than admin) are not able to download the image.

Added logic in policy.py, to allow normal user to download image
without restricted property.

https://review.openstack.org/#/c/127923/
Above patch will fix the issue for master, but as it is too large to
be back-ported to stable/juno, Nikhil recommended to push only minimal
changes that fixes this issue after discussing it with the release manager.

In this patch, I have copied required changes from oslo-incubator
policy module from commit 33533b0d97639ed828a2fd5e874f16eb1ecfeaa4.
Note: In juno release, oslo-incubator policy module was not synced
with glance completely. 

Closes-Bug: #1387973
Change-Id: Ib85901b073c85ede7da2e9c7333426c8b8de3bb6
This commit is contained in:
abhishekkekane 2014-11-11 12:00:27 -08:00 committed by Abhishek Kekane
parent 41e143b217
commit c5e302ef22
5 changed files with 286 additions and 2 deletions

View File

@ -774,8 +774,12 @@ class GenericCheck(Check):
role:compute:admin
"""
# TODO(termie): do dict inspection via dot syntax
match = self.match % target
try:
match = self.match % target
except KeyError:
# While doing GenericCheck if key not
# present in Target return false
return False
try:
# Try to interpret self.kind as a literal

View File

@ -713,3 +713,55 @@ class TestApi(functional.FunctionalTest):
self.assertEqual(response.status, 404)
self.stop_servers()
def test_download_image_with_no_restricted_property_set_to_image(self):
"""
We test the following sequential series of actions:
0. POST /images with public image named Image1
and no custom properties
- Verify 201 returned
1. GET image
2. DELETE image1
- Delete the newly added image
"""
self.cleanup()
rules = {"context_is_admin": "role:admin",
"default": "",
"restricted":
"not ('test_key':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
image_data = "*" * FIVE_KB
headers = minimal_headers('Image1')
headers.update({'X-Roles': 'member'})
path = "http://%s:%d/v1/images" % ("127.0.0.1", self.api_port)
http = httplib2.Http()
response, content = http.request(path, 'POST', headers=headers,
body=image_data)
self.assertEqual(201, response.status)
data = jsonutils.loads(content)
image_id = data['image']['id']
self.assertEqual(data['image']['checksum'],
hashlib.md5(image_data).hexdigest())
self.assertEqual(data['image']['size'], FIVE_KB)
self.assertEqual(data['image']['name'], "Image1")
self.assertTrue(data['image']['is_public'])
# 1. GET image
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'GET')
self.assertEqual(200, response.status)
# 2. DELETE image1
path = "http://%s:%d/v1/images/%s" % ("127.0.0.1", self.api_port,
image_id)
http = httplib2.Http()
response, content = http.request(path, 'DELETE')
self.assertEqual(200, response.status)
self.stop_servers()

View File

@ -591,6 +591,71 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_download_image_with_no_restricted_property_set_to_image(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"restricted":
"not ('test_key':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
# Create an image
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Returned image entity
image = jsonutils.loads(response.text)
image_id = image['id']
expected_image = {
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
'min_disk': 0,
'min_ram': 0,
'schema': '/v2/schemas/image',
}
for key, value in six.iteritems(expected_image):
self.assertEqual(image[key], value, key)
# Upload data to image
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream'})
response = requests.put(path, headers=headers, data='ZZZZZ')
self.assertEqual(204, response.status_code)
# Get an image
path = self._url('/v2/images/%s/file' % image_id)
headers = self._headers({'Content-Type': 'application/octet-stream',
'X-Roles': '_member_'})
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
# Image Deletion should work
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(204, response.status_code)
# This image should be no longer be directly accessible
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(404, response.status_code)
self.stop_servers()
def test_download_image_not_allowed_using_restricted_policy(self):
rules = {

View File

@ -565,6 +565,76 @@ class TestCacheMiddlewareProcessRequest(base.IsolatedUnitTest):
actual = cache_filter.process_request(request)
self.assertTrue(actual)
def test_v1_process_request_download_without_restricted_property(self):
"""
Test process_request for v1 api where _member_ role able to
download the image if restricted property is not added to the image.
"""
image_id = 'test1'
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': ''
}
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
actual = cache_filter.process_request(request)
self.assertTrue(actual)
def test_v2_process_request_download_without_restricted_property(self):
"""
Test process_request for v2 api where member role able to
download the image if restricted property is not added to the image.
"""
image_id = 'test1'
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['_member_'])
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
actual = cache_filter.process_request(request)
self.assertTrue(actual)
class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
def test_process_v1_DELETE_response(self):
@ -830,3 +900,84 @@ class TestCacheMiddlewareProcessResponse(base.IsolatedUnitTest):
resp = webob.Response(request=request)
actual = cache_filter.process_response(resp)
self.assertEqual(actual, resp)
def test_v1_process_response_download_without_restricted_property(self):
"""
Test process_response for v1 api where _member_ role able to
download the image if restricted property is not added to the image.
"""
image_id = 'test1'
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v1')
def fake_get_v1_image_metadata(*args, **kwargs):
return {
'id': image_id,
'name': 'fake_image',
'status': 'active',
'created_at': '',
'min_disk': '10G',
'min_ram': '1024M',
'protected': False,
'locations': '',
'checksum': 'c1234',
'owner': '',
'disk_format': 'raw',
'container_format': 'bare',
'size': '123456789',
'virtual_size': '123456789',
'is_public': 'public',
'deleted': False,
'updated_at': ''
}
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v1_image_metadata = fake_get_v1_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v1/images/%s' % image_id)
request.context = context.RequestContext(roles=['_member_'])
resp = webob.Response(request=request)
actual = cache_filter.process_response(resp)
self.assertEqual(actual, resp)
def test_v2_process_response_download_without_restricted_property(self):
"""
Test process_response for v2 api where member role able to
download the image if restricted property is not added to the image.
"""
image_id = 'test1'
def fake_fetch_request_info(*args, **kwargs):
return ('test1', 'GET', 'v2')
def fake_get_v2_image_metadata(*args, **kwargs):
image = ImageStub(image_id)
request.environ['api.cache.image'] = image
return glance.api.policy.ImageTarget(image)
cache_filter = ProcessRequestTestCacheFilter()
cache_filter._fetch_request_info = fake_fetch_request_info
cache_filter._get_v2_image_metadata = fake_get_v2_image_metadata
rules = {
"restricted":
"not ('test_1234':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
cache_filter.policy = glance.api.policy.Enforcer()
request = webob.Request.blank('/v2/images/test1/file')
request.context = context.RequestContext(roles=['_member_'])
resp = webob.Response(request=request)
actual = cache_filter.process_response(resp)
self.assertEqual(actual, resp)

View File

@ -2549,6 +2549,18 @@ class TestGlanceAPI(base.IsolatedUnitTest):
res = req.get_response(self.api)
self.assertEqual(res.status_int, 403)
def test_show_image_download_without_restricted_policy(self):
rules = {"context_is_admin": "role:admin",
"default": "",
"restricted":
"not ('test_key':%(x_test_key)s and role:_member_)",
"download_image": "role:admin or rule:restricted"}
self.set_policy_rules(rules)
req = webob.Request.blank("/images/%s" % UUID2)
req.headers['X-Auth-Token'] = 'user:tenant:_member_'
res = req.get_response(self.api)
self.assertEqual(res.status_int, 200)
def test_delete_image(self):
req = webob.Request.blank("/images/%s" % UUID2)
req.method = 'DELETE'