Implement and Enable Community Images

This change replaces the existing boolean 'is_public' column for
the 'images' table with enum 'visibility' column featuring the
four explicit visibility values - public, private, shared,
and community.

This change also implements and enables all backend code to
utilize the new values.

Co-Authored-By: Timothy Symanczyk <timothy_symanczyk@symantec.com>
Co-Authored-By: Dharini Chandrasekar <dharini.chandrasekar@intel.com>

Implements: blueprint community-level-v2-image-sharing
Closes-Bug: #1394299
Closes-Bug: #1452443
Depends-On: I6e3268f3712cbc0aadb51d204c694023b92d55a5
Change-Id: I94bc7708b291ce37319539e27b3e88c9a17e1a9f
This commit is contained in:
Timothy Symanczyk 2016-09-03 07:57:50 -07:00 committed by Dharini Chandrasekar
parent dcdfdf1bbd
commit 265659e8c3
34 changed files with 1078 additions and 293 deletions

View File

@ -71,13 +71,18 @@ The actions that may have a rule enforced on them are:
* ``PUT /v1/images/<IMAGE_ID>``
* ``PUT /v2/images/<IMAGE_ID>``
* ``publicize_image`` - Create or update images with attribute
* ``publicize_image`` - Create or update public images
* ``POST /v1/images`` with attribute ``is_public`` = ``true``
* ``PUT /v1/images/<IMAGE_ID>`` with attribute ``is_public`` = ``true``
* ``POST /v2/images`` with attribute ``visibility`` = ``public``
* ``PUT /v2/images/<IMAGE_ID>`` with attribute ``visibility`` = ``public``
* ``communitize_image`` - Create or update community images
* ``POST /v2/images`` with attribute ``visibility`` = ``community``
* ``PUT /v2/images/<IMAGE_ID>`` with attribute ``visibility`` = ``community``
* ``delete_image`` - Delete an image entity and associated binary data
* ``DELETE /v1/images/<IMAGE_ID>``

View File

@ -8,6 +8,7 @@
"get_images": "",
"modify_image": "",
"publicize_image": "role:admin",
"communitize_image": "",
"copy_from": "",
"download_image": "",

View File

@ -112,6 +112,12 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
return [proxy_image(self.context, i) for i in images]
def _validate_image_accepts_members(visibility):
if visibility != 'shared':
message = _("Only shared images have members.")
raise exception.Forbidden(message)
class ImageMemberRepoProxy(glance.domain.proxy.MemberRepo):
def __init__(self, member_repo, image, context):
@ -124,12 +130,7 @@ class ImageMemberRepoProxy(glance.domain.proxy.MemberRepo):
member_repo,
member_proxy_class=ImageMemberProxy,
member_proxy_kwargs=proxy_kwargs)
self._check_image_visibility()
def _check_image_visibility(self):
if self.image.visibility == 'public':
message = _("Public images do not have members.")
raise exception.Forbidden(message)
_validate_image_accepts_members(self.image.visibility)
def get(self, member_id):
if (self.context.is_admin or
@ -220,9 +221,7 @@ class ImageMemberFactoryProxy(glance.domain.proxy.ImageMembershipFactory):
"for the image.")
raise exception.Forbidden(message)
if image.visibility == 'public':
message = _("Public images do not have members.")
raise exception.Forbidden(message)
_validate_image_accepts_members(image.visibility)
return self.image_member_factory.new_image_member(image, member_id)

View File

@ -122,6 +122,13 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
return super(ImageRepoProxy, self).add(image)
def _enforce_image_visibility(policy, context, visibility, target):
if visibility == 'public':
policy.enforce(context, 'publicize_image', target)
elif visibility == 'community':
policy.enforce(context, 'communitize_image', target)
class ImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, policy):
@ -137,8 +144,8 @@ class ImageProxy(glance.domain.proxy.Image):
@visibility.setter
def visibility(self, value):
if value == 'public':
self.policy.enforce(self.context, 'publicize_image', self.target)
_enforce_image_visibility(self.policy, self.context, value,
self.target)
self.image.visibility = value
@property
@ -206,8 +213,8 @@ class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
proxy_kwargs=proxy_kwargs)
def new_image(self, **kwargs):
if kwargs.get('visibility') == 'public':
self.policy.enforce(self.context, 'publicize_image', {})
_enforce_image_visibility(self.policy, self.context,
kwargs.get('visibility'), {})
return super(ImageFactoryProxy, self).new_image(**kwargs)

View File

@ -50,8 +50,8 @@ class ImageMembersController(object):
def _get_member_repo(self, req, image):
try:
# For public images, a forbidden exception with message
# "Public images do not have members" is thrown.
# For public, private, and community images, a forbidden exception
# with message "Only shared images have members." is thrown.
return self.gateway.get_member_repo(image, req.context)
except exception.Forbidden as e:
msg = (_("Error fetching members of image %(image_id)s: "

View File

@ -624,7 +624,7 @@ class RequestDeserializer(wsgi.JSONRequestDeserializer):
def _get_filters(self, filters):
visibility = filters.get('visibility')
if visibility:
if visibility not in ['public', 'private', 'shared']:
if visibility not in ['community', 'public', 'private', 'shared']:
msg = _('Invalid visibility value: %s') % visibility
raise webob.exc.HTTPBadRequest(explanation=msg)
changes_since = filters.get('changes-since', None)
@ -858,7 +858,7 @@ def get_base_properties():
'visibility': {
'type': 'string',
'description': _('Scope of image accessibility'),
'enum': ['public', 'private'],
'enum': ['community', 'public', 'private', 'shared'],
},
'protected': {
'type': 'boolean',

View File

@ -186,7 +186,6 @@ class ImageRepo(object):
return images
def _format_image_from_db(self, db_image, db_tags):
visibility = 'public' if db_image['is_public'] else 'private'
properties = {}
for prop in db_image.pop('properties'):
# NOTE(markwash) db api requires us to filter deleted
@ -204,7 +203,7 @@ class ImageRepo(object):
status=db_image['status'],
created_at=db_image['created_at'],
updated_at=db_image['updated_at'],
visibility=visibility,
visibility=db_image['visibility'],
min_disk=db_image['min_disk'],
min_ram=db_image['min_ram'],
protected=db_image['protected'],
@ -246,7 +245,7 @@ class ImageRepo(object):
'container_format': image.container_format,
'size': image.size,
'virtual_size': image.virtual_size,
'is_public': image.visibility == 'public',
'visibility': image.visibility,
'properties': dict(image.extra_properties),
}

View File

@ -54,13 +54,14 @@ def _get_client(func):
@_get_client
def image_create(client, values):
def image_create(client, values, v1_mode=False):
"""Create an image from the values dictionary."""
return client.image_create(values=values)
return client.image_create(values=values, v1_mode=v1_mode)
@_get_client
def image_update(client, image_id, values, purge_props=False, from_state=None):
def image_update(client, image_id, values, purge_props=False, from_state=None,
v1_mode=False):
"""
Set the given properties on an image and update it.
@ -68,7 +69,9 @@ def image_update(client, image_id, values, purge_props=False, from_state=None):
"""
return client.image_update(values=values,
image_id=image_id,
purge_props=purge_props, from_state=from_state)
purge_props=purge_props,
from_state=from_state,
v1_mode=v1_mode)
@_get_client
@ -78,9 +81,10 @@ def image_destroy(client, image_id):
@_get_client
def image_get(client, image_id, force_show_deleted=False):
def image_get(client, image_id, force_show_deleted=False, v1_mode=False):
return client.image_get(image_id=image_id,
force_show_deleted=force_show_deleted)
force_show_deleted=force_show_deleted,
v1_mode=v1_mode)
def is_image_visible(context, image, status=None):
@ -93,8 +97,8 @@ def is_image_visible(context, image, status=None):
if image['owner'] is None:
return True
# Image is_public == image visible
if image['is_public']:
# Public or Community visibility == image visible
if image['visibility'] in ['public', 'community']:
return True
# Perform tests based on whether we have an owner
@ -102,13 +106,14 @@ def is_image_visible(context, image, status=None):
if context.owner == image['owner']:
return True
# Figure out if this image is shared with that tenant
members = image_member_find(context,
image_id=image['id'],
member=context.owner,
status=status)
if members:
return True
if 'shared' == image['visibility']:
# Figure out if this image is shared with that tenant
members = image_member_find(context,
image_id=image['id'],
member=context.owner,
status=status)
if members:
return True
# Private image
return False
@ -118,7 +123,7 @@ def is_image_visible(context, image, status=None):
def image_get_all(client, filters=None, marker=None, limit=None,
sort_key=None, sort_dir=None,
member_status='accepted', is_public=None,
admin_as_user=False, return_tag=False):
admin_as_user=False, return_tag=False, v1_mode=False):
"""
Get all images that match zero or more filters.
@ -139,6 +144,8 @@ def image_get_all(client, filters=None, marker=None, limit=None,
:param return_tag: To indicates whether image entry in result includes it
relevant tag entries. This could improve upper-layer
query performance, to prevent using separated calls
:param v1_mode: If true, mutates the 'visibility' value of each image
into the v1-compatible field 'is_public'
"""
sort_key = ['created_at'] if not sort_key else sort_key
sort_dir = ['desc'] if not sort_dir else sort_dir
@ -147,7 +154,8 @@ def image_get_all(client, filters=None, marker=None, limit=None,
member_status=member_status,
is_public=is_public,
admin_as_user=admin_as_user,
return_tag=return_tag)
return_tag=return_tag,
v1_mode=v1_mode)
@_get_client

View File

@ -24,6 +24,7 @@ import six
from glance.common import exception
from glance.common import timeutils
from glance.common import utils
from glance.db import utils as db_utils
from glance.i18n import _, _LI, _LW
@ -199,6 +200,7 @@ def _image_update(image, values, properties):
if 'properties' not in image.keys():
image['properties'] = []
image['properties'].extend(properties)
values = db_utils.ensure_image_dict_v2_compliant(values)
image.update(values)
return image
@ -212,7 +214,7 @@ def _image_format(image_id, **values):
'locations': [],
'status': 'queued',
'protected': False,
'is_public': False,
'visibility': 'shared',
'container_format': None,
'disk_format': None,
'min_ram': 0,
@ -259,27 +261,39 @@ def _filter_images(images, filters, context,
member=context.owner, status=status)
is_member = len(member) > 0
has_ownership = context.owner and image['owner'] == context.owner
can_see = (image['is_public'] or has_ownership or is_member or
(context.is_admin and not admin_as_user))
image_is_public = image['visibility'] == 'public'
image_is_community = image['visibility'] == 'community'
image_is_shared = image['visibility'] == 'shared'
acts_as_admin = context.is_admin and not admin_as_user
can_see = (image_is_public
or image_is_community
or has_ownership
or (is_member and image_is_shared)
or acts_as_admin)
if not can_see:
continue
if visibility:
if visibility == 'public':
if not image['is_public']:
if not image_is_public:
continue
elif visibility == 'private':
if image['is_public']:
if not (image['visibility'] == 'private'):
continue
if not (has_ownership or (context.is_admin
and not admin_as_user)):
if not (has_ownership or acts_as_admin):
continue
elif visibility == 'shared':
if not is_member:
if not image_is_shared:
continue
elif visibility == 'community':
if not image_is_community:
continue
else:
if (not has_ownership) and image_is_community:
continue
if is_public is not None:
if not image['is_public'] == is_public:
if not image_is_public == is_public:
continue
to_add = True
@ -420,17 +434,21 @@ def _image_get(context, image_id, force_show_deleted=False, status=None):
@log_call
def image_get(context, image_id, session=None, force_show_deleted=False):
image = _image_get(context, image_id, force_show_deleted)
return _normalize_locations(context, copy.deepcopy(image),
force_show_deleted=force_show_deleted)
def image_get(context, image_id, session=None, force_show_deleted=False,
v1_mode=False):
image = copy.deepcopy(_image_get(context, image_id, force_show_deleted))
image = _normalize_locations(context, image,
force_show_deleted=force_show_deleted)
if v1_mode:
image = db_utils.mutate_image_dict_to_v1(image)
return image
@log_call
def image_get_all(context, filters=None, marker=None, limit=None,
sort_key=None, sort_dir=None,
member_status='accepted', is_public=None,
admin_as_user=False, return_tag=False):
admin_as_user=False, return_tag=False, v1_mode=False):
filters = filters or {}
images = DATA['images'].values()
images = _filter_images(images, filters, context, member_status,
@ -446,6 +464,9 @@ def image_get_all(context, filters=None, marker=None, limit=None,
force_show_deleted=force_show_deleted)
if return_tag:
img['tags'] = image_tag_get_all(context, img['id'])
if v1_mode:
img = db_utils.mutate_image_dict_to_v1(img)
res.append(img)
return res
@ -677,7 +698,7 @@ def _normalize_locations(context, image, force_show_deleted=False):
@log_call
def image_create(context, image_values):
def image_create(context, image_values, v1_mode=False):
global DATA
image_id = image_values.get('id', str(uuid.uuid4()))
@ -691,7 +712,7 @@ def image_create(context, image_values):
'virtual_size', 'checksum', 'locations', 'owner',
'protected', 'is_public', 'container_format',
'disk_format', 'created_at', 'updated_at', 'deleted',
'deleted_at', 'properties', 'tags'])
'deleted_at', 'properties', 'tags', 'visibility'])
incorrect_keys = set(image_values.keys()) - allowed_keys
if incorrect_keys:
@ -702,12 +723,15 @@ def image_create(context, image_values):
DATA['images'][image_id] = image
DATA['tags'][image_id] = image.pop('tags', [])
return _normalize_locations(context, copy.deepcopy(image))
image = _normalize_locations(context, copy.deepcopy(image))
if v1_mode:
image = db_utils.mutate_image_dict_to_v1(image)
return image
@log_call
def image_update(context, image_id, image_values, purge_props=False,
from_state=None):
from_state=None, v1_mode=False):
global DATA
try:
image = DATA['images'][image_id]
@ -730,7 +754,11 @@ def image_update(context, image_id, image_values, purge_props=False,
image['updated_at'] = timeutils.utcnow()
_image_update(image, image_values, new_properties)
DATA['images'][image_id] = image
return _normalize_locations(context, copy.deepcopy(image))
image = _normalize_locations(context, copy.deepcopy(image))
if v1_mode:
image = db_utils.mutate_image_dict_to_v1(image)
return image
@log_call
@ -828,8 +856,8 @@ def is_image_visible(context, image, status=None):
if image['owner'] is None:
return True
# Image is_public == image visible
if image['is_public']:
# Public or Community visibility == image visible
if image['visibility'] in ['public', 'community']:
return True
# Perform tests based on whether we have an owner
@ -840,12 +868,14 @@ def is_image_visible(context, image, status=None):
# Figure out if this image is shared with that tenant
if status == 'all':
status = None
members = image_member_find(context,
image_id=image['id'],
member=context.owner,
status=status)
if members:
return True
if 'shared' == image['visibility']:
members = image_member_find(context,
image_id=image['id'],
member=context.owner,
status=status)
if members:
return True
# Private image
return False

View File

@ -53,6 +53,7 @@ from glance.db.sqlalchemy.metadef_api import object as metadef_object_api
from glance.db.sqlalchemy.metadef_api import property as metadef_property_api
from glance.db.sqlalchemy.metadef_api import tag as metadef_tag_api
from glance.db.sqlalchemy import models
from glance.db import utils as db_utils
from glance import glare as ga
from glance.i18n import _, _LW, _LI
@ -133,28 +134,35 @@ def _check_mutate_authorization(context, image_ref):
if not is_image_mutable(context, image_ref):
LOG.warn(_LW("Attempted to modify image user did not own."))
msg = _("You do not own this image")
if image_ref.is_public:
exc_class = exception.ForbiddenPublicImage
else:
if image_ref.visibility in ['private', 'shared']:
exc_class = exception.Forbidden
else:
# 'public', or 'community'
exc_class = exception.ForbiddenPublicImage
raise exc_class(msg)
def image_create(context, values):
def image_create(context, values, v1_mode=False):
"""Create an image from the values dictionary."""
return _image_update(context, values, None, purge_props=False)
image = _image_update(context, values, None, purge_props=False)
if v1_mode:
image = db_utils.mutate_image_dict_to_v1(image)
return image
def image_update(context, image_id, values, purge_props=False,
from_state=None):
from_state=None, v1_mode=False):
"""
Set the given properties on an image and update it.
:raises: ImageNotFound if image does not exist.
"""
return _image_update(context, values, image_id, purge_props,
from_state=from_state)
image = _image_update(context, values, image_id, purge_props,
from_state=from_state)
if v1_mode:
image = db_utils.mutate_image_dict_to_v1(image)
return image
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
@ -213,11 +221,14 @@ def _normalize_tags(image):
return image
def image_get(context, image_id, session=None, force_show_deleted=False):
def image_get(context, image_id, session=None, force_show_deleted=False,
v1_mode=False):
image = _image_get(context, image_id, session=session,
force_show_deleted=force_show_deleted)
image = _normalize_locations(context, image.to_dict(),
force_show_deleted=force_show_deleted)
if v1_mode:
image = db_utils.mutate_image_dict_to_v1(image)
return image
@ -290,8 +301,8 @@ def is_image_visible(context, image, status=None):
if image['owner'] is None:
return True
# Image is_public == image visible
if image['is_public']:
# Public or Community visibility == image visible
if image['visibility'] in ['public', 'community']:
return True
# Perform tests based on whether we have an owner
@ -299,13 +310,14 @@ def is_image_visible(context, image, status=None):
if context.owner == image['owner']:
return True
# Figure out if this image is shared with that tenant
members = image_member_find(context,
image_id=image['id'],
member=context.owner,
status=status)
if members:
return True
if 'shared' == image['visibility']:
# Figure out if this image is shared with that tenant
members = image_member_find(context,
image_id=image['id'],
member=context.owner,
status=status)
if members:
return True
# Private image
return False
@ -456,18 +468,15 @@ def _make_conditions_from_filters(filters, is_public=None):
tag_conditions = []
if is_public is not None:
image_conditions.append(models.Image.is_public == is_public)
if is_public:
image_conditions.append(models.Image.visibility == 'public')
else:
image_conditions.append(models.Image.visibility != 'public')
if 'checksum' in filters:
checksum = filters.pop('checksum')
image_conditions.append(models.Image.checksum == checksum)
if 'is_public' in filters:
key = 'is_public'
value = filters.pop('is_public')
prop_filters = _make_image_property_condition(key=key, value=value)
prop_conditions.append(prop_filters)
for (k, v) in filters.pop('properties', {}).items():
prop_filters = _make_image_property_condition(key=k, value=v)
prop_conditions.append(prop_filters)
@ -571,6 +580,7 @@ def _select_images_query(context, image_conditions, admin_as_user,
models.Image.members).filter(img_conditional_clause)
if regular_user:
member_filters = [models.ImageMember.deleted == False]
member_filters.extend([models.Image.visibility == 'shared'])
if context.owner is not None:
member_filters.extend([models.ImageMember.member == context.owner])
if member_status != 'all':
@ -578,14 +588,13 @@ def _select_images_query(context, image_conditions, admin_as_user,
models.ImageMember.status == member_status])
query_member = query_member.filter(sa_sql.and_(*member_filters))
# NOTE(venkatesh) if the 'visibility' is set to 'shared', we just
# query the image members table. No union is required.
if visibility is not None and visibility == 'shared':
return query_member
query_image = session.query(models.Image).filter(img_conditional_clause)
if regular_user:
query_image = query_image.filter(models.Image.is_public == True)
visibility_filters = [
models.Image.visibility == 'public',
models.Image.visibility == 'community',
]
query_image = query_image .filter(sa_sql.or_(*visibility_filters))
query_image_owner = None
if context.owner is not None:
query_image_owner = session.query(models.Image).filter(
@ -604,7 +613,7 @@ def _select_images_query(context, image_conditions, admin_as_user,
def image_get_all(context, filters=None, marker=None, limit=None,
sort_key=None, sort_dir=None,
member_status='accepted', is_public=None,
admin_as_user=False, return_tag=False):
admin_as_user=False, return_tag=False, v1_mode=False):
"""
Get all images that match zero or more filters.
@ -625,6 +634,8 @@ def image_get_all(context, filters=None, marker=None, limit=None,
:param return_tag: To indicates whether image entry in result includes it
relevant tag entries. This could improve upper-layer
query performance, to prevent using separated calls
:param v1_mode: If true, mutates the 'visibility' value of each image
into the v1-compatible field 'is_public'
"""
sort_key = ['created_at'] if not sort_key else sort_key
@ -650,12 +661,22 @@ def image_get_all(context, filters=None, marker=None, limit=None,
admin_as_user,
member_status,
visibility)
if visibility is not None:
if visibility == 'public':
query = query.filter(models.Image.is_public == True)
elif visibility == 'private':
query = query.filter(models.Image.is_public == False)
# with a visibility, we always and only include images with that
# visibility
query = query.filter(models.Image.visibility == visibility)
elif context.owner is None:
# without either a visibility or an owner, we never include
# 'community' images
query = query.filter(models.Image.visibility != 'community')
else:
# without a visibility and with an owner, we only want to include
# 'community' images if and only if they are owned by this owner
community_filters = [
models.Image.owner == context.owner,
models.Image.visibility != 'community',
]
query = query.filter(sa_sql.or_(*community_filters))
if prop_cond:
for prop_condition in prop_cond:
@ -697,6 +718,8 @@ def image_get_all(context, filters=None, marker=None, limit=None,
force_show_deleted=showing_deleted)
if return_tag:
image_dict = _normalize_tags(image_dict)
if v1_mode:
image_dict = db_utils.mutate_image_dict_to_v1(image_dict)
images.append(image_dict)
return images
@ -804,7 +827,11 @@ def _image_update(context, values, image_id, purge_props=False,
if 'min_disk' in values:
values['min_disk'] = int(values['min_disk'] or 0)
values['is_public'] = bool(values.get('is_public', False))
if 'is_public' in values:
values = db_utils.ensure_image_dict_v2_compliant(values)
else:
values['visibility'] = values.get('visibility', 'shared')
values['protected'] = bool(values.get('protected', False))
image_ref = models.Image()

View File

@ -0,0 +1,51 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, Enum, Index, MetaData, Table, select, not_
from sqlalchemy.engine import reflection
def upgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
images = Table('images', meta, autoload=True)
enum = Enum('private', 'public', 'shared', 'community', metadata=meta,
name='image_visibility')
enum.create()
images.create_column(Column('visibility', enum, nullable=False,
server_default='shared'))
visibility_index = Index('visibility_image_idx', images.c.visibility)
visibility_index.create(migrate_engine)
images.update(values={'visibility': 'public'}).where(
images.c.is_public).execute()
image_members = Table('image_members', meta, autoload=True)
# NOTE(dharinic): Mark all the non-public images as 'private' first
images.update().values(visibility='private').where(
not_(images.c.is_public)).execute()
# NOTE(dharinic): Identify 'shared' images from the above
images.update().values(visibility='shared').where(
images.c.visibility != 'public' and images.c.id.in_(select(
[image_members.c.image_id]).distinct().where(
not_(image_members.c.deleted)))).execute()
insp = reflection.Inspector.from_engine(migrate_engine)
for index in insp.get_indexes('images'):
if 'ix_images_is_public' == index['name']:
Index('ix_images_is_public', images.c.is_public).drop()
break
images.c.is_public.drop()

View File

@ -0,0 +1,162 @@
CREATE TEMPORARY TABLE images_backup (
id VARCHAR(36) NOT NULL,
name VARCHAR(255),
size INTEGER,
status VARCHAR(30) NOT NULL,
is_public BOOLEAN NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME,
deleted_at DATETIME,
deleted BOOLEAN NOT NULL,
disk_format VARCHAR(20),
container_format VARCHAR(20),
checksum VARCHAR(32),
owner VARCHAR(255),
min_disk INTEGER NOT NULL,
min_ram INTEGER NOT NULL,
protected BOOLEAN DEFAULT 0 NOT NULL,
virtual_size INTEGER,
PRIMARY KEY (id),
CHECK (is_public IN (0, 1)),
CHECK (deleted IN (0, 1)),
CHECK (protected IN (0, 1))
);
INSERT INTO images_backup
SELECT id,
name,
size,
status,
is_public,
created_at,
updated_at,
deleted_at,
deleted,
disk_format,
container_format,
checksum,
owner,
min_disk,
min_ram,
protected,
virtual_size
FROM images;
DROP TABLE images;
CREATE TABLE images (
id VARCHAR(36) NOT NULL,
name VARCHAR(255),
size INTEGER,
status VARCHAR(30) NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME,
deleted_at DATETIME,
deleted BOOLEAN NOT NULL,
disk_format VARCHAR(20),
container_format VARCHAR(20),
checksum VARCHAR(32),
owner VARCHAR(255),
min_disk INTEGER NOT NULL,
min_ram INTEGER NOT NULL,
protected BOOLEAN DEFAULT 0 NOT NULL,
virtual_size INTEGER,
visibility VARCHAR(9) DEFAULT 'shared' NOT NULL,
PRIMARY KEY (id),
CHECK (deleted IN (0, 1)),
CHECK (protected IN (0, 1)),
CONSTRAINT image_visibility CHECK (visibility IN ('private', 'public', 'shared', 'community'))
);
CREATE INDEX checksum_image_idx ON images (checksum);
CREATE INDEX visibility_image_idx ON images (visibility);
CREATE INDEX ix_images_deleted ON images (deleted);
CREATE INDEX owner_image_idx ON images (owner);
CREATE INDEX created_at_image_idx ON images (created_at);
CREATE INDEX updated_at_image_idx ON images (updated_at);
-- Copy over all the 'public' rows
INSERT INTO images (
id,
name,
size,
status,
created_at,
updated_at,
deleted_at,
deleted,
disk_format,
container_format,
checksum,
owner,
min_disk,
min_ram,
protected,
virtual_size
)
SELECT id,
name,
size,
status,
created_at,
updated_at,
deleted_at,
deleted,
disk_format,
container_format,
checksum,
owner,
min_disk,
min_ram,
protected,
virtual_size
FROM images_backup
WHERE is_public=1;
UPDATE images SET visibility='public';
-- Now copy over the 'private' rows
INSERT INTO images (
id,
name,
size,
status,
created_at,
updated_at,
deleted_at,
deleted,
disk_format,
container_format,
checksum,
owner,
min_disk,
min_ram,
protected,
virtual_size
)
SELECT id,
name,
size,
status,
created_at,
updated_at,
deleted_at,
deleted,
disk_format,
container_format,
checksum,
owner,
min_disk,
min_ram,
protected,
virtual_size
FROM images_backup
WHERE is_public=0;
UPDATE images SET visibility='private' WHERE visibility='shared';
UPDATE images SET visibility='shared' WHERE visibility <> 'public' AND id IN (SELECT DISTINCT image_id FROM image_members WHERE deleted != 1);
DROP TABLE images_backup;

View File

@ -26,6 +26,7 @@ from sqlalchemy import BigInteger
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import Enum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import ForeignKey
from sqlalchemy import Index
@ -114,7 +115,7 @@ class Image(BASE, GlanceBase):
"""Represents an image in the datastore."""
__tablename__ = 'images'
__table_args__ = (Index('checksum_image_idx', 'checksum'),
Index('ix_images_is_public', 'is_public'),
Index('visibility_image_idx', 'visibility'),
Index('ix_images_deleted', 'deleted'),
Index('owner_image_idx', 'owner'),
Index('created_at_image_idx', 'created_at'),
@ -128,7 +129,9 @@ class Image(BASE, GlanceBase):
size = Column(BigInteger().with_variant(Integer, "sqlite"))
virtual_size = Column(BigInteger().with_variant(Integer, "sqlite"))
status = Column(String(30), nullable=False)
is_public = Column(Boolean, nullable=False, default=False)
visibility = Column(Enum('private', 'public', 'shared', 'community',
name='image_visibility'), nullable=False,
server_default='shared')
checksum = Column(String(32))
min_disk = Column(Integer, nullable=False, default=0)
min_ram = Column(Integer, nullable=False, default=0)

40
glance/db/utils.py Normal file
View File

@ -0,0 +1,40 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from glance.common import exception
def mutate_image_dict_to_v1(image):
"""
Replaces a v2-style image dictionary's 'visibility' member with the
equivalent v1-style 'is_public' member.
"""
visibility = image.pop('visibility')
is_image_public = 'public' == visibility
image['is_public'] = is_image_public
return image
def ensure_image_dict_v2_compliant(image):
"""
Accepts an image dictionary that contains a v1-style 'is_public' member
and returns the equivalent v2-style image dictionary.
"""
if ('is_public' in image):
if ('visibility' in image):
msg = _("Specifying both 'visibility' and 'is_public' is not "
"permiitted.")
raise exception.Invalid(msg)
else:
image['visibility'] = ('public' if image.pop('is_public') else
'shared')
return image

View File

@ -68,7 +68,7 @@ class ImageFactory(object):
if key in properties:
raise exception.ReservedProperty(property=key)
def new_image(self, image_id=None, name=None, visibility='private',
def new_image(self, image_id=None, name=None, visibility='shared',
min_disk=0, min_ram=0, protected=False, owner=None,
disk_format=None, container_format=None,
extra_properties=None, tags=None, **other_args):
@ -116,7 +116,7 @@ class Image(object):
self.created_at = created_at
self.updated_at = updated_at
self.name = kwargs.pop('name', None)
self.visibility = kwargs.pop('visibility', 'private')
self.visibility = kwargs.pop('visibility', 'shared')
self.min_disk = kwargs.pop('min_disk', 0)
self.min_ram = kwargs.pop('min_ram', 0)
self.protected = kwargs.pop('protected', False)
@ -172,9 +172,9 @@ class Image(object):
@visibility.setter
def visibility(self, visibility):
if visibility not in ('public', 'private'):
raise ValueError(_('Visibility must be either "public" '
'or "private"'))
if visibility not in ('community', 'public', 'private', 'shared'):
raise ValueError(_('Visibility must be one of "community", '
'"public", "private", or "shared"'))
self._visibility = visibility
@property

View File

@ -157,6 +157,7 @@ def format_image_notification(image):
'size': image.size,
'virtual_size': image.virtual_size,
'is_public': image.visibility == 'public',
'visibility': image.visibility,
'properties': dict(image.extra_properties),
'tags': list(image.tags),
'deleted': False,

View File

@ -119,7 +119,7 @@ class Controller(object):
del params['is_public']
try:
return self.db_api.image_get_all(context, filters=filters,
**params)
v1_mode=True, **params)
except exception.ImageNotFound:
LOG.warn(_LW("Invalid marker. Image %(id)s could not be "
"found.") % {'id': params.get('marker')})
@ -358,7 +358,7 @@ class Controller(object):
def show(self, req, id):
"""Return data about the given image id."""
try:
image = self.db_api.image_get(req.context, id)
image = self.db_api.image_get(req.context, id, v1_mode=True)
LOG.debug("Successfully retrieved image %(id)s", {'id': id})
except exception.ImageNotFound:
LOG.info(_LI("Image %(id)s not found"), {'id': id})
@ -438,7 +438,8 @@ class Controller(object):
try:
image_data = _normalize_image_location_for_db(image_data)
image_data = self.db_api.image_create(req.context, image_data)
image_data = self.db_api.image_create(req.context, image_data,
v1_mode=True)
image_data = dict(image=make_image_dict(image_data))
LOG.info(_LI("Successfully created image %(id)s"),
{'id': image_data['image']['id']})
@ -494,7 +495,8 @@ class Controller(object):
updated_image = self.db_api.image_update(req.context, id,
image_data,
purge_props=purge_props,
from_state=from_state)
from_state=from_state,
v1_mode=True)
LOG.info(_LI("Updating metadata for image %(id)s"), {'id': id})
return dict(image=make_image_dict(updated_image))

View File

@ -63,7 +63,7 @@ class Controller(object):
Get the members of an image.
"""
try:
self.db_api.image_get(req.context, image_id)
self.db_api.image_get(req.context, image_id, v1_mode=True)
except exception.NotFound:
msg = _("Image %(id)s not found") % {'id': image_id}
LOG.warn(msg)
@ -97,7 +97,7 @@ class Controller(object):
# Make sure the image exists
try:
image = self.db_api.image_get(req.context, image_id)
image = self.db_api.image_get(req.context, image_id, v1_mode=True)
except exception.NotFound:
msg = _("Image %(id)s not found") % {'id': image_id}
LOG.warn(msg)
@ -217,7 +217,7 @@ class Controller(object):
# Make sure the image exists
try:
image = self.db_api.image_get(req.context, image_id)
image = self.db_api.image_get(req.context, image_id, v1_mode=True)
except exception.NotFound:
msg = _("Image %(id)s not found") % {'id': image_id}
LOG.warn(msg)
@ -281,7 +281,7 @@ class Controller(object):
# Make sure the image exists
try:
image = self.db_api.image_get(req.context, image_id)
image = self.db_api.image_get(req.context, image_id, v1_mode=True)
except exception.NotFound:
msg = _("Image %(id)s not found") % {'id': image_id}
LOG.warn(msg)

View File

@ -9,6 +9,7 @@
"get_images": "",
"modify_image": "",
"publicize_image": "",
"communitize_image": "",
"copy_from": "",
"download_image": "",

View File

@ -61,6 +61,8 @@ def build_image_fixture(**kwargs):
'metadata': {}, 'status': 'active'}],
'properties': {},
}
if 'visibility' in kwargs:
image.pop('is_public')
image.update(kwargs)
return image
@ -156,7 +158,7 @@ class DriverTests(object):
self.assertEqual(0, image['min_ram'])
self.assertEqual(0, image['min_disk'])
self.assertIsNone(image['owner'])
self.assertFalse(image['is_public'])
self.assertEqual('shared', image['visibility'])
self.assertIsNone(image['size'])
self.assertIsNone(image['checksum'])
self.assertIsNone(image['disk_format'])
@ -1083,7 +1085,7 @@ class DriverTests(object):
auth_token='user:%s:user' % TENANT2,
owner_is_tenant=False)
UUIDX = str(uuid.uuid4())
# We need private image and context.owner should not match image
# We need a shared image and context.owner should not match image
# owner
self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
@ -1136,7 +1138,7 @@ class DriverTests(object):
auth_token='user:%s:user' % TENANT2,
owner_is_tenant=False)
UUIDX = str(uuid.uuid4())
# We need private image and context.owner should not match image
# We need a shared image and context.owner should not match image
# owner
image = self.db_api.image_create(ctxt1, {'id': UUIDX,
'status': 'queued',
@ -1156,6 +1158,31 @@ class DriverTests(object):
result = self.db_api.is_image_visible(ctxt2, image)
self.assertFalse(result)
def test_is_community_image_visible(self):
TENANT1 = str(uuid.uuid4())
TENANT2 = str(uuid.uuid4())
owners_ctxt = context.RequestContext(is_admin=False, tenant=TENANT1,
auth_token='user:%s:user'
% TENANT1, owner_is_tenant=True)
viewing_ctxt = context.RequestContext(is_admin=False, user=TENANT2,
auth_token='user:%s:user'
% TENANT2, owner_is_tenant=False)
UUIDX = str(uuid.uuid4())
# We need a community image and context.owner should not match image
# owner
image = self.db_api.image_create(owners_ctxt,
{'id': UUIDX,
'status': 'queued',
'visibility': 'community',
'owner': TENANT1})
# image should be visible in every context
result = self.db_api.is_image_visible(owners_ctxt, image)
self.assertTrue(result)
result = self.db_api.is_image_visible(viewing_ctxt, image)
self.assertTrue(result)
def test_image_tag_create(self):
tag = self.db_api.image_tag_create(self.context, UUID1, 'snap')
self.assertEqual('snap', tag)
@ -1985,13 +2012,13 @@ class TestVisibility(test_utils.BaseTestCase):
'Tenant 1': self.tenant1,
'Tenant 2': self.tenant2,
}
visibilities = {'public': True, 'private': False}
visibilities = ['community', 'private', 'public', 'shared']
for owner_label, owner in owners.items():
for visibility, is_public in visibilities.items():
for visibility in visibilities:
fixture = {
'name': '%s, %s' % (owner_label, visibility),
'owner': owner,
'is_public': is_public,
'visibility': visibility,
}
fixtures.append(fixture)
return [build_image_fixture(**f) for f in fixtures]
@ -2003,95 +2030,125 @@ class TestVisibility(test_utils.BaseTestCase):
class VisibilityTests(object):
def test_unknown_admin_sees_all(self):
def test_unknown_admin_sees_all_but_community(self):
images = self.db_api.image_get_all(self.admin_none_context)
self.assertEqual(8, len(images))
self.assertEqual(12, len(images))
def test_unknown_admin_is_public_true(self):
images = self.db_api.image_get_all(self.admin_none_context,
is_public=True)
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_unknown_admin_is_public_false(self):
images = self.db_api.image_get_all(self.admin_none_context,
is_public=False)
self.assertEqual(4, len(images))
self.assertEqual(8, len(images))
for i in images:
self.assertFalse(i['is_public'])
self.assertTrue(i['visibility'] in ['shared', 'private'])
def test_unknown_admin_is_public_none(self):
images = self.db_api.image_get_all(self.admin_none_context)
self.assertEqual(8, len(images))
self.assertEqual(12, len(images))
def test_unknown_admin_visibility_public(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_unknown_admin_visibility_shared(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'shared'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('shared', i['visibility'])
def test_unknown_admin_visibility_private(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'private'})
self.assertEqual(4, len(images))
for i in images:
self.assertFalse(i['is_public'])
self.assertEqual('private', i['visibility'])
def test_known_admin_sees_all(self):
def test_unknown_admin_visibility_community(self):
images = self.db_api.image_get_all(self.admin_none_context,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('community', i['visibility'])
def test_known_admin_sees_all_but_others_community_images(self):
images = self.db_api.image_get_all(self.admin_context)
self.assertEqual(8, len(images))
self.assertEqual(13, len(images))
def test_known_admin_is_public_true(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True)
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_known_admin_is_public_false(self):
images = self.db_api.image_get_all(self.admin_context,
is_public=False)
self.assertEqual(4, len(images))
self.assertEqual(9, len(images))
for i in images:
self.assertFalse(i['is_public'])
self.assertTrue(i['visibility']
in ['shared', 'private', 'community'])
def test_known_admin_is_public_none(self):
images = self.db_api.image_get_all(self.admin_context)
self.assertEqual(8, len(images))
self.assertEqual(13, len(images))
def test_admin_as_user_true(self):
images = self.db_api.image_get_all(self.admin_context,
admin_as_user=True)
self.assertEqual(5, len(images))
self.assertEqual(7, len(images))
for i in images:
self.assertTrue(i['is_public'] or i['owner'] == self.admin_tenant)
self.assertTrue(('public' == i['visibility'])
or i['owner'] == self.admin_tenant)
def test_known_admin_visibility_public(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_known_admin_visibility_shared(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'shared'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('shared', i['visibility'])
def test_known_admin_visibility_private(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'private'})
self.assertEqual(4, len(images))
for i in images:
self.assertFalse(i['is_public'])
self.assertEqual('private', i['visibility'])
def test_known_admin_visibility_community(self):
images = self.db_api.image_get_all(self.admin_context,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('community', i['visibility'])
def test_what_unknown_user_sees(self):
images = self.db_api.image_get_all(self.none_context)
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_unknown_user_is_public_true(self):
images = self.db_api.image_get_all(self.none_context, is_public=True)
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_unknown_user_is_public_false(self):
images = self.db_api.image_get_all(self.none_context, is_public=False)
@ -2101,25 +2158,37 @@ class VisibilityTests(object):
images = self.db_api.image_get_all(self.none_context)
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_unknown_user_visibility_public(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_unknown_user_visibility_shared(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'shared'})
self.assertEqual(0, len(images))
def test_unknown_user_visibility_private(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'private'})
self.assertEqual(0, len(images))
def test_unknown_user_visibility_community(self):
images = self.db_api.image_get_all(self.none_context,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('community', i['visibility'])
def test_what_tenant1_sees(self):
images = self.db_api.image_get_all(self.tenant1_context)
self.assertEqual(5, len(images))
self.assertEqual(7, len(images))
for i in images:
if not i['is_public']:
if not ('public' == i['visibility']):
self.assertEqual(i['owner'], self.tenant1)
def test_tenant1_is_public_true(self):
@ -2127,20 +2196,22 @@ class VisibilityTests(object):
is_public=True)
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_tenant1_is_public_false(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=False)
self.assertEqual(1, len(images))
self.assertFalse(images[0]['is_public'])
self.assertEqual(images[0]['owner'], self.tenant1)
self.assertEqual(3, len(images))
for i in images:
self.assertEqual(i['owner'], self.tenant1)
self.assertTrue(i['visibility']
in ['private', 'shared', 'community'])
def test_tenant1_is_public_none(self):
images = self.db_api.image_get_all(self.tenant1_context)
self.assertEqual(5, len(images))
self.assertEqual(7, len(images))
for i in images:
if not i['is_public']:
if not ('public' == i['visibility']):
self.assertEqual(self.tenant1, i['owner'])
def test_tenant1_visibility_public(self):
@ -2148,20 +2219,34 @@ class VisibilityTests(object):
filters={'visibility': 'public'})
self.assertEqual(4, len(images))
for i in images:
self.assertTrue(i['is_public'])
self.assertEqual('public', i['visibility'])
def test_tenant1_visibility_shared(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'visibility': 'shared'})
self.assertEqual(1, len(images))
self.assertEqual('shared', images[0]['visibility'])
self.assertEqual(self.tenant1, images[0]['owner'])
def test_tenant1_visibility_private(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'visibility': 'private'})
self.assertEqual(1, len(images))
self.assertFalse(images[0]['is_public'])
self.assertEqual('private', images[0]['visibility'])
self.assertEqual(self.tenant1, images[0]['owner'])
def test_tenant1_visibility_community(self):
images = self.db_api.image_get_all(self.tenant1_context,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
for i in images:
self.assertEqual('community', i['visibility'])
def _setup_is_public_red_herring(self):
values = {
'name': 'Red Herring',
'owner': self.tenant1,
'is_public': False,
'visibility': 'shared',
'properties': {'is_public': 'silly'}
}
fixture = build_image_fixture(**values)
@ -2196,6 +2281,16 @@ class VisibilityTests(object):
filters={'visibility': 'public'})
self.assertEqual(0, len(images))
def test_admin_is_public_true_and_visibility_shared(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True,
filters={'visibility': 'shared'})
self.assertEqual(0, len(images))
def test_admin_is_public_false_and_visibility_shared(self):
images = self.db_api.image_get_all(self.admin_context, is_public=False,
filters={'visibility': 'shared'})
self.assertEqual(4, len(images))
def test_admin_is_public_true_and_visibility_private(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True,
filters={'visibility': 'private'})
@ -2206,6 +2301,16 @@ class VisibilityTests(object):
filters={'visibility': 'private'})
self.assertEqual(4, len(images))
def test_admin_is_public_true_and_visibility_community(self):
images = self.db_api.image_get_all(self.admin_context, is_public=True,
filters={'visibility': 'community'})
self.assertEqual(0, len(images))
def test_admin_is_public_false_and_visibility_community(self):
images = self.db_api.image_get_all(self.admin_context, is_public=False,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
def test_tenant1_is_public_true_and_visibility_public(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True,
@ -2218,6 +2323,18 @@ class VisibilityTests(object):
filters={'visibility': 'public'})
self.assertEqual(0, len(images))
def test_tenant1_is_public_true_and_visibility_shared(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True,
filters={'visibility': 'shared'})
self.assertEqual(0, len(images))
def test_tenant1_is_public_false_and_visibility_shared(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=False,
filters={'visibility': 'shared'})
self.assertEqual(1, len(images))
def test_tenant1_is_public_true_and_visibility_private(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True,
@ -2230,6 +2347,18 @@ class VisibilityTests(object):
filters={'visibility': 'private'})
self.assertEqual(1, len(images))
def test_tenant1_is_public_true_and_visibility_community(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=True,
filters={'visibility': 'community'})
self.assertEqual(0, len(images))
def test_tenant1_is_public_false_and_visibility_community(self):
images = self.db_api.image_get_all(self.tenant1_context,
is_public=False,
filters={'visibility': 'community'})
self.assertEqual(4, len(images))
class TestMembershipVisibility(test_utils.BaseTestCase):
def setUp(self):
@ -2262,7 +2391,8 @@ class TestMembershipVisibility(test_utils.BaseTestCase):
members=[self.tenant1, self.tenant2])
def _create_image(self, name, owner, members=None):
image = build_image_fixture(name=name, owner=owner, is_public=False)
image = build_image_fixture(name=name, owner=owner,
visibility='shared')
self.image_ids[(owner, name)] = image['id']
self.db_api.image_create(self.admin_ctx, image)
for member in members or []:

View File

@ -187,7 +187,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -251,7 +251,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-2',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image2_id,
'protected': False,
'file': '/v2/images/%s/file' % image2_id,
@ -806,7 +806,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -875,7 +875,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -947,7 +947,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -1292,6 +1292,48 @@ class TestImages(functional.FunctionalTest):
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(http.OK, response.status_code)
def test_owning_tenant_can_communitize_image(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"communitize_image": "tenant:%(owner)s",
"get_image": "tenant:%(owner)s",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
headers = self._headers({
'Content-Type': 'application/openstack-images-v2.1-json-patch',
'X-Tenant-Id': TENANT1,
})
doc = [{'op': 'replace', 'path': '/visibility', 'value': 'community'}]
data = jsonutils.dumps(doc)
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code)
def test_owning_tenant_can_delete_image(self):
rules = {
"context_is_admin": "role:admin",
@ -1567,7 +1609,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -1716,7 +1758,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -1854,7 +1896,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -1882,7 +1924,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -2131,7 +2173,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -2159,7 +2201,7 @@ class TestImages(functional.FunctionalTest):
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'visibility': 'shared',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
@ -2733,7 +2775,7 @@ class TestImages(functional.FunctionalTest):
self.start_servers(**kwargs)
owners = ['admin', 'tenant1', 'tenant2', 'none']
visibilities = ['public', 'private']
visibilities = ['public', 'private', 'shared', 'community']
for owner in owners:
for visibility in visibilities:
@ -2761,7 +2803,7 @@ class TestImages(functional.FunctionalTest):
# 1. Known user sees public and their own images
images = list_images('tenant1')
self.assertEqual(5, len(images))
self.assertEqual(7, len(images))
for image in images:
self.assertTrue(image['visibility'] == 'public'
or 'tenant1' in image['name'])
@ -2779,54 +2821,102 @@ class TestImages(functional.FunctionalTest):
self.assertEqual('private', image['visibility'])
self.assertIn('tenant1', image['name'])
# 4. Unknown user sees only public images
# 4. Known user, visibility=shared, sees only their shared image
images = list_images('tenant1', visibility='shared')
self.assertEqual(1, len(images))
image = images[0]
self.assertEqual('shared', image['visibility'])
self.assertIn('tenant1', image['name'])
# 5. Known user, visibility=community, sees all community images
images = list_images('tenant1', visibility='community')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('community', image['visibility'])
# 6. Unknown user sees only public images
images = list_images('none')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('public', image['visibility'])
# 5. Unknown user, visibility=public, sees only public images
# 7. Unknown user, visibility=public, sees only public images
images = list_images('none', visibility='public')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('public', image['visibility'])
# 6. Unknown user, visibility=private, sees no images
# 8. Unknown user, visibility=private, sees no images
images = list_images('none', visibility='private')
self.assertEqual(0, len(images))
# 7. Unknown admin sees all images
images = list_images('none', role='admin')
self.assertEqual(8, len(images))
# 9. Unknown user, visibility=shared, sees no images
images = list_images('none', visibility='shared')
self.assertEqual(0, len(images))
# 8. Unknown admin, visibility=public, shows only public images
# 10. Unknown user, visibility=community, sees only community images
images = list_images('none', visibility='community')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('community', image['visibility'])
# 11. Unknown admin sees all images except for community images
images = list_images('none', role='admin')
self.assertEqual(12, len(images))
# 12. Unknown admin, visibility=public, shows only public images
images = list_images('none', role='admin', visibility='public')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('public', image['visibility'])
# 9. Unknown admin, visibility=private, sees only private images
# 13. Unknown admin, visibility=private, sees only private images
images = list_images('none', role='admin', visibility='private')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('private', image['visibility'])
# 10. Known admin sees all images
images = list_images('admin', role='admin')
self.assertEqual(8, len(images))
# 14. Unknown admin, visibility=shared, sees only shared images
images = list_images('none', role='admin', visibility='shared')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('shared', image['visibility'])
# 11. Known admin, visibility=public, sees all public images
# 15. Unknown admin, visibility=community, sees only community images
images = list_images('none', role='admin', visibility='community')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('community', image['visibility'])
# 16. Known admin sees all images, except community images owned by
# others
images = list_images('admin', role='admin')
self.assertEqual(13, len(images))
# 17. Known admin, visibility=public, sees all public images
images = list_images('admin', role='admin', visibility='public')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('public', image['visibility'])
# 12. Known admin, visibility=private, sees all private images
# 18. Known admin, visibility=private, sees all private images
images = list_images('admin', role='admin', visibility='private')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('private', image['visibility'])
# 19. Known admin, visibility=shared, sees all shared images
images = list_images('admin', role='admin', visibility='shared')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('shared', image['visibility'])
# 20. Known admin, visibility=community, sees all community images
images = list_images('admin', role='admin', visibility='community')
self.assertEqual(4, len(images))
for image in images:
self.assertEqual('community', image['visibility'])
self.stop_servers()
def test_update_locations(self):
@ -3260,7 +3350,7 @@ class TestImageMembers(functional.FunctionalTest):
self.assertEqual(0, len(images))
owners = ['tenant1', 'tenant2', 'admin']
visibilities = ['public', 'private']
visibilities = ['community', 'private', 'public', 'shared']
image_fixture = []
for owner in owners:
for visibility in visibilities:
@ -3277,12 +3367,12 @@ class TestImageMembers(functional.FunctionalTest):
self.assertEqual(http.CREATED, response.status_code)
image_fixture.append(jsonutils.loads(response.text))
# Image list should contain 4 images for tenant1
# Image list should contain 6 images for tenant1
path = self._url('/v2/images')
response = requests.get(path, headers=get_header('tenant1'))
self.assertEqual(http.OK, response.status_code)
images = jsonutils.loads(response.text)['images']
self.assertEqual(4, len(images))
self.assertEqual(6, len(images))
# Image list should contain 3 images for TENANT3
path = self._url('/v2/images')
@ -3291,14 +3381,14 @@ class TestImageMembers(functional.FunctionalTest):
images = jsonutils.loads(response.text)['images']
self.assertEqual(3, len(images))
# Add Image member for tenant1-private image
path = self._url('/v2/images/%s/members' % image_fixture[1]['id'])
# Add Image member for tenant1-shared image
path = self._url('/v2/images/%s/members' % image_fixture[3]['id'])
body = jsonutils.dumps({'member': TENANT3})
response = requests.post(path, headers=get_header('tenant1'),
data=body)
self.assertEqual(http.OK, response.status_code)
image_member = jsonutils.loads(response.text)
self.assertEqual(image_fixture[1]['id'], image_member['image_id'])
self.assertEqual(image_fixture[3]['id'], image_member['image_id'])
self.assertEqual(TENANT3, image_member['member_id'])
self.assertIn('created_at', image_member)
self.assertIn('updated_at', image_member)
@ -3340,7 +3430,7 @@ class TestImageMembers(functional.FunctionalTest):
self.assertEqual(http.OK, response.status_code)
images = jsonutils.loads(response.text)['images']
self.assertEqual(1, len(images))
self.assertEqual(images[0]['name'], 'tenant1-private')
self.assertEqual(images[0]['name'], 'tenant1-shared')
# Image list should contain 0 image for TENANT3 with status rejected
# and visibility shared
@ -3366,54 +3456,54 @@ class TestImageMembers(functional.FunctionalTest):
images = jsonutils.loads(response.text)['images']
self.assertEqual(0, len(images))
# Image tenant2-private's image members list should contain no members
path = self._url('/v2/images/%s/members' % image_fixture[3]['id'])
# Image tenant2-shared's image members list should contain no members
path = self._url('/v2/images/%s/members' % image_fixture[7]['id'])
response = requests.get(path, headers=get_header('tenant2'))
self.assertEqual(http.OK, response.status_code)
body = jsonutils.loads(response.text)
self.assertEqual(0, len(body['members']))
# Tenant 1, who is the owner cannot change status of image member
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
body = jsonutils.dumps({'status': 'accepted'})
response = requests.put(path, headers=get_header('tenant1'), data=body)
self.assertEqual(http.FORBIDDEN, response.status_code)
# Tenant 1, who is the owner can get status of its own image member
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
response = requests.get(path, headers=get_header('tenant1'))
self.assertEqual(http.OK, response.status_code)
body = jsonutils.loads(response.text)
self.assertEqual('pending', body['status'])
self.assertEqual(image_fixture[1]['id'], body['image_id'])
self.assertEqual(image_fixture[3]['id'], body['image_id'])
self.assertEqual(TENANT3, body['member_id'])
# Tenant 3, who is the member can get status of its own status
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
response = requests.get(path, headers=get_header(TENANT3))
self.assertEqual(http.OK, response.status_code)
body = jsonutils.loads(response.text)
self.assertEqual('pending', body['status'])
self.assertEqual(image_fixture[1]['id'], body['image_id'])
self.assertEqual(image_fixture[3]['id'], body['image_id'])
self.assertEqual(TENANT3, body['member_id'])
# Tenant 2, who not the owner cannot get status of image member
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
response = requests.get(path, headers=get_header('tenant2'))
self.assertEqual(http.NOT_FOUND, response.status_code)
# Tenant 3 can change status of image member
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
body = jsonutils.dumps({'status': 'accepted'})
response = requests.put(path, headers=get_header(TENANT3), data=body)
self.assertEqual(http.OK, response.status_code)
image_member = jsonutils.loads(response.text)
self.assertEqual(image_fixture[1]['id'], image_member['image_id'])
self.assertEqual(image_fixture[3]['id'], image_member['image_id'])
self.assertEqual(TENANT3, image_member['member_id'])
self.assertEqual('accepted', image_member['status'])
@ -3426,84 +3516,110 @@ class TestImageMembers(functional.FunctionalTest):
self.assertEqual(4, len(images))
# Tenant 3 invalid status change
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
body = jsonutils.dumps({'status': 'invalid-status'})
response = requests.put(path, headers=get_header(TENANT3), data=body)
self.assertEqual(http.BAD_REQUEST, response.status_code)
# Owner cannot change status of image
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
body = jsonutils.dumps({'status': 'accepted'})
response = requests.put(path, headers=get_header('tenant1'), data=body)
self.assertEqual(http.FORBIDDEN, response.status_code)
# Add Image member for tenant2-private image
path = self._url('/v2/images/%s/members' % image_fixture[3]['id'])
# Add Image member for tenant2-shared image
path = self._url('/v2/images/%s/members' % image_fixture[7]['id'])
body = jsonutils.dumps({'member': TENANT4})
response = requests.post(path, headers=get_header('tenant2'),
data=body)
self.assertEqual(http.OK, response.status_code)
image_member = jsonutils.loads(response.text)
self.assertEqual(image_fixture[3]['id'], image_member['image_id'])
self.assertEqual(image_fixture[7]['id'], image_member['image_id'])
self.assertEqual(TENANT4, image_member['member_id'])
self.assertIn('created_at', image_member)
self.assertIn('updated_at', image_member)
# Add Image member to public image
path = self._url('/v2/images/%s/members' % image_fixture[2]['id'])
body = jsonutils.dumps({'member': TENANT2})
response = requests.post(path, headers=get_header('tenant1'),
data=body)
self.assertEqual(http.FORBIDDEN, response.status_code)
# Add Image member to private image
path = self._url('/v2/images/%s/members' % image_fixture[1]['id'])
body = jsonutils.dumps({'member': TENANT2})
response = requests.post(path, headers=get_header('tenant1'),
data=body)
self.assertEqual(http.FORBIDDEN, response.status_code)
# Add Image member to community image
path = self._url('/v2/images/%s/members' % image_fixture[0]['id'])
body = jsonutils.dumps({'member': TENANT2})
response = requests.post(path, headers=get_header('tenant1'),
data=body)
self.assertEqual(http.FORBIDDEN, response.status_code)
# Image tenant1-private's members list should contain 1 member
path = self._url('/v2/images/%s/members' % image_fixture[1]['id'])
# Image tenant1-shared's members list should contain 1 member
path = self._url('/v2/images/%s/members' % image_fixture[3]['id'])
response = requests.get(path, headers=get_header('tenant1'))
self.assertEqual(http.OK, response.status_code)
body = jsonutils.loads(response.text)
self.assertEqual(1, len(body['members']))
# Admin can see any members
path = self._url('/v2/images/%s/members' % image_fixture[1]['id'])
path = self._url('/v2/images/%s/members' % image_fixture[3]['id'])
response = requests.get(path, headers=get_header('tenant1', 'admin'))
self.assertEqual(http.OK, response.status_code)
body = jsonutils.loads(response.text)
self.assertEqual(1, len(body['members']))
# Image members not found for private image not owned by TENANT 1
path = self._url('/v2/images/%s/members' % image_fixture[3]['id'])
path = self._url('/v2/images/%s/members' % image_fixture[7]['id'])
response = requests.get(path, headers=get_header('tenant1'))
self.assertEqual(http.NOT_FOUND, response.status_code)
# Image members forbidden for public image
path = self._url('/v2/images/%s/members' % image_fixture[2]['id'])
response = requests.get(path, headers=get_header('tenant1'))
self.assertIn("Only shared images have members", response.text)
self.assertEqual(http.FORBIDDEN, response.status_code)
# Image members forbidden for community image
path = self._url('/v2/images/%s/members' % image_fixture[0]['id'])
response = requests.get(path, headers=get_header('tenant1'))
self.assertIn("Public images do not have members", response.text)
self.assertIn("Only shared images have members", response.text)
self.assertEqual(http.FORBIDDEN, response.status_code)
# Image members forbidden for private image
path = self._url('/v2/images/%s/members' % image_fixture[1]['id'])
response = requests.get(path, headers=get_header('tenant1'))
self.assertIn("Only shared images have members", response.text)
self.assertEqual(http.FORBIDDEN, response.status_code)
# Image Member Cannot delete Image membership
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
response = requests.delete(path, headers=get_header(TENANT3))
self.assertEqual(http.FORBIDDEN, response.status_code)
# Delete Image member
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
path = self._url('/v2/images/%s/members/%s' % (image_fixture[3]['id'],
TENANT3))
response = requests.delete(path, headers=get_header('tenant1'))
self.assertEqual(http.NO_CONTENT, response.status_code)
# Now the image has no members
path = self._url('/v2/images/%s/members' % image_fixture[1]['id'])
path = self._url('/v2/images/%s/members' % image_fixture[3]['id'])
response = requests.get(path, headers=get_header('tenant1'))
self.assertEqual(http.OK, response.status_code)
body = jsonutils.loads(response.text)
self.assertEqual(0, len(body['members']))
# Adding 11 image members should fail since configured limit is 10
path = self._url('/v2/images/%s/members' % image_fixture[1]['id'])
path = self._url('/v2/images/%s/members' % image_fixture[3]['id'])
for i in range(10):
body = jsonutils.dumps({'member': str(uuid.uuid4())})
response = requests.post(path, headers=get_header('tenant1'),
@ -3516,17 +3632,41 @@ class TestImageMembers(functional.FunctionalTest):
self.assertEqual(http.REQUEST_ENTITY_TOO_LARGE, response.status_code)
# Get Image member should return not found for public image
path = self._url('/v2/images/%s/members/%s' % (image_fixture[2]['id'],
TENANT3))
response = requests.get(path, headers=get_header('tenant1'))
self.assertEqual(http.NOT_FOUND, response.status_code)
# Get Image member should return not found for community image
path = self._url('/v2/images/%s/members/%s' % (image_fixture[0]['id'],
TENANT3))
response = requests.get(path, headers=get_header('tenant1'))
self.assertEqual(http.NOT_FOUND, response.status_code)
# Get Image member should return not found for private image
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
TENANT3))
response = requests.get(path, headers=get_header('tenant1'))
self.assertEqual(http.NOT_FOUND, response.status_code)
# Delete Image member should return forbidden for public image
path = self._url('/v2/images/%s/members/%s' % (image_fixture[2]['id'],
TENANT3))
response = requests.delete(path, headers=get_header('tenant1'))
self.assertEqual(http.FORBIDDEN, response.status_code)
# Delete Image member should return forbidden for community image
path = self._url('/v2/images/%s/members/%s' % (image_fixture[0]['id'],
TENANT3))
response = requests.delete(path, headers=get_header('tenant1'))
self.assertEqual(http.FORBIDDEN, response.status_code)
# Delete Image member should return forbidden for private image
path = self._url('/v2/images/%s/members/%s' % (image_fixture[1]['id'],
TENANT3))
response = requests.delete(path, headers=get_header('tenant1'))
self.assertEqual(http.FORBIDDEN, response.status_code)
self.stop_servers()

View File

@ -25,7 +25,7 @@ def _fake_image(owner, is_public):
return {
'id': None,
'owner': owner,
'is_public': is_public,
'visibility': 'public' if is_public else 'shared',
}

View File

@ -95,6 +95,8 @@ def _db_fixture(id, **kwargs):
'min_ram': None,
'min_disk': None,
}
if 'visibility' in kwargs:
obj.pop('is_public')
obj.update(kwargs)
return obj
@ -248,6 +250,11 @@ class TestImageRepo(test_utils.BaseTestCase):
def test_list_private_images(self):
filters = {'visibility': 'private'}
images = self.image_repo.list(filters=filters)
self.assertEqual(0, len(images))
def test_list_shared_images(self):
filters = {'visibility': 'shared'}
images = self.image_repo.list(filters=filters)
image_ids = set([i.image_id for i in images])
self.assertEqual(set([UUID2]), image_ids)
@ -488,7 +495,7 @@ class TestImageMemberRepo(test_utils.BaseTestCase):
_db_fixture(UUID1, owner=TENANT1, name='1', size=256,
status='active'),
_db_fixture(UUID2, owner=TENANT1, name='2',
size=512, is_public=False),
size=512, visibility='shared'),
]
[self.db.image_create(None, image) for image in self.images]

View File

@ -50,7 +50,7 @@ class TestImageFactory(test_utils.BaseTestCase):
self.assertIsNotNone(image.created_at)
self.assertEqual(image.created_at, image.updated_at)
self.assertEqual('queued', image.status)
self.assertEqual('private', image.visibility)
self.assertEqual('shared', image.visibility)
self.assertIsNone(image.owner)
self.assertIsNone(image.name)
self.assertIsNone(image.size)
@ -70,7 +70,7 @@ class TestImageFactory(test_utils.BaseTestCase):
self.assertIsNotNone(image.created_at)
self.assertEqual(image.created_at, image.updated_at)
self.assertEqual('queued', image.status)
self.assertEqual('private', image.visibility)
self.assertEqual('shared', image.visibility)
self.assertEqual(TENANT1, image.owner)
self.assertEqual('image-1', image.name)
self.assertIsNone(image.size)
@ -93,7 +93,7 @@ class TestImageFactory(test_utils.BaseTestCase):
self.assertIsNotNone(image.created_at)
self.assertEqual(image.created_at, image.updated_at)
self.assertEqual('queued', image.status)
self.assertEqual('private', image.visibility)
self.assertEqual('shared', image.visibility)
self.assertIsNone(image.owner)
self.assertEqual('image-1', image.name)
self.assertIsNone(image.size)
@ -153,6 +153,8 @@ class TestImage(test_utils.BaseTestCase):
def test_visibility_enumerated(self):
self.image.visibility = 'public'
self.image.visibility = 'private'
self.image.visibility = 'shared'
self.image.visibility = 'community'
self.assertRaises(ValueError, setattr,
self.image, 'visibility', 'ellison')

View File

@ -1489,6 +1489,94 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
metadef_resource_types.name, engine)
)
def _pre_upgrade_045(self, engine):
images = db_utils.get_table(engine, 'images')
now = datetime.datetime.now()
image_members = db_utils.get_table(engine, 'image_members')
# inserting a public image record
public_temp = dict(deleted=False,
created_at=now,
status='active',
is_public=True,
min_disk=0,
min_ram=0,
id='public_id')
images.insert().values(public_temp).execute()
# inserting a non-public image record for 'shared' visibility test
shared_temp = dict(deleted=False,
created_at=now,
status='active',
is_public=False,
min_disk=0,
min_ram=0,
id='shared_id')
images.insert().values(shared_temp).execute()
# inserting a non-public image records for 'private' visbility test
private_temp = dict(deleted=False,
created_at=now,
status='active',
is_public=False,
min_disk=0,
min_ram=0,
id='private_id_1')
images.insert().values(private_temp).execute()
private_temp = dict(deleted=False,
created_at=now,
status='active',
is_public=False,
min_disk=0,
min_ram=0,
id='private_id_2')
images.insert().values(private_temp).execute()
# adding an image member for checking 'shared' visbility
temp = dict(deleted=False,
created_at=now,
image_id='shared_id',
member='fake_member_452',
can_share=True,
id=45)
image_members.insert().values(temp).execute()
# adding an image member, but marking it deleted,
# for testing 'private' visibility
temp = dict(deleted=True,
created_at=now,
image_id='private_id_2',
member='fake_member_451',
can_share=True,
id=451)
image_members.insert().values(temp).execute()
def _check_045(self, engine, data):
# check that after migration, 'visbility' column is introduced
images = db_utils.get_table(engine, 'images')
self.assertIn('visibility', images.c)
self.assertNotIn('is_public', images.c)
# tests to identify the visbilities of images created above
rows = images.select().where(
images.c.id == 'public_id').execute().fetchall()
self.assertEqual(1, len(rows))
self.assertEqual('public', rows[0][16])
rows = images.select().where(
images.c.id == 'shared_id').execute().fetchall()
self.assertEqual(1, len(rows))
self.assertEqual('shared', rows[0][16])
rows = images.select().where(
images.c.id == 'private_id_1').execute().fetchall()
self.assertEqual(1, len(rows))
self.assertEqual('private', rows[0][16])
rows = images.select().where(
images.c.id == 'private_id_2').execute().fetchall()
self.assertEqual(1, len(rows))
self.assertEqual('private', rows[0][16])
def assert_table(self, engine, table_name, indices, columns):
table = db_utils.get_table(engine, table_name)
index_data = [(index.name, index.columns.keys()) for index in

View File

@ -240,6 +240,22 @@ class TestImagePolicy(test_utils.BaseTestCase):
self.policy.enforce.assert_called_once_with({}, "publicize_image",
image.target)
def test_communitize_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden,
setattr, image, 'visibility', 'community')
self.assertEqual('private', image.visibility)
self.policy.enforce.assert_called_once_with({}, "communitize_image",
image.target)
def test_communitize_image_allowed(self):
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image.visibility = 'community'
self.assertEqual('community', image.visibility)
self.policy.enforce.assert_called_once_with({}, "communitize_image",
image.target)
def test_delete_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
@ -328,7 +344,7 @@ class TestImagePolicy(test_utils.BaseTestCase):
self.policy.enforce.assert_called_once_with({}, "add_image",
image.target)
def test_new_image_visibility(self):
def test_new_image_visibility_public_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_factory = glance.api.policy.ImageFactoryProxy(
self.image_factory_stub, {}, self.policy)
@ -342,6 +358,24 @@ class TestImagePolicy(test_utils.BaseTestCase):
image_factory.new_image(visibility='public')
self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
def test_new_image_visibility_community_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_factory = glance.api.policy.ImageFactoryProxy(
self.image_factory_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image_factory.new_image,
visibility='community')
self.policy.enforce.assert_called_once_with({},
"communitize_image",
{})
def test_new_image_visibility_community_allowed(self):
image_factory = glance.api.policy.ImageFactoryProxy(
self.image_factory_stub, {}, self.policy)
image_factory.new_image(visibility='community')
self.policy.enforce.assert_called_once_with({},
"communitize_image",
{})
def test_image_get_data_policy_enforced_with_target(self):
extra_properties = {
'test_key': 'test_4321'

View File

@ -1225,7 +1225,7 @@ class TestRegistryAPI(base.IsolatedUnitTest, test_utils.RegistryAPIMixIn):
def test_create_image(self):
"""Tests that the /images POST registry API creates the image"""
fixture = self.get_minimal_fixture()
fixture = self.get_minimal_fixture(is_public=True)
body = jsonutils.dump_as_bytes(dict(image=fixture))
res = self.get_api_response_ext(http.OK, body=body,

View File

@ -375,7 +375,7 @@ class TestRegistryV1Client(base.IsolatedUnitTest, test_utils.RegistryAPIMixIn):
def test_get_image_details(self):
"""Tests that the detailed info about public images returned"""
fixture = self.get_fixture(id=UUID2, name='fake image #2',
properties={}, size=19)
properties={}, size=19, is_public=True)
images = self.client.get_images_detailed()
@ -646,7 +646,7 @@ class TestRegistryV1Client(base.IsolatedUnitTest, test_utils.RegistryAPIMixIn):
def test_add_image_basic(self):
"""Tests that we can add image metadata and returns the new id"""
fixture = self.get_fixture()
fixture = self.get_fixture(is_public=True)
new_image = self.client.add_image(fixture)
@ -663,7 +663,8 @@ class TestRegistryV1Client(base.IsolatedUnitTest, test_utils.RegistryAPIMixIn):
def test_add_image_with_properties(self):
"""Tests that we can add image metadata with properties"""
fixture = self.get_fixture(location="file:///tmp/glance-tests/2",
properties={'distro': 'Ubuntu 10.04 LTS'})
properties={'distro': 'Ubuntu 10.04 LTS'},
is_public=True)
new_image = self.client.add_image(fixture)

View File

@ -33,7 +33,7 @@ def _db_fixture(id, **kwargs):
obj = {
'id': id,
'name': None,
'is_public': False,
'visibility': 'shared',
'properties': {},
'checksum': None,
'owner': None,
@ -90,7 +90,7 @@ class TestImageActionsController(base.IsolatedUnitTest):
self.images = [
_db_fixture(UUID1, owner=TENANT1, checksum=CHKSUM,
name='1', size=256, virtual_size=1024,
is_public=True,
visibility='public',
locations=[{'url': '%s/%s' % (BASE_URI, UUID1),
'metadata': {}, 'status': 'active'}],
disk_format='raw',

View File

@ -50,7 +50,7 @@ def _db_fixture(id, **kwargs):
obj = {
'id': id,
'name': None,
'is_public': False,
'visibility': 'shared',
'properties': {},
'checksum': None,
'owner': None,
@ -113,7 +113,7 @@ class TestImageMembersController(test_utils.BaseTestCase):
def _create_images(self):
self.images = [
_db_fixture(UUID1, owner=TENANT1, name='1', size=256,
is_public=True,
visibility='public',
locations=[{'url': '%s/%s' % (BASE_URI, UUID1),
'metadata': {}, 'status': 'active'}]),
_db_fixture(UUID2, owner=TENANT1, name='2', size=512),
@ -151,7 +151,7 @@ class TestImageMembersController(test_utils.BaseTestCase):
self.assertEqual({'members': []}, output)
def test_index_member_view(self):
# UUID3 is a private image owned by TENANT3
# UUID3 is a shared image owned by TENANT3
# UUID3 has members TENANT2 and TENANT4
# When TENANT4 lists members for UUID3, should not see TENANT2
request = unit_test_utils.get_fake_request(tenant=TENANT4)

View File

@ -60,7 +60,7 @@ def _db_fixture(id, **kwargs):
obj = {
'id': id,
'name': None,
'is_public': False,
'visibility': 'shared',
'properties': {},
'checksum': None,
'owner': None,
@ -140,7 +140,7 @@ class TestImagesController(base.IsolatedUnitTest):
self.images = [
_db_fixture(UUID1, owner=TENANT1, checksum=CHKSUM,
name='1', size=256, virtual_size=1024,
is_public=True,
visibility='public',
locations=[{'url': '%s/%s' % (BASE_URI, UUID1),
'metadata': {}, 'status': 'active'}],
disk_format='raw',
@ -148,7 +148,7 @@ class TestImagesController(base.IsolatedUnitTest):
status='active'),
_db_fixture(UUID2, owner=TENANT1, checksum=CHKSUM1,
name='2', size=512, virtual_size=2048,
is_public=True,
visibility='public',
disk_format='raw',
container_format='bare',
status='active',
@ -157,7 +157,7 @@ class TestImagesController(base.IsolatedUnitTest):
'bar': 'foo'}),
_db_fixture(UUID3, owner=TENANT3, checksum=CHKSUM1,
name='3', size=512, virtual_size=2048,
is_public=True, tags=['windows', '64bit', 'x86']),
visibility='public', tags=['windows', '64bit', 'x86']),
_db_fixture(UUID4, owner=TENANT4, name='4',
size=1024, virtual_size=3072),
]
@ -356,15 +356,29 @@ class TestImagesController(base.IsolatedUnitTest):
self.assertEqual(0, len(images))
def test_index_with_non_default_is_public_filter(self):
image = _db_fixture(str(uuid.uuid4()),
is_public=False,
owner=TENANT3)
self.db.image_create(None, image)
private_uuid = str(uuid.uuid4())
new_image = _db_fixture(private_uuid,
visibility='private',
owner=TENANT3)
self.db.image_create(None, new_image)
path = '/images?visibility=private'
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request,
filters={'visibility': 'private'})
self.assertEqual(2, len(output['images']))
self.assertEqual(1, len(output['images']))
actual = set([image.image_id for image in output['images']])
expected = set([private_uuid])
self.assertEqual(expected, actual)
path = '/images?visibility=shared'
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request,
filters={'visibility': 'shared'})
self.assertEqual(1, len(output['images']))
actual = set([image.image_id for image in output['images']])
expected = set([UUID4])
self.assertEqual(expected, actual)
def test_index_with_many_filters(self):
url = '/images?status=queued&name=3'
@ -594,7 +608,7 @@ class TestImagesController(base.IsolatedUnitTest):
self.assertEqual('image-1', output.name)
self.assertEqual({}, output.extra_properties)
self.assertEqual(set([]), output.tags)
self.assertEqual('private', output.visibility)
self.assertEqual('shared', output.visibility)
output_logs = self.notifier.get_logs()
self.assertEqual(1, len(output_logs))
output_log = output_logs[0]
@ -612,7 +626,7 @@ class TestImagesController(base.IsolatedUnitTest):
self.assertEqual('image-1', output.name)
self.assertEqual({}, output.extra_properties)
self.assertEqual(set([]), output.tags)
self.assertEqual('private', output.visibility)
self.assertEqual('shared', output.visibility)
output_logs = self.notifier.get_logs()
self.assertEqual(0, len(output_logs))
@ -626,7 +640,7 @@ class TestImagesController(base.IsolatedUnitTest):
self.assertEqual('image-1', output.name)
self.assertEqual(image_properties, output.extra_properties)
self.assertEqual(set([]), output.tags)
self.assertEqual('private', output.visibility)
self.assertEqual('shared', output.visibility)
output_logs = self.notifier.get_logs()
self.assertEqual(1, len(output_logs))
output_log = output_logs[0]
@ -2192,6 +2206,16 @@ class TestImagesControllerPolicies(base.IsolatedUnitTest):
self.assertRaises(webob.exc.HTTPForbidden, self.controller.create,
request, image, extra_properties, tags)
def test_create_community_image_unauthorized(self):
rules = {"communitize_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
image = {'name': 'image-c1', 'visibility': 'community'}
extra_properties = {}
tags = []
self.assertRaises(webob.exc.HTTPForbidden, self.controller.create,
request, image, extra_properties, tags)
def test_update_unauthorized(self):
rules = {"modify_image": False}
self.policy.set_rules(rules)
@ -2209,6 +2233,15 @@ class TestImagesControllerPolicies(base.IsolatedUnitTest):
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
request, UUID1, changes)
def test_update_communitize_image_unauthorized(self):
rules = {"communitize_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
changes = [{'op': 'replace', 'path': ['visibility'],
'value': 'community'}]
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
request, UUID1, changes)
def test_update_depublicize_image_unauthorized(self):
rules = {"publicize_image": False}
self.policy.set_rules(rules)
@ -2218,6 +2251,15 @@ class TestImagesControllerPolicies(base.IsolatedUnitTest):
output = self.controller.update(request, UUID1, changes)
self.assertEqual('private', output.visibility)
def test_update_decommunitize_image_unauthorized(self):
rules = {"communitize_image": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
changes = [{'op': 'replace', 'path': ['visibility'],
'value': 'private'}]
output = self.controller.update(request, UUID1, changes)
self.assertEqual('private', output.visibility)
def test_update_get_image_location_unauthorized(self):
rules = {"get_image_location": False}
self.policy.set_rules(rules)

View File

@ -57,7 +57,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
'status': 'active',
'disk_format': 'ami',
'container_format': 'ami',
'is_public': False,
'visibility': 'shared',
'created_at': uuid1_time,
'updated_at': uuid1_time,
'deleted_at': None,
@ -74,7 +74,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
'status': 'active',
'disk_format': 'vhd',
'container_format': 'ovf',
'is_public': True,
'visibility': 'public',
'created_at': uuid2_time,
'updated_at': uuid2_time,
'deleted_at': None,
@ -185,7 +185,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -199,7 +199,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -213,7 +213,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID5 = _gen_uuid()
extra_fixture = {'id': UUID5,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -251,7 +251,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': None,
@ -284,7 +284,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': None,
@ -317,7 +317,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': None,
'container_format': 'ovf',
'name': 'Fake image',
@ -350,7 +350,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': None,
'container_format': 'ovf',
'name': 'Fake image',
@ -383,7 +383,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': None,
'name': 'Fake image',
@ -416,7 +416,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': None,
'name': 'Fake image',
@ -465,7 +465,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -479,7 +479,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -514,7 +514,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -527,7 +527,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
extra_fixture = {'id': _gen_uuid(),
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -561,7 +561,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
"""
extra_fixture = {'id': _gen_uuid(),
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -572,7 +572,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
extra_fixture = {'id': _gen_uuid(),
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -607,7 +607,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
extra_id = _gen_uuid()
extra_fixture = {'id': extra_id,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'image-extra-1',
@ -720,7 +720,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -734,7 +734,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -748,7 +748,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID5 = _gen_uuid()
extra_fixture = {'id': UUID5,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -784,7 +784,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
@ -796,7 +796,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'xyz',
@ -807,7 +807,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID5 = _gen_uuid()
extra_fixture = {'id': UUID5,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': None,
@ -842,7 +842,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'queued',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
@ -854,7 +854,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'xyz',
@ -890,7 +890,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'ami',
'container_format': 'ami',
'name': 'asdf',
@ -904,7 +904,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vdi',
'container_format': 'ovf',
'name': 'xyz',
@ -938,7 +938,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'ami',
'container_format': 'ami',
'name': 'asdf',
@ -952,7 +952,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'iso',
'container_format': 'bare',
'name': 'xyz',
@ -985,7 +985,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'ami',
'container_format': 'ami',
'name': 'asdf',
@ -997,7 +997,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'iso',
'container_format': 'bare',
'name': 'xyz',
@ -1033,7 +1033,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -1047,7 +1047,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -1085,7 +1085,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -1099,7 +1099,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
@ -1138,7 +1138,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
@ -1152,7 +1152,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'xyz',
@ -1166,7 +1166,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID5 = _gen_uuid()
extra_fixture = {'id': UUID5,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
@ -1219,7 +1219,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
@ -1233,7 +1233,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'xyz',
@ -1247,7 +1247,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
UUID5 = _gen_uuid()
extra_fixture = {'id': UUID5,
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
@ -1320,7 +1320,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
"""Tests that the registry API creates the image"""
fixture = {'name': 'fake public image',
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf'}
@ -1346,7 +1346,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
def test_create_image_with_min_disk(self):
"""Tests that the registry API creates the image"""
fixture = {'name': 'fake public image',
'is_public': True,
'visibility': 'public',
'status': 'active',
'min_disk': 5,
'disk_format': 'vhd',
@ -1370,7 +1370,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
def test_create_image_with_min_ram(self):
"""Tests that the registry API creates the image"""
fixture = {'name': 'fake public image',
'is_public': True,
'visibility': 'public',
'status': 'active',
'min_ram': 256,
'disk_format': 'vhd',
@ -1395,7 +1395,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
"""Tests that the registry API creates the image"""
fixture = {'name': 'fake public image',
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf'}
@ -1418,7 +1418,7 @@ class TestRegistryRPC(base.IsolatedUnitTest):
"""Tests that the registry API creates the image"""
fixture = {'name': 'fake public image',
'status': 'active',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf'}

View File

@ -68,7 +68,7 @@ class TestRegistryV2Client(base.IsolatedUnitTest,
uuid2_time = uuid1_time + datetime.timedelta(seconds=5)
self.FIXTURES = [
self.get_extra_fixture(
id=UUID1, name='fake image #1', is_public=False,
id=UUID1, name='fake image #1', visibility='shared',
disk_format='ami', container_format='ami', size=13,
virtual_size=26, properties={'type': 'kernel'},
location="swift://user:passwd@acct/container/obj.tar.0",
@ -469,7 +469,8 @@ class TestRegistryV2Client(base.IsolatedUnitTest,
def test_image_get(self):
"""Tests that the detailed info about an image returned"""
fixture = self.get_fixture(id=UUID1, name='fake image #1',
is_public=False, size=13, virtual_size=26,
visibility='shared',
size=13, virtual_size=26,
disk_format='ami', container_format='ami')
data = self.client.image_get(image_id=UUID1)

View File

@ -492,17 +492,21 @@ class RegistryAPIMixIn(object):
'status': 'active',
'disk_format': 'vhd',
'container_format': 'ovf',
'is_public': True,
'visibility': 'public',
'size': 20,
'checksum': None}
if 'is_public' in kwargs:
fixture.pop('visibility')
fixture.update(kwargs)
return fixture
def get_minimal_fixture(self, **kwargs):
fixture = {'name': 'fake public image',
'is_public': True,
'visibility': 'public',
'disk_format': 'vhd',
'container_format': 'ovf'}
if 'is_public' in kwargs:
fixture.pop('visibility')
fixture.update(kwargs)
return fixture