commit 6afd6df9d478762b5027ab83e2f2f64884f128be Author: Flavio Percoco Date: Fri Jan 24 18:30:46 2014 +0100 Copying from glance diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..bda3c483 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +*.pyc +*.swp +*.log +.glance-venv +.venv +.tox +.coverage +cover/* +nosetests.xml +coverage.xml +glance.sqlite +AUTHORS +ChangeLog +build +doc/source/api +dist +*.egg +glance.egg-info +tests.sqlite +glance/versioninfo +# Files created by doc build +doc/source/api + +# IDE files +.project +.pydevproject diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..68c771a0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,176 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + diff --git a/README.rst b/README.rst new file mode 100644 index 00000000..a8becb5f --- /dev/null +++ b/README.rst @@ -0,0 +1,4 @@ +Glance Store Library +===================== + +Glance's stores library diff --git a/glance/__init__.py b/glance/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/glance/store/__init__.py b/glance/store/__init__.py new file mode 100644 index 00000000..75885000 --- /dev/null +++ b/glance/store/__init__.py @@ -0,0 +1,715 @@ +# Copyright 2010-2011 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import sys + +from oslo.config import cfg + +from glance.common import exception +from glance.common import utils +import glance.context +import glance.domain.proxy +from glance.openstack.common import importutils +import glance.openstack.common.log as logging +from glance.store import location +from glance.store import scrubber + +LOG = logging.getLogger(__name__) + +store_opts = [ + cfg.ListOpt('known_stores', + default=[ + 'glance.store.filesystem.Store', + 'glance.store.http.Store', + 'glance.store.rbd.Store', + 'glance.store.s3.Store', + 'glance.store.swift.Store', + 'glance.store.sheepdog.Store', + 'glance.store.cinder.Store', + ], + help=_('List of which store classes and store class locations ' + 'are currently known to glance at startup.')), + cfg.StrOpt('default_store', default='file', + help=_("Default scheme to use to store image data. The " + "scheme must be registered by one of the stores " + "defined by the 'known_stores' config option.")), + cfg.StrOpt('scrubber_datadir', + default='/var/lib/glance/scrubber', + help=_('Directory that the scrubber will use to track ' + 'information about what to delete. ' + 'Make sure this is set in glance-api.conf and ' + 'glance-scrubber.conf')), + cfg.BoolOpt('delayed_delete', default=False, + help=_('Turn on/off delayed delete.')), + cfg.IntOpt('scrub_time', default=0, + help=_('The amount of time in seconds to delay before ' + 'performing a delete.')), +] + +CONF = cfg.CONF +CONF.register_opts(store_opts) + + +class BackendException(Exception): + pass + + +class UnsupportedBackend(BackendException): + pass + + +class Indexable(object): + + """ + Wrapper that allows an iterator or filelike be treated as an indexable + data structure. This is required in the case where the return value from + Store.get() is passed to Store.add() when adding a Copy-From image to a + Store where the client library relies on eventlet GreenSockets, in which + case the data to be written is indexed over. + """ + + def __init__(self, wrapped, size): + """ + Initialize the object + + :param wrappped: the wrapped iterator or filelike. + :param size: the size of data available + """ + self.wrapped = wrapped + self.size = int(size) if size else (wrapped.len + if hasattr(wrapped, 'len') else 0) + self.cursor = 0 + self.chunk = None + + def __iter__(self): + """ + Delegate iteration to the wrapped instance. + """ + for self.chunk in self.wrapped: + yield self.chunk + + def __getitem__(self, i): + """ + Index into the next chunk (or previous chunk in the case where + the last data returned was not fully consumed). + + :param i: a slice-to-the-end + """ + start = i.start if isinstance(i, slice) else i + if start < self.cursor: + return self.chunk[(start - self.cursor):] + + self.chunk = self.another() + if self.chunk: + self.cursor += len(self.chunk) + + return self.chunk + + def another(self): + """Implemented by subclasses to return the next element""" + raise NotImplementedError + + def getvalue(self): + """ + Return entire string value... used in testing + """ + return self.wrapped.getvalue() + + def __len__(self): + """ + Length accessor. + """ + return self.size + + +def _get_store_class(store_entry): + store_cls = None + try: + LOG.debug("Attempting to import store %s", store_entry) + store_cls = importutils.import_class(store_entry) + except exception.NotFound: + raise BackendException('Unable to load store. ' + 'Could not find a class named %s.' + % store_entry) + return store_cls + + +def create_stores(): + """ + Registers all store modules and all schemes + from the given config. Duplicates are not re-registered. + """ + store_count = 0 + store_classes = set() + for store_entry in CONF.known_stores: + store_entry = store_entry.strip() + if not store_entry: + continue + store_cls = _get_store_class(store_entry) + try: + store_instance = store_cls() + except exception.BadStoreConfiguration as e: + LOG.warn(_("%s Skipping store driver.") % unicode(e)) + continue + schemes = store_instance.get_schemes() + if not schemes: + raise BackendException('Unable to register store %s. ' + 'No schemes associated with it.' + % store_cls) + else: + if store_cls not in store_classes: + LOG.debug("Registering store %s with schemes %s", + store_cls, schemes) + store_classes.add(store_cls) + scheme_map = {} + for scheme in schemes: + loc_cls = store_instance.get_store_location_class() + scheme_map[scheme] = { + 'store_class': store_cls, + 'location_class': loc_cls, + } + location.register_scheme_map(scheme_map) + store_count += 1 + else: + LOG.debug("Store %s already registered", store_cls) + return store_count + + +def verify_default_store(): + scheme = cfg.CONF.default_store + context = glance.context.RequestContext() + try: + get_store_from_scheme(context, scheme) + except exception.UnknownScheme: + msg = _("Store for scheme %s not found") % scheme + raise RuntimeError(msg) + + +def get_known_schemes(): + """Returns list of known schemes""" + return location.SCHEME_TO_CLS_MAP.keys() + + +def get_store_from_scheme(context, scheme, loc=None): + """ + Given a scheme, return the appropriate store object + for handling that scheme. + """ + if scheme not in location.SCHEME_TO_CLS_MAP: + raise exception.UnknownScheme(scheme=scheme) + scheme_info = location.SCHEME_TO_CLS_MAP[scheme] + store = scheme_info['store_class'](context, loc) + return store + + +def get_store_from_uri(context, uri, loc=None): + """ + Given a URI, return the store object that would handle + operations on the URI. + + :param uri: URI to analyze + """ + scheme = uri[0:uri.find('/') - 1] + store = get_store_from_scheme(context, scheme, loc) + return store + + +def get_from_backend(context, uri, **kwargs): + """Yields chunks of data from backend specified by uri""" + + loc = location.get_location_from_uri(uri) + store = get_store_from_uri(context, uri, loc) + + try: + return store.get(loc) + except NotImplementedError: + raise exception.StoreGetNotSupported + + +def get_size_from_backend(context, uri): + """Retrieves image size from backend specified by uri""" + + loc = location.get_location_from_uri(uri) + store = get_store_from_uri(context, uri, loc) + + return store.get_size(loc) + + +def delete_from_backend(context, uri, **kwargs): + """Removes chunks of data from backend specified by uri""" + loc = location.get_location_from_uri(uri) + store = get_store_from_uri(context, uri, loc) + + try: + return store.delete(loc) + except NotImplementedError: + raise exception.StoreDeleteNotSupported + + +def get_store_from_location(uri): + """ + Given a location (assumed to be a URL), attempt to determine + the store from the location. We use here a simple guess that + the scheme of the parsed URL is the store... + + :param uri: Location to check for the store + """ + loc = location.get_location_from_uri(uri) + return loc.store_name + + +def safe_delete_from_backend(context, uri, image_id, **kwargs): + """Given a uri, delete an image from the store.""" + try: + return delete_from_backend(context, uri, **kwargs) + except exception.NotFound: + msg = _('Failed to delete image %s in store from URI') + LOG.warn(msg % image_id) + except exception.StoreDeleteNotSupported as e: + LOG.warn(str(e)) + except UnsupportedBackend: + exc_type = sys.exc_info()[0].__name__ + msg = (_('Failed to delete image %s from store (%s)') % + (image_id, exc_type)) + LOG.error(msg) + + +def schedule_delayed_delete_from_backend(context, uri, image_id, **kwargs): + """Given a uri, schedule the deletion of an image location.""" + (file_queue, _db_queue) = scrubber.get_scrub_queues() + # NOTE(zhiyan): Defautly ask glance-api store using file based queue. + # In future we can change it using DB based queued instead, + # such as using image location's status to saving pending delete flag + # when that property be added. + file_queue.add_location(image_id, uri) + + +def delete_image_from_backend(context, store_api, image_id, uri): + if CONF.delayed_delete: + store_api.schedule_delayed_delete_from_backend(context, uri, image_id) + else: + store_api.safe_delete_from_backend(context, uri, image_id) + + +def check_location_metadata(val, key=''): + if isinstance(val, dict): + for key in val: + check_location_metadata(val[key], key=key) + elif isinstance(val, list): + ndx = 0 + for v in val: + check_location_metadata(v, key='%s[%d]' % (key, ndx)) + ndx = ndx + 1 + elif not isinstance(val, unicode): + raise BackendException(_("The image metadata key %s has an invalid " + "type of %s. Only dict, list, and unicode " + "are supported.") % (key, type(val))) + + +def store_add_to_backend(image_id, data, size, store): + """ + A wrapper around a call to each stores add() method. This gives glance + a common place to check the output + + :param image_id: The image add to which data is added + :param data: The data to be stored + :param size: The length of the data in bytes + :param store: The store to which the data is being added + :return: The url location of the file, + the size amount of data, + the checksum of the data + the storage systems metadata dictionary for the location + """ + (location, size, checksum, metadata) = store.add(image_id, data, size) + if metadata is not None: + if not isinstance(metadata, dict): + msg = (_("The storage driver %s returned invalid metadata %s" + "This must be a dictionary type") % + (str(store), str(metadata))) + LOG.error(msg) + raise BackendException(msg) + try: + check_location_metadata(metadata) + except BackendException as e: + e_msg = (_("A bad metadata structure was returned from the " + "%s storage driver: %s. %s.") % + (str(store), str(metadata), str(e))) + LOG.error(e_msg) + raise BackendException(e_msg) + return (location, size, checksum, metadata) + + +def add_to_backend(context, scheme, image_id, data, size): + store = get_store_from_scheme(context, scheme) + try: + return store_add_to_backend(image_id, data, size, store) + except NotImplementedError: + raise exception.StoreAddNotSupported + + +def set_acls(context, location_uri, public=False, read_tenants=[], + write_tenants=[]): + loc = location.get_location_from_uri(location_uri) + scheme = get_store_from_location(location_uri) + store = get_store_from_scheme(context, scheme, loc) + try: + store.set_acls(loc, public=public, read_tenants=read_tenants, + write_tenants=write_tenants) + except NotImplementedError: + LOG.debug(_("Skipping store.set_acls... not implemented.")) + + +class ImageRepoProxy(glance.domain.proxy.Repo): + + def __init__(self, image_repo, context, store_api): + self.context = context + self.store_api = store_api + proxy_kwargs = {'context': context, 'store_api': store_api} + super(ImageRepoProxy, self).__init__(image_repo, + item_proxy_class=ImageProxy, + item_proxy_kwargs=proxy_kwargs) + + def _set_acls(self, image): + public = image.visibility == 'public' + member_ids = [] + if image.locations and not public: + member_repo = image.get_member_repo() + member_ids = [m.member_id for m in member_repo.list()] + for location in image.locations: + self.store_api.set_acls(self.context, location['url'], public, + read_tenants=member_ids) + + def add(self, image): + result = super(ImageRepoProxy, self).add(image) + self._set_acls(image) + return result + + def save(self, image): + result = super(ImageRepoProxy, self).save(image) + self._set_acls(image) + return result + + +def _check_location_uri(context, store_api, uri): + """ + Check if an image location uri is valid. + + :param context: Glance request context + :param store_api: store API module + :param uri: location's uri string + """ + is_ok = True + try: + size = store_api.get_size_from_backend(context, uri) + # NOTE(zhiyan): Some stores return zero when it catch exception + is_ok = size > 0 + except (exception.UnknownScheme, exception.NotFound): + is_ok = False + if not is_ok: + raise exception.BadStoreUri(_('Invalid location: %s') % uri) + + +def _check_image_location(context, store_api, location): + _check_location_uri(context, store_api, location['url']) + store_api.check_location_metadata(location['metadata']) + + +def _set_image_size(context, image, locations): + if not image.size: + for location in locations: + size_from_backend = glance.store.get_size_from_backend( + context, location['url']) + if size_from_backend: + # NOTE(flwang): This assumes all locations have the same size + image.size = size_from_backend + break + + +class ImageFactoryProxy(glance.domain.proxy.ImageFactory): + def __init__(self, factory, context, store_api): + self.context = context + self.store_api = store_api + proxy_kwargs = {'context': context, 'store_api': store_api} + super(ImageFactoryProxy, self).__init__(factory, + proxy_class=ImageProxy, + proxy_kwargs=proxy_kwargs) + + def new_image(self, **kwargs): + locations = kwargs.get('locations', []) + for l in locations: + _check_image_location(self.context, self.store_api, l) + + if locations.count(l) > 1: + raise exception.DuplicateLocation(location=l['url']) + + return super(ImageFactoryProxy, self).new_image(**kwargs) + + +class StoreLocations(collections.MutableSequence): + """ + The proxy for store location property. It takes responsibility for: + 1. Location uri correctness checking when adding a new location. + 2. Remove the image data from the store when a location is removed + from an image. + """ + def __init__(self, image_proxy, value): + self.image_proxy = image_proxy + if isinstance(value, list): + self.value = value + else: + self.value = list(value) + + def append(self, location): + # NOTE(flaper87): Insert this + # location at the very end of + # the value list. + self.insert(len(self.value), location) + + def extend(self, other): + if isinstance(other, StoreLocations): + locations = other.value + else: + locations = list(other) + + for location in locations: + self.append(location) + + def insert(self, i, location): + _check_image_location(self.image_proxy.context, + self.image_proxy.store_api, location) + + if location in self.value: + raise exception.DuplicateLocation(location=location['url']) + + self.value.insert(i, location) + _set_image_size(self.image_proxy.context, + self.image_proxy, + [location]) + + def pop(self, i=-1): + location = self.value.pop(i) + try: + delete_image_from_backend(self.image_proxy.context, + self.image_proxy.store_api, + self.image_proxy.image.image_id, + location['url']) + except Exception: + self.value.insert(i, location) + raise + return location + + def count(self, location): + return self.value.count(location) + + def index(self, location, *args): + return self.value.index(location, *args) + + def remove(self, location): + if self.count(location): + self.pop(self.index(location)) + else: + self.value.remove(location) + + def reverse(self): + self.value.reverse() + + # Mutable sequence, so not hashable + __hash__ = None + + def __getitem__(self, i): + return self.value.__getitem__(i) + + def __setitem__(self, i, location): + _check_image_location(self.image_proxy.context, + self.image_proxy.store_api, location) + self.value.__setitem__(i, location) + _set_image_size(self.image_proxy.context, + self.image_proxy, + [location]) + + def __delitem__(self, i): + location = None + try: + location = self.value.__getitem__(i) + except Exception: + return self.value.__delitem__(i) + delete_image_from_backend(self.image_proxy.context, + self.image_proxy.store_api, + self.image_proxy.image.image_id, + location['url']) + self.value.__delitem__(i) + + def __delslice__(self, i, j): + i = max(i, 0) + j = max(j, 0) + locations = [] + try: + locations = self.value.__getslice__(i, j) + except Exception: + return self.value.__delslice__(i, j) + for location in locations: + delete_image_from_backend(self.image_proxy.context, + self.image_proxy.store_api, + self.image_proxy.image.image_id, + location['url']) + self.value.__delitem__(i) + + def __iadd__(self, other): + self.extend(other) + return self + + def __contains__(self, location): + return location in self.value + + def __len__(self): + return len(self.value) + + def __cast(self, other): + if isinstance(other, StoreLocations): + return other.value + else: + return other + + def __cmp__(self, other): + return cmp(self.value, self.__cast(other)) + + def __iter__(self): + return iter(self.value) + + +def _locations_proxy(target, attr): + """ + Make a location property proxy on the image object. + + :param target: the image object on which to add the proxy + :param attr: the property proxy we want to hook + """ + def get_attr(self): + value = getattr(getattr(self, target), attr) + return StoreLocations(self, value) + + def set_attr(self, value): + if not isinstance(value, (list, StoreLocations)): + raise exception.BadStoreUri(_('Invalid locations: %s') % value) + ori_value = getattr(getattr(self, target), attr) + if ori_value != value: + # NOTE(zhiyan): Enforced locations list was previously empty list. + if len(ori_value) > 0: + raise exception.Invalid(_('Original locations is not empty: ' + '%s') % ori_value) + # NOTE(zhiyan): Check locations are all valid. + for location in value: + _check_image_location(self.context, self.store_api, + location) + + if value.count(location) > 1: + raise exception.DuplicateLocation(location=location['url']) + _set_image_size(self.context, getattr(self, target), value) + return setattr(getattr(self, target), attr, list(value)) + + def del_attr(self): + value = getattr(getattr(self, target), attr) + while len(value): + delete_image_from_backend(self.context, self.store_api, + self.image.image_id, value[0]['url']) + del value[0] + setattr(getattr(self, target), attr, value) + return delattr(getattr(self, target), attr) + + return property(get_attr, set_attr, del_attr) + + +class ImageProxy(glance.domain.proxy.Image): + + locations = _locations_proxy('image', 'locations') + + def __init__(self, image, context, store_api): + self.image = image + self.context = context + self.store_api = store_api + proxy_kwargs = { + 'context': context, + 'image': self, + 'store_api': store_api, + } + super(ImageProxy, self).__init__( + image, member_repo_proxy_class=ImageMemberRepoProxy, + member_repo_proxy_kwargs=proxy_kwargs) + + def delete(self): + self.image.delete() + if self.image.locations: + for location in self.image.locations: + self.store_api.delete_image_from_backend(self.context, + self.store_api, + self.image.image_id, + location['url']) + + def set_data(self, data, size=None): + if size is None: + size = 0 # NOTE(markwash): zero -> unknown size + location, size, checksum, loc_meta = self.store_api.add_to_backend( + self.context, CONF.default_store, + self.image.image_id, utils.CooperativeReader(data), size) + self.image.locations = [{'url': location, 'metadata': loc_meta}] + self.image.size = size + self.image.checksum = checksum + self.image.status = 'active' + + def get_data(self): + if not self.image.locations: + raise exception.NotFound(_("No image data could be found")) + err = None + for loc in self.image.locations: + try: + data, size = self.store_api.get_from_backend(self.context, + loc['url']) + + return data + except Exception as e: + LOG.warn(_('Get image %(id)s data from %(loc)s ' + 'failed: %(err)s.') % {'id': self.image.image_id, + 'loc': loc, 'err': e}) + err = e + # tried all locations + LOG.error(_('Glance tried all locations to get data for image %s ' + 'but all have failed.') % self.image.image_id) + raise err + + +class ImageMemberRepoProxy(glance.domain.proxy.Repo): + def __init__(self, repo, image, context, store_api): + self.repo = repo + self.image = image + self.context = context + self.store_api = store_api + super(ImageMemberRepoProxy, self).__init__(repo) + + def _set_acls(self): + public = self.image.visibility == 'public' + if self.image.locations and not public: + member_ids = [m.member_id for m in self.repo.list()] + for location in self.image.locations: + self.store_api.set_acls(self.context, location['url'], public, + read_tenants=member_ids) + + def add(self, member): + super(ImageMemberRepoProxy, self).add(member) + self._set_acls() + + def remove(self, member): + super(ImageMemberRepoProxy, self).remove(member) + self._set_acls() diff --git a/glance/store/base.py b/glance/store/base.py new file mode 100644 index 00000000..f0f89cbc --- /dev/null +++ b/glance/store/base.py @@ -0,0 +1,167 @@ +# Copyright 2011 OpenStack Foundation +# Copyright 2012 RedHat Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Base class for all storage backends""" + +from glance.common import exception +from glance.openstack.common import importutils +import glance.openstack.common.log as logging +from glance.openstack.common import strutils +from glance.openstack.common import units + +LOG = logging.getLogger(__name__) + + +def _exception_to_unicode(exc): + try: + return unicode(exc) + except UnicodeError: + try: + return strutils.safe_decode(str(exc), errors='ignore') + except UnicodeError: + msg = (_("Caught '%(exception)s' exception.") % + {"exception": exc.__class__.__name__}) + return strutils.safe_decode(msg, errors='ignore') + + +class Store(object): + + CHUNKSIZE = 16 * units.Mi # 16M + + def __init__(self, context=None, location=None): + """ + Initialize the Store + """ + self.store_location_class = None + self.context = context + self.configure() + + try: + self.configure_add() + except exception.BadStoreConfiguration as e: + self.add = self.add_disabled + msg = (_(u"Failed to configure store correctly: %s " + "Disabling add method.") % _exception_to_unicode(e)) + LOG.warn(msg) + + def configure(self): + """ + Configure the Store to use the stored configuration options + Any store that needs special configuration should implement + this method. + """ + pass + + def get_schemes(self): + """ + Returns a tuple of schemes which this store can handle. + """ + raise NotImplementedError + + def get_store_location_class(self): + """ + Returns the store location class that is used by this store. + """ + if not self.store_location_class: + class_name = "%s.StoreLocation" % (self.__module__) + LOG.debug("Late loading location class %s", class_name) + self.store_location_class = importutils.import_class(class_name) + return self.store_location_class + + def configure_add(self): + """ + This is like `configure` except that it's specifically for + configuring the store to accept objects. + + If the store was not able to successfully configure + itself, it should raise `exception.BadStoreConfiguration`. + """ + pass + + def get(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns a tuple of generator + (for reading the image file) and image_size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + raise NotImplementedError + + def get_size(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns the size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + raise NotImplementedError + + def add_disabled(self, *args, **kwargs): + """ + Add method that raises an exception because the Store was + not able to be configured properly and therefore the add() + method would error out. + """ + raise exception.StoreAddDisabled + + def add(self, image_id, image_file, image_size): + """ + Stores an image file with supplied identifier to the backend + storage system and returns a tuple containing information + about the stored image. + + :param image_id: The opaque image identifier + :param image_file: The image data to write, as a file-like object + :param image_size: The size of the image data to write, in bytes + + :retval tuple of URL in backing store, bytes written, checksum + and a dictionary with storage system specific information + :raises `glance.common.exception.Duplicate` if the image already + existed + """ + raise NotImplementedError + + def delete(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file to delete + + :location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + raise NotImplementedError + + def set_acls(self, location, public=False, read_tenants=[], + write_tenants=[]): + """ + Sets the read and write access control list for an image in the + backend store. + + :location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :public A boolean indicating whether the image should be public. + :read_tenants A list of tenant strings which should be granted + read access for an image. + :write_tenants A list of tenant strings which should be granted + write access for an image. + """ + raise NotImplementedError diff --git a/glance/store/cinder.py b/glance/store/cinder.py new file mode 100644 index 00000000..a75d5b77 --- /dev/null +++ b/glance/store/cinder.py @@ -0,0 +1,182 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Storage backend for Cinder""" + +from cinderclient import exceptions as cinder_exception +from cinderclient import service_catalog +from cinderclient.v2 import client as cinderclient + +from oslo.config import cfg + +from glance.common import exception +from glance.common import utils +import glance.openstack.common.log as logging +from glance.openstack.common import units +import glance.store.base +import glance.store.location + +LOG = logging.getLogger(__name__) + +cinder_opts = [ + cfg.StrOpt('cinder_catalog_info', + default='volume:cinder:publicURL', + help='Info to match when looking for cinder in the service ' + 'catalog. Format is : separated values of the form: ' + '::'), + cfg.StrOpt('cinder_endpoint_template', + default=None, + help='Override service catalog lookup with template for cinder ' + 'endpoint e.g. http://localhost:8776/v1/%(project_id)s'), + cfg.StrOpt('os_region_name', + default=None, + help='Region name of this node'), + cfg.StrOpt('cinder_ca_certificates_file', + default=None, + help='Location of ca certicates file to use for cinder client ' + 'requests.'), + cfg.IntOpt('cinder_http_retries', + default=3, + help='Number of cinderclient retries on failed http calls'), + cfg.BoolOpt('cinder_api_insecure', + default=False, + help='Allow to perform insecure SSL requests to cinder'), +] + +CONF = cfg.CONF +CONF.register_opts(cinder_opts) + + +def get_cinderclient(context): + if CONF.cinder_endpoint_template: + url = CONF.cinder_endpoint_template % context.to_dict() + else: + info = CONF.cinder_catalog_info + service_type, service_name, endpoint_type = info.split(':') + + # extract the region if set in configuration + if CONF.os_region_name: + attr = 'region' + filter_value = CONF.os_region_name + else: + attr = None + filter_value = None + + # FIXME: the cinderclient ServiceCatalog object is mis-named. + # It actually contains the entire access blob. + # Only needed parts of the service catalog are passed in, see + # nova/context.py. + compat_catalog = { + 'access': {'serviceCatalog': context.service_catalog or []}} + sc = service_catalog.ServiceCatalog(compat_catalog) + + url = sc.url_for(attr=attr, + filter_value=filter_value, + service_type=service_type, + service_name=service_name, + endpoint_type=endpoint_type) + + LOG.debug(_('Cinderclient connection created using URL: %s') % url) + + c = cinderclient.Client(context.user, + context.auth_tok, + project_id=context.tenant, + auth_url=url, + insecure=CONF.cinder_api_insecure, + retries=CONF.cinder_http_retries, + cacert=CONF.cinder_ca_certificates_file) + + # noauth extracts user_id:project_id from auth_token + c.client.auth_token = context.auth_tok or '%s:%s' % (context.user, + context.tenant) + c.client.management_url = url + return c + + +class StoreLocation(glance.store.location.StoreLocation): + + """Class describing a Cinder URI""" + + def process_specs(self): + self.scheme = self.specs.get('scheme', 'cinder') + self.volume_id = self.specs.get('volume_id') + + def get_uri(self): + return "cinder://%s" % self.volume_id + + def parse_uri(self, uri): + if not uri.startswith('cinder://'): + reason = _("URI must start with cinder://") + LOG.error(reason) + raise exception.BadStoreUri(uri, reason) + + self.scheme = 'cinder' + self.volume_id = uri[9:] + + if not utils.is_uuid_like(self.volume_id): + reason = _("URI contains invalid volume ID: %s") % self.volume_id + LOG.error(reason) + raise exception.BadStoreUri(uri, reason) + + +class Store(glance.store.base.Store): + + """Cinder backend store adapter.""" + + EXAMPLE_URL = "cinder://volume-id" + + def get_schemes(self): + return ('cinder',) + + def configure_add(self): + """ + Configure the Store to use the stored configuration options + Any store that needs special configuration should implement + this method. If the store was not able to successfully configure + itself, it should raise `exception.BadStoreConfiguration` + """ + + if self.context is None: + reason = _("Cinder storage requires a context.") + raise exception.BadStoreConfiguration(store_name="cinder", + reason=reason) + if self.context.service_catalog is None: + reason = _("Cinder storage requires a service catalog.") + raise exception.BadStoreConfiguration(store_name="cinder", + reason=reason) + + def get_size(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file and returns the image size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + :rtype int + """ + + loc = location.store_location + + try: + volume = get_cinderclient(self.context).volumes.get(loc.volume_id) + # GB unit convert to byte + return volume.size * units.Gi + except cinder_exception.NotFound as e: + reason = _("Failed to get image size due to " + "volume can not be found: %s") % self.volume_id + LOG.error(reason) + raise exception.NotFound(reason) + except Exception as e: + LOG.exception(_("Failed to get image size due to " + "internal error: %s") % e) + return 0 diff --git a/glance/store/filesystem.py b/glance/store/filesystem.py new file mode 100644 index 00000000..dd9d8bac --- /dev/null +++ b/glance/store/filesystem.py @@ -0,0 +1,301 @@ +# Copyright 2010 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A simple filesystem-backed store +""" + +import errno +import hashlib +import os +import urlparse + +from oslo.config import cfg + +from glance.common import exception +from glance.common import utils +from glance.openstack.common import jsonutils +import glance.openstack.common.log as logging +import glance.store +import glance.store.base +import glance.store.location + +LOG = logging.getLogger(__name__) + +filesystem_opts = [ + cfg.StrOpt('filesystem_store_datadir', + help=_('Directory to which the Filesystem backend ' + 'store writes images.')), + cfg.StrOpt('filesystem_store_metadata_file', + help=_("The path to a file which contains the " + "metadata to be returned with any location " + "associated with this store. The file must " + "contain a valid JSON dict."))] + +CONF = cfg.CONF +CONF.register_opts(filesystem_opts) + + +class StoreLocation(glance.store.location.StoreLocation): + + """Class describing a Filesystem URI""" + + def process_specs(self): + self.scheme = self.specs.get('scheme', 'file') + self.path = self.specs.get('path') + + def get_uri(self): + return "file://%s" % self.path + + def parse_uri(self, uri): + """ + Parse URLs. This method fixes an issue where credentials specified + in the URL are interpreted differently in Python 2.6.1+ than prior + versions of Python. + """ + pieces = urlparse.urlparse(uri) + assert pieces.scheme in ('file', 'filesystem') + self.scheme = pieces.scheme + path = (pieces.netloc + pieces.path).strip() + if path == '': + reason = _("No path specified in URI: %s") % uri + LOG.debug(reason) + raise exception.BadStoreUri('No path specified') + self.path = path + + +class ChunkedFile(object): + + """ + We send this back to the Glance API server as + something that can iterate over a large file + """ + + CHUNKSIZE = 65536 + + def __init__(self, filepath): + self.filepath = filepath + self.fp = open(self.filepath, 'rb') + + def __iter__(self): + """Return an iterator over the image file""" + try: + if self.fp: + while True: + chunk = self.fp.read(ChunkedFile.CHUNKSIZE) + if chunk: + yield chunk + else: + break + finally: + self.close() + + def close(self): + """Close the internal file pointer""" + if self.fp: + self.fp.close() + self.fp = None + + +class Store(glance.store.base.Store): + + def get_schemes(self): + return ('file', 'filesystem') + + def configure_add(self): + """ + Configure the Store to use the stored configuration options + Any store that needs special configuration should implement + this method. If the store was not able to successfully configure + itself, it should raise `exception.BadStoreConfiguration` + """ + self.datadir = CONF.filesystem_store_datadir + if self.datadir is None: + reason = (_("Could not find %s in configuration options.") % + 'filesystem_store_datadir') + LOG.error(reason) + raise exception.BadStoreConfiguration(store_name="filesystem", + reason=reason) + + if not os.path.exists(self.datadir): + msg = _("Directory to write image files does not exist " + "(%s). Creating.") % self.datadir + LOG.info(msg) + try: + os.makedirs(self.datadir) + except (IOError, OSError): + if os.path.exists(self.datadir): + # NOTE(markwash): If the path now exists, some other + # process must have beat us in the race condition. But it + # doesn't hurt, so we can safely ignore the error. + return + reason = _("Unable to create datadir: %s") % self.datadir + LOG.error(reason) + raise exception.BadStoreConfiguration(store_name="filesystem", + reason=reason) + + @staticmethod + def _resolve_location(location): + filepath = location.store_location.path + + if not os.path.exists(filepath): + raise exception.NotFound(_("Image file %s not found") % filepath) + + filesize = os.path.getsize(filepath) + return filepath, filesize + + def _get_metadata(self): + if CONF.filesystem_store_metadata_file is None: + return {} + + try: + with open(CONF.filesystem_store_metadata_file, 'r') as fptr: + metadata = jsonutils.load(fptr) + glance.store.check_location_metadata(metadata) + return metadata + except glance.store.BackendException as bee: + LOG.error(_('The JSON in the metadata file %s could not be used: ' + '%s An empty dictionary will be returned ' + 'to the client.') + % (CONF.filesystem_store_metadata_file, str(bee))) + return {} + except IOError as ioe: + LOG.error(_('The path for the metadata file %s could not be ' + 'opened: %s An empty dictionary will be returned ' + 'to the client.') + % (CONF.filesystem_store_metadata_file, ioe)) + return {} + except Exception as ex: + LOG.exception(_('An error occurred processing the storage systems ' + 'meta data file: %s. An empty dictionary will be ' + 'returned to the client.') % str(ex)) + return {} + + def get(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns a tuple of generator + (for reading the image file) and image_size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + filepath, filesize = self._resolve_location(location) + msg = _("Found image at %s. Returning in ChunkedFile.") % filepath + LOG.debug(msg) + return (ChunkedFile(filepath), filesize) + + def get_size(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file and returns the image size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + :rtype int + """ + filepath, filesize = self._resolve_location(location) + msg = _("Found image at %s.") % filepath + LOG.debug(msg) + return filesize + + def delete(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file to delete + + :location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + + :raises NotFound if image does not exist + :raises Forbidden if cannot delete because of permissions + """ + loc = location.store_location + fn = loc.path + if os.path.exists(fn): + try: + LOG.debug(_("Deleting image at %(fn)s"), {'fn': fn}) + os.unlink(fn) + except OSError: + raise exception.Forbidden(_("You cannot delete file %s") % fn) + else: + raise exception.NotFound(_("Image file %s does not exist") % fn) + + def add(self, image_id, image_file, image_size): + """ + Stores an image file with supplied identifier to the backend + storage system and returns a tuple containing information + about the stored image. + + :param image_id: The opaque image identifier + :param image_file: The image data to write, as a file-like object + :param image_size: The size of the image data to write, in bytes + + :retval tuple of URL in backing store, bytes written, checksum + and a dictionary with storage system specific information + :raises `glance.common.exception.Duplicate` if the image already + existed + + :note By default, the backend writes the image data to a file + `//`, where is the value of + the filesystem_store_datadir configuration option and + is the supplied image ID. + """ + + filepath = os.path.join(self.datadir, str(image_id)) + + if os.path.exists(filepath): + raise exception.Duplicate(_("Image file %s already exists!") + % filepath) + + checksum = hashlib.md5() + bytes_written = 0 + try: + with open(filepath, 'wb') as f: + for buf in utils.chunkreadable(image_file, + ChunkedFile.CHUNKSIZE): + bytes_written += len(buf) + checksum.update(buf) + f.write(buf) + except IOError as e: + if e.errno != errno.EACCES: + self._delete_partial(filepath, image_id) + exceptions = {errno.EFBIG: exception.StorageFull(), + errno.ENOSPC: exception.StorageFull(), + errno.EACCES: exception.StorageWriteDenied()} + raise exceptions.get(e.errno, e) + except Exception: + self._delete_partial(filepath, image_id) + raise + + checksum_hex = checksum.hexdigest() + metadata = self._get_metadata() + + LOG.debug(_("Wrote %(bytes_written)d bytes to %(filepath)s with " + "checksum %(checksum_hex)s"), + {'bytes_written': bytes_written, + 'filepath': filepath, + 'checksum_hex': checksum_hex}) + return ('file://%s' % filepath, bytes_written, checksum_hex, metadata) + + @staticmethod + def _delete_partial(filepath, id): + try: + os.unlink(filepath) + except Exception as e: + msg = _('Unable to remove partial image data for image %s: %s') + LOG.error(msg % (id, e)) diff --git a/glance/store/gridfs.py b/glance/store/gridfs.py new file mode 100644 index 00000000..14f0b5fa --- /dev/null +++ b/glance/store/gridfs.py @@ -0,0 +1,212 @@ +# Copyright 2013 Red Hat, Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Storage backend for GridFS""" +from __future__ import absolute_import + +from oslo.config import cfg +import urlparse + +from glance.common import exception +from glance.openstack.common import excutils +import glance.openstack.common.log as logging +import glance.store.base +import glance.store.location + +try: + import gridfs + import gridfs.errors + import pymongo + import pymongo.uri_parser as uri_parser +except ImportError: + pymongo = None + +LOG = logging.getLogger(__name__) + +gridfs_opts = [ + cfg.StrOpt('mongodb_store_uri', + help="Hostname or IP address of the instance to connect to, " + "or a mongodb URI, or a list of hostnames / mongodb URIs. " + "If host is an IPv6 literal it must be enclosed " + "in '[' and ']' characters following the RFC2732 " + "URL syntax (e.g. '[::1]' for localhost)"), + cfg.StrOpt('mongodb_store_db', default=None, help='Database to use'), +] + +CONF = cfg.CONF +CONF.register_opts(gridfs_opts) + + +class StoreLocation(glance.store.location.StoreLocation): + """ + Class describing an gridfs URI: + + gridfs:// + + Connection information has been consciously omitted for + security reasons, since this location will be stored in glance's + database and can be queried from outside. + + Note(flaper87): Make connection info available if user wants so + by adding a new configuration parameter `mongdb_store_insecure`. + """ + + def get_uri(self): + return "gridfs://%s" % self.specs.get("image_id") + + def parse_uri(self, uri): + """ + This method should fix any issue with the passed URI. Right now, + it just sets image_id value in the specs dict. + + :param uri: Current set URI + """ + parsed = urlparse.urlparse(uri) + assert parsed.scheme in ('gridfs',) + self.specs["image_id"] = parsed.netloc + + +class Store(glance.store.base.Store): + """GridFS adapter""" + + EXAMPLE_URL = "gridfs://" + + def get_schemes(self): + return ('gridfs',) + + def configure_add(self): + """ + Configure the Store to use the stored configuration options + Any store that needs special configuration should implement + this method. If the store was not able to successfully configure + itself, it should raise `exception.BadStoreConfiguration` + """ + if pymongo is None: + msg = _("Missing dependencies: pymongo") + raise exception.BadStoreConfiguration(store_name="gridfs", + reason=msg) + + self.mongodb_uri = self._option_get('mongodb_store_uri') + + parsed = uri_parser.parse_uri(self.mongodb_uri) + self.mongodb_db = self._option_get('mongodb_store_db') or \ + parsed.get("database") + + self.mongodb = pymongo.MongoClient(self.mongodb_uri) + self.fs = gridfs.GridFS(self.mongodb[self.mongodb_db]) + + def _option_get(self, param): + result = getattr(CONF, param) + if not result: + reason = (_("Could not find %(param)s in configuration " + "options.") % {'param': param}) + LOG.debug(reason) + raise exception.BadStoreConfiguration(store_name="gridfs", + reason=reason) + return result + + def get(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns a tuple of generator + (for reading the image file) and image_size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + image = self._get_file(location) + return (image, image.length) + + def get_size(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns the image_size (or 0 + if unavailable) + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + """ + try: + key = self._get_file(location) + return key.length + except Exception: + return 0 + + def _get_file(self, location): + store_location = location + if isinstance(location, glance.store.location.Location): + store_location = location.store_location + try: + + parsed = urlparse.urlparse(store_location.get_uri()) + return self.fs.get(parsed.netloc) + except gridfs.errors.NoFile: + msg = _("Could not find %s image in GridFS") % \ + store_location.get_uri() + LOG.debug(msg) + raise exception.NotFound(msg) + + def add(self, image_id, image_file, image_size): + """ + Stores an image file with supplied identifier to the backend + storage system and returns a tuple containing information + about the stored image. + + :param image_id: The opaque image identifier + :param image_file: The image data to write, as a file-like object + :param image_size: The size of the image data to write, in bytes + + :retval tuple of URL in backing store, bytes written, checksum + and a dictionary with storage system specific information + :raises `glance.common.exception.Duplicate` if the image already + existed + """ + loc = StoreLocation({'image_id': image_id}) + + if self.fs.exists(image_id): + raise exception.Duplicate(_("GridFS already has an image at " + "location %s") % loc.get_uri()) + + LOG.debug(_("Adding a new image to GridFS with id %s and size %s") % + (image_id, image_size)) + + try: + self.fs.put(image_file, _id=image_id) + image = self._get_file(loc) + except Exception: + # Note(zhiyan): clean up already received data when + # error occurs such as ImageSizeLimitExceeded exception. + with excutils.save_and_reraise_exception(): + self.fs.delete(image_id) + + LOG.debug(_("Uploaded image %s, md5 %s, length %s to GridFS") % + (image._id, image.md5, image.length)) + + return (loc.get_uri(), image.length, image.md5, {}) + + def delete(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file to delete + + :location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + + :raises NotFound if image does not exist + """ + image = self._get_file(location) + self.fs.delete(image._id) + LOG.debug("Deleted image %s from GridFS") diff --git a/glance/store/http.py b/glance/store/http.py new file mode 100644 index 00000000..4df03adf --- /dev/null +++ b/glance/store/http.py @@ -0,0 +1,192 @@ +# Copyright 2010 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import httplib +import urlparse + +from glance.common import exception +import glance.openstack.common.log as logging +import glance.store.base +import glance.store.location + +LOG = logging.getLogger(__name__) + + +MAX_REDIRECTS = 5 + + +class StoreLocation(glance.store.location.StoreLocation): + + """Class describing an HTTP(S) URI""" + + def process_specs(self): + self.scheme = self.specs.get('scheme', 'http') + self.netloc = self.specs['netloc'] + self.user = self.specs.get('user') + self.password = self.specs.get('password') + self.path = self.specs.get('path') + + def _get_credstring(self): + if self.user: + return '%s:%s@' % (self.user, self.password) + return '' + + def get_uri(self): + return "%s://%s%s%s" % ( + self.scheme, + self._get_credstring(), + self.netloc, + self.path) + + def parse_uri(self, uri): + """ + Parse URLs. This method fixes an issue where credentials specified + in the URL are interpreted differently in Python 2.6.1+ than prior + versions of Python. + """ + pieces = urlparse.urlparse(uri) + assert pieces.scheme in ('https', 'http') + self.scheme = pieces.scheme + netloc = pieces.netloc + path = pieces.path + try: + if '@' in netloc: + creds, netloc = netloc.split('@') + else: + creds = None + except ValueError: + # Python 2.6.1 compat + # see lp659445 and Python issue7904 + if '@' in path: + creds, path = path.split('@') + else: + creds = None + if creds: + try: + self.user, self.password = creds.split(':') + except ValueError: + reason = (_("Credentials '%s' not well-formatted.") + % "".join(creds)) + LOG.debug(reason) + raise exception.BadStoreUri() + else: + self.user = None + if netloc == '': + reason = _("No address specified in HTTP URL") + LOG.debug(reason) + raise exception.BadStoreUri(message=reason) + self.netloc = netloc + self.path = path + + +def http_response_iterator(conn, response, size): + """ + Return an iterator for a file-like object. + + :param conn: HTTP(S) Connection + :param response: httplib.HTTPResponse object + :param size: Chunk size to iterate with + """ + chunk = response.read(size) + while chunk: + yield chunk + chunk = response.read(size) + conn.close() + + +class Store(glance.store.base.Store): + + """An implementation of the HTTP(S) Backend Adapter""" + + def get(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns a tuple of generator + (for reading the image file) and image_size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + """ + conn, resp, content_length = self._query(location, 'GET') + + iterator = http_response_iterator(conn, resp, self.CHUNKSIZE) + + class ResponseIndexable(glance.store.Indexable): + def another(self): + try: + return self.wrapped.next() + except StopIteration: + return '' + + return (ResponseIndexable(iterator, content_length), content_length) + + def get_schemes(self): + return ('http', 'https') + + def get_size(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns the size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + """ + try: + return self._query(location, 'HEAD')[2] + except Exception: + return 0 + + def _query(self, location, verb, depth=0): + if depth > MAX_REDIRECTS: + reason = (_("The HTTP URL exceeded %s maximum " + "redirects.") % MAX_REDIRECTS) + LOG.debug(reason) + raise exception.MaxRedirectsExceeded(redirects=MAX_REDIRECTS) + loc = location.store_location + conn_class = self._get_conn_class(loc) + conn = conn_class(loc.netloc) + conn.request(verb, loc.path, "", {}) + resp = conn.getresponse() + + # Check for bad status codes + if resp.status >= 400: + reason = _("HTTP URL returned a %s status code.") % resp.status + LOG.debug(reason) + raise exception.BadStoreUri(loc.path, reason) + + location_header = resp.getheader("location") + if location_header: + if resp.status not in (301, 302): + reason = (_("The HTTP URL attempted to redirect with an " + "invalid %s status code.") % resp.status) + LOG.debug(reason) + raise exception.BadStoreUri(loc.path, reason) + location_class = glance.store.location.Location + new_loc = location_class(location.store_name, + location.store_location.__class__, + uri=location_header, + image_id=location.image_id, + store_specs=location.store_specs) + return self._query(new_loc, verb, depth + 1) + content_length = int(resp.getheader('content-length', 0)) + return (conn, resp, content_length) + + def _get_conn_class(self, loc): + """ + Returns connection class for accessing the resource. Useful + for dependency injection and stubouts in testing... + """ + return {'http': httplib.HTTPConnection, + 'https': httplib.HTTPSConnection}[loc.scheme] diff --git a/glance/store/location.py b/glance/store/location.py new file mode 100644 index 00000000..6fe965a1 --- /dev/null +++ b/glance/store/location.py @@ -0,0 +1,163 @@ +# Copyright 2011 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A class that describes the location of an image in Glance. + +In Glance, an image can either be **stored** in Glance, or it can be +**registered** in Glance but actually be stored somewhere else. + +We needed a class that could support the various ways that Glance +describes where exactly an image is stored. + +An image in Glance has two location properties: the image URI +and the image storage URI. + +The image URI is essentially the permalink identifier for the image. +It is displayed in the output of various Glance API calls and, +while read-only, is entirely user-facing. It shall **not** contain any +security credential information at all. The Glance image URI shall +be the host:port of that Glance API server along with /images/. + +The Glance storage URI is an internal URI structure that Glance +uses to maintain critical information about how to access the images +that it stores in its storage backends. It **may contain** security +credentials and is **not** user-facing. +""" + +import urlparse + +from glance.common import exception +import glance.openstack.common.log as logging + +LOG = logging.getLogger(__name__) + +SCHEME_TO_CLS_MAP = {} + + +def get_location_from_uri(uri): + """ + Given a URI, return a Location object that has had an appropriate + store parse the URI. + + :param uri: A URI that could come from the end-user in the Location + attribute/header + + Example URIs: + https://user:pass@example.com:80/images/some-id + http://images.oracle.com/123456 + swift://example.com/container/obj-id + swift://user:account:pass@authurl.com/container/obj-id + swift+http://user:account:pass@authurl.com/container/obj-id + s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id + s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id + file:///var/lib/glance/images/1 + cinder://volume-id + """ + pieces = urlparse.urlparse(uri) + if pieces.scheme not in SCHEME_TO_CLS_MAP.keys(): + raise exception.UnknownScheme(scheme=pieces.scheme) + scheme_info = SCHEME_TO_CLS_MAP[pieces.scheme] + return Location(pieces.scheme, uri=uri, + store_location_class=scheme_info['location_class']) + + +def register_scheme_map(scheme_map): + """ + Given a mapping of 'scheme' to store_name, adds the mapping to the + known list of schemes if it does not already exist. + """ + for (k, v) in scheme_map.items(): + if k not in SCHEME_TO_CLS_MAP: + LOG.debug("Registering scheme %s with %s", k, v) + SCHEME_TO_CLS_MAP[k] = v + + +class Location(object): + + """ + Class describing the location of an image that Glance knows about + """ + + def __init__(self, store_name, store_location_class, + uri=None, image_id=None, store_specs=None): + """ + Create a new Location object. + + :param store_name: The string identifier/scheme of the storage backend + :param store_location_class: The store location class to use + for this location instance. + :param image_id: The identifier of the image in whatever storage + backend is used. + :param uri: Optional URI to construct location from + :param store_specs: Dictionary of information about the location + of the image that is dependent on the backend + store + """ + self.store_name = store_name + self.image_id = image_id + self.store_specs = store_specs or {} + self.store_location = store_location_class(self.store_specs) + if uri: + self.store_location.parse_uri(uri) + + def get_store_uri(self): + """ + Returns the Glance image URI, which is the host:port of the API server + along with /images/ + """ + return self.store_location.get_uri() + + def get_uri(self): + return None + + +class StoreLocation(object): + + """ + Base class that must be implemented by each store + """ + + def __init__(self, store_specs): + self.specs = store_specs + if self.specs: + self.process_specs() + + def process_specs(self): + """ + Subclasses should implement any processing of the self.specs collection + such as storing credentials and possibly establishing connections. + """ + pass + + def get_uri(self): + """ + Subclasses should implement a method that returns an internal URI that, + when supplied to the StoreLocation instance, can be interpreted by the + StoreLocation's parse_uri() method. The URI returned from this method + shall never be public and only used internally within Glance, so it is + fine to encode credentials in this URI. + """ + raise NotImplementedError("StoreLocation subclass must implement " + "get_uri()") + + def parse_uri(self, uri): + """ + Subclasses should implement a method that accepts a string URI and + sets appropriate internal fields such that a call to get_uri() will + return a proper internal URI + """ + raise NotImplementedError("StoreLocation subclass must implement " + "parse_uri()") diff --git a/glance/store/rbd.py b/glance/store/rbd.py new file mode 100644 index 00000000..703c1aa7 --- /dev/null +++ b/glance/store/rbd.py @@ -0,0 +1,390 @@ +# Copyright 2010-2011 Josh Durgin +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Storage backend for RBD + (RADOS (Reliable Autonomic Distributed Object Store) Block Device)""" +from __future__ import absolute_import +from __future__ import with_statement + +import hashlib +import math +import urllib + +from oslo.config import cfg + +from glance.common import exception +from glance.common import utils +import glance.openstack.common.log as logging +from glance.openstack.common import units +import glance.store.base +import glance.store.location + +try: + import rados + import rbd +except ImportError: + rados = None + rbd = None + +DEFAULT_POOL = 'images' +DEFAULT_CONFFILE = '/etc/ceph/ceph.conf' +DEFAULT_USER = None # let librados decide based on the Ceph conf file +DEFAULT_CHUNKSIZE = 8 # in MiB +DEFAULT_SNAPNAME = 'snap' + +LOG = logging.getLogger(__name__) + +rbd_opts = [ + cfg.IntOpt('rbd_store_chunk_size', default=DEFAULT_CHUNKSIZE, + help=_('RADOS images will be chunked into objects of this size ' + '(in megabytes). For best performance, this should be ' + 'a power of two.')), + cfg.StrOpt('rbd_store_pool', default=DEFAULT_POOL, + help=_('RADOS pool in which images are stored.')), + cfg.StrOpt('rbd_store_user', default=DEFAULT_USER, + help=_('RADOS user to authenticate as (only applicable if ' + 'using Cephx. If , a default will be chosen based ' + 'on the client. section in rbd_store_ceph_conf)')), + cfg.StrOpt('rbd_store_ceph_conf', default=DEFAULT_CONFFILE, + help=_('Ceph configuration file path. ' + 'If , librados will locate the default config. ' + 'If using cephx authentication, this file should ' + 'include a reference to the right keyring ' + 'in a client. section')), +] + +CONF = cfg.CONF +CONF.register_opts(rbd_opts) + + +class StoreLocation(glance.store.location.StoreLocation): + """ + Class describing a RBD URI. This is of the form: + + rbd://image + + or + + rbd://fsid/pool/image/snapshot + """ + + def process_specs(self): + # convert to ascii since librbd doesn't handle unicode + for key, value in self.specs.iteritems(): + self.specs[key] = str(value) + self.fsid = self.specs.get('fsid') + self.pool = self.specs.get('pool') + self.image = self.specs.get('image') + self.snapshot = self.specs.get('snapshot') + + def get_uri(self): + if self.fsid and self.pool and self.snapshot: + # ensure nothing contains / or any other url-unsafe character + safe_fsid = urllib.quote(self.fsid, '') + safe_pool = urllib.quote(self.pool, '') + safe_image = urllib.quote(self.image, '') + safe_snapshot = urllib.quote(self.snapshot, '') + return "rbd://%s/%s/%s/%s" % (safe_fsid, safe_pool, + safe_image, safe_snapshot) + else: + return "rbd://%s" % self.image + + def parse_uri(self, uri): + prefix = 'rbd://' + if not uri.startswith(prefix): + reason = _('URI must start with rbd://') + msg = (_("Invalid URI: %(uri)s: %(reason)s") % {'uri': uri, + 'reason': reason}) + LOG.debug(msg) + raise exception.BadStoreUri(message=reason) + # convert to ascii since librbd doesn't handle unicode + try: + ascii_uri = str(uri) + except UnicodeError: + reason = _('URI contains non-ascii characters') + msg = (_("Invalid URI: %(uri)s: %(reason)s") % {'uri': uri, + 'reason': reason}) + LOG.debug(msg) + raise exception.BadStoreUri(message=reason) + pieces = ascii_uri[len(prefix):].split('/') + if len(pieces) == 1: + self.fsid, self.pool, self.image, self.snapshot = \ + (None, None, pieces[0], None) + elif len(pieces) == 4: + self.fsid, self.pool, self.image, self.snapshot = \ + map(urllib.unquote, pieces) + else: + reason = _('URI must have exactly 1 or 4 components') + msg = (_("Invalid URI: %(uri)s: %(reason)s") % {'uri': uri, + 'reason': reason}) + LOG.debug(msg) + raise exception.BadStoreUri(message=reason) + if any(map(lambda p: p == '', pieces)): + reason = _('URI cannot contain empty components') + msg = (_("Invalid URI: %(uri)s: %(reason)s") % {'uri': uri, + 'reason': reason}) + LOG.debug(msg) + raise exception.BadStoreUri(message=reason) + + +class ImageIterator(object): + """ + Reads data from an RBD image, one chunk at a time. + """ + + def __init__(self, name, store): + self.name = name + self.pool = store.pool + self.user = store.user + self.conf_file = store.conf_file + self.chunk_size = store.chunk_size + + def __iter__(self): + try: + with rados.Rados(conffile=self.conf_file, + rados_id=self.user) as conn: + with conn.open_ioctx(self.pool) as ioctx: + with rbd.Image(ioctx, self.name) as image: + img_info = image.stat() + size = img_info['size'] + bytes_left = size + while bytes_left > 0: + length = min(self.chunk_size, bytes_left) + data = image.read(size - bytes_left, length) + bytes_left -= len(data) + yield data + raise StopIteration() + except rbd.ImageNotFound: + raise exception.NotFound( + _('RBD image %s does not exist') % self.name) + + +class Store(glance.store.base.Store): + """An implementation of the RBD backend adapter.""" + + EXAMPLE_URL = "rbd://///" + + def get_schemes(self): + return ('rbd',) + + def configure_add(self): + """ + Configure the Store to use the stored configuration options + Any store that needs special configuration should implement + this method. If the store was not able to successfully configure + itself, it should raise `exception.BadStoreConfiguration` + """ + try: + self.chunk_size = CONF.rbd_store_chunk_size * units.Mi + + # these must not be unicode since they will be passed to a + # non-unicode-aware C library + self.pool = str(CONF.rbd_store_pool) + self.user = str(CONF.rbd_store_user) + self.conf_file = str(CONF.rbd_store_ceph_conf) + except cfg.ConfigFileValueError as e: + reason = _("Error in store configuration: %s") % e + LOG.error(reason) + raise exception.BadStoreConfiguration(store_name='rbd', + reason=reason) + + def get(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns a tuple of generator + (for reading the image file) and image_size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + loc = location.store_location + return (ImageIterator(loc.image, self), self.get_size(location)) + + def get_size(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns the size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + loc = location.store_location + with rados.Rados(conffile=self.conf_file, + rados_id=self.user) as conn: + with conn.open_ioctx(self.pool) as ioctx: + try: + with rbd.Image(ioctx, loc.image, + snapshot=loc.snapshot) as image: + img_info = image.stat() + return img_info['size'] + except rbd.ImageNotFound: + msg = _('RBD image %s does not exist') % loc.get_uri() + LOG.debug(msg) + raise exception.NotFound(msg) + + def _create_image(self, fsid, ioctx, image_name, size, order): + """ + Create an rbd image. If librbd supports it, + make it a cloneable snapshot, so that copy-on-write + volumes can be created from it. + + :param image_name Image's name + + :retval `glance.store.rbd.StoreLocation` object + """ + librbd = rbd.RBD() + if hasattr(rbd, 'RBD_FEATURE_LAYERING'): + librbd.create(ioctx, image_name, size, order, old_format=False, + features=rbd.RBD_FEATURE_LAYERING) + return StoreLocation({ + 'fsid': fsid, + 'pool': self.pool, + 'image': image_name, + 'snapshot': DEFAULT_SNAPNAME, + }) + else: + librbd.create(ioctx, image_name, size, order, old_format=True) + return StoreLocation({'image': image_name}) + + def _delete_image(self, image_name, snapshot_name=None): + """ + Delete RBD image and snapshot. + + :param image_name Image's name + :param snapshot_name Image snapshot's name + + :raises NotFound if image does not exist; + InUseByStore if image is in use or snapshot unprotect failed + """ + with rados.Rados(conffile=self.conf_file, rados_id=self.user) as conn: + with conn.open_ioctx(self.pool) as ioctx: + try: + # First remove snapshot. + if snapshot_name is not None: + with rbd.Image(ioctx, image_name) as image: + try: + image.unprotect_snap(snapshot_name) + except rbd.ImageBusy: + log_msg = _("snapshot %(image)s@%(snap)s " + "could not be unprotected because " + "it is in use") + LOG.debug(log_msg % + {'image': image_name, + 'snap': snapshot_name}) + raise exception.InUseByStore() + image.remove_snap(snapshot_name) + + # Then delete image. + rbd.RBD().remove(ioctx, image_name) + except rbd.ImageNotFound: + raise exception.NotFound( + _("RBD image %s does not exist") % image_name) + except rbd.ImageBusy: + log_msg = _("image %s could not be removed " + "because it is in use") + LOG.debug(log_msg % image_name) + raise exception.InUseByStore() + + def add(self, image_id, image_file, image_size): + """ + Stores an image file with supplied identifier to the backend + storage system and returns a tuple containing information + about the stored image. + + :param image_id: The opaque image identifier + :param image_file: The image data to write, as a file-like object + :param image_size: The size of the image data to write, in bytes + + :retval tuple of URL in backing store, bytes written, checksum + and a dictionary with storage system specific information + :raises `glance.common.exception.Duplicate` if the image already + existed + """ + checksum = hashlib.md5() + image_name = str(image_id) + with rados.Rados(conffile=self.conf_file, rados_id=self.user) as conn: + fsid = None + if hasattr(conn, 'get_fsid'): + fsid = conn.get_fsid() + with conn.open_ioctx(self.pool) as ioctx: + order = int(math.log(self.chunk_size, 2)) + LOG.debug('creating image %s with order %d and size %d', + image_name, order, image_size) + if image_size == 0: + LOG.warning(_("since image size is zero we will be doing " + "resize-before-write for each chunk which " + "will be considerably slower than normal")) + + try: + loc = self._create_image(fsid, ioctx, image_name, + image_size, order) + except rbd.ImageExists: + raise exception.Duplicate( + _('RBD image %s already exists') % image_id) + try: + with rbd.Image(ioctx, image_name) as image: + bytes_written = 0 + offset = 0 + chunks = utils.chunkreadable(image_file, + self.chunk_size) + for chunk in chunks: + # If the image size provided is zero we need to do + # a resize for the amount we are writing. This will + # be slower so setting a higher chunk size may + # speed things up a bit. + if image_size == 0: + chunk_length = len(chunk) + length = offset + chunk_length + bytes_written += chunk_length + LOG.debug(_("resizing image to %s KiB") % + (length / units.Ki)) + image.resize(length) + LOG.debug(_("writing chunk at offset %s") % + (offset)) + offset += image.write(chunk, offset) + checksum.update(chunk) + if loc.snapshot: + image.create_snap(loc.snapshot) + image.protect_snap(loc.snapshot) + except Exception as exc: + # Delete image if one was created + try: + self._delete_image(loc.image, loc.snapshot) + except exception.NotFound: + pass + + raise exc + + # Make sure we send back the image size whether provided or inferred. + if image_size == 0: + image_size = bytes_written + + return (loc.get_uri(), image_size, checksum.hexdigest(), {}) + + def delete(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file to delete. + + :location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + + :raises NotFound if image does not exist; + InUseByStore if image is in use or snapshot unprotect failed + """ + loc = location.store_location + self._delete_image(loc.image, loc.snapshot) diff --git a/glance/store/s3.py b/glance/store/s3.py new file mode 100644 index 00000000..59ee0306 --- /dev/null +++ b/glance/store/s3.py @@ -0,0 +1,542 @@ +# Copyright 2010 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Storage backend for S3 or Storage Servers that follow the S3 Protocol""" + +import hashlib +import httplib +import re +import tempfile +import urlparse + +from oslo.config import cfg + +from glance.common import exception +from glance.common import utils +import glance.openstack.common.log as logging +import glance.store +import glance.store.base +import glance.store.location + +LOG = logging.getLogger(__name__) + +s3_opts = [ + cfg.StrOpt('s3_store_host', + help=_('The host where the S3 server is listening.')), + cfg.StrOpt('s3_store_access_key', secret=True, + help=_('The S3 query token access key.')), + cfg.StrOpt('s3_store_secret_key', secret=True, + help=_('The S3 query token secret key.')), + cfg.StrOpt('s3_store_bucket', + help=_('The S3 bucket to be used to store the Glance data.')), + cfg.StrOpt('s3_store_object_buffer_dir', + help=_('The local directory where uploads will be staged ' + 'before they are transferred into S3.')), + cfg.BoolOpt('s3_store_create_bucket_on_put', default=False, + help=_('A boolean to determine if the S3 bucket should be ' + 'created on upload if it does not exist or if ' + 'an error should be returned to the user.')), + cfg.StrOpt('s3_store_bucket_url_format', default='subdomain', + help=_('The S3 calling format used to determine the bucket. ' + 'Either subdomain or path can be used.')), +] + +CONF = cfg.CONF +CONF.register_opts(s3_opts) + + +class StoreLocation(glance.store.location.StoreLocation): + + """ + Class describing an S3 URI. An S3 URI can look like any of + the following: + + s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id + s3+http://accesskey:secretkey@s3.amazonaws.com/bucket/key-id + s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id + + The s3+https:// URIs indicate there is an HTTPS s3service URL + """ + + def process_specs(self): + self.scheme = self.specs.get('scheme', 's3') + self.accesskey = self.specs.get('accesskey') + self.secretkey = self.specs.get('secretkey') + s3_host = self.specs.get('s3serviceurl') + self.bucket = self.specs.get('bucket') + self.key = self.specs.get('key') + + if s3_host.startswith('https://'): + self.scheme = 's3+https' + s3_host = s3_host[8:].strip('/') + elif s3_host.startswith('http://'): + s3_host = s3_host[7:].strip('/') + self.s3serviceurl = s3_host.strip('/') + + def _get_credstring(self): + if self.accesskey: + return '%s:%s@' % (self.accesskey, self.secretkey) + return '' + + def get_uri(self): + return "%s://%s%s/%s/%s" % ( + self.scheme, + self._get_credstring(), + self.s3serviceurl, + self.bucket, + self.key) + + def parse_uri(self, uri): + """ + Parse URLs. This method fixes an issue where credentials specified + in the URL are interpreted differently in Python 2.6.1+ than prior + versions of Python. + + Note that an Amazon AWS secret key can contain the forward slash, + which is entirely retarded, and breaks urlparse miserably. + This function works around that issue. + """ + # Make sure that URIs that contain multiple schemes, such as: + # s3://accesskey:secretkey@https://s3.amazonaws.com/bucket/key-id + # are immediately rejected. + if uri.count('://') != 1: + reason = _("URI cannot contain more than one occurrence " + "of a scheme. If you have specified a URI like " + "s3://accesskey:secretkey@" + "https://s3.amazonaws.com/bucket/key-id" + ", you need to change it to use the " + "s3+https:// scheme, like so: " + "s3+https://accesskey:secretkey@" + "s3.amazonaws.com/bucket/key-id") + LOG.debug(_("Invalid store uri: %s") % reason) + raise exception.BadStoreUri(message=reason) + + pieces = urlparse.urlparse(uri) + assert pieces.scheme in ('s3', 's3+http', 's3+https') + self.scheme = pieces.scheme + path = pieces.path.strip('/') + netloc = pieces.netloc.strip('/') + entire_path = (netloc + '/' + path).strip('/') + + if '@' in uri: + creds, path = entire_path.split('@') + cred_parts = creds.split(':') + + try: + access_key = cred_parts[0] + secret_key = cred_parts[1] + # NOTE(jaypipes): Need to encode to UTF-8 here because of a + # bug in the HMAC library that boto uses. + # See: http://bugs.python.org/issue5285 + # See: http://trac.edgewall.org/ticket/8083 + access_key = access_key.encode('utf-8') + secret_key = secret_key.encode('utf-8') + self.accesskey = access_key + self.secretkey = secret_key + except IndexError: + reason = _("Badly formed S3 credentials %s") % creds + LOG.debug(reason) + raise exception.BadStoreUri() + else: + self.accesskey = None + path = entire_path + try: + path_parts = path.split('/') + self.key = path_parts.pop() + self.bucket = path_parts.pop() + if path_parts: + self.s3serviceurl = '/'.join(path_parts).strip('/') + else: + reason = _("Badly formed S3 URI. Missing s3 service URL.") + raise exception.BadStoreUri() + except IndexError: + reason = _("Badly formed S3 URI: %s") % uri + LOG.debug(reason) + raise exception.BadStoreUri() + + +class ChunkedFile(object): + + """ + We send this back to the Glance API server as + something that can iterate over a ``boto.s3.key.Key`` + """ + + CHUNKSIZE = 65536 + + def __init__(self, fp): + self.fp = fp + + def __iter__(self): + """Return an iterator over the image file""" + try: + if self.fp: + while True: + chunk = self.fp.read(ChunkedFile.CHUNKSIZE) + if chunk: + yield chunk + else: + break + finally: + self.close() + + def getvalue(self): + """Return entire string value... used in testing.""" + data = "" + self.len = 0 + for chunk in self: + read_bytes = len(chunk) + data = data + chunk + self.len = self.len + read_bytes + return data + + def close(self): + """Close the internal file pointer.""" + if self.fp: + self.fp.close() + self.fp = None + + +class Store(glance.store.base.Store): + """An implementation of the s3 adapter.""" + + EXAMPLE_URL = "s3://:@//" + + def get_schemes(self): + return ('s3', 's3+http', 's3+https') + + def configure_add(self): + """ + Configure the Store to use the stored configuration options + Any store that needs special configuration should implement + this method. If the store was not able to successfully configure + itself, it should raise `exception.BadStoreConfiguration` + """ + self.s3_host = self._option_get('s3_store_host') + access_key = self._option_get('s3_store_access_key') + secret_key = self._option_get('s3_store_secret_key') + # NOTE(jaypipes): Need to encode to UTF-8 here because of a + # bug in the HMAC library that boto uses. + # See: http://bugs.python.org/issue5285 + # See: http://trac.edgewall.org/ticket/8083 + self.access_key = access_key.encode('utf-8') + self.secret_key = secret_key.encode('utf-8') + self.bucket = self._option_get('s3_store_bucket') + + self.scheme = 's3' + if self.s3_host.startswith('https://'): + self.scheme = 's3+https' + self.full_s3_host = self.s3_host + elif self.s3_host.startswith('http://'): + self.full_s3_host = self.s3_host + else: # Defaults http + self.full_s3_host = 'http://' + self.s3_host + + self.s3_store_object_buffer_dir = CONF.s3_store_object_buffer_dir + + def _option_get(self, param): + result = getattr(CONF, param) + if not result: + reason = (_("Could not find %(param)s in configuration " + "options.") % {'param': param}) + LOG.debug(reason) + raise exception.BadStoreConfiguration(store_name="s3", + reason=reason) + return result + + def get(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns a tuple of generator + (for reading the image file) and image_size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + key = self._retrieve_key(location) + + key.BufferSize = self.CHUNKSIZE + + class ChunkedIndexable(glance.store.Indexable): + def another(self): + return (self.wrapped.fp.read(ChunkedFile.CHUNKSIZE) + if self.wrapped.fp else None) + + return (ChunkedIndexable(ChunkedFile(key), key.size), key.size) + + def get_size(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns the image_size (or 0 + if unavailable) + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + """ + try: + key = self._retrieve_key(location) + return key.size + except Exception: + return 0 + + def _retrieve_key(self, location): + loc = location.store_location + from boto.s3.connection import S3Connection + + s3_conn = S3Connection(loc.accesskey, loc.secretkey, + host=loc.s3serviceurl, + is_secure=(loc.scheme == 's3+https'), + calling_format=get_calling_format()) + bucket_obj = get_bucket(s3_conn, loc.bucket) + + key = get_key(bucket_obj, loc.key) + + msg = _("Retrieved image object from S3 using (s3_host=%(s3_host)s, " + "access_key=%(accesskey)s, bucket=%(bucket)s, " + "key=%(obj_name)s)") % ({'s3_host': loc.s3serviceurl, + 'accesskey': loc.accesskey, + 'bucket': loc.bucket, + 'obj_name': loc.key}) + LOG.debug(msg) + + return key + + def add(self, image_id, image_file, image_size): + """ + Stores an image file with supplied identifier to the backend + storage system and returns a tuple containing information + about the stored image. + + :param image_id: The opaque image identifier + :param image_file: The image data to write, as a file-like object + :param image_size: The size of the image data to write, in bytes + + :retval tuple of URL in backing store, bytes written, checksum + and a dictionary with storage system specific information + :raises `glance.common.exception.Duplicate` if the image already + existed + + S3 writes the image data using the scheme: + s3://:@// + where: + = ``s3_store_user`` + = ``s3_store_key`` + = ``s3_store_host`` + = ``s3_store_bucket`` + = The id of the image being added + """ + from boto.s3.connection import S3Connection + + loc = StoreLocation({'scheme': self.scheme, + 'bucket': self.bucket, + 'key': image_id, + 's3serviceurl': self.full_s3_host, + 'accesskey': self.access_key, + 'secretkey': self.secret_key}) + + s3_conn = S3Connection(loc.accesskey, loc.secretkey, + host=loc.s3serviceurl, + is_secure=(loc.scheme == 's3+https'), + calling_format=get_calling_format()) + + create_bucket_if_missing(self.bucket, s3_conn) + + bucket_obj = get_bucket(s3_conn, self.bucket) + obj_name = str(image_id) + + def _sanitize(uri): + return re.sub('//.*:.*@', + '//s3_store_secret_key:s3_store_access_key@', + uri) + + key = bucket_obj.get_key(obj_name) + if key and key.exists(): + raise exception.Duplicate(_("S3 already has an image at " + "location %s") % + _sanitize(loc.get_uri())) + + msg = _("Adding image object to S3 using (s3_host=%(s3_host)s, " + "access_key=%(access_key)s, bucket=%(bucket)s, " + "key=%(obj_name)s)") % ({'s3_host': self.s3_host, + 'access_key': self.access_key, + 'bucket': self.bucket, + 'obj_name': obj_name}) + LOG.debug(msg) + + key = bucket_obj.new_key(obj_name) + + # We need to wrap image_file, which is a reference to the + # webob.Request.body_file, with a seekable file-like object, + # otherwise the call to set_contents_from_file() will die + # with an error about Input object has no method 'seek'. We + # might want to call webob.Request.make_body_seekable(), but + # unfortunately, that method copies the entire image into + # memory and results in LP Bug #818292 occurring. So, here + # we write temporary file in as memory-efficient manner as + # possible and then supply the temporary file to S3. We also + # take this opportunity to calculate the image checksum while + # writing the tempfile, so we don't need to call key.compute_md5() + + msg = _("Writing request body file to temporary file " + "for %s") % _sanitize(loc.get_uri()) + LOG.debug(msg) + + tmpdir = self.s3_store_object_buffer_dir + temp_file = tempfile.NamedTemporaryFile(dir=tmpdir) + checksum = hashlib.md5() + for chunk in utils.chunkreadable(image_file, self.CHUNKSIZE): + checksum.update(chunk) + temp_file.write(chunk) + temp_file.flush() + + msg = (_("Uploading temporary file to S3 for %s") % + _sanitize(loc.get_uri())) + LOG.debug(msg) + + # OK, now upload the data into the key + key.set_contents_from_file(open(temp_file.name, 'r+b'), replace=False) + size = key.size + checksum_hex = checksum.hexdigest() + + LOG.debug(_("Wrote %(size)d bytes to S3 key named %(obj_name)s " + "with checksum %(checksum_hex)s"), + {'size': size, 'obj_name': obj_name, + 'checksum_hex': checksum_hex}) + + return (loc.get_uri(), size, checksum_hex, {}) + + def delete(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file to delete + + :location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + + :raises NotFound if image does not exist + """ + loc = location.store_location + from boto.s3.connection import S3Connection + s3_conn = S3Connection(loc.accesskey, loc.secretkey, + host=loc.s3serviceurl, + is_secure=(loc.scheme == 's3+https'), + calling_format=get_calling_format()) + bucket_obj = get_bucket(s3_conn, loc.bucket) + + # Close the key when we're through. + key = get_key(bucket_obj, loc.key) + + msg = _("Deleting image object from S3 using (s3_host=%(s3_host)s, " + "access_key=%(accesskey)s, bucket=%(bucket)s, " + "key=%(obj_name)s)") % ({'s3_host': loc.s3serviceurl, + 'accesskey': loc.accesskey, + 'bucket': loc.bucket, + 'obj_name': loc.key}) + LOG.debug(msg) + + return key.delete() + + +def get_bucket(conn, bucket_id): + """ + Get a bucket from an s3 connection + + :param conn: The ``boto.s3.connection.S3Connection`` + :param bucket_id: ID of the bucket to fetch + :raises ``glance.exception.NotFound`` if bucket is not found. + """ + + bucket = conn.get_bucket(bucket_id) + if not bucket: + msg = _("Could not find bucket with ID %s") % bucket_id + LOG.debug(msg) + raise exception.NotFound(msg) + + return bucket + + +def get_s3_location(s3_host): + from boto.s3.connection import Location + locations = { + 's3.amazonaws.com': Location.DEFAULT, + 's3-eu-west-1.amazonaws.com': Location.EU, + 's3-us-west-1.amazonaws.com': Location.USWest, + 's3-ap-southeast-1.amazonaws.com': Location.APSoutheast, + 's3-ap-northeast-1.amazonaws.com': Location.APNortheast, + } + # strip off scheme and port if present + key = re.sub('^(https?://)?(?P[^:]+)(:[0-9]+)?$', + '\g', + s3_host) + return locations.get(key, Location.DEFAULT) + + +def create_bucket_if_missing(bucket, s3_conn): + """ + Creates a missing bucket in S3 if the + ``s3_store_create_bucket_on_put`` option is set. + + :param bucket: Name of bucket to create + :param s3_conn: Connection to S3 + """ + from boto.exception import S3ResponseError + try: + s3_conn.get_bucket(bucket) + except S3ResponseError as e: + if e.status == httplib.NOT_FOUND: + if CONF.s3_store_create_bucket_on_put: + location = get_s3_location(CONF.s3_store_host) + try: + s3_conn.create_bucket(bucket, location=location) + except S3ResponseError as e: + msg = (_("Failed to add bucket to S3.\n" + "Got error from S3: %(e)s") % {'e': e}) + raise glance.store.BackendException(msg) + else: + msg = (_("The bucket %(bucket)s does not exist in " + "S3. Please set the " + "s3_store_create_bucket_on_put option " + "to add bucket to S3 automatically.") + % {'bucket': bucket}) + raise glance.store.BackendException(msg) + + +def get_key(bucket, obj): + """ + Get a key from a bucket + + :param bucket: The ``boto.s3.Bucket`` + :param obj: Object to get the key for + :raises ``glance.exception.NotFound`` if key is not found. + """ + + key = bucket.get_key(obj) + if not key or not key.exists(): + msg = (_("Could not find key %(obj)s in bucket %(bucket)s") % + {'obj': obj, 'bucket': bucket}) + LOG.debug(msg) + raise exception.NotFound(msg) + return key + + +def get_calling_format(bucket_format=None): + import boto.s3.connection + if bucket_format is None: + bucket_format = CONF.s3_store_bucket_url_format + if bucket_format.lower() == 'path': + return boto.s3.connection.OrdinaryCallingFormat() + else: + return boto.s3.connection.SubdomainCallingFormat() diff --git a/glance/store/scrubber.py b/glance/store/scrubber.py new file mode 100644 index 00000000..547a07a0 --- /dev/null +++ b/glance/store/scrubber.py @@ -0,0 +1,523 @@ +# Copyright 2010 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import calendar +import eventlet +import os +import time + +from oslo.config import cfg + +from glance.common import crypt +from glance.common import exception +from glance.common import utils +from glance import context +from glance.openstack.common import lockutils +import glance.openstack.common.log as logging +import glance.registry.client.v1.api as registry + +LOG = logging.getLogger(__name__) + +scrubber_opts = [ + cfg.StrOpt('scrubber_datadir', + default='/var/lib/glance/scrubber', + help=_('Directory that the scrubber will use to track ' + 'information about what to delete. ' + 'Make sure this is set in glance-api.conf and ' + 'glance-scrubber.conf')), + cfg.IntOpt('scrub_time', default=0, + help=_('The amount of time in seconds to delay before ' + 'performing a delete.')), + cfg.BoolOpt('cleanup_scrubber', default=False, + help=_('A boolean that determines if the scrubber should ' + 'clean up the files it uses for taking data. Only ' + 'one server in your deployment should be designated ' + 'the cleanup host.')), + cfg.IntOpt('cleanup_scrubber_time', default=86400, + help=_('Items must have a modified time that is older than ' + 'this value in order to be candidates for cleanup.')) +] + +CONF = cfg.CONF +CONF.register_opts(scrubber_opts) +CONF.import_opt('metadata_encryption_key', 'glance.common.config') + + +class ScrubQueue(object): + """Image scrub queue base class. + + The queue contains image's location which need to delete from backend. + """ + def __init__(self): + registry.configure_registry_client() + registry.configure_registry_admin_creds() + self.registry = registry.get_registry_client(context.RequestContext()) + + @abc.abstractmethod + def add_location(self, image_id, uri): + """Adding image location to scrub queue. + + :param image_id: The opaque image identifier + :param uri: The opaque image location uri + """ + pass + + @abc.abstractmethod + def get_all_locations(self): + """Returns a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + pass + + @abc.abstractmethod + def pop_all_locations(self): + """Pop out a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + pass + + @abc.abstractmethod + def has_image(self, image_id): + """Returns whether the queue contains an image or not. + :param image_id: The opaque image identifier + + :retval a boolean value to inform including or not + """ + pass + + +class ScrubFileQueue(ScrubQueue): + """File-based image scrub queue class.""" + def __init__(self): + super(ScrubFileQueue, self).__init__() + self.scrubber_datadir = CONF.scrubber_datadir + utils.safe_mkdirs(self.scrubber_datadir) + self.scrub_time = CONF.scrub_time + self.metadata_encryption_key = CONF.metadata_encryption_key + + def _read_queue_file(self, file_path): + """Reading queue file to loading deleted location and timestamp out. + + :param file_path: Queue file full path + + :retval a list of image location timestamp tuple from queue file + """ + uris = [] + delete_times = [] + + try: + with open(file_path, 'r') as f: + while True: + uri = f.readline().strip() + if uri: + uris.append(uri) + delete_times.append(int(f.readline().strip())) + else: + break + except Exception: + LOG.error(_("%s file can not be read.") % file_path) + + return uris, delete_times + + def _update_queue_file(self, file_path, remove_record_idxs): + """Updating queue file to remove such queue records. + + :param file_path: Queue file full path + :param remove_record_idxs: A list of record index those want to remove + """ + try: + with open(file_path, 'r') as f: + lines = f.readlines() + # NOTE(zhiyan) we need bottom up removing to + # keep record index be valid. + remove_record_idxs.sort(reverse=True) + for record_idx in remove_record_idxs: + # Each record has two lines + line_no = (record_idx + 1) * 2 - 1 + del lines[line_no:line_no + 2] + with open(file_path, 'w') as f: + f.write(''.join(lines)) + os.chmod(file_path, 0o600) + except Exception: + LOG.error(_("%s file can not be wrote.") % file_path) + + def add_location(self, image_id, uri): + """Adding image location to scrub queue. + + :param image_id: The opaque image identifier + :param uri: The opaque image location uri + """ + with lockutils.lock("scrubber-%s" % image_id, + lock_file_prefix='glance-', external=True): + + # NOTE(zhiyan): make sure scrubber does not cleanup + # 'pending_delete' images concurrently before the code + # get lock and reach here. + try: + image = self.registry.get_image(image_id) + if image['status'] == 'deleted': + return + except exception.NotFound as e: + LOG.error(_("Failed to find image to delete: " + "%(e)s"), {'e': e}) + return + + delete_time = time.time() + self.scrub_time + file_path = os.path.join(self.scrubber_datadir, str(image_id)) + + if self.metadata_encryption_key is not None: + uri = crypt.urlsafe_encrypt(self.metadata_encryption_key, + uri, 64) + + if os.path.exists(file_path): + # Append the uri of location to the queue file + with open(file_path, 'a') as f: + f.write('\n') + f.write('\n'.join([uri, str(int(delete_time))])) + else: + # NOTE(zhiyan): Protect the file before we write any data. + open(file_path, 'w').close() + os.chmod(file_path, 0o600) + with open(file_path, 'w') as f: + f.write('\n'.join([uri, str(int(delete_time))])) + os.utime(file_path, (delete_time, delete_time)) + + def _walk_all_locations(self, remove=False): + """Returns a list of image id and location tuple from scrub queue. + + :param remove: Whether remove location from queue or not after walk + + :retval a list of image image_id and location tuple from scrub queue + """ + if not os.path.exists(self.scrubber_datadir): + LOG.info(_("%s directory does not exist.") % self.scrubber_datadir) + return [] + + ret = [] + for root, dirs, files in os.walk(self.scrubber_datadir): + for image_id in files: + if not utils.is_uuid_like(image_id): + continue + with lockutils.lock("scrubber-%s" % image_id, + lock_file_prefix='glance-', external=True): + file_path = os.path.join(self.scrubber_datadir, image_id) + uris, delete_times = self._read_queue_file(file_path) + + remove_record_idxs = [] + skipped = False + for (record_idx, delete_time) in enumerate(delete_times): + if delete_time > time.time(): + skipped = True + continue + else: + ret.append((image_id, uris[record_idx])) + remove_record_idxs.append(record_idx) + if remove: + if skipped: + # NOTE(zhiyan): remove location records from + # the queue file. + self._update_queue_file(file_path, + remove_record_idxs) + else: + utils.safe_remove(file_path) + return ret + + def get_all_locations(self): + """Returns a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + return self._walk_all_locations() + + def pop_all_locations(self): + """Pop out a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + return self._walk_all_locations(remove=True) + + def has_image(self, image_id): + """Returns whether the queue contains an image or not. + + :param image_id: The opaque image identifier + + :retval a boolean value to inform including or not + """ + return os.path.exists(os.path.join(self.scrubber_datadir, + str(image_id))) + + +class ScrubDBQueue(ScrubQueue): + """Database-based image scrub queue class.""" + def __init__(self): + super(ScrubDBQueue, self).__init__() + self.cleanup_scrubber_time = CONF.cleanup_scrubber_time + + def add_location(self, image_id, uri): + """Adding image location to scrub queue. + + :param image_id: The opaque image identifier + :param uri: The opaque image location uri + """ + raise NotImplementedError + + def _walk_all_locations(self, remove=False): + """Returns a list of image id and location tuple from scrub queue. + + :param remove: Whether remove location from queue or not after walk + + :retval a list of image id and location tuple from scrub queue + """ + filters = {'deleted': True, + 'is_public': 'none', + 'status': 'pending_delete'} + ret = [] + for image in self.registry.get_images_detailed(filters=filters): + deleted_at = image.get('deleted_at') + if not deleted_at: + continue + + # NOTE: Strip off microseconds which may occur after the last '.,' + # Example: 2012-07-07T19:14:34.974216 + date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0] + delete_time = calendar.timegm(time.strptime(date_str, + "%Y-%m-%dT%H:%M:%S")) + + if delete_time + self.cleanup_scrubber_time > time.time(): + continue + + ret.extend([(image['id'], location['uri']) + for location in image['location_data']]) + + if remove: + self.registry.update_image(image['id'], {'status': 'deleted'}) + return ret + + def get_all_locations(self): + """Returns a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + return self._walk_all_locations() + + def pop_all_locations(self): + """Pop out a list of image id and location tuple from scrub queue. + + :retval a list of image id and location tuple from scrub queue + """ + return self._walk_all_locations(remove=True) + + def has_image(self, image_id): + """Returns whether the queue contains an image or not. + + :param image_id: The opaque image identifier + + :retval a boolean value to inform including or not + """ + try: + image = self.registry.get_image(image_id) + return image['status'] == 'pending_delete' + except exception.NotFound as e: + return False + + +_file_queue = None +_db_queue = None + + +def get_scrub_queues(): + global _file_queue, _db_queue + if not _file_queue: + _file_queue = ScrubFileQueue() + if not _db_queue: + _db_queue = ScrubDBQueue() + return (_file_queue, _db_queue) + + +class Daemon(object): + def __init__(self, wakeup_time=300, threads=1000): + LOG.info(_("Starting Daemon: wakeup_time=%(wakeup_time)s " + "threads=%(threads)s"), + {'wakeup_time': wakeup_time, 'threads': threads}) + self.wakeup_time = wakeup_time + self.event = eventlet.event.Event() + self.pool = eventlet.greenpool.GreenPool(threads) + + def start(self, application): + self._run(application) + + def wait(self): + try: + self.event.wait() + except KeyboardInterrupt: + msg = _("Daemon Shutdown on KeyboardInterrupt") + LOG.info(msg) + + def _run(self, application): + LOG.debug(_("Running application")) + self.pool.spawn_n(application.run, self.pool, self.event) + eventlet.spawn_after(self.wakeup_time, self._run, application) + LOG.debug(_("Next run scheduled in %s seconds") % self.wakeup_time) + + +class Scrubber(object): + def __init__(self, store_api): + LOG.info(_("Initializing scrubber with configuration: %s") % + unicode({'scrubber_datadir': CONF.scrubber_datadir, + 'cleanup': CONF.cleanup_scrubber, + 'cleanup_time': CONF.cleanup_scrubber_time, + 'registry_host': CONF.registry_host, + 'registry_port': CONF.registry_port})) + + utils.safe_mkdirs(CONF.scrubber_datadir) + + self.store_api = store_api + + registry.configure_registry_client() + registry.configure_registry_admin_creds() + self.registry = registry.get_registry_client(context.RequestContext()) + + (self.file_queue, self.db_queue) = get_scrub_queues() + + def _get_delete_jobs(self, queue, pop): + try: + if pop: + image_id_uri_list = queue.pop_all_locations() + else: + image_id_uri_list = queue.get_all_locations() + except Exception: + LOG.error(_("Can not %s scrub jobs from queue.") % + 'pop' if pop else 'get') + return None + + delete_jobs = {} + for image_id, image_uri in image_id_uri_list: + if image_id not in delete_jobs: + delete_jobs[image_id] = [] + delete_jobs[image_id].append((image_id, image_uri)) + return delete_jobs + + def run(self, pool, event=None): + delete_jobs = self._get_delete_jobs(self.file_queue, True) + if delete_jobs: + for image_id, jobs in delete_jobs.iteritems(): + self._scrub_image(pool, image_id, jobs) + + if CONF.cleanup_scrubber: + self._cleanup(pool) + + def _scrub_image(self, pool, image_id, delete_jobs): + if len(delete_jobs) == 0: + return + + LOG.info(_("Scrubbing image %(id)s from %(count)d locations.") % + {'id': image_id, 'count': len(delete_jobs)}) + # NOTE(bourke): The starmap must be iterated to do work + list(pool.starmap(self._delete_image_from_backend, delete_jobs)) + + image = self.registry.get_image(image_id) + if (image['status'] == 'pending_delete' and + not self.file_queue.has_image(image_id)): + self.registry.update_image(image_id, {'status': 'deleted'}) + + def _delete_image_from_backend(self, image_id, uri): + if CONF.metadata_encryption_key is not None: + uri = crypt.urlsafe_decrypt(CONF.metadata_encryption_key, uri) + + try: + LOG.debug(_("Deleting URI from image %(image_id)s.") % + {'image_id': image_id}) + + # Here we create a request context with credentials to support + # delayed delete when using multi-tenant backend storage + admin_tenant = CONF.admin_tenant_name + auth_token = self.registry.auth_tok + admin_context = context.RequestContext(user=CONF.admin_user, + tenant=admin_tenant, + auth_tok=auth_token) + + self.store_api.delete_from_backend(admin_context, uri) + except Exception: + msg = _("Failed to delete URI from image %(image_id)s") + LOG.error(msg % {'image_id': image_id}) + + def _read_cleanup_file(self, file_path): + """Reading cleanup to get latest cleanup timestamp. + + :param file_path: Cleanup status file full path + + :retval latest cleanup timestamp + """ + try: + if not os.path.exists(file_path): + msg = _("%s file is not exists.") % unicode(file_path) + raise Exception(msg) + atime = int(os.path.getatime(file_path)) + mtime = int(os.path.getmtime(file_path)) + if atime != mtime: + msg = _("%s file contains conflicting cleanup " + "timestamp.") % unicode(file_path) + raise Exception(msg) + return atime + except Exception as e: + LOG.error(e) + return None + + def _update_cleanup_file(self, file_path, cleanup_time): + """Update latest cleanup timestamp to cleanup file. + + :param file_path: Cleanup status file full path + :param cleanup_time: The Latest cleanup timestamp + """ + try: + open(file_path, 'w').close() + os.chmod(file_path, 0o600) + os.utime(file_path, (cleanup_time, cleanup_time)) + except Exception: + LOG.error(_("%s file can not be created.") % unicode(file_path)) + + def _cleanup(self, pool): + now = time.time() + cleanup_file = os.path.join(CONF.scrubber_datadir, ".cleanup") + if not os.path.exists(cleanup_file): + self._update_cleanup_file(cleanup_file, now) + return + + last_cleanup_time = self._read_cleanup_file(cleanup_file) + cleanup_time = last_cleanup_time + CONF.cleanup_scrubber_time + if cleanup_time > now: + return + + LOG.info(_("Getting images deleted before " + "%s") % CONF.cleanup_scrubber_time) + self._update_cleanup_file(cleanup_file, now) + + delete_jobs = self._get_delete_jobs(self.db_queue, False) + if not delete_jobs: + return + + for image_id, jobs in delete_jobs.iteritems(): + with lockutils.lock("scrubber-%s" % image_id, + lock_file_prefix='glance-', external=True): + if not self.file_queue.has_image(image_id): + # NOTE(zhiyan): scrubber should not cleanup this image + # since a queue file be created for this 'pending_delete' + # image concurrently before the code get lock and + # reach here. The checking only be worth if glance-api and + # glance-scrubber service be deployed on a same host. + self._scrub_image(pool, image_id, jobs) diff --git a/glance/store/sheepdog.py b/glance/store/sheepdog.py new file mode 100644 index 00000000..11293f0e --- /dev/null +++ b/glance/store/sheepdog.py @@ -0,0 +1,307 @@ +# Copyright 2013 Taobao Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Storage backend for Sheepdog storage system""" + +import hashlib + +from oslo.config import cfg + +from glance.common import exception +from glance.openstack.common import excutils +import glance.openstack.common.log as logging +from glance.openstack.common import processutils +from glance.openstack.common import units +import glance.store +import glance.store.base +import glance.store.location + + +LOG = logging.getLogger(__name__) + +DEFAULT_ADDR = 'localhost' +DEFAULT_PORT = 7000 +DEFAULT_CHUNKSIZE = 64 # in MiB + +LOG = logging.getLogger(__name__) + +sheepdog_opts = [ + cfg.IntOpt('sheepdog_store_chunk_size', default=DEFAULT_CHUNKSIZE, + help=_('Images will be chunked into objects of this size ' + '(in megabytes). For best performance, this should be ' + 'a power of two.')), + cfg.IntOpt('sheepdog_store_port', default=DEFAULT_PORT, + help=_('Port of sheep daemon.')), + cfg.StrOpt('sheepdog_store_address', default=DEFAULT_ADDR, + help=_('IP address of sheep daemon.')) +] + +CONF = cfg.CONF +CONF.register_opts(sheepdog_opts) + + +class SheepdogImage: + """Class describing an image stored in Sheepdog storage.""" + + def __init__(self, addr, port, name, chunk_size): + self.addr = addr + self.port = port + self.name = name + self.chunk_size = chunk_size + + def _run_command(self, command, data, *params): + cmd = ("collie vdi %(command)s -a %(addr)s -p %(port)d %(name)s " + "%(params)s" % + {"command": command, + "addr": self.addr, + "port": self.port, + "name": self.name, + "params": " ".join(map(str, params))}) + + try: + return processutils.execute( + cmd, process_input=data, shell=True)[0] + except processutils.ProcessExecutionError as exc: + LOG.error(exc) + raise glance.store.BackendException(exc) + + def get_size(self): + """ + Return the size of the this iamge + + Sheepdog Usage: collie vdi list -r -a address -p port image + """ + out = self._run_command("list -r", None) + return long(out.split(' ')[3]) + + def read(self, offset, count): + """ + Read up to 'count' bytes from this image starting at 'offset' and + return the data. + + Sheepdog Usage: collie vdi read -a address -p port image offset len + """ + return self._run_command("read", None, str(offset), str(count)) + + def write(self, data, offset, count): + """ + Write up to 'count' bytes from the data to this image starting at + 'offset' + + Sheepdog Usage: collie vdi write -a address -p port image offset len + """ + self._run_command("write", data, str(offset), str(count)) + + def create(self, size): + """ + Create this image in the Sheepdog cluster with size 'size'. + + Sheepdog Usage: collie vdi create -a address -p port image size + """ + self._run_command("create", None, str(size)) + + def delete(self): + """ + Delete this image in the Sheepdog cluster + + Sheepdog Usage: collie vdi delete -a address -p port image + """ + self._run_command("delete", None) + + def exist(self): + """ + Check if this image exists in the Sheepdog cluster via 'list' command + + Sheepdog Usage: collie vdi list -r -a address -p port image + """ + out = self._run_command("list -r", None) + if not out: + return False + else: + return True + + +class StoreLocation(glance.store.location.StoreLocation): + """ + Class describing a Sheepdog URI. This is of the form: + + sheepdog://image + + """ + + def process_specs(self): + self.image = self.specs.get('image') + + def get_uri(self): + return "sheepdog://%s" % self.image + + def parse_uri(self, uri): + if not uri.startswith('sheepdog://'): + raise exception.BadStoreUri(uri, "URI must start with %s://" % + 'sheepdog') + self.image = uri[11:] + + +class ImageIterator(object): + """ + Reads data from an Sheepdog image, one chunk at a time. + """ + + def __init__(self, image): + self.image = image + + def __iter__(self): + image = self.image + total = left = image.get_size() + while left > 0: + length = min(image.chunk_size, left) + data = image.read(total - left, length) + left -= len(data) + yield data + raise StopIteration() + + +class Store(glance.store.base.Store): + """Sheepdog backend adapter.""" + + EXAMPLE_URL = "sheepdog://image" + + def get_schemes(self): + return ('sheepdog',) + + def configure_add(self): + """ + Configure the Store to use the stored configuration options + Any store that needs special configuration should implement + this method. If the store was not able to successfully configure + itself, it should raise `exception.BadStoreConfiguration` + """ + + try: + self.chunk_size = CONF.sheepdog_store_chunk_size * units.Mi + self.addr = CONF.sheepdog_store_address + self.port = CONF.sheepdog_store_port + except cfg.ConfigFileValueError as e: + reason = _("Error in store configuration: %s") % e + LOG.error(reason) + raise exception.BadStoreConfiguration(store_name='sheepdog', + reason=reason) + + try: + processutils.execute("collie", shell=True) + except processutils.ProcessExecutionError as exc: + reason = _("Error in store configuration: %s") % exc + LOG.error(reason) + raise exception.BadStoreConfiguration(store_name='sheepdog', + reason=reason) + + def get(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file, and returns a generator for reading + the image file + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + """ + + loc = location.store_location + image = SheepdogImage(self.addr, self.port, loc.image, + self.chunk_size) + if not image.exist(): + raise exception.NotFound(_("Sheepdog image %s does not exist") + % image.name) + return (ImageIterator(image), image.get_size()) + + def get_size(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file and returns the image size + + :param location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + :raises `glance.exception.NotFound` if image does not exist + :rtype int + """ + + loc = location.store_location + image = SheepdogImage(self.addr, self.port, loc.image, + self.chunk_size) + if not image.exist(): + raise exception.NotFound(_("Sheepdog image %s does not exist") + % image.name) + return image.get_size() + + def add(self, image_id, image_file, image_size): + """ + Stores an image file with supplied identifier to the backend + storage system and returns a tuple containing information + about the stored image. + + :param image_id: The opaque image identifier + :param image_file: The image data to write, as a file-like object + :param image_size: The size of the image data to write, in bytes + + :retval tuple of URL in backing store, bytes written, and checksum + :raises `glance.common.exception.Duplicate` if the image already + existed + """ + + image = SheepdogImage(self.addr, self.port, image_id, + self.chunk_size) + if image.exist(): + raise exception.Duplicate(_("Sheepdog image %s already exists") + % image_id) + + location = StoreLocation({'image': image_id}) + checksum = hashlib.md5() + + image.create(image_size) + + try: + total = left = image_size + while left > 0: + length = min(self.chunk_size, left) + data = image_file.read(length) + image.write(data, total - left, length) + left -= length + checksum.update(data) + except Exception: + # Note(zhiyan): clean up already received data when + # error occurs such as ImageSizeLimitExceeded exception. + with excutils.save_and_reraise_exception(): + image.delete() + + return (location.get_uri(), image_size, checksum.hexdigest(), {}) + + def delete(self, location): + """ + Takes a `glance.store.location.Location` object that indicates + where to find the image file to delete + + :location `glance.store.location.Location` object, supplied + from glance.store.location.get_location_from_uri() + + :raises NotFound if image does not exist + """ + + loc = location.store_location + image = SheepdogImage(self.addr, self.port, loc.image, + self.chunk_size) + if not image.exist(): + raise exception.NotFound(_("Sheepdog image %s does not exist") % + loc.image) + image.delete() diff --git a/glance/store/swift.py b/glance/store/swift.py new file mode 100644 index 00000000..ec67f83a --- /dev/null +++ b/glance/store/swift.py @@ -0,0 +1,687 @@ +# Copyright 2010-2011 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Storage backend for SWIFT""" + +from __future__ import absolute_import + +import hashlib +import httplib +import math +import urllib +import urlparse + +from oslo.config import cfg + +from glance.common import auth +from glance.common import exception +from glance.openstack.common import excutils +import glance.openstack.common.log as logging +import glance.store +import glance.store.base +import glance.store.location + +try: + import swiftclient +except ImportError: + pass + +LOG = logging.getLogger(__name__) + +DEFAULT_CONTAINER = 'glance' +DEFAULT_LARGE_OBJECT_SIZE = 5 * 1024 # 5GB +DEFAULT_LARGE_OBJECT_CHUNK_SIZE = 200 # 200M +ONE_MB = 1000 * 1024 + +swift_opts = [ + cfg.BoolOpt('swift_enable_snet', default=False, + help=_('Whether to use ServiceNET to communicate with the ' + 'Swift storage servers.')), + cfg.StrOpt('swift_store_auth_address', + help=_('The address where the Swift authentication service ' + 'is listening.')), + cfg.StrOpt('swift_store_user', secret=True, + help=_('The user to authenticate against the Swift ' + 'authentication service')), + cfg.StrOpt('swift_store_key', secret=True, + help=_('Auth key for the user authenticating against the ' + 'Swift authentication service.')), + cfg.StrOpt('swift_store_auth_version', default='2', + help=_('Version of the authentication service to use. ' + 'Valid versions are 2 for keystone and 1 for swauth ' + 'and rackspace')), + cfg.BoolOpt('swift_store_auth_insecure', default=False, + help=_('If True, swiftclient won\'t check for a valid SSL ' + 'certificate when authenticating.')), + cfg.StrOpt('swift_store_region', + help=_('The region of the swift endpoint to be used for ' + 'single tenant. This setting is only necessary if the ' + 'tenant has multiple swift endpoints.')), + cfg.StrOpt('swift_store_endpoint_type', default='publicURL', + help=_('A string giving the endpoint type of the swift ' + 'service to use (publicURL, adminURL or internalURL). ' + 'This setting is only used if swift_store_auth_version ' + 'is 2.')), + cfg.StrOpt('swift_store_service_type', default='object-store', + help=_('A string giving the service type of the swift service ' + 'to use. This setting is only used if ' + 'swift_store_auth_version is 2.')), + cfg.StrOpt('swift_store_container', + default=DEFAULT_CONTAINER, + help=_('Container within the account that the account should ' + 'use for storing images in Swift.')), + cfg.IntOpt('swift_store_large_object_size', + default=DEFAULT_LARGE_OBJECT_SIZE, + help=_('The size, in MB, that Glance will start chunking image ' + 'files and do a large object manifest in Swift')), + cfg.IntOpt('swift_store_large_object_chunk_size', + default=DEFAULT_LARGE_OBJECT_CHUNK_SIZE, + help=_('The amount of data written to a temporary disk buffer ' + 'during the process of chunking the image file.')), + cfg.BoolOpt('swift_store_create_container_on_put', default=False, + help=_('A boolean value that determines if we create the ' + 'container if it does not exist.')), + cfg.BoolOpt('swift_store_multi_tenant', default=False, + help=_('If set to True, enables multi-tenant storage ' + 'mode which causes Glance images to be stored in ' + 'tenant specific Swift accounts.')), + cfg.ListOpt('swift_store_admin_tenants', default=[], + help=_('A list of tenants that will be granted read/write ' + 'access on all Swift containers created by Glance in ' + 'multi-tenant mode.')), + cfg.BoolOpt('swift_store_ssl_compression', default=True, + help=_('If set to False, disables SSL layer compression of ' + 'https swift requests. Setting to False may improve ' + 'performance for images which are already in a ' + 'compressed format, eg qcow2.')), +] + +CONF = cfg.CONF +CONF.register_opts(swift_opts) + + +class StoreLocation(glance.store.location.StoreLocation): + + """ + Class describing a Swift URI. A Swift URI can look like any of + the following: + + swift://user:pass@authurl.com/container/obj-id + swift://account:user:pass@authurl.com/container/obj-id + swift+http://user:pass@authurl.com/container/obj-id + swift+https://user:pass@authurl.com/container/obj-id + + When using multi-tenant a URI might look like this (a storage URL): + + swift+https://example.com/container/obj-id + + The swift+http:// URIs indicate there is an HTTP authentication URL. + The default for Swift is an HTTPS authentication URL, so swift:// and + swift+https:// are the same... + """ + + def process_specs(self): + self.scheme = self.specs.get('scheme', 'swift+https') + self.user = self.specs.get('user') + self.key = self.specs.get('key') + self.auth_or_store_url = self.specs.get('auth_or_store_url') + self.container = self.specs.get('container') + self.obj = self.specs.get('obj') + + def _get_credstring(self): + if self.user and self.key: + return '%s:%s@' % (urllib.quote(self.user), urllib.quote(self.key)) + return '' + + def get_uri(self): + auth_or_store_url = self.auth_or_store_url + if auth_or_store_url.startswith('http://'): + auth_or_store_url = auth_or_store_url[len('http://'):] + elif auth_or_store_url.startswith('https://'): + auth_or_store_url = auth_or_store_url[len('https://'):] + + credstring = self._get_credstring() + auth_or_store_url = auth_or_store_url.strip('/') + container = self.container.strip('/') + obj = self.obj.strip('/') + + return '%s://%s%s/%s/%s' % (self.scheme, credstring, auth_or_store_url, + container, obj) + + def parse_uri(self, uri): + """ + Parse URLs. This method fixes an issue where credentials specified + in the URL are interpreted differently in Python 2.6.1+ than prior + versions of Python. It also deals with the peculiarity that new-style + Swift URIs have where a username can contain a ':', like so: + + swift://account:user:pass@authurl.com/container/obj + """ + # Make sure that URIs that contain multiple schemes, such as: + # swift://user:pass@http://authurl.com/v1/container/obj + # are immediately rejected. + if uri.count('://') != 1: + reason = _("URI cannot contain more than one occurrence " + "of a scheme. If you have specified a URI like " + "swift://user:pass@http://authurl.com/v1/container/obj" + ", you need to change it to use the " + "swift+http:// scheme, like so: " + "swift+http://user:pass@authurl.com/v1/container/obj") + LOG.debug(_("Invalid store URI: %(reason)s"), {'reason': reason}) + raise exception.BadStoreUri(message=reason) + + pieces = urlparse.urlparse(uri) + assert pieces.scheme in ('swift', 'swift+http', 'swift+https') + self.scheme = pieces.scheme + netloc = pieces.netloc + path = pieces.path.lstrip('/') + if netloc != '': + # > Python 2.6.1 + if '@' in netloc: + creds, netloc = netloc.split('@') + else: + creds = None + else: + # Python 2.6.1 compat + # see lp659445 and Python issue7904 + if '@' in path: + creds, path = path.split('@') + else: + creds = None + netloc = path[0:path.find('/')].strip('/') + path = path[path.find('/'):].strip('/') + if creds: + cred_parts = creds.split(':') + if len(cred_parts) != 2: + reason = (_("Badly formed credentials in Swift URI.")) + LOG.debug(reason) + raise exception.BadStoreUri() + user, key = cred_parts + self.user = urllib.unquote(user) + self.key = urllib.unquote(key) + else: + self.user = None + self.key = None + path_parts = path.split('/') + try: + self.obj = path_parts.pop() + self.container = path_parts.pop() + if not netloc.startswith('http'): + # push hostname back into the remaining to build full authurl + path_parts.insert(0, netloc) + self.auth_or_store_url = '/'.join(path_parts) + except IndexError: + reason = _("Badly formed Swift URI.") + LOG.debug(reason) + raise exception.BadStoreUri() + + @property + def swift_url(self): + """ + Creates a fully-qualified auth url that the Swift client library can + use. The scheme for the auth_url is determined using the scheme + included in the `location` field. + + HTTPS is assumed, unless 'swift+http' is specified. + """ + if self.auth_or_store_url.startswith('http'): + return self.auth_or_store_url + else: + if self.scheme in ('swift+https', 'swift'): + auth_scheme = 'https://' + else: + auth_scheme = 'http://' + + return ''.join([auth_scheme, self.auth_or_store_url]) + + +def Store(context=None, loc=None): + if (CONF.swift_store_multi_tenant and + (loc is None or loc.store_location.user is None)): + return MultiTenantStore(context, loc) + return SingleTenantStore(context, loc) + + +class BaseStore(glance.store.base.Store): + CHUNKSIZE = 65536 + + def get_schemes(self): + return ('swift+https', 'swift', 'swift+http') + + def configure(self): + _obj_size = self._option_get('swift_store_large_object_size') + self.large_object_size = _obj_size * ONE_MB + _chunk_size = self._option_get('swift_store_large_object_chunk_size') + self.large_object_chunk_size = _chunk_size * ONE_MB + self.admin_tenants = CONF.swift_store_admin_tenants + self.region = CONF.swift_store_region + self.service_type = CONF.swift_store_service_type + self.endpoint_type = CONF.swift_store_endpoint_type + self.snet = CONF.swift_enable_snet + self.insecure = CONF.swift_store_auth_insecure + self.ssl_compression = CONF.swift_store_ssl_compression + + def get(self, location, connection=None): + location = location.store_location + if not connection: + connection = self.get_connection(location) + + try: + resp_headers, resp_body = connection.get_object( + container=location.container, obj=location.obj, + resp_chunk_size=self.CHUNKSIZE) + except swiftclient.ClientException as e: + if e.http_status == httplib.NOT_FOUND: + msg = _("Swift could not find object %s.") % location.obj + LOG.warn(msg) + raise exception.NotFound(msg) + else: + raise + + class ResponseIndexable(glance.store.Indexable): + def another(self): + try: + return self.wrapped.next() + except StopIteration: + return '' + + length = int(resp_headers.get('content-length', 0)) + return (ResponseIndexable(resp_body, length), length) + + def get_size(self, location, connection=None): + location = location.store_location + if not connection: + connection = self.get_connection(location) + try: + resp_headers = connection.head_object( + container=location.container, obj=location.obj) + return int(resp_headers.get('content-length', 0)) + except Exception: + return 0 + + def _option_get(self, param): + result = getattr(CONF, param) + if not result: + reason = (_("Could not find %(param)s in configuration " + "options.") % {'param': param}) + LOG.error(reason) + raise exception.BadStoreConfiguration(store_name="swift", + reason=reason) + return result + + def _delete_stale_chunks(self, connection, container, chunk_list): + for chunk in chunk_list: + LOG.debug(_("Deleting chunk %s") % chunk) + try: + connection.delete_object(container, chunk) + except Exception: + msg = _("Failed to delete orphaned chunk %s/%s") + LOG.exception(msg, container, chunk) + + def add(self, image_id, image_file, image_size, connection=None): + location = self.create_location(image_id) + if not connection: + connection = self.get_connection(location) + + self._create_container_if_missing(location.container, connection) + + LOG.debug(_("Adding image object '%(obj_name)s' " + "to Swift") % dict(obj_name=location.obj)) + try: + if image_size > 0 and image_size < self.large_object_size: + # Image size is known, and is less than large_object_size. + # Send to Swift with regular PUT. + obj_etag = connection.put_object(location.container, + location.obj, image_file, + content_length=image_size) + else: + # Write the image into Swift in chunks. + chunk_id = 1 + if image_size > 0: + total_chunks = str(int( + math.ceil(float(image_size) / + float(self.large_object_chunk_size)))) + else: + # image_size == 0 is when we don't know the size + # of the image. This can occur with older clients + # that don't inspect the payload size. + LOG.debug(_("Cannot determine image size. Adding as a " + "segmented object to Swift.")) + total_chunks = '?' + + checksum = hashlib.md5() + written_chunks = [] + combined_chunks_size = 0 + while True: + chunk_size = self.large_object_chunk_size + if image_size == 0: + content_length = None + else: + left = image_size - combined_chunks_size + if left == 0: + break + if chunk_size > left: + chunk_size = left + content_length = chunk_size + + chunk_name = "%s-%05d" % (location.obj, chunk_id) + reader = ChunkReader(image_file, checksum, chunk_size) + try: + chunk_etag = connection.put_object( + location.container, chunk_name, reader, + content_length=content_length) + written_chunks.append(chunk_name) + except Exception: + # Delete orphaned segments from swift backend + with excutils.save_and_reraise_exception(): + LOG.exception(_("Error during chunked upload to " + "backend, deleting stale chunks")) + self._delete_stale_chunks(connection, + location.container, + written_chunks) + + bytes_read = reader.bytes_read + msg = (_("Wrote chunk %(chunk_name)s (%(chunk_id)d/" + "%(total_chunks)s) of length %(bytes_read)d " + "to Swift returning MD5 of content: " + "%(chunk_etag)s") % + {'chunk_name': chunk_name, + 'chunk_id': chunk_id, + 'total_chunks': total_chunks, + 'bytes_read': bytes_read, + 'chunk_etag': chunk_etag}) + LOG.debug(msg) + + if bytes_read == 0: + # Delete the last chunk, because it's of zero size. + # This will happen if size == 0. + LOG.debug(_("Deleting final zero-length chunk")) + connection.delete_object(location.container, + chunk_name) + break + + chunk_id += 1 + combined_chunks_size += bytes_read + + # In the case we have been given an unknown image size, + # set the size to the total size of the combined chunks. + if image_size == 0: + image_size = combined_chunks_size + + # Now we write the object manifest and return the + # manifest's etag... + manifest = "%s/%s-" % (location.container, location.obj) + headers = {'ETag': hashlib.md5("").hexdigest(), + 'X-Object-Manifest': manifest} + + # The ETag returned for the manifest is actually the + # MD5 hash of the concatenated checksums of the strings + # of each chunk...so we ignore this result in favour of + # the MD5 of the entire image file contents, so that + # users can verify the image file contents accordingly + connection.put_object(location.container, location.obj, + None, headers=headers) + obj_etag = checksum.hexdigest() + + # NOTE: We return the user and key here! Have to because + # location is used by the API server to return the actual + # image data. We *really* should consider NOT returning + # the location attribute from GET /images/ and + # GET /images/details + + return (location.get_uri(), image_size, obj_etag, {}) + except swiftclient.ClientException as e: + if e.http_status == httplib.CONFLICT: + raise exception.Duplicate(_("Swift already has an image at " + "this location")) + msg = (_("Failed to add object to Swift.\n" + "Got error from Swift: %(e)s") % {'e': e}) + LOG.error(msg) + raise glance.store.BackendException(msg) + + def delete(self, location, connection=None): + location = location.store_location + if not connection: + connection = self.get_connection(location) + + try: + # We request the manifest for the object. If one exists, + # that means the object was uploaded in chunks/segments, + # and we need to delete all the chunks as well as the + # manifest. + manifest = None + try: + headers = connection.head_object( + location.container, location.obj) + manifest = headers.get('x-object-manifest') + except swiftclient.ClientException as e: + if e.http_status != httplib.NOT_FOUND: + raise + if manifest: + # Delete all the chunks before the object manifest itself + obj_container, obj_prefix = manifest.split('/', 1) + segments = connection.get_container( + obj_container, prefix=obj_prefix)[1] + for segment in segments: + # TODO(jaypipes): This would be an easy area to parallelize + # since we're simply sending off parallelizable requests + # to Swift to delete stuff. It's not like we're going to + # be hogging up network or file I/O here... + connection.delete_object(obj_container, + segment['name']) + + # Delete object (or, in segmented case, the manifest) + connection.delete_object(location.container, location.obj) + + except swiftclient.ClientException as e: + if e.http_status == httplib.NOT_FOUND: + msg = _("Swift could not find image at URI.") + raise exception.NotFound(msg) + else: + raise + + def _create_container_if_missing(self, container, connection): + """ + Creates a missing container in Swift if the + ``swift_store_create_container_on_put`` option is set. + + :param container: Name of container to create + :param connection: Connection to swift service + """ + try: + connection.head_container(container) + except swiftclient.ClientException as e: + if e.http_status == httplib.NOT_FOUND: + if CONF.swift_store_create_container_on_put: + try: + connection.put_container(container) + except swiftclient.ClientException as e: + msg = (_("Failed to add container to Swift.\n" + "Got error from Swift: %(e)s") % {'e': e}) + raise glance.store.BackendException(msg) + else: + msg = (_("The container %(container)s does not exist in " + "Swift. Please set the " + "swift_store_create_container_on_put option" + "to add container to Swift automatically.") % + {'container': container}) + raise glance.store.BackendException(msg) + else: + raise + + def get_connection(self): + raise NotImplemented() + + def create_location(self): + raise NotImplemented() + + +class SingleTenantStore(BaseStore): + EXAMPLE_URL = "swift://:@//" + + def configure(self): + super(SingleTenantStore, self).configure() + self.auth_version = self._option_get('swift_store_auth_version') + + def configure_add(self): + self.auth_address = self._option_get('swift_store_auth_address') + if self.auth_address.startswith('http://'): + self.scheme = 'swift+http' + else: + self.scheme = 'swift+https' + self.container = CONF.swift_store_container + self.user = self._option_get('swift_store_user') + self.key = self._option_get('swift_store_key') + + def create_location(self, image_id): + specs = {'scheme': self.scheme, + 'container': self.container, + 'obj': str(image_id), + 'auth_or_store_url': self.auth_address, + 'user': self.user, + 'key': self.key} + return StoreLocation(specs) + + def get_connection(self, location): + if not location.user: + reason = (_("Location is missing user:password information.")) + LOG.debug(reason) + raise exception.BadStoreUri(message=reason) + + auth_url = location.swift_url + if not auth_url.endswith('/'): + auth_url += '/' + + if self.auth_version == '2': + try: + tenant_name, user = location.user.split(':') + except ValueError: + reason = (_("Badly formed tenant:user '%(user)s' in " + "Swift URI") % {'user': location.user}) + LOG.debug(reason) + raise exception.BadStoreUri() + else: + tenant_name = None + user = location.user + + os_options = {} + if self.region: + os_options['region_name'] = self.region + os_options['endpoint_type'] = self.endpoint_type + os_options['service_type'] = self.service_type + + return swiftclient.Connection( + auth_url, user, location.key, insecure=self.insecure, + tenant_name=tenant_name, snet=self.snet, + auth_version=self.auth_version, os_options=os_options, + ssl_compression=self.ssl_compression) + + +class MultiTenantStore(BaseStore): + EXAMPLE_URL = "swift:////" + + def configure_add(self): + self.container = CONF.swift_store_container + if self.context is None: + reason = _("Multi-tenant Swift storage requires a context.") + raise exception.BadStoreConfiguration(store_name="swift", + reason=reason) + if self.context.service_catalog is None: + reason = _("Multi-tenant Swift storage requires " + "a service catalog.") + raise exception.BadStoreConfiguration(store_name="swift", + reason=reason) + self.storage_url = auth.get_endpoint( + self.context.service_catalog, service_type=self.service_type, + endpoint_region=self.region, endpoint_type=self.endpoint_type) + if self.storage_url.startswith('http://'): + self.scheme = 'swift+http' + else: + self.scheme = 'swift+https' + + def delete(self, location, connection=None): + if not connection: + connection = self.get_connection(location.store_location) + super(MultiTenantStore, self).delete(location, connection) + connection.delete_container(location.store_location.container) + + def set_acls(self, location, public=False, read_tenants=None, + write_tenants=None, connection=None): + location = location.store_location + if not connection: + connection = self.get_connection(location) + + if read_tenants is None: + read_tenants = [] + if write_tenants is None: + write_tenants = [] + + headers = {} + if public: + headers['X-Container-Read'] = ".r:*,.rlistings" + elif read_tenants: + headers['X-Container-Read'] = ','.join('%s:*' % i + for i in read_tenants) + else: + headers['X-Container-Read'] = '' + + write_tenants.extend(self.admin_tenants) + if write_tenants: + headers['X-Container-Write'] = ','.join('%s:*' % i + for i in write_tenants) + else: + headers['X-Container-Write'] = '' + + try: + connection.post_container(location.container, headers=headers) + except swiftclient.ClientException as e: + if e.http_status == httplib.NOT_FOUND: + msg = _("Swift could not find image at URI.") + raise exception.NotFound(msg) + else: + raise + + def create_location(self, image_id): + specs = {'scheme': self.scheme, + 'container': self.container + '_' + str(image_id), + 'obj': str(image_id), + 'auth_or_store_url': self.storage_url} + return StoreLocation(specs) + + def get_connection(self, location): + return swiftclient.Connection( + None, self.context.user, None, + preauthurl=location.swift_url, + preauthtoken=self.context.auth_tok, + tenant_name=self.context.tenant, + auth_version='2', snet=self.snet, insecure=self.insecure, + ssl_compression=self.ssl_compression) + + +class ChunkReader(object): + def __init__(self, fd, checksum, total): + self.fd = fd + self.checksum = checksum + self.total = total + self.bytes_read = 0 + + def read(self, i): + left = self.total - self.bytes_read + if i > left: + i = left + result = self.fd.read(i) + self.bytes_read += len(result) + self.checksum.update(result) + return result diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..561bb48d --- /dev/null +++ b/setup.cfg @@ -0,0 +1,33 @@ +[metadata] +name = glance.store +version = 2014.1 +summary = OpenStack Image Service Store Library +description-file = + README.rst +author = OpenStack +author-email = openstack-dev@lists.openstack.org +home-page = http://www.openstack.org/ +classifier = + Environment :: OpenStack + Intended Audience :: Information Technology + Intended Audience :: System Administrators + License :: OSI Approved :: Apache Software License + Operating System :: POSIX :: Linux + Programming Language :: Python + Programming Language :: Python :: 2 + Programming Language :: Python :: 2.7 + Programming Language :: Python :: 2.6 + +[files] +packages = + glance +namespace_packages = + glance + +[build_sphinx] +source-dir = doc/source +build-dir = doc/build +all_files = 1 + +[upload_sphinx] +upload-dir = doc/build/html \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..70c2b3f3 --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python +# Copyright (c) 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT +import setuptools + +setuptools.setup( + setup_requires=['pbr'], + pbr=True)