Add domain proxies that stop unauthorized actions
This patch introduces domain objects that handle context-based authorization checks. With this approach, we can eventually remove ownership checks from the database apis. Part of implementing bp:glance-domain-logic-layer Change-Id: I30c7444220013f17dab6479f1b00f1598ab424d0
This commit is contained in:
parent
1af85e279f
commit
308b832eb1
|
@ -0,0 +1,145 @@
|
|||
# Copyright 2012 OpenStack, LLC
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from glance.common import exception
|
||||
import glance.domain
|
||||
|
||||
|
||||
def is_image_mutable(context, image):
|
||||
"""Return True if the image is mutable in this context."""
|
||||
if context.is_admin:
|
||||
return True
|
||||
|
||||
if image.owner is None or context.owner is None:
|
||||
return False
|
||||
|
||||
return image.owner == context.owner
|
||||
|
||||
|
||||
def proxy_image(context, image):
|
||||
if is_image_mutable(context, image):
|
||||
return image
|
||||
else:
|
||||
return ImmutableImageProxy(image)
|
||||
|
||||
|
||||
class ImageRepoProxy(glance.domain.ImageRepoProxy):
|
||||
|
||||
def __init__(self, image_repo, context):
|
||||
self.context = context
|
||||
self.image_repo = image_repo
|
||||
super(ImageRepoProxy, self).__init__(image_repo)
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
image = self.image_repo.get(*args, **kwargs)
|
||||
return proxy_image(self.context, image)
|
||||
|
||||
def list(self, *args, **kwargs):
|
||||
images = self.image_repo.list(*args, **kwargs)
|
||||
return [proxy_image(self.context, i) for i in images]
|
||||
|
||||
|
||||
class ImageFactoryProxy(object):
|
||||
|
||||
def __init__(self, image_factory, context):
|
||||
self.image_factory = image_factory
|
||||
self.context = context
|
||||
|
||||
def new_image(self, **kwargs):
|
||||
owner = kwargs.pop('owner', self.context.owner)
|
||||
|
||||
if not self.context.is_admin:
|
||||
if owner is None or owner != self.context.owner:
|
||||
message = _("You are not permitted to create images "
|
||||
"owned by '%s'.")
|
||||
raise exception.Forbidden(message % owner)
|
||||
|
||||
return self.image_factory.new_image(owner=owner, **kwargs)
|
||||
|
||||
|
||||
def _immutable_attr(target, attr, proxy=None):
|
||||
|
||||
def get_attr(self):
|
||||
value = getattr(getattr(self, target), attr)
|
||||
if proxy is not None:
|
||||
value = proxy(value)
|
||||
return value
|
||||
|
||||
def forbidden(self, *args, **kwargs):
|
||||
message = _("You are not permitted to modify '%s' on this image.")
|
||||
raise exception.Forbidden(message % attr)
|
||||
|
||||
return property(get_attr, forbidden, forbidden)
|
||||
|
||||
|
||||
class ImmutableProperties(dict):
|
||||
def forbidden_key(self, key, *args, **kwargs):
|
||||
message = _("You are not permitted to modify '%s' on this image.")
|
||||
raise exception.Forbidden(message % key)
|
||||
|
||||
def forbidden(self, *args, **kwargs):
|
||||
message = _("You are not permitted to modify this image.")
|
||||
raise exception.Forbidden(message)
|
||||
|
||||
__delitem__ = forbidden_key
|
||||
__setitem__ = forbidden_key
|
||||
pop = forbidden
|
||||
popitem = forbidden
|
||||
setdefault = forbidden
|
||||
update = forbidden
|
||||
|
||||
|
||||
class ImmutableTags(set):
|
||||
def forbidden(self, *args, **kwargs):
|
||||
message = _("You are not permitted to modify tags on this image.")
|
||||
raise exception.Forbidden(message)
|
||||
|
||||
add = forbidden
|
||||
clear = forbidden
|
||||
difference_update = forbidden
|
||||
intersection_update = forbidden
|
||||
pop = forbidden
|
||||
remove = forbidden
|
||||
symmetric_difference_update = forbidden
|
||||
update = forbidden
|
||||
|
||||
|
||||
class ImmutableImageProxy(object):
|
||||
def __init__(self, base):
|
||||
self.base = base
|
||||
|
||||
name = _immutable_attr('base', 'name')
|
||||
image_id = _immutable_attr('base', 'image_id')
|
||||
name = _immutable_attr('base', 'name')
|
||||
status = _immutable_attr('base', 'status')
|
||||
created_at = _immutable_attr('base', 'created_at')
|
||||
updated_at = _immutable_attr('base', 'updated_at')
|
||||
visibility = _immutable_attr('base', 'visibility')
|
||||
min_disk = _immutable_attr('base', 'min_disk')
|
||||
min_ram = _immutable_attr('base', 'min_ram')
|
||||
protected = _immutable_attr('base', 'protected')
|
||||
location = _immutable_attr('base', 'location')
|
||||
checksum = _immutable_attr('base', 'checksum')
|
||||
owner = _immutable_attr('base', 'owner')
|
||||
disk_format = _immutable_attr('base', 'disk_format')
|
||||
container_format = _immutable_attr('base', 'container_format')
|
||||
size = _immutable_attr('base', 'size')
|
||||
extra_properties = _immutable_attr('base', 'extra_properties',
|
||||
proxy=ImmutableProperties)
|
||||
tags = _immutable_attr('base', 'tags', proxy=ImmutableTags)
|
||||
|
||||
def delete(self):
|
||||
message = _("You are not permitted to delete this image.")
|
||||
raise exception.Forbidden(message)
|
|
@ -19,11 +19,21 @@ import json
|
|||
import stubout
|
||||
import webob
|
||||
|
||||
from glance.api import authorization
|
||||
from glance.common import auth
|
||||
from glance.common import exception
|
||||
import glance.domain
|
||||
from glance.openstack.common import timeutils
|
||||
from glance.tests import utils
|
||||
|
||||
|
||||
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
|
||||
TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'
|
||||
|
||||
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
|
||||
UUID2 = 'a85abd86-55b3-4d5b-b0b4-5d0a6e6042fc'
|
||||
|
||||
|
||||
class FakeResponse(object):
|
||||
"""
|
||||
Simple class that masks the inconsistency between
|
||||
|
@ -544,3 +554,215 @@ class TestEndpoints(utils.BaseTestCase):
|
|||
service_type='object-store',
|
||||
endpoint_region='foo',
|
||||
endpoint_type='internalURL')
|
||||
|
||||
|
||||
class TestImageMutability(utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestImageMutability, self).setUp()
|
||||
self.image_factory = glance.domain.ImageFactory()
|
||||
|
||||
def _is_mutable(self, tenant, owner, is_admin=False):
|
||||
context = glance.context.RequestContext(tenant=tenant,
|
||||
is_admin=is_admin)
|
||||
image = self.image_factory.new_image(owner=owner)
|
||||
return authorization.is_image_mutable(context, image)
|
||||
|
||||
def test_admin_everything_mutable(self):
|
||||
self.assertTrue(self._is_mutable(None, None, is_admin=True))
|
||||
self.assertTrue(self._is_mutable(None, TENANT1, is_admin=True))
|
||||
self.assertTrue(self._is_mutable(TENANT1, None, is_admin=True))
|
||||
self.assertTrue(self._is_mutable(TENANT1, TENANT1, is_admin=True))
|
||||
self.assertTrue(self._is_mutable(TENANT1, TENANT2, is_admin=True))
|
||||
|
||||
def test_no_tenant_nothing_mutable(self):
|
||||
self.assertFalse(self._is_mutable(None, None))
|
||||
self.assertFalse(self._is_mutable(None, TENANT1))
|
||||
|
||||
def test_regular_user(self):
|
||||
self.assertFalse(self._is_mutable(TENANT1, None))
|
||||
self.assertFalse(self._is_mutable(TENANT1, TENANT2))
|
||||
self.assertTrue(self._is_mutable(TENANT1, TENANT1))
|
||||
|
||||
|
||||
class TestImmutableImage(utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestImmutableImage, self).setUp()
|
||||
image_factory = glance.domain.ImageFactory()
|
||||
image = image_factory.new_image(
|
||||
image_id=UUID1,
|
||||
name='Marvin',
|
||||
owner=TENANT1,
|
||||
disk_format='raw',
|
||||
container_format='bare',
|
||||
extra_properties={'foo': 'bar'},
|
||||
tags=['ping', 'pong'],
|
||||
)
|
||||
self.image = authorization.ImmutableImageProxy(image)
|
||||
|
||||
def _test_change(self, attr, value):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
setattr, self.image, attr, value)
|
||||
self.assertRaises(exception.Forbidden,
|
||||
delattr, self.image, attr)
|
||||
|
||||
def test_change_id(self):
|
||||
self._test_change('image_id', UUID2)
|
||||
|
||||
def test_change_name(self):
|
||||
self._test_change('name', 'Freddie')
|
||||
|
||||
def test_change_owner(self):
|
||||
self._test_change('owner', TENANT2)
|
||||
|
||||
def test_change_min_disk(self):
|
||||
self._test_change('min_disk', 100)
|
||||
|
||||
def test_change_min_ram(self):
|
||||
self._test_change('min_ram', 1024)
|
||||
|
||||
def test_change_disk_format(self):
|
||||
self._test_change('disk_format', 'vhd')
|
||||
|
||||
def test_change_container_format(self):
|
||||
self._test_change('container_format', 'ova')
|
||||
|
||||
def test_change_visibility(self):
|
||||
self._test_change('visibility', 'public')
|
||||
|
||||
def test_change_status(self):
|
||||
self._test_change('status', 'active')
|
||||
|
||||
def test_change_created_at(self):
|
||||
self._test_change('created_at', timeutils.utcnow())
|
||||
|
||||
def test_change_updated_at(self):
|
||||
self._test_change('updated_at', timeutils.utcnow())
|
||||
|
||||
def test_change_location(self):
|
||||
self._test_change('location', 'http://a/b/c')
|
||||
|
||||
def test_change_size(self):
|
||||
self._test_change('size', 32)
|
||||
|
||||
def test_change_tags(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
delattr, self.image, 'tags')
|
||||
self.assertRaises(exception.Forbidden,
|
||||
setattr, self.image, 'tags', ['king', 'kong'])
|
||||
self.assertRaises(exception.Forbidden, self.image.tags.pop)
|
||||
self.assertRaises(exception.Forbidden, self.image.tags.clear)
|
||||
self.assertRaises(exception.Forbidden, self.image.tags.add, 'king')
|
||||
self.assertRaises(exception.Forbidden, self.image.tags.remove, 'ping')
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.tags.update, set(['king', 'kong']))
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.tags.intersection_update, set([]))
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.tags.difference_update, set([]))
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.tags.symmetric_difference_update,
|
||||
set([]))
|
||||
|
||||
def test_change_properties(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
delattr, self.image, 'extra_properties')
|
||||
self.assertRaises(exception.Forbidden,
|
||||
setattr, self.image, 'extra_properties', {})
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.extra_properties.__delitem__, 'foo')
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.extra_properties.__setitem__, 'foo', 'b')
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.extra_properties.__setitem__, 'z', 'j')
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.extra_properties.pop)
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.extra_properties.popitem)
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.extra_properties.setdefault, 'p', 'j')
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image.extra_properties.update, {})
|
||||
|
||||
|
||||
class TestImageFactoryProxy(utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestImageFactoryProxy, self).setUp()
|
||||
factory = glance.domain.ImageFactory()
|
||||
self.context = glance.context.RequestContext(tenant=TENANT1)
|
||||
self.image_factory = authorization.ImageFactoryProxy(factory,
|
||||
self.context)
|
||||
|
||||
def test_default_owner_is_set(self):
|
||||
image = self.image_factory.new_image()
|
||||
self.assertEqual(image.owner, TENANT1)
|
||||
|
||||
def test_wrong_owner_cannot_be_set(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image_factory.new_image, owner=TENANT2)
|
||||
|
||||
def test_cannot_set_owner_to_none(self):
|
||||
self.assertRaises(exception.Forbidden,
|
||||
self.image_factory.new_image, owner=None)
|
||||
|
||||
def test_admin_can_set_any_owner(self):
|
||||
self.context.is_admin = True
|
||||
image = self.image_factory.new_image(owner=TENANT2)
|
||||
self.assertEqual(image.owner, TENANT2)
|
||||
|
||||
def test_admin_can_set_owner_to_none(self):
|
||||
self.context.is_admin = True
|
||||
image = self.image_factory.new_image(owner=None)
|
||||
self.assertEqual(image.owner, None)
|
||||
|
||||
def test_admin_still_gets_default_tenant(self):
|
||||
self.context.is_admin = True
|
||||
image = self.image_factory.new_image()
|
||||
self.assertEqual(image.owner, TENANT1)
|
||||
|
||||
|
||||
class TestImageRepoProxy(utils.BaseTestCase):
|
||||
|
||||
class ImageRepoStub(object):
|
||||
def __init__(self, fixtures):
|
||||
self.fixtures = fixtures
|
||||
|
||||
def get(self, image_id):
|
||||
for f in self.fixtures:
|
||||
if f.image_id == image_id:
|
||||
return f
|
||||
else:
|
||||
raise ValueError(image_id)
|
||||
|
||||
def list(self, *args, **kwargs):
|
||||
return self.fixtures
|
||||
|
||||
def setUp(self):
|
||||
super(TestImageRepoProxy, self).setUp()
|
||||
image_factory = glance.domain.ImageFactory()
|
||||
self.fixtures = [
|
||||
image_factory.new_image(owner=TENANT1),
|
||||
image_factory.new_image(owner=TENANT2, visibility='public'),
|
||||
image_factory.new_image(owner=TENANT2),
|
||||
]
|
||||
self.context = glance.context.RequestContext(tenant=TENANT1)
|
||||
image_repo = self.ImageRepoStub(self.fixtures)
|
||||
self.image_repo = authorization.ImageRepoProxy(image_repo,
|
||||
self.context)
|
||||
|
||||
def test_get_mutable_image(self):
|
||||
image = self.image_repo.get(self.fixtures[0].image_id)
|
||||
self.assertTrue(image is self.fixtures[0])
|
||||
|
||||
def test_get_immutable_image(self):
|
||||
image = self.image_repo.get(self.fixtures[1].image_id)
|
||||
self.assertRaises(exception.Forbidden,
|
||||
setattr, image, 'name', 'Vince')
|
||||
|
||||
def test_list(self):
|
||||
images = self.image_repo.list()
|
||||
self.assertTrue(images[0] is self.fixtures[0])
|
||||
self.assertRaises(exception.Forbidden,
|
||||
setattr, images[1], 'name', 'Wally')
|
||||
self.assertRaises(exception.Forbidden,
|
||||
setattr, images[2], 'name', 'Calvin')
|
||||
|
|
Loading…
Reference in New Issue