Make ImageTarget behave like a dictionary

This is required because oslo_policy's 'enforce' method expects a dict-like
object as its second argument.

Change-Id: I9187b6805d3b2cd351189e34dd2f9db3158f6b8d
Closes-Bug: #1720354
(cherry-picked from commit 3134ee07b2)
This commit is contained in:
Cyril Roelandt 2017-10-13 23:22:16 +02:00
parent 0a2074ecef
commit 2ae4541f3e
2 changed files with 52 additions and 11 deletions

View File

@ -16,6 +16,7 @@
"""Policy Engine For Glance"""
import collections
import copy
from oslo_config import cfg
@ -106,7 +107,8 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
self.policy.enforce(self.context, 'get_image', {})
raise
else:
self.policy.enforce(self.context, 'get_image', ImageTarget(image))
self.policy.enforce(self.context, 'get_image',
dict(ImageTarget(image)))
return image
def list(self, *args, **kwargs):
@ -114,11 +116,11 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
return super(ImageRepoProxy, self).list(*args, **kwargs)
def save(self, image, from_state=None):
self.policy.enforce(self.context, 'modify_image', image.target)
self.policy.enforce(self.context, 'modify_image', dict(image.target))
return super(ImageRepoProxy, self).save(image, from_state=from_state)
def add(self, image):
self.policy.enforce(self.context, 'add_image', image.target)
self.policy.enforce(self.context, 'add_image', dict(image.target))
return super(ImageRepoProxy, self).add(image)
@ -166,7 +168,7 @@ class ImageProxy(glance.domain.proxy.Image):
self.image.locations = new_locations
def delete(self):
self.policy.enforce(self.context, 'delete_image', self.target)
self.policy.enforce(self.context, 'delete_image', dict(self.target))
return self.image.delete()
def deactivate(self):
@ -378,7 +380,7 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
task_proxy_kwargs=proxy_kwargs)
class ImageTarget(object):
class ImageTarget(collections.Mapping):
SENTINEL = object()
def __init__(self, target):
@ -387,6 +389,9 @@ class ImageTarget(object):
:param target: Object being targeted
"""
self.target = target
self._target_keys = [k for k in dir(ImageProxy)
if not k.startswith('__')
if not callable(getattr(ImageProxy, k))]
def __getitem__(self, key):
"""Return the value of 'key' from the target.
@ -406,6 +411,23 @@ class ImageTarget(object):
value = None
return value
def get(self, key, default=None):
try:
return self.__getitem__(key)
except KeyError:
return default
def __len__(self):
length = len(self._target_keys)
length += len(getattr(self.target, 'extra_properties', {}))
return length
def __iter__(self):
for key in self._target_keys:
yield key
for key in getattr(self.target, 'extra_properties', {}).keys():
yield key
def key_transforms(self, key):
if key == 'id':
key = 'image_id'

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import os.path
import mock
@ -29,6 +30,12 @@ from glance.tests import utils as test_utils
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
class IterableMock(mock.Mock, collections.Iterable):
def __iter__(self):
while False:
yield None
class ImageRepoStub(object):
def get(self, *args, **kwargs):
return 'image_from_get'
@ -57,6 +64,18 @@ class ImageStub(object):
self.disk_format = disk_format
self.status = status
self.extra_properties = extra_properties
self.checksum = 'c2e5db72bd7fd153f53ede5da5a06de3'
self.created_at = '2013-09-28T15:27:36Z'
self.updated_at = '2013-09-28T15:27:37Z'
self.locations = []
self.min_disk = 0
self.min_ram = 0
self.name = 'image_name'
self.owner = 'tenant1'
self.protected = False
self.size = 0
self.virtual_size = 0
self.tags = []
def delete(self):
self.status = 'deleted'
@ -266,24 +285,24 @@ class TestImagePolicy(test_utils.BaseTestCase):
def test_delete_image_allowed(self):
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
args = dict(image.target)
image.delete()
self.assertEqual('deleted', image.status)
self.policy.enforce.assert_called_once_with({}, "delete_image",
image.target)
self.policy.enforce.assert_called_once_with({}, "delete_image", args)
def test_get_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_target = mock.Mock()
image_target = IterableMock()
with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
target.return_value = image_target
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
self.assertRaises(exception.Forbidden, image_repo.get, UUID1)
self.policy.enforce.assert_called_once_with({}, "get_image",
image_target)
dict(image_target))
def test_get_image_allowed(self):
image_target = mock.Mock()
image_target = IterableMock()
with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
target.return_value = image_target
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
@ -292,7 +311,7 @@ class TestImagePolicy(test_utils.BaseTestCase):
self.assertIsInstance(output, glance.api.policy.ImageProxy)
self.assertEqual('image_from_get', output.image)
self.policy.enforce.assert_called_once_with({}, "get_image",
image_target)
dict(image_target))
def test_get_images_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden