Using policies for protected properties

This patch extends the way we can define rules for property protections.
It allows us to use the roles/rules defined in policy.json and leverage
the policy language for property protections as well.

DocImpact

Related to bp api-v2-property-protection

Change-Id: I4789ebb2645171280185d2c76138e78d954b5954
This commit is contained in:
iccha.sethi 2013-09-15 15:37:57 +00:00
parent 91174889c3
commit eb87f1fae8
19 changed files with 735 additions and 140 deletions

View File

@ -107,16 +107,21 @@ workers = 1
# (string value). This setting needs to be the same for both # (string value). This setting needs to be the same for both
# glance-scrubber and glance-api. # glance-scrubber and glance-api.
#lock_path=<None> #lock_path=<None>
#
# Property Protections config file # Property Protections config file
# This file contains the rules for property protections and the roles # This file contains the rules for property protections and the roles/policies
# associated with it. # associated with it.
# If this config value is not specified, by default, property protections # If this config value is not specified, by default, property protections
# won't be enforced. # won't be enforced.
# If a value is specified and the file is not found, then an # If a value is specified and the file is not found, then the glance-api
# HTTPInternalServerError will be thrown. # service will not start.
#property_protection_file = #property_protection_file =
# Specify whether 'roles' or 'policies' are used in the
# property_protection_file.
# The default value for property_protection_rule_format is 'roles'.
#property_protection_rule_format = roles
# Set a system wide quota for every user. This value is the total number # Set a system wide quota for every user. This value is the total number
# of bytes that a user can use across all storage systems. A value of # of bytes that a user can use across all storage systems. A value of
# 0 means unlimited. # 0 means unlimited.

View File

@ -0,0 +1,34 @@
# property-protections-policies.conf.sample
#
# This file is an example config file for when
# property_protection_rule_format=policies is enabled.
#
# Specify regular expression for which properties will be protected in []
# For each section, specify CRUD permissions. You may refer to policies defined
# in policy.json.
# The property rules will be applied in the order specified. Once
# a match is found the remaining property rules will not be applied.
#
# WARNING:
# * If the reg ex specified below does not compile, then
# the glance-api service fails to start. (Guide for reg ex python compiler
# used:
# http://docs.python.org/2/library/re.html#regular-expression-syntax)
# * If an operation(create, read, update, delete) is not specified or misspelt
# then the glance-api service fails to start.
# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY!
#
# NOTE: Only one policy can be specified per action. If multiple policies are
# specified, then the glance-api service fails to start.
[^x_.*]
create = default
read = default
update = default
delete = default
[.*]
create = context_is_admin
read = context_is_admin
update = context_is_admin
delete = context_is_admin

View File

@ -0,0 +1,32 @@
# property-protections-roles.conf.sample
#
# This file is an example config file for when
# property_protection_rule_format=roles is enabled.
#
# Specify regular expression for which properties will be protected in []
# For each section, specify CRUD permissions.
# The property rules will be applied in the order specified. Once
# a match is found the remaining property rules will not be applied.
#
# WARNING:
# * If the reg ex specified below does not compile, then
# glance-api service will not start. (Guide for reg ex python compiler used:
# http://docs.python.org/2/library/re.html#regular-expression-syntax)
# * If an operation(create, read, update, delete) is not specified or misspelt
# then the glance-api service will not start.
# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY!
#
# NOTE: Multiple roles can be specified for a given operation. These roles must
# be comma separated.
[^x_.*]
create = admin,member
read = admin,member
update = admin,member
delete = admin,member
[.*]
create = admin
read = admin
update = admin
delete = admin

View File

@ -1,25 +0,0 @@
# property-protections.conf.sample
# Specify regular expression for which properties will be protected in []
# For each section, specify CRUD permissions. You may refer to roles defined
# in policy.json
# The property rules will be applied in the order specified below. Once
# a match is found the remaining property rules will not be traversed through.
# WARNING:
# * If the reg ex specified below does not compile, then
# HTTPInternalServerErrors will be thrown. (Guide for reg ex python compiler used:
# http://docs.python.org/2/library/re.html#regular-expression-syntax)
# * If an operation(create, read, update, delete) is not specified or misspelt
# then that operation for the given regex is disabled for all roles.
# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY!
[^x_.*]
create = admin,member
read = admin,member
update = admin,member
delete = admin,member
[.*]
create = admin
read = admin
update = admin
delete = admin

View File

@ -55,12 +55,21 @@ class Enforcer(object):
self.policy_path = self._find_policy_file() self.policy_path = self._find_policy_file()
self.policy_file_mtime = None self.policy_file_mtime = None
self.policy_file_contents = None self.policy_file_contents = None
self.load_rules()
def set_rules(self, rules): def set_rules(self, rules):
"""Create a new Rules object based on the provided dict of rules""" """Create a new Rules object based on the provided dict of rules"""
rules_obj = policy.Rules(rules, self.default_rule) rules_obj = policy.Rules(rules, self.default_rule)
policy.set_rules(rules_obj) policy.set_rules(rules_obj)
def add_rules(self, rules):
"""Add new rules to the Rules object"""
if policy._rules:
rules_obj = policy.Rules(rules)
policy._rules.update(rules_obj)
else:
self.set_rules(rules)
def load_rules(self): def load_rules(self):
"""Set the rules found in the json file on disk""" """Set the rules found in the json file on disk"""
if self.policy_path: if self.policy_path:
@ -112,8 +121,6 @@ class Enforcer(object):
:raises: `glance.common.exception.Forbidden` :raises: `glance.common.exception.Forbidden`
:returns: A non-False value if access is allowed. :returns: A non-False value if access is allowed.
""" """
self.load_rules()
credentials = { credentials = {
'roles': context.roles, 'roles': context.roles,
'user': context.user, 'user': context.user,

View File

@ -23,7 +23,6 @@ class ProtectedImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, image_factory, context, property_rules): def __init__(self, image_factory, context, property_rules):
self.image_factory = image_factory self.image_factory = image_factory
self.context = context self.context = context
self.roles = self.context.roles
self.property_rules = property_rules self.property_rules = property_rules
kwargs = {'context': self.context, kwargs = {'context': self.context,
'property_rules': self.property_rules} 'property_rules': self.property_rules}
@ -38,7 +37,7 @@ class ProtectedImageFactoryProxy(glance.domain.proxy.ImageFactory):
extra_properties = {} extra_properties = {}
for key in extra_props.keys(): for key in extra_props.keys():
if self.property_rules.check_property_rules(key, 'create', if self.property_rules.check_property_rules(key, 'create',
self.roles): self.context):
extra_properties[key] = extra_props[key] extra_properties[key] = extra_props[key]
else: else:
raise exception.ReservedProperty(property=key) raise exception.ReservedProperty(property=key)
@ -72,11 +71,10 @@ class ProtectedImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, property_rules): def __init__(self, image, context, property_rules):
self.image = image self.image = image
self.context = context self.context = context
self.roles = self.context.roles
self.property_rules = property_rules self.property_rules = property_rules
self.image.extra_properties = ExtraPropertiesProxy( self.image.extra_properties = ExtraPropertiesProxy(
self.roles, self.context,
self.image.extra_properties, self.image.extra_properties,
self.property_rules) self.property_rules)
super(ProtectedImageProxy, self).__init__(self.image) super(ProtectedImageProxy, self).__init__(self.image)
@ -84,18 +82,18 @@ class ProtectedImageProxy(glance.domain.proxy.Image):
class ExtraPropertiesProxy(glance.domain.ExtraProperties): class ExtraPropertiesProxy(glance.domain.ExtraProperties):
def __init__(self, roles, extra_props, property_rules): def __init__(self, context, extra_props, property_rules):
self.roles = roles self.context = context
self.property_rules = property_rules self.property_rules = property_rules
extra_properties = {} extra_properties = {}
for key in extra_props.keys(): for key in extra_props.keys():
if self.property_rules.check_property_rules(key, 'read', if self.property_rules.check_property_rules(key, 'read',
self.roles): self.context):
extra_properties[key] = extra_props[key] extra_properties[key] = extra_props[key]
super(ExtraPropertiesProxy, self).__init__(extra_properties) super(ExtraPropertiesProxy, self).__init__(extra_properties)
def __getitem__(self, key): def __getitem__(self, key):
if self.property_rules.check_property_rules(key, 'read', self.roles): if self.property_rules.check_property_rules(key, 'read', self.context):
return dict.__getitem__(self, key) return dict.__getitem__(self, key)
else: else:
raise KeyError raise KeyError
@ -108,13 +106,13 @@ class ExtraPropertiesProxy(glance.domain.ExtraProperties):
try: try:
if self.__getitem__(key): if self.__getitem__(key):
if self.property_rules.check_property_rules(key, 'update', if self.property_rules.check_property_rules(key, 'update',
self.roles): self.context):
return dict.__setitem__(self, key, value) return dict.__setitem__(self, key, value)
else: else:
raise exception.ReservedProperty(property=key) raise exception.ReservedProperty(property=key)
except KeyError: except KeyError:
if self.property_rules.check_property_rules(key, 'create', if self.property_rules.check_property_rules(key, 'create',
self.roles): self.context):
return dict.__setitem__(self, key, value) return dict.__setitem__(self, key, value)
else: else:
raise exception.ReservedProperty(property=key) raise exception.ReservedProperty(property=key)
@ -124,7 +122,7 @@ class ExtraPropertiesProxy(glance.domain.ExtraProperties):
raise KeyError raise KeyError
if self.property_rules.check_property_rules(key, 'delete', if self.property_rules.check_property_rules(key, 'delete',
self.roles): self.context):
return dict.__delitem__(self, key) return dict.__delitem__(self, key)
else: else:
raise exception.ReservedProperty(property=key) raise exception.ReservedProperty(property=key)

