Property Protection Layer

This patch introduces the property protection layer and the tests
associated with it

Related to bp api-v2-property-protection

Change-Id: Iaf2111bf07377ef6a4c89a621888077cc14be776
This commit is contained in:
iccha.sethi 2013-08-22 17:10:20 -04:00
parent e8440d1ee8
commit 262bdf68b4
7 changed files with 557 additions and 3 deletions

View File

@ -0,0 +1,130 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from glance.common import exception
import glance.domain.proxy
class ProtectedImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, image_factory, context, property_rules):
self.image_factory = image_factory
self.context = context
self.roles = self.context.roles
self.property_rules = property_rules
kwargs = {'context': self.context,
'property_rules': self.property_rules}
super(ProtectedImageFactoryProxy, self).__init__(
image_factory,
proxy_class=ProtectedImageProxy,
proxy_kwargs=kwargs)
def new_image(self, **kwargs):
extra_props = kwargs.pop('extra_properties', {})
extra_properties = {}
for key in extra_props.keys():
if self.property_rules.check_property_rules(key, 'create',
self.roles):
extra_properties[key] = extra_props[key]
else:
raise exception.ReservedProperty(property=key)
return super(ProtectedImageFactoryProxy, self).\
new_image(extra_properties=extra_properties, **kwargs)
class ProtectedImageRepoProxy(glance.domain.proxy.Repo):
def __init__(self, image_repo, context, property_rules):
self.context = context
self.image_repo = image_repo
self.property_rules = property_rules
proxy_kwargs = {'context': self.context}
super(ProtectedImageRepoProxy, self).__init__(
image_repo, item_proxy_class=ProtectedImageProxy,
item_proxy_kwargs=proxy_kwargs)
def get(self, image_id):
return ProtectedImageProxy(self.image_repo.get(image_id),
self.context, self.property_rules)
def list(self, *args, **kwargs):
images = self.image_repo.list(*args, **kwargs)
return [ProtectedImageProxy(image, self.context, self.property_rules)
for image in images]
class ProtectedImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, property_rules):
self.image = image
self.context = context
self.roles = self.context.roles
self.property_rules = property_rules
self.image.extra_properties = ExtraPropertiesProxy(
self.roles,
self.image.extra_properties,
self.property_rules)
super(ProtectedImageProxy, self).__init__(self.image)
class ExtraPropertiesProxy(glance.domain.ExtraProperties):
def __init__(self, roles, extra_props, property_rules):
self.roles = roles
self.property_rules = property_rules
extra_properties = {}
for key in extra_props.keys():
if self.property_rules.check_property_rules(key, 'read',
self.roles):
extra_properties[key] = extra_props[key]
super(ExtraPropertiesProxy, self).__init__(extra_properties)
def __getitem__(self, key):
if self.property_rules.check_property_rules(key, 'read', self.roles):
return dict.__getitem__(self, key)
else:
raise KeyError
def __setitem__(self, key, value):
# NOTE(isethi): Exceptions are raised only for actions update, delete
# and create, where the user proactively interacts with the properties.
# A user cannot request to read a specific property, hence reads do
# raise an exception
try:
if self.__getitem__(key):
if self.property_rules.check_property_rules(key, 'update',
self.roles):
return dict.__setitem__(self, key, value)
else:
raise exception.ReservedProperty(property=key)
except KeyError:
if self.property_rules.check_property_rules(key, 'create',
self.roles):
return dict.__setitem__(self, key, value)
else:
raise exception.ReservedProperty(property=key)
def __delitem__(self, key):
if not super(ExtraPropertiesProxy, self).__getitem__(key):
raise KeyError
if self.property_rules.check_property_rules(key, 'delete',
self.roles):
return dict.__delitem__(self, key)
else:
raise exception.ReservedProperty(property=key)

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
from oslo.config import cfg
from glance.common import exception
@ -101,7 +102,8 @@ class Image(object):
self.disk_format = kwargs.pop('disk_format', None)
self.container_format = kwargs.pop('container_format', None)
self.size = kwargs.pop('size', None)
self.extra_properties = kwargs.pop('extra_properties', None) or {}
extra_properties = kwargs.pop('extra_properties', None) or {}
self.extra_properties = ExtraProperties(extra_properties)
self.tags = kwargs.pop('tags', None) or []
if kwargs:
message = "__init__() got unexpected keyword argument '%s'"
@ -156,6 +158,32 @@ class Image(object):
raise NotImplementedError()
class ExtraProperties(collections.MutableMapping, dict):
def __getitem__(self, key):
return dict.__getitem__(self, key)
def __setitem__(self, key, value):
return dict.__setitem__(self, key, value)
def __delitem__(self, key):
return dict.__delitem__(self, key)
def __eq__(self, other):
if isinstance(other, ExtraProperties):
return dict(self).__eq__(dict(other))
elif isinstance(other, dict):
return dict(self).__eq__(other)
else:
return False
def __len__(self):
return dict(self).__len__()
def keys(self):
return dict(self).keys()
class ImageMembership(object):
def __init__(self, image_id, member_id, created_at, updated_at,

View File

@ -4,6 +4,36 @@ read = admin,member
update = admin,member
delete = admin,member
[spl_create_prop]
create = admin,spl_role
read = admin,spl_role
update = admin
delete = admin
[spl_read_prop]
create = admin,spl_role
read = admin,spl_role
update = admin
delete = admin
[spl_read_only_prop]
create = admin
read = admin,spl_role
update = admin
delete = admin
[spl_update_prop]
create = admin,spl_role
read = admin,spl_role
update = admin,spl_role
delete = admin
[spl_delete_prop]
create = admin,spl_role
read = admin,spl_role
update = admin
delete = admin,spl_role
[.*]
create = admin
read = admin

View File

@ -0,0 +1,281 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from glance.api import property_protections
from glance.common import exception
from glance.common import property_utils
import glance.domain
from glance.tests import utils
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'
class TestProtectedImageRepoProxy(utils.BaseTestCase):
class ImageRepoStub(object):
def __init__(self, fixtures):
self.fixtures = fixtures
def get(self, image_id):
for f in self.fixtures:
if f.image_id == image_id:
return f
else:
raise ValueError(image_id)
def list(self, *args, **kwargs):
return self.fixtures
def add(self, image):
self.fixtures.append(image)
def setUp(self):
super(TestProtectedImageRepoProxy, self).setUp()
self.set_property_protections()
self.property_rules = property_utils.PropertyRules()
self.image_factory = glance.domain.ImageFactory()
extra_props = {'spl_create_prop': 'c',
'spl_read_prop': 'r',
'spl_update_prop': 'u',
'spl_delete_prop': 'd',
'forbidden': 'prop'}
extra_props_2 = {'spl_read_prop': 'r', 'forbidden': 'prop'}
self.fixtures = [
self.image_factory.new_image(image_id='1', owner=TENANT1,
extra_properties=extra_props),
self.image_factory.new_image(owner=TENANT2, visibility='public'),
self.image_factory.new_image(image_id='3', owner=TENANT1,
extra_properties=extra_props_2),
]
self.context = glance.context.RequestContext(roles=['spl_role'])
image_repo = self.ImageRepoStub(self.fixtures)
self.image_repo = property_protections.ProtectedImageRepoProxy(
image_repo, self.context, self.property_rules)
def test_get_image(self):
image_id = '1'
result_image = self.image_repo.get(image_id)
result_extra_props = result_image.extra_properties
self.assertEqual(result_extra_props['spl_create_prop'], 'c')
self.assertEqual(result_extra_props['spl_read_prop'], 'r')
self.assertEqual(result_extra_props['spl_update_prop'], 'u')
self.assertEqual(result_extra_props['spl_delete_prop'], 'd')
self.assertFalse('forbidden' in result_extra_props.keys())
def test_list_image(self):
result_images = self.image_repo.list()
self.assertEquals(len(result_images), 3)
result_extra_props = result_images[0].extra_properties
self.assertEqual(result_extra_props['spl_create_prop'], 'c')
self.assertEqual(result_extra_props['spl_read_prop'], 'r')
self.assertEqual(result_extra_props['spl_update_prop'], 'u')
self.assertEqual(result_extra_props['spl_delete_prop'], 'd')
self.assertFalse('forbidden' in result_extra_props.keys())
result_extra_props = result_images[1].extra_properties
self.assertEqual(result_extra_props, {})
result_extra_props = result_images[2].extra_properties
self.assertEqual(result_extra_props['spl_read_prop'], 'r')
self.assertFalse('forbidden' in result_extra_props.keys())
class TestProtectedImageProxy(utils.BaseTestCase):
def setUp(self):
super(TestProtectedImageProxy, self).setUp()
self.set_property_protections()
self.property_rules = property_utils.PropertyRules()
class ImageStub(object):
def __init__(self, extra_prop):
self.extra_properties = extra_prop
def test_read_image_with_extra_prop(self):
context = glance.context.RequestContext(roles=['spl_role'])
extra_prop = {'spl_read_prop': 'read', 'spl_fake_prop': 'prop'}
image = self.ImageStub(extra_prop)
result_image = property_protections.ProtectedImageProxy(
image, context, self.property_rules)
result_extra_props = result_image.extra_properties
self.assertEqual(result_extra_props['spl_read_prop'], 'read')
self.assertFalse('spl_fake_prop' in result_extra_props.keys())
class TestExtraPropertiesProxy(utils.BaseTestCase):
def setUp(self):
super(TestExtraPropertiesProxy, self).setUp()
self.set_property_protections()
self.property_rules = property_utils.PropertyRules()
def test_read_extra_property_as_admin_role(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['admin']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
test_result = extra_prop_proxy['foo']
self.assertEqual(test_result, 'bar')
def test_read_extra_property_as_unpermitted_role(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['unpermitted_role']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo')
def test_update_extra_property_as_permitted_role_after_read(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['admin']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
extra_prop_proxy['foo'] = 'par'
self.assertEqual(extra_prop_proxy['foo'], 'par')
def test_update_extra_property_as_unpermitted_role_after_read(self):
extra_properties = {'spl_read_prop': 'bar'}
roles = ['spl_role']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__,
'spl_read_prop', 'par')
def test_update_reserved_extra_property_as_permitted_role(self):
extra_properties = {'spl_create_prop': 'bar'}
roles = ['spl_role']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__, 'spl_create_prop',
'par')
def test_create_extra_property_as_permitted_role_after_read(self):
extra_properties = {}
roles = ['admin']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
extra_prop_proxy['boo'] = 'doo'
self.assertEqual(extra_prop_proxy['boo'], 'doo')
def test_create_reserved_extra_property_as_permitted_role(self):
extra_properties = {}
roles = ['spl_role']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__, 'boo',
'doo')
def test_delete_extra_property_as_admin_role(self):
extra_properties = {'foo': 'bar'}
roles = ['admin']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
del extra_prop_proxy['foo']
self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo')
def test_delete_nonexistant_extra_property_as_admin_role(self):
extra_properties = {}
roles = ['admin']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
self.assertRaises(KeyError, extra_prop_proxy.__delitem__, 'foo')
def test_delete_reserved_extra_property_as_permitted_role(self):
extra_properties = {'spl_read_prop': 'r'}
roles = ['spl_role']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
# Ensure property has been created and can be read
self.assertEqual(extra_prop_proxy['spl_read_prop'], 'r')
self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__delitem__, 'spl_read_prop')
def test_delete_nonexistant_extra_property_as_permitted_role(self):
extra_properties = {}
roles = ['spl_role']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
self.assertRaises(KeyError,
extra_prop_proxy.__delitem__, 'spl_read_prop')
class TestProtectedImageFactoryProxy(utils.BaseTestCase):
def setUp(self):
super(TestProtectedImageFactoryProxy, self).setUp()
self.set_property_protections()
self.property_rules = property_utils.PropertyRules()
self.factory = glance.domain.ImageFactory()
def test_create_image_no_extra_prop(self):
self.context = glance.context.RequestContext(tenant=TENANT1,
roles=['spl_role'])
self.image_factory = property_protections.ProtectedImageFactoryProxy(
self.factory, self.context,
self.property_rules)
extra_props = {}
image = self.image_factory.new_image(extra_properties=extra_props)
expected_extra_props = {}
self.assertEqual(image.extra_properties, expected_extra_props)
def test_create_image_extra_prop(self):
self.context = glance.context.RequestContext(tenant=TENANT1,
roles=['spl_role'])
self.image_factory = property_protections.ProtectedImageFactoryProxy(
self.factory, self.context,
self.property_rules)
extra_props = {'spl_create_prop': 'c'}
image = self.image_factory.new_image(extra_properties=extra_props)
expected_extra_props = {'spl_create_prop': 'c'}
self.assertEqual(image.extra_properties, expected_extra_props)
def test_create_image_extra_prop_reserved_property(self):
self.context = glance.context.RequestContext(tenant=TENANT1,
roles=['spl_role'])
self.image_factory = property_protections.ProtectedImageFactoryProxy(
self.factory, self.context,
self.property_rules)
extra_props = {'foo': 'bar', 'spl_create_prop': 'c'}
# no reg ex for property 'foo' is mentioned for spl_role in config
self.assertRaises(exception.ReservedProperty,
self.image_factory.new_image,
extra_properties=extra_props)
def test_create_image_extra_prop_admin(self):
self.context = glance.context.RequestContext(tenant=TENANT1,
roles=['admin'])
self.image_factory = property_protections.ProtectedImageFactoryProxy(
self.factory, self.context,
self.property_rules)
extra_props = {'foo': 'bar', 'spl_create_prop': 'c'}
image = self.image_factory.new_image(extra_properties=extra_props)
expected_extra_props = {'foo': 'bar', 'spl_create_prop': 'c'}
self.assertEqual(image.extra_properties, expected_extra_props)
def test_create_image_extra_prop_invalid_role(self):
self.context = glance.context.RequestContext(tenant=TENANT1,
roles=['imaginary-role'])
self.image_factory = property_protections.ProtectedImageFactoryProxy(
self.factory, self.context,
self.property_rules)
extra_props = {'foo': 'bar', 'spl_create_prop': 'c'}
self.assertRaises(exception.ReservedProperty,
self.image_factory.new_image,
extra_properties=extra_props)

View File

@ -114,6 +114,20 @@ class TestImage(test_utils.BaseTestCase):
self.image = self.image_factory.new_image(
container_format='bear', disk_format='rawr')
def test_extra_properties(self):
self.image.extra_properties = {'foo': 'bar'}
self.assertEqual(self.image.extra_properties, {'foo': 'bar'})
def test_extra_properties_assign(self):
self.image.extra_properties['foo'] = 'bar'
self.assertEqual(self.image.extra_properties, {'foo': 'bar'})
def test_delete_extra_properties(self):
self.image.extra_properties = {'foo': 'bar'}
self.assertEqual(self.image.extra_properties, {'foo': 'bar'})
del self.image.extra_properties['foo']
self.assertEqual(self.image.extra_properties, {})
def test_visibility_enumerated(self):
self.image.visibility = 'public'
self.image.visibility = 'private'
@ -191,3 +205,73 @@ class TestImageMemberFactory(test_utils.BaseTestCase):
self.assertEqual(image_member.created_at, image_member.updated_at)
self.assertEqual(image_member.status, 'pending')
self.assertTrue(image_member.member_id is not None)
class TestExtraProperties(test_utils.BaseTestCase):
def test_getitem(self):
a_dict = {'foo': 'bar', 'snitch': 'golden'}
extra_properties = domain.ExtraProperties(a_dict)
self.assertEqual(extra_properties['foo'], 'bar')
self.assertEqual(extra_properties['snitch'], 'golden')
def test_getitem_with_no_items(self):
extra_properties = domain.ExtraProperties()
self.assertRaises(KeyError, extra_properties.__getitem__, 'foo')
def test_setitem(self):
a_dict = {'foo': 'bar', 'snitch': 'golden'}
extra_properties = domain.ExtraProperties(a_dict)
extra_properties['foo'] = 'baz'
self.assertEqual(extra_properties['foo'], 'baz')
def test_delitem(self):
a_dict = {'foo': 'bar', 'snitch': 'golden'}
extra_properties = domain.ExtraProperties(a_dict)
del extra_properties['foo']
self.assertRaises(KeyError, extra_properties.__getitem__, 'foo')
self.assertEqual(extra_properties['snitch'], 'golden')
def test_len_with_zero_items(self):
extra_properties = domain.ExtraProperties()
self.assertEqual(len(extra_properties), 0)
def test_len_with_non_zero_items(self):
extra_properties = domain.ExtraProperties()
extra_properties['foo'] = 'bar'
extra_properties['snitch'] = 'golden'
self.assertEqual(len(extra_properties), 2)
def test_eq_with_a_dict(self):
a_dict = {'foo': 'bar', 'snitch': 'golden'}
extra_properties = domain.ExtraProperties(a_dict)
ref_extra_properties = {'foo': 'bar', 'snitch': 'golden'}
self.assertEqual(extra_properties, ref_extra_properties)
def test_eq_with_an_object_of_ExtraProperties(self):
a_dict = {'foo': 'bar', 'snitch': 'golden'}
extra_properties = domain.ExtraProperties(a_dict)
ref_extra_properties = domain.ExtraProperties()
ref_extra_properties['snitch'] = 'golden'
ref_extra_properties['foo'] = 'bar'
self.assertEqual(extra_properties, ref_extra_properties)
def test_eq_with_uneqal_dict(self):
a_dict = {'foo': 'bar', 'snitch': 'golden'}
extra_properties = domain.ExtraProperties(a_dict)
ref_extra_properties = {'boo': 'far', 'gnitch': 'solden'}
self.assertFalse(extra_properties.__eq__(ref_extra_properties))
def test_eq_with_unequal_ExtraProperties_object(self):
a_dict = {'foo': 'bar', 'snitch': 'golden'}
extra_properties = domain.ExtraProperties(a_dict)
ref_extra_properties = domain.ExtraProperties()
ref_extra_properties['gnitch'] = 'solden'
ref_extra_properties['boo'] = 'far'
self.assertFalse(extra_properties.__eq__(ref_extra_properties))
def test_eq_with_incompatible_object(self):
a_dict = {'foo': 'bar', 'snitch': 'golden'}
extra_properties = domain.ExtraProperties(a_dict)
random_list = ['foo', 'bar']
self.assertFalse(extra_properties.__eq__(random_list))

View File

@ -40,14 +40,14 @@ BASE_URI = 'swift+http://storeurl.com/container'
def get_fake_request(path='', method='POST', is_admin=False, user=USER1,
tenant=TENANT1):
roles=['member'], tenant=TENANT1):
req = wsgi.Request.blank(path)
req.method = method
kwargs = {
'user': user,
'tenant': tenant,
'roles': [],
'roles': roles,
'is_admin': is_admin,
}

View File

@ -37,6 +37,7 @@ from glance.common import config
from glance.common import exception
from glance.common import wsgi
from glance import context
from glance.common import property_utils
CONF = cfg.CONF