View File

@ -137,7 +137,7 @@ class Controller(controller.BaseController):
self.policy = policy.Enforcer() self.policy = policy.Enforcer()
self.pool = eventlet.GreenPool(size=1024) self.pool = eventlet.GreenPool(size=1024)
if property_utils.is_property_protection_enabled(): if property_utils.is_property_protection_enabled():
self.prop_enforcer = property_utils.PropertyRules() self.prop_enforcer = property_utils.PropertyRules(self.policy)
else: else:
self.prop_enforcer = None self.prop_enforcer = None
@ -160,7 +160,7 @@ class Controller(controller.BaseController):
if property_utils.is_property_protection_enabled(): if property_utils.is_property_protection_enabled():
for key in create_props: for key in create_props:
if (self.prop_enforcer.check_property_rules( if (self.prop_enforcer.check_property_rules(
key, 'create', req.context.roles) is False): key, 'create', req.context) is False):
msg = _("Property '%s' is protected" % key) msg = _("Property '%s' is protected" % key)
LOG.debug(msg) LOG.debug(msg)
raise HTTPForbidden(explanation=msg, raise HTTPForbidden(explanation=msg,
@ -177,7 +177,7 @@ class Controller(controller.BaseController):
if property_utils.is_property_protection_enabled(): if property_utils.is_property_protection_enabled():
for key in image_meta['properties'].keys(): for key in image_meta['properties'].keys():
if (self.prop_enforcer.check_property_rules( if (self.prop_enforcer.check_property_rules(
key, 'read', req.context.roles) is False): key, 'read', req.context) is False):
image_meta['properties'].pop(key) image_meta['properties'].pop(key)
def _enforce_update_protected_props(self, update_props, image_meta, def _enforce_update_protected_props(self, update_props, image_meta,
@ -200,9 +200,9 @@ class Controller(controller.BaseController):
if property_utils.is_property_protection_enabled(): if property_utils.is_property_protection_enabled():
for key in update_props: for key in update_props:
has_read = self.prop_enforcer.check_property_rules( has_read = self.prop_enforcer.check_property_rules(
key, 'read', req.context.roles) key, 'read', req.context)
if ((self.prop_enforcer.check_property_rules( if ((self.prop_enforcer.check_property_rules(
key, 'update', req.context.roles) is False and key, 'update', req.context) is False and
image_meta['properties'][key] != image_meta['properties'][key] !=
orig_meta['properties'][key]) or not has_read): orig_meta['properties'][key]) or not has_read):
msg = _("Property '%s' is protected" % key) msg = _("Property '%s' is protected" % key)
@ -232,13 +232,13 @@ class Controller(controller.BaseController):
if property_utils.is_property_protection_enabled(): if property_utils.is_property_protection_enabled():
for key in delete_props: for key in delete_props:
if (self.prop_enforcer.check_property_rules( if (self.prop_enforcer.check_property_rules(
key, 'read', req.context.roles) is False): key, 'read', req.context) is False):
# NOTE(bourke): if read protected, re-add to image_meta to # NOTE(bourke): if read protected, re-add to image_meta to
# prevent deletion # prevent deletion
image_meta['properties'][key] = \ image_meta['properties'][key] = \
orig_meta['properties'][key] orig_meta['properties'][key]
elif (self.prop_enforcer.check_property_rules( elif (self.prop_enforcer.check_property_rules(
key, 'delete', req.context.roles) is False): key, 'delete', req.context) is False):
msg = _("Property '%s' is protected" % key) msg = _("Property '%s' is protected" % key)
LOG.debug(msg) LOG.debug(msg)
raise HTTPForbidden(explanation=msg, raise HTTPForbidden(explanation=msg,

View File

@ -136,6 +136,10 @@ class InvalidSortKey(Invalid):
message = _("Sort key supplied was not valid.") message = _("Sort key supplied was not valid.")
class InvalidPropertyProtectionConfiguration(Invalid):
message = _("Invalid configuration in property protection file.")
class InvalidFilterRangeValue(Invalid): class InvalidFilterRangeValue(Invalid):
message = _("Unable to filter using the specified range.") message = _("Unable to filter using the specified range.")

View File

@ -20,8 +20,11 @@ import re
from oslo.config import cfg from oslo.config import cfg
import webob.exc import webob.exc
import glance.api.policy
from glance.common import exception
from glance.common.ordereddict import OrderedDict from glance.common.ordereddict import OrderedDict
from glance.openstack.common import log as logging from glance.openstack.common import log as logging
from glance.openstack.common import policy
# NOTE(bourke): The default dict_type is collections.OrderedDict in py27, but # NOTE(bourke): The default dict_type is collections.OrderedDict in py27, but
# we must set manually for compatibility with py26 # we must set manually for compatibility with py26
@ -32,6 +35,10 @@ property_opts = [
cfg.StrOpt('property_protection_file', cfg.StrOpt('property_protection_file',
default=None, default=None,
help=_('The location of the property protection file.')), help=_('The location of the property protection file.')),
cfg.StrOpt('property_protection_rule_format',
default='roles',
help=_('This config value indicates whether "roles" or '
'"policies" are used in the property protection file.')),
] ]
CONF = cfg.CONF CONF = cfg.CONF
@ -46,8 +53,13 @@ def is_property_protection_enabled():
class PropertyRules(object): class PropertyRules(object):
def __init__(self): def __init__(self, policy_enforcer=None):
self.rules = [] self.rules = []
self.prop_exp_mapping = {}
self.policies = []
self.policy_enforcer = policy_enforcer or glance.api.policy.Enforcer()
self.prop_prot_rule_format = CONF.property_protection_rule_format
self.prop_prot_rule_format = self.prop_prot_rule_format.lower()
self._load_rules() self._load_rules()
def _load_rules(self): def _load_rules(self):
@ -58,7 +70,14 @@ class PropertyRules(object):
msg = (_("Couldn't find property protection file %s:%s.") % msg = (_("Couldn't find property protection file %s:%s.") %
(CONF.property_protection_file, e)) (CONF.property_protection_file, e))
LOG.error(msg) LOG.error(msg)
raise webob.exc.HTTPInternalServerError(explanation=msg) raise exception.InvalidPropertyProtectionConfiguration()
if self.prop_prot_rule_format not in ['policies', 'roles']:
msg = _("Invalid value '%s' for 'property_protection_rule_format'"
". The permitted values are 'roles' and 'policies'" %
self.prop_prot_rule_format)
LOG.error(msg)
raise exception.InvalidPropertyProtectionConfiguration()
operations = ['create', 'read', 'update', 'delete'] operations = ['create', 'read', 'update', 'delete']
properties = CONFIG.sections() properties = CONFIG.sections()
@ -67,10 +86,25 @@ class PropertyRules(object):
compiled_rule = self._compile_rule(property_exp) compiled_rule = self._compile_rule(property_exp)
for operation in operations: for operation in operations:
roles = CONFIG.get(property_exp, operation) permissions = CONFIG.get(property_exp, operation)
if roles: if permissions:
roles = [role.strip() for role in roles.split(',')] if self.prop_prot_rule_format == 'policies':
property_dict[operation] = roles if ',' in permissions:
msg = _("Multiple policies '%s' not allowed for a"
" given operation. Policies can be "
"combined in the policy file" %
permissions)
LOG.error(msg)
raise exception.\
InvalidPropertyProtectionConfiguration()
self.prop_exp_mapping[compiled_rule] = property_exp
self._add_policy_rules(property_exp, operation,
permissions)
permissions = [permissions]
else:
permissions = [permission.strip() for permission in
permissions.split(',')]
property_dict[operation] = permissions
else: else:
property_dict[operation] = [] property_dict[operation] = []
msg = _(('Property protection on operation %s for rule ' msg = _(('Property protection on operation %s for rule '
@ -88,9 +122,34 @@ class PropertyRules(object):
msg = (_("Encountered a malformed property protection rule %s:%s.") msg = (_("Encountered a malformed property protection rule %s:%s.")
% (rule, e)) % (rule, e))
LOG.error(msg) LOG.error(msg)
raise webob.exc.HTTPInternalServerError(explanation=msg) raise exception.InvalidPropertyProtectionConfiguration()
def check_property_rules(self, property_name, action, roles): def _add_policy_rules(self, property_exp, action, rule):
""" Add policy rules to the policy enforcer.
For example, if the file listed as property_protection_file has:
[prop_a]
create = glance_creator
then the corresponding policy rule would be:
"prop_a:create": "rule:glance_creator"
where glance_creator is defined in policy.json. For example:
"glance:creator": "role:admin or role:glance_create_user"
"""
rule = "rule:%s" % rule
rule_name = "%s:%s" % (property_exp, action)
rule_dict = {}
rule_dict[rule_name] = policy.parse_rule(rule)
self.policy_enforcer.add_rules(rule_dict)
def _check_policy(self, property_exp, action, context):
try:
target = ":".join([property_exp, action])
self.policy_enforcer.enforce(context, target, {})
except exception.Forbidden:
return False
return True
def check_property_rules(self, property_name, action, context):
roles = context.roles
if not self.rules: if not self.rules:
return True return True
@ -99,6 +158,12 @@ class PropertyRules(object):
for rule_exp, rule in self.rules: for rule_exp, rule in self.rules:
if rule_exp.search(str(property_name)): if rule_exp.search(str(property_name)):
if set(roles).intersection(set(rule.get(action))): rule_roles = rule.get(action)
return True if rule_roles:
if self.prop_prot_rule_format == 'policies':
prop_exp_key = self.prop_exp_mapping[rule_exp]
return self._check_policy(prop_exp_key, action,
context)
if set(roles).intersection(set(rule_roles)):
return True
return False return False

View File

@ -44,7 +44,7 @@ class Gateway(object):
notifier_image_factory = glance.notifier.ImageFactoryProxy( notifier_image_factory = glance.notifier.ImageFactoryProxy(
policy_image_factory, context, self.notifier) policy_image_factory, context, self.notifier)
if property_utils.is_property_protection_enabled(): if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules() property_rules = property_utils.PropertyRules(self.policy)
protected_image_factory = property_protections.\ protected_image_factory = property_protections.\
ProtectedImageFactoryProxy(notifier_image_factory, context, ProtectedImageFactoryProxy(notifier_image_factory, context,
property_rules) property_rules)
@ -74,7 +74,7 @@ class Gateway(object):
notifier_image_repo = glance.notifier.ImageRepoProxy( notifier_image_repo = glance.notifier.ImageRepoProxy(
policy_image_repo, context, self.notifier) policy_image_repo, context, self.notifier)
if property_utils.is_property_protection_enabled(): if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules() property_rules = property_utils.PropertyRules(self.policy)
protected_image_repo = property_protections.\ protected_image_repo = property_protections.\
ProtectedImageRepoProxy(notifier_image_repo, context, ProtectedImageRepoProxy(notifier_image_repo, context,
property_rules) property_rules)

View File

@ -1,4 +1,5 @@
{ {
"context_is_admin": "role:admin", "context_is_admin": "role:admin",
"default": "" "default": "",
"glance_creator": "role:admin or role:spl_role"
} }

View File

@ -0,0 +1,17 @@
[spl_creator_policy]
create = glance_creator
read = glance_creator
update = context_is_admin
delete = context_is_admin
[spl_default_policy]
create = context_is_admin
read = default
update = context_is_admin
delete = context_is_admin
[.*]
create = context_is_admin
read = context_is_admin
update = context_is_admin
delete = context_is_admin

View File

@ -310,6 +310,7 @@ class ApiServer(Server):
self.image_cache_driver = 'sqlite' self.image_cache_driver = 'sqlite'
self.policy_file = policy_file self.policy_file = policy_file
self.policy_default_rule = 'default' self.policy_default_rule = 'default'
self.property_protection_rule_format = 'roles'
self.needs_database = True self.needs_database = True
default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir
@ -370,6 +371,7 @@ enable_v2_api = %(enable_v2_api)s
lock_path = %(lock_path)s lock_path = %(lock_path)s
enable_v2_api= %(enable_v2_api)s enable_v2_api= %(enable_v2_api)s
property_protection_file = %(property_protection_file)s property_protection_file = %(property_protection_file)s
property_protection_rule_format = %(property_protection_rule_format)s
[paste_deploy] [paste_deploy]
flavor = %(deployment_flavor)s flavor = %(deployment_flavor)s
""" """
@ -575,8 +577,12 @@ class FunctionalTest(test_utils.BaseTestCase):
self.copy_data_file('schema-image.json', conf_dir) self.copy_data_file('schema-image.json', conf_dir)
self.copy_data_file('policy.json', conf_dir) self.copy_data_file('policy.json', conf_dir)
self.copy_data_file('property-protections.conf', conf_dir) self.copy_data_file('property-protections.conf', conf_dir)
self.property_file = os.path.join(conf_dir, self.copy_data_file('property-protections-policies.conf', conf_dir)
'property-protections.conf') self.property_file_roles = os.path.join(conf_dir,
'property-protections.conf')
property_policies = 'property-protections-policies.conf'
self.property_file_policies = os.path.join(conf_dir,
property_policies)
self.policy_file = os.path.join(conf_dir, 'policy.json') self.policy_file = os.path.join(conf_dir, 'policy.json')
self.api_server = ApiServer(self.test_dir, self.api_server = ApiServer(self.test_dir,

View File

@ -368,9 +368,9 @@ class TestImages(functional.FunctionalTest):
self.stop_servers() self.stop_servers()
def test_property_protections(self): def test_property_protections_with_roles(self):
# Enable property protection # Enable property protection
self.api_server.property_protection_file = self.property_file self.api_server.property_protection_file = self.property_file_roles
self.start_servers(**self.__dict__.copy()) self.start_servers(**self.__dict__.copy())
# Image list should be empty # Image list should be empty
@ -427,6 +427,7 @@ class TestImages(functional.FunctionalTest):
data = json.dumps({'name': 'image-1', data = json.dumps({'name': 'image-1',
'disk_format': 'aki', 'container_format': 'aki', 'disk_format': 'aki', 'container_format': 'aki',
'spl_create_prop': 'create_bar', 'spl_create_prop': 'create_bar',
'spl_create_prop_policy': 'create_policy_bar',
'spl_read_prop': 'read_bar', 'spl_read_prop': 'read_bar',
'spl_update_prop': 'update_bar', 'spl_update_prop': 'update_bar',
'spl_delete_prop': 'delete_bar'}) 'spl_delete_prop': 'delete_bar'})
@ -495,6 +496,155 @@ class TestImages(functional.FunctionalTest):
self.stop_servers() self.stop_servers()
def test_property_protections_with_policies(self):
# Enable property protection
self.api_server.property_protection_file = self.property_file_policies
self.api_server.property_protection_rule_format = 'policies'
self.start_servers(**self.__dict__.copy())
# Image list should be empty
path = self._url('/v2/images')
response = requests.get(path, headers=self._headers())
self.assertEqual(200, response.status_code)
images = json.loads(response.text)['images']
self.assertEqual(0, len(images))
## Create an image for role member with extra props
# Raises 403 since user is not allowed to set 'foo'
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = json.dumps({'name': 'image-1', 'foo': 'bar',
'disk_format': 'aki', 'container_format': 'aki',
'x_owner_foo': 'o_s_bar'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(403, response.status_code)
## Create an image for role member without 'foo'
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = json.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Returned image entity
image = json.loads(response.text)
image_id = image['id']
expected_image = {
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
'min_disk': 0,
'min_ram': 0,
'schema': '/v2/schemas/image',
}
for key, value in expected_image.items():
self.assertEqual(image[key], value, key)
# Create an image for role spl_role with extra props
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'spl_role, admin'})
data = json.dumps({'name': 'image-1',
'disk_format': 'aki', 'container_format': 'aki',
'spl_creator_policy': 'creator_bar',
'spl_default_policy': 'default_bar'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
image = json.loads(response.text)
image_id = image['id']
self.assertEqual('creator_bar', image['spl_creator_policy'])
self.assertEqual('default_bar', image['spl_default_policy'])
# Attempt to replace a property which is permitted
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers = self._headers({'content-type': media_type,
'X-Roles': 'admin'})
data = json.dumps([
{'op': 'replace', 'path': '/spl_creator_policy', 'value': 'r'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code, response.text)
# Returned image entity should reflect the changes
image = json.loads(response.text)
# 'spl_creator_policy' has update permission for admin
# hence the value has changed
self.assertEqual('r', image['spl_creator_policy'])
# Attempt to replace a property which is forbidden
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers = self._headers({'content-type': media_type,
'X-Roles': 'spl_role'})
data = json.dumps([
{'op': 'replace', 'path': '/spl_creator_policy', 'value': 'z'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(403, response.status_code, response.text)
# Attempt to read properties
path = self._url('/v2/images/%s' % image_id)
headers = self._headers({'content-type': media_type,
'X-Roles': 'random_role'})
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
image = json.loads(response.text)
# 'random_role' is allowed read 'spl_default_policy'.
self.assertEqual(image['spl_default_policy'], 'default_bar')
# 'random_role' is forbidden to read 'spl_creator_policy'.
self.assertFalse('spl_creator_policy' in image)
# Attempt to add and remove properties which are permitted
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers = self._headers({'content-type': media_type,
'X-Roles': 'admin'})
data = json.dumps([
{'op': 'remove', 'path': '/spl_creator_policy'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code, response.text)
# Returned image entity should reflect the changes
image = json.loads(response.text)
# 'spl_creator_policy' has delete permission for admin
# hence the value has been deleted
self.assertFalse('spl_creator_policy' in image)
# Attempt to read a property that is permitted
path = self._url('/v2/images/%s' % image_id)
headers = self._headers({'content-type': media_type,
'X-Roles': 'random_role'})
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
# Returned image entity should reflect the changes
image = json.loads(response.text)
self.assertEqual(image['spl_default_policy'], 'default_bar')
# Image Deletion should work
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(204, response.status_code)
# This image should be no longer be directly accessible
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(404, response.status_code)
self.stop_servers()
def test_tag_lifecycle(self): def test_tag_lifecycle(self):
# Create an image with a tag - duplicate should be ignored # Create an image with a tag - duplicate should be ignored
path = self._url('/v2/images') path = self._url('/v2/images')

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from glance.api import policy
from glance.api import property_protections from glance.api import property_protections
from glance.common import exception from glance.common import exception
from glance.common import property_utils from glance.common import property_utils
@ -48,7 +49,8 @@ class TestProtectedImageRepoProxy(utils.BaseTestCase):
def setUp(self): def setUp(self):
super(TestProtectedImageRepoProxy, self).setUp() super(TestProtectedImageRepoProxy, self).setUp()
self.set_property_protections() self.set_property_protections()
self.property_rules = property_utils.PropertyRules() self.policy = policy.Enforcer()
self.property_rules = property_utils.PropertyRules(self.policy)
self.image_factory = glance.domain.ImageFactory() self.image_factory = glance.domain.ImageFactory()
extra_props = {'spl_create_prop': 'c', extra_props = {'spl_create_prop': 'c',
'spl_read_prop': 'r', 'spl_read_prop': 'r',
@ -101,7 +103,8 @@ class TestProtectedImageProxy(utils.BaseTestCase):
def setUp(self): def setUp(self):
super(TestProtectedImageProxy, self).setUp() super(TestProtectedImageProxy, self).setUp()
self.set_property_protections() self.set_property_protections()
self.property_rules = property_utils.PropertyRules() self.policy = policy.Enforcer()
self.property_rules = property_utils.PropertyRules(self.policy)
class ImageStub(object): class ImageStub(object):
def __init__(self, extra_prop): def __init__(self, extra_prop):
@ -123,92 +126,93 @@ class TestExtraPropertiesProxy(utils.BaseTestCase):
def setUp(self): def setUp(self):
super(TestExtraPropertiesProxy, self).setUp() super(TestExtraPropertiesProxy, self).setUp()
self.set_property_protections() self.set_property_protections()
self.property_rules = property_utils.PropertyRules() self.policy = policy.Enforcer()
self.property_rules = property_utils.PropertyRules(self.policy)
def test_read_extra_property_as_admin_role(self): def test_read_extra_property_as_admin_role(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'} extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['admin'] context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
test_result = extra_prop_proxy['foo'] test_result = extra_prop_proxy['foo']
self.assertEqual(test_result, 'bar') self.assertEqual(test_result, 'bar')
def test_read_extra_property_as_unpermitted_role(self): def test_read_extra_property_as_unpermitted_role(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'} extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['unpermitted_role'] context = glance.context.RequestContext(roles=['unpermitted_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo') self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo')
def test_update_extra_property_as_permitted_role_after_read(self): def test_update_extra_property_as_permitted_role_after_read(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'} extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['admin'] context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
extra_prop_proxy['foo'] = 'par' extra_prop_proxy['foo'] = 'par'
self.assertEqual(extra_prop_proxy['foo'], 'par') self.assertEqual(extra_prop_proxy['foo'], 'par')
def test_update_extra_property_as_unpermitted_role_after_read(self): def test_update_extra_property_as_unpermitted_role_after_read(self):
extra_properties = {'spl_read_prop': 'bar'} extra_properties = {'spl_read_prop': 'bar'}
roles = ['spl_role'] context = glance.context.RequestContext(roles=['spl_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty, self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__, extra_prop_proxy.__setitem__,
'spl_read_prop', 'par') 'spl_read_prop', 'par')
def test_update_reserved_extra_property_as_permitted_role(self): def test_update_reserved_extra_property(self):
extra_properties = {'spl_create_prop': 'bar'} extra_properties = {'spl_create_prop': 'bar'}
roles = ['spl_role'] context = glance.context.RequestContext(roles=['spl_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty, self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__, 'spl_create_prop', extra_prop_proxy.__setitem__, 'spl_create_prop',
'par') 'par')
def test_create_extra_property_as_permitted_role_after_read(self): def test_create_extra_property_admin(self):
extra_properties = {} extra_properties = {}
roles = ['admin'] context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
extra_prop_proxy['boo'] = 'doo' extra_prop_proxy['boo'] = 'doo'
self.assertEqual(extra_prop_proxy['boo'], 'doo') self.assertEqual(extra_prop_proxy['boo'], 'doo')
def test_create_reserved_extra_property_as_permitted_role(self): def test_create_reserved_extra_property(self):
extra_properties = {} extra_properties = {}
roles = ['spl_role'] context = glance.context.RequestContext(roles=['spl_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty, self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__, 'boo', extra_prop_proxy.__setitem__, 'boo',
'doo') 'doo')
def test_delete_extra_property_as_admin_role(self): def test_delete_extra_property_as_admin_role(self):
extra_properties = {'foo': 'bar'} extra_properties = {'foo': 'bar'}
roles = ['admin'] context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
del extra_prop_proxy['foo'] del extra_prop_proxy['foo']
self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo') self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo')
def test_delete_nonexistant_extra_property_as_admin_role(self): def test_delete_nonexistant_extra_property_as_admin_role(self):
extra_properties = {} extra_properties = {}
roles = ['admin'] context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
self.assertRaises(KeyError, extra_prop_proxy.__delitem__, 'foo') self.assertRaises(KeyError, extra_prop_proxy.__delitem__, 'foo')
def test_delete_reserved_extra_property_as_permitted_role(self): def test_delete_reserved_extra_property(self):
extra_properties = {'spl_read_prop': 'r'} extra_properties = {'spl_read_prop': 'r'}
roles = ['spl_role'] context = glance.context.RequestContext(roles=['spl_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules) context, extra_properties, self.property_rules)
# Ensure property has been created and can be read # Ensure property has been created and can be read
self.assertEqual(extra_prop_proxy['spl_read_prop'], 'r') self.assertEqual(extra_prop_proxy['spl_read_prop'], 'r')
self.assertRaises(exception.ReservedProperty, self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__delitem__, 'spl_read_prop') extra_prop_proxy.__delitem__, 'spl_read_prop')
def test_delete_nonexistant_extra_property_as_permitted_role(self): def test_delete_nonexistant_extra_property(self):
extra_properties = {} extra_properties = {}
roles = ['spl_role'] roles = ['spl_role']
extra_prop_proxy = property_protections.ExtraPropertiesProxy( extra_prop_proxy = property_protections.ExtraPropertiesProxy(
@ -221,7 +225,8 @@ class TestProtectedImageFactoryProxy(utils.BaseTestCase):
def setUp(self): def setUp(self):
super(TestProtectedImageFactoryProxy, self).setUp() super(TestProtectedImageFactoryProxy, self).setUp()
self.set_property_protections() self.set_property_protections()
self.property_rules = property_utils.PropertyRules() self.policy = policy.Enforcer()
self.property_rules = property_utils.PropertyRules(self.policy)
self.factory = glance.domain.ImageFactory() self.factory = glance.domain.ImageFactory()
def test_create_image_no_extra_prop(self): def test_create_image_no_extra_prop(self):

View File

@ -15,8 +15,11 @@
import webob.exc import webob.exc
from glance.api import policy
from glance.common import exception
from glance.common import property_utils from glance.common import property_utils
from glance.tests import utils import glance.context
from glance.tests.unit import base
CONFIG_SECTIONS = [ CONFIG_SECTIONS = [
'^x_owner_.*', '^x_owner_.*',
@ -30,16 +33,20 @@ CONFIG_SECTIONS = [
] ]
class TestPropertyRules(utils.BaseTestCase): def create_context(policy, roles=[]):
return glance.context.RequestContext(roles=roles,
policy_enforcer=policy)
class TestPropertyRulesWithRoles(base.IsolatedUnitTest):
def setUp(self): def setUp(self):
super(TestPropertyRules, self).setUp() super(TestPropertyRulesWithRoles, self).setUp()
self.set_property_protections() self.set_property_protections()
self.policy = policy.Enforcer()
def tearDown(self): def tearDown(self):
for section in property_utils.CONFIG.sections(): super(TestPropertyRulesWithRoles, self).tearDown()
property_utils.CONFIG.remove_section(section)
super(TestPropertyRules, self).tearDown()
def test_is_property_protections_enabled_true(self): def test_is_property_protections_enabled_true(self):
self.config(property_protection_file="property-protections.conf") self.config(property_protection_file="property-protections.conf")
@ -51,7 +58,7 @@ class TestPropertyRules(utils.BaseTestCase):
def test_property_protection_file_doesnt_exist(self): def test_property_protection_file_doesnt_exist(self):
self.config(property_protection_file='fake-file.conf') self.config(property_protection_file='fake-file.conf')
self.assertRaises(webob.exc.HTTPInternalServerError, self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules) property_utils.PropertyRules)
def test_property_protection_with_malformed_rule(self): def test_property_protection_with_malformed_rule(self):
@ -60,7 +67,7 @@ class TestPropertyRules(utils.BaseTestCase):
'update': ['fake-role'], 'update': ['fake-role'],
'delete': ['fake-role']}} 'delete': ['fake-role']}}
self.set_property_protection_rules(malformed_rules) self.set_property_protection_rules(malformed_rules)
self.assertRaises(webob.exc.HTTPInternalServerError, self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules) property_utils.PropertyRules)
def test_property_protection_with_missing_operation(self): def test_property_protection_with_missing_operation(self):
@ -68,7 +75,7 @@ class TestPropertyRules(utils.BaseTestCase):
'update': ['fake-role'], 'update': ['fake-role'],
'delete': ['fake-role']}} 'delete': ['fake-role']}}
self.set_property_protection_rules(rules_with_missing_operation) self.set_property_protection_rules(rules_with_missing_operation)
self.assertRaises(webob.exc.HTTPInternalServerError, self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules) property_utils.PropertyRules)
def test_property_protection_with_misspelt_operation(self): def test_property_protection_with_misspelt_operation(self):
@ -77,7 +84,7 @@ class TestPropertyRules(utils.BaseTestCase):
'update': ['fake-role'], 'update': ['fake-role'],
'delete': ['fake-role']}} 'delete': ['fake-role']}}
self.set_property_protection_rules(rules_with_misspelt_operation) self.set_property_protection_rules(rules_with_misspelt_operation)
self.assertRaises(webob.exc.HTTPInternalServerError, self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules) property_utils.PropertyRules)
def test_property_protection_with_whitespace(self): def test_property_protection_with_whitespace(self):
@ -92,81 +99,85 @@ class TestPropertyRules(utils.BaseTestCase):
self.set_property_protection_rules(rules_whitespace) self.set_property_protection_rules(rules_whitespace)
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules('test_prop_1', self.assertTrue(self.rules_checker.check_property_rules('test_prop_1',
'read', ['member'])) 'read', create_context(self.policy, ['member'])))
self.assertTrue(self.rules_checker.check_property_rules('test_prop_1', self.assertTrue(self.rules_checker.check_property_rules('test_prop_1',
'read', ['fake-role'])) 'read', create_context(self.policy, ['fake-role'])))
def test_check_property_rules_invalid_action(self): def test_check_property_rules_invalid_action(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop', self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'hall', ['admin'])) 'hall', create_context(self.policy, ['admin'])))
def test_check_property_rules_read_permitted_admin_role(self): def test_check_property_rules_read_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules('test_prop', self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'read', ['admin'])) 'read', create_context(self.policy, ['admin'])))
def test_check_property_rules_read_permitted_specific_role(self): def test_check_property_rules_read_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules( self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'read', ['member'])) 'x_owner_prop', 'read',
create_context(self.policy, ['member'])))
def test_check_property_rules_read_unpermitted_role(self): def test_check_property_rules_read_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop', self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'read', ['member'])) 'read', create_context(self.policy, ['member'])))
def test_check_property_rules_create_permitted_admin_role(self): def test_check_property_rules_create_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules('test_prop', self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'create', ['admin'])) 'create', create_context(self.policy, ['admin'])))
def test_check_property_rules_create_permitted_specific_role(self): def test_check_property_rules_create_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules( self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'create', ['member'])) 'x_owner_prop', 'create',
create_context(self.policy, ['member'])))
def test_check_property_rules_create_unpermitted_role(self): def test_check_property_rules_create_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop', self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'create', ['member'])) 'create', create_context(self.policy, ['member'])))
def test_check_property_rules_update_permitted_admin_role(self): def test_check_property_rules_update_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules('test_prop', self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'update', ['admin'])) 'update', create_context(self.policy, ['admin'])))
def test_check_property_rules_update_permitted_specific_role(self): def test_check_property_rules_update_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules( self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'update', ['member'])) 'x_owner_prop', 'update',
create_context(self.policy, ['member'])))
def test_check_property_rules_update_unpermitted_role(self): def test_check_property_rules_update_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop', self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'update', ['member'])) 'update', create_context(self.policy, ['member'])))
def test_check_property_rules_delete_permitted_admin_role(self): def test_check_property_rules_delete_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules('test_prop', self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'delete', ['admin'])) 'delete', create_context(self.policy, ['admin'])))
def test_check_property_rules_delete_permitted_specific_role(self): def test_check_property_rules_delete_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules( self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'delete', ['member'])) 'x_owner_prop', 'delete',
create_context(self.policy, ['member'])))
def test_check_property_rules_delete_unpermitted_role(self): def test_check_property_rules_delete_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop', self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'delete', ['member'])) 'delete', create_context(self.policy, ['member'])))
def test_property_config_loaded_in_order(self): def test_property_config_loaded_in_order(self):
""" """
Verify the order of loaded config sections matches that from the Verify the order of loaded config sections matches that from the
configuration file configuration file
""" """
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertEqual(property_utils.CONFIG.sections(), CONFIG_SECTIONS) self.assertEqual(property_utils.CONFIG.sections(), CONFIG_SECTIONS)
def test_property_rules_loaded_in_order(self): def test_property_rules_loaded_in_order(self):
@ -174,7 +185,77 @@ class TestPropertyRules(utils.BaseTestCase):
Verify rules are iterable in the same order as read from the config Verify rules are iterable in the same order as read from the config
file file
""" """
self.rules_checker = property_utils.PropertyRules() self.rules_checker = property_utils.PropertyRules(self.policy)
for i in xrange(len(property_utils.CONFIG.sections())): for i in xrange(len(property_utils.CONFIG.sections())):
self.assertEqual(property_utils.CONFIG.sections()[i], self.assertEqual(property_utils.CONFIG.sections()[i],
self.rules_checker.rules[i][0].pattern) self.rules_checker.rules[i][0].pattern)
class TestPropertyRulesWithPolicies(base.IsolatedUnitTest):
def setUp(self):
super(TestPropertyRulesWithPolicies, self).setUp()
self.set_property_protections(use_policies=True)
self.policy = policy.Enforcer()
self.rules_checker = property_utils.PropertyRules(self.policy)
def tearDown(self):
super(TestPropertyRulesWithPolicies, self).tearDown()
def test_check_property_rules_create_permitted_specific_policy(self):
self.assertTrue(self.rules_checker.check_property_rules(
'spl_creator_policy', 'create',
create_context(self.policy, ['spl_role'])))
def test_check_property_rules_create_unpermitted_policy(self):
self.assertFalse(self.rules_checker.check_property_rules(
'spl_creator_policy', 'create',
create_context(self.policy, ['fake-role'])))
def test_check_property_rules_read_permitted_specific_policy(self):
self.assertTrue(self.rules_checker.check_property_rules(
'spl_creator_policy', 'read',
create_context(self.policy, ['spl_role'])))
def test_check_property_rules_read_unpermitted_policy(self):
self.assertFalse(self.rules_checker.check_property_rules(
'spl_creator_policy', 'read',
create_context(self.policy, ['fake-role'])))
def test_check_property_rules_update_permitted_specific_policy(self):
self.assertTrue(self.rules_checker.check_property_rules(
'spl_creator_policy', 'update',
create_context(self.policy, ['admin'])))
def test_check_property_rules_update_unpermitted_policy(self):
self.assertFalse(self.rules_checker.check_property_rules(
'spl_creator_policy', 'update',
create_context(self.policy, ['fake-role'])))
def test_check_property_rules_delete_permitted_specific_policy(self):
self.assertTrue(self.rules_checker.check_property_rules(
'spl_creator_policy', 'delete',
create_context(self.policy, ['admin'])))
def test_check_property_rules_delete_unpermitted_policy(self):
self.assertFalse(self.rules_checker.check_property_rules(
'spl_creator_policy', 'delete',
create_context(self.policy, ['fake-role'])))
def test_property_protection_with_malformed_rule(self):
malformed_rules = {'^[0-9)': {'create': ['fake-policy'],
'read': ['fake-policy'],
'update': ['fake-policy'],
'delete': ['fake-policy']}}
self.set_property_protection_rules(malformed_rules)
self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules)
def test_property_protection_with_multiple_policies(self):
malformed_rules = {'^x_.*': {'create': ['fake-policy, another_pol'],
'read': ['fake-policy'],
'update': ['fake-policy'],
'delete': ['fake-policy']}}
self.set_property_protection_rules(malformed_rules)
self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules)

View File

@ -2487,7 +2487,7 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
def test_prop_protection_with_create_and_permitted_role(self): def test_prop_protection_with_create_and_permitted_role(self):
""" """
As admin role, create and image and verify permitted role 'member' can As admin role, create an image and verify permitted role 'member' can
create a protected property create a protected property
""" """
image_id = self._create_admin_image() image_id = self._create_admin_image()
@ -2501,6 +2501,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
res_body = json.loads(output.body)['image'] res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties']['x_owner_foo'], 'bar') self.assertEqual(res_body['properties']['x_owner_foo'], 'bar')
def test_prop_protection_with_permitted_policy_config(self):
"""
As admin role, create an image and verify permitted role 'member' can
create a protected property
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image()
another_request = unit_test_utils.get_fake_request(
path='/images/%s' % image_id, method='PUT')
headers = {'x-auth-token': 'user:tenant:admin',
'x-image-meta-property-spl_create_prop_policy': 'bar'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
self.assertEqual(output.status_int, 200)
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties']['spl_create_prop_policy'],
'bar')
def test_prop_protection_with_create_and_unpermitted_role(self): def test_prop_protection_with_create_and_unpermitted_role(self):
""" """
As admin role, create an image and verify unpermitted role As admin role, create an image and verify unpermitted role
@ -2605,6 +2624,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
res_body = json.loads(output.body)['images'][0] res_body = json.loads(output.body)['images'][0]
self.assertEqual(res_body['properties']['x_owner_foo'], 'bar') self.assertEqual(res_body['properties']['x_owner_foo'], 'bar')
def test_prop_protection_with_detail_and_permitted_policy(self):
"""
As admin role, create an image with a protected property, and verify
permitted role 'member' can read that protected property via
/images/detail
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-x_owner_foo': 'bar'})
another_request = unit_test_utils.get_fake_request(
method='GET', path='/images/detail')
headers = {'x-auth-token': 'user:tenant:member'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
self.assertEqual(output.status_int, 200)
res_body = json.loads(output.body)['images'][0]
self.assertEqual(res_body['properties']['x_owner_foo'], 'bar')
def test_prop_protection_with_detail_and_unpermitted_role(self): def test_prop_protection_with_detail_and_unpermitted_role(self):
""" """
As admin role, create an image with a protected property, and verify As admin role, create an image with a protected property, and verify
@ -2624,6 +2662,26 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
self.assertNotIn('x-image-meta-property-x_owner_foo', self.assertNotIn('x-image-meta-property-x_owner_foo',
res_body['properties']) res_body['properties'])
def test_prop_protection_with_detail_and_unpermitted_policy(self):
"""
As admin role, create an image with a protected property, and verify
permitted role 'fake_role' can *not* read that protected property via
/images/detail
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-x_owner_foo': 'bar'})
another_request = unit_test_utils.get_fake_request(
method='GET', path='/images/detail')
headers = {'x-auth-token': 'user:tenant:fake_role'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
self.assertEqual(output.status_int, 200)
res_body = json.loads(output.body)['images'][0]
self.assertNotIn('x-image-meta-property-x_owner_foo',
res_body['properties'])
def test_prop_protection_with_update_and_permitted_role(self): def test_prop_protection_with_update_and_permitted_role(self):
""" """
As admin role, create an image with protected property, and verify As admin role, create an image with protected property, and verify
@ -2641,6 +2699,24 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
res_body = json.loads(output.body)['image'] res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties']['x_owner_foo'], 'baz') self.assertEqual(res_body['properties']['x_owner_foo'], 'baz')
def test_prop_protection_with_update_and_permitted_policy(self):
"""
As admin role, create an image with protected property, and verify
permitted role 'admin' can update that protected property
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-spl_default_policy': 'bar'})
another_request = unit_test_utils.get_fake_request(
path='/images/%s' % image_id, method='PUT')
headers = {'x-auth-token': 'user:tenant:admin',
'x-image-meta-property-spl_default_policy': 'baz'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties']['spl_default_policy'], 'baz')
def test_prop_protection_with_update_and_unpermitted_role(self): def test_prop_protection_with_update_and_unpermitted_role(self):
""" """
As admin role, create an image with protected property, and verify As admin role, create an image with protected property, and verify
@ -2659,6 +2735,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
self.assertIn("Property '%s' is protected" % self.assertIn("Property '%s' is protected" %
"x_owner_foo", output.body) "x_owner_foo", output.body)
def test_prop_protection_with_update_and_unpermitted_policy(self):
"""
As admin role, create an image with protected property, and verify
unpermitted role 'fake_role' can *not* update that protected property
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-x_owner_foo': 'bar'})
another_request = unit_test_utils.get_fake_request(
path='/images/%s' % image_id, method='PUT')
headers = {'x-auth-token': 'user:tenant:fake_role',
'x-image-meta-property-x_owner_foo': 'baz'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
self.assertEquals(output.status_int, webob.exc.HTTPForbidden.code)
self.assertIn("Property '%s' is protected" %
"x_owner_foo", output.body)
def test_prop_protection_update_without_read(self): def test_prop_protection_update_without_read(self):
""" """
Test protected property cannot be updated without read permission Test protected property cannot be updated without read permission
@ -2711,6 +2806,24 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
res_body = json.loads(output.body)['image'] res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties'], {}) self.assertEqual(res_body['properties'], {})
def test_prop_protection_with_delete_and_permitted_policy(self):
"""
As admin role, create an image with protected property, and verify
permitted role 'member' can can delete that protected property
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-x_owner_foo': 'bar'})
another_request = unit_test_utils.get_fake_request(
path='/images/%s' % image_id, method='PUT')
headers = {'x-auth-token': 'user:tenant:member',
'X-Glance-Registry-Purge-Props': 'True'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties'], {})
def test_prop_protection_with_delete_and_unpermitted_read(self): def test_prop_protection_with_delete_and_unpermitted_read(self):
""" """
Test protected property cannot be deleted without read permission Test protected property cannot be deleted without read permission

View File

@ -105,7 +105,7 @@ def _db_image_member_fixture(image_id, member_id, **kwargs):
return obj return obj
class TestImagesController(test_utils.BaseTestCase): class TestImagesController(base.IsolatedUnitTest):
def setUp(self): def setUp(self):
super(TestImagesController, self).setUp() super(TestImagesController, self).setUp()
@ -607,6 +607,11 @@ class TestImagesController(test_utils.BaseTestCase):
self.controller.update, request, UUID1, changes) self.controller.update, request, UUID1, changes)
def test_prop_protection_with_create_and_permitted_role(self): def test_prop_protection_with_create_and_permitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections() self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin']) request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'} image = {'name': 'image-1'}
@ -621,7 +626,66 @@ class TestImagesController(test_utils.BaseTestCase):
created_image.image_id, changes) created_image.image_id, changes)
self.assertEqual(output.extra_properties['x_owner_foo'], 'bar') self.assertEqual(output.extra_properties['x_owner_foo'], 'bar')
def test_prop_protection_with_update_and_permitted_policy(self):
self.set_property_protections(use_policies=True)
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
request = unit_test_utils.get_fake_request(roles=['spl_role'])
image = {'name': 'image-1'}
extra_props = {'spl_creator_policy': 'bar'}
created_image = self.controller.create(request, image=image,
extra_properties=extra_props,
tags=[])
self.assertEqual(created_image.extra_properties['spl_creator_policy'],
'bar')
another_request = unit_test_utils.get_fake_request(roles=['spl_role'])
changes = [
{'op': 'replace', 'path': ['spl_creator_policy'], 'value': 'par'},
]
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
another_request, created_image.image_id, changes)
another_request = unit_test_utils.get_fake_request(roles=['admin'])
output = self.controller.update(another_request,
created_image.image_id, changes)
self.assertEqual(output.extra_properties['spl_creator_policy'],
'par')
def test_prop_protection_with_create_with_patch_and_policy(self):
self.set_property_protections(use_policies=True)
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
request = unit_test_utils.get_fake_request(roles=['spl_role', 'admin'])
image = {'name': 'image-1'}
extra_props = {'spl_default_policy': 'bar'}
created_image = self.controller.create(request, image=image,
extra_properties=extra_props,
tags=[])
another_request = unit_test_utils.get_fake_request(roles=['fake_role'])
changes = [
{'op': 'add', 'path': ['spl_creator_policy'], 'value': 'bar'},
]
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
another_request, created_image.image_id, changes)
another_request = unit_test_utils.get_fake_request(roles=['spl_role'])
output = self.controller.update(another_request,
created_image.image_id, changes)
self.assertEqual(output.extra_properties['spl_creator_policy'],
'bar')
def test_prop_protection_with_create_and_unpermitted_role(self): def test_prop_protection_with_create_and_unpermitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections() self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin']) request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'} image = {'name': 'image-1'}
@ -638,6 +702,11 @@ class TestImagesController(test_utils.BaseTestCase):
created_image.image_id, changes) created_image.image_id, changes)
def test_prop_protection_with_show_and_permitted_role(self): def test_prop_protection_with_show_and_permitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections() self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin']) request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'} image = {'name': 'image-1'}
@ -650,6 +719,11 @@ class TestImagesController(test_utils.BaseTestCase):
self.assertEqual(output.extra_properties['x_owner_foo'], 'bar') self.assertEqual(output.extra_properties['x_owner_foo'], 'bar')
def test_prop_protection_with_show_and_unpermitted_role(self): def test_prop_protection_with_show_and_unpermitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections() self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['member']) request = unit_test_utils.get_fake_request(roles=['member'])
image = {'name': 'image-1'} image = {'name': 'image-1'}
@ -663,6 +737,11 @@ class TestImagesController(test_utils.BaseTestCase):
'x_owner_foo') 'x_owner_foo')
def test_prop_protection_with_update_and_permitted_role(self): def test_prop_protection_with_update_and_permitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections() self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin']) request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'} image = {'name': 'image-1'}
@ -679,6 +758,11 @@ class TestImagesController(test_utils.BaseTestCase):
self.assertEqual(output.extra_properties['x_owner_foo'], 'baz') self.assertEqual(output.extra_properties['x_owner_foo'], 'baz')
def test_prop_protection_with_update_and_unpermitted_role(self): def test_prop_protection_with_update_and_unpermitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections() self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin']) request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'} image = {'name': 'image-1'}
@ -694,6 +778,11 @@ class TestImagesController(test_utils.BaseTestCase):
request, UUID1, changes) request, UUID1, changes)
def test_prop_protection_with_delete_and_permitted_role(self): def test_prop_protection_with_delete_and_permitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections() self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin']) request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'} image = {'name': 'image-1'}
@ -711,6 +800,11 @@ class TestImagesController(test_utils.BaseTestCase):
'x_owner_foo') 'x_owner_foo')
def test_prop_protection_with_delete_and_unpermitted_role(self): def test_prop_protection_with_delete_and_unpermitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections() self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin']) request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'} image = {'name': 'image-1'}

View File

@ -65,11 +65,19 @@ class BaseTestCase(testtools.TestCase):
self.stubs.SmartUnsetAll() self.stubs.SmartUnsetAll()
super(BaseTestCase, self).tearDown() super(BaseTestCase, self).tearDown()
def set_property_protections(self): def set_property_protections(self, use_policies=False):
self.property_file = self._copy_data_file('property-protections.conf', self.unset_property_protections()
self.test_dir) conf_file = "property-protections.conf"
if use_policies:
conf_file = "property-protections-policies.conf"
self.config(property_protection_rule_format="policies")
self.property_file = self._copy_data_file(conf_file, self.test_dir)
self.config(property_protection_file=self.property_file) self.config(property_protection_file=self.property_file)
def unset_property_protections(self):
for section in property_utils.CONFIG.sections():
property_utils.CONFIG.remove_section(section)
def _copy_data_file(self, file_name, dst_dir): def _copy_data_file(self, file_name, dst_dir):
src_file_name = os.path.join('glance/tests/etc', file_name) src_file_name = os.path.join('glance/tests/etc', file_name)
shutil.copy(src_file_name, dst_dir) shutil.copy(src_file_name, dst_dir)