diff --git a/openstack/cloud/_normalize.py b/openstack/cloud/_normalize.py index ea7687fa7..598d982ad 100644 --- a/openstack/cloud/_normalize.py +++ b/openstack/cloud/_normalize.py @@ -1165,3 +1165,37 @@ class Normalizer(object): location=self._get_identity_location(), properties={}, ) + + def _normalize_containers(self, containers): + """Normalize Swift Containers""" + ret = [] + for container in containers: + ret.append(self._normalize_container(container)) + return ret + + def _normalize_container(self, container): + """Normalize Swift Container.""" + + return munch.Munch( + name=container.get('name'), + bytes=container.get('bytes'), + count=container.get('count'), + ) + + def _normalize_objects(self, objects): + """Normalize Swift Objects""" + ret = [] + for object in objects: + ret.append(self._normalize_object(object)) + return ret + + def _normalize_object(self, object): + """Normalize Swift Object.""" + + return munch.Munch( + name=object.get('name'), + bytes=object.get('_bytes'), + content_type=object.get('content_type'), + hash=object.get('_hash'), + last_modified=object.get('_last_modified'), + ) diff --git a/openstack/cloud/openstackcloud.py b/openstack/cloud/openstackcloud.py index 43c7753a2..804be74aa 100755 --- a/openstack/cloud/openstackcloud.py +++ b/openstack/cloud/openstackcloud.py @@ -11,7 +11,6 @@ # limitations under the License. import base64 -import collections import concurrent.futures import copy import datetime @@ -19,7 +18,6 @@ import functools import hashlib import ipaddress import iso8601 -import json import jsonpatch import operator import os @@ -36,7 +34,6 @@ import dogpile.cache import munch import requests.models import requestsexceptions -from six.moves import urllib import keystoneauth1.exceptions import keystoneauth1.session @@ -54,9 +51,6 @@ import openstack.config import openstack.config.defaults from openstack import utils -DEFAULT_OBJECT_SEGMENT_SIZE = 1073741824 # 1GB -# This halves the current default for Swift -DEFAULT_MAX_FILE_SIZE = (5 * 1024 * 1024 * 1024 + 2) / 2 DEFAULT_SERVER_AGE = 5 DEFAULT_PORT_AGE = 5 DEFAULT_FLOAT_AGE = 5 @@ -7070,8 +7064,8 @@ class _OpenStackCloudMixin(_normalize.Normalizer): :raises: OpenStackCloudException on operation error. """ params = dict(format='json', prefix=prefix) - response = self.object_store.get('/', params=params) - return self._get_and_munchify(None, _adapter._json_response(response)) + data = self.object_store.containers(**params) + return self._normalize_containers(self._get_and_munchify(None, data)) def search_containers(self, name=None, filters=None): """Search containers. @@ -7101,9 +7095,8 @@ class _OpenStackCloudMixin(_normalize.Normalizer): """ if skip_cache or name not in self._container_cache: try: - response = self.object_store.head(name) - exceptions.raise_from_response(response) - self._container_cache[name] = response.headers + response = self.object_store.get_container_metadata(name) + self._container_cache[name] = response except exc.OpenStackCloudHTTPError as e: if e.response.status_code == 404: return None @@ -7121,7 +7114,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer): container = self.get_container(name) if container: return container - exceptions.raise_from_response(self.object_store.put(name)) + self.object_store.create_container(name=name) if public: self.set_container_access(name, 'public') return self.get_container(name, skip_cache=True) @@ -7132,7 +7125,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer): :param str name: Name of the container to delete. """ try: - exceptions.raise_from_response(self.object_store.delete(name)) + self.object_store.delete_container(name, ignore_missing=False) self._container_cache.pop(name, None) return True except exc.OpenStackCloudHTTPError as e: @@ -7161,8 +7154,11 @@ class _OpenStackCloudMixin(_normalize.Normalizer): :param dict headers: Key/Value headers to set on the container. """ - exceptions.raise_from_response( - self.object_store.post(name, headers=headers)) + self.object_store.set_container_metadata( + name, + refresh=False, + **headers + ) def set_container_access(self, name, access): """Set the access control list on a container. @@ -7178,8 +7174,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer): raise exc.OpenStackCloudException( "Invalid container access specified: %s. Must be one of %s" % (access, list(OBJECT_CONTAINER_ACLS.keys()))) - header = {'x-container-read': OBJECT_CONTAINER_ACLS[access]} - self.update_container(name, header) + self.object_store.set_container_metadata( + name, + refresh=False, + read_ACL=OBJECT_CONTAINER_ACLS[access]) def get_container_access(self, name): """Get the control list from a container. @@ -7189,7 +7187,7 @@ class _OpenStackCloudMixin(_normalize.Normalizer): container = self.get_container(name, skip_cache=True) if not container: raise exc.OpenStackCloudException("Container not found: %s" % name) - acl = container.get('x-container-read', '') + acl = container.read_ACL or '' for key, value in OBJECT_CONTAINER_ACLS.items(): # Convert to string for the comparison because swiftclient # returns byte values as bytes sometimes and apparently == @@ -7229,43 +7227,11 @@ class _OpenStackCloudMixin(_normalize.Normalizer): The object-storage service publishes a set of capabilities that include metadata about maximum values and thresholds. """ - # The endpoint in the catalog has version and project-id in it - # To get capabilities, we have to disassemble and reassemble the URL - # This logic is taken from swiftclient - endpoint = urllib.parse.urlparse(self.object_store.get_endpoint()) - url = "{scheme}://{netloc}/info".format( - scheme=endpoint.scheme, netloc=endpoint.netloc) - - return _adapter._json_response(self.object_store.get(url)) + return self.object_store.get_info() def get_object_segment_size(self, segment_size): """Get a segment size that will work given capabilities""" - if segment_size is None: - segment_size = DEFAULT_OBJECT_SEGMENT_SIZE - min_segment_size = 0 - try: - caps = self.get_object_capabilities() - except exc.OpenStackCloudHTTPError as e: - if e.response.status_code in (404, 412): - # Clear the exception so that it doesn't linger - # and get reported as an Inner Exception later - _utils._exc_clear() - server_max_file_size = DEFAULT_MAX_FILE_SIZE - self.log.info( - "Swift capabilities not supported. " - "Using default max file size.") - else: - raise - else: - server_max_file_size = caps.get('swift', {}).get('max_file_size', - 0) - min_segment_size = caps.get('slo', {}).get('min_segment_size', 0) - - if segment_size > server_max_file_size: - return server_max_file_size - if segment_size < min_segment_size: - return min_segment_size - return segment_size + return self.object_store.get_object_segment_size(segment_size) def is_object_stale( self, container, name, filename, file_md5=None, file_sha256=None): @@ -7281,35 +7247,8 @@ class _OpenStackCloudMixin(_normalize.Normalizer): Pre-calculated sha256 of the file contents. Defaults to None which means calculate locally. """ - metadata = self.get_object_metadata(container, name) - if not metadata: - self.log.debug( - "swift stale check, no object: {container}/{name}".format( - container=container, name=name)) - return True - - if not (file_md5 or file_sha256): - (file_md5, file_sha256) = self._get_file_hashes(filename) - md5_key = metadata.get( - self._OBJECT_MD5_KEY, metadata.get(self._SHADE_OBJECT_MD5_KEY, '')) - sha256_key = metadata.get( - self._OBJECT_SHA256_KEY, metadata.get( - self._SHADE_OBJECT_SHA256_KEY, '')) - up_to_date = self._hashes_up_to_date( - md5=file_md5, sha256=file_sha256, - md5_key=md5_key, sha256_key=sha256_key) - - if not up_to_date: - self.log.debug( - "swift checksum mismatch: " - " %(filename)s!=%(container)s/%(name)s", - {'filename': filename, 'container': container, 'name': name}) - return True - - self.log.debug( - "swift object up to date: %(container)s/%(name)s", - {'container': container, 'name': name}) - return False + return self.object_store.is_object_stale(container, name, filename, + file_md5, file_sha256) def create_directory_marker_object(self, container, name, **headers): """Create a zero-byte directory marker object @@ -7379,105 +7318,12 @@ class _OpenStackCloudMixin(_normalize.Normalizer): :raises: ``OpenStackCloudException`` on operation error. """ - if data is not None and filename: - raise ValueError( - "Both filename and data given. Please choose one.") - if data is not None and not name: - raise ValueError( - "name is a required parameter when data is given") - if data is not None and generate_checksums: - raise ValueError( - "checksums cannot be generated with data parameter") - if generate_checksums is None: - if data is not None: - generate_checksums = False - else: - generate_checksums = True - - if not metadata: - metadata = {} - - if not filename and data is None: - filename = name - - if generate_checksums and (md5 is None or sha256 is None): - (md5, sha256) = self._get_file_hashes(filename) - if md5: - headers[self._OBJECT_MD5_KEY] = md5 or '' - if sha256: - headers[self._OBJECT_SHA256_KEY] = sha256 or '' - for (k, v) in metadata.items(): - headers['x-object-meta-' + k] = v - - endpoint = '{container}/{name}'.format(container=container, name=name) - - if data is not None: - self.log.debug( - "swift uploading data to %(endpoint)s", - {'endpoint': endpoint}) - - return self._upload_object_data(endpoint, data, headers) - - # segment_size gets used as a step value in a range call, so needs - # to be an int - if segment_size: - segment_size = int(segment_size) - segment_size = self.get_object_segment_size(segment_size) - file_size = os.path.getsize(filename) - - if self.is_object_stale(container, name, filename, md5, sha256): - - self.log.debug( - "swift uploading %(filename)s to %(endpoint)s", - {'filename': filename, 'endpoint': endpoint}) - - if file_size <= segment_size: - self._upload_object(endpoint, filename, headers) - else: - self._upload_large_object( - endpoint, filename, headers, - file_size, segment_size, use_slo) - - def _upload_object_data(self, endpoint, data, headers): - return _adapter._json_response(self.object_store.put( - endpoint, headers=headers, data=data)) - - def _upload_object(self, endpoint, filename, headers): - return _adapter._json_response(self.object_store.put( - endpoint, headers=headers, data=open(filename, 'rb'))) - - def _get_file_segments(self, endpoint, filename, file_size, segment_size): - # Use an ordered dict here so that testing can replicate things - segments = collections.OrderedDict() - for (index, offset) in enumerate(range(0, file_size, segment_size)): - remaining = file_size - (index * segment_size) - segment = _utils.FileSegment( - filename, offset, - segment_size if segment_size < remaining else remaining) - name = '{endpoint}/{index:0>6}'.format( - endpoint=endpoint, index=index) - segments[name] = segment - return segments - - def _object_name_from_url(self, url): - '''Get container_name/object_name from the full URL called. - - Remove the Swift endpoint from the front of the URL, and remove - the leaving / that will leave behind.''' - endpoint = self.object_store.get_endpoint() - object_name = url.replace(endpoint, '') - if object_name.startswith('/'): - object_name = object_name[1:] - return object_name - - def _add_etag_to_manifest(self, segment_results, manifest): - for result in segment_results: - if 'Etag' not in result.headers: - continue - name = self._object_name_from_url(result.url) - for entry in manifest: - if entry['path'] == '/{name}'.format(name=name): - entry['etag'] = result.headers['Etag'] + return self.object_store.create_object( + container, name, + filename=filename, md5=md5, sha256=sha256, + segment_size=segment_size, use_slo=use_slo, metadata=metadata, + generate_checksums=generate_checksums, data=data, + **headers) @property def _pool_executor(self): @@ -7515,84 +7361,6 @@ class _OpenStackCloudMixin(_normalize.Normalizer): retries.append(completed.result()) return results, retries - def _upload_large_object( - self, endpoint, filename, - headers, file_size, segment_size, use_slo): - # If the object is big, we need to break it up into segments that - # are no larger than segment_size, upload each of them individually - # and then upload a manifest object. The segments can be uploaded in - # parallel, so we'll use the async feature of the TaskManager. - - segment_futures = [] - segment_results = [] - retry_results = [] - retry_futures = [] - manifest = [] - - # Get an OrderedDict with keys being the swift location for the - # segment, the value a FileSegment file-like object that is a - # slice of the data for the segment. - segments = self._get_file_segments( - endpoint, filename, file_size, segment_size) - - # Schedule the segments for upload - for name, segment in segments.items(): - # Async call to put - schedules execution and returns a future - segment_future = self._pool_executor.submit( - self.object_store.put, - name, headers=headers, data=segment, - raise_exc=False) - segment_futures.append(segment_future) - # TODO(mordred) Collect etags from results to add to this manifest - # dict. Then sort the list of dicts by path. - manifest.append(dict( - path='/{name}'.format(name=name), - size_bytes=segment.length)) - - # Try once and collect failed results to retry - segment_results, retry_results = self._wait_for_futures( - segment_futures, raise_on_error=False) - - self._add_etag_to_manifest(segment_results, manifest) - - for result in retry_results: - # Grab the FileSegment for the failed upload so we can retry - name = self._object_name_from_url(result.url) - segment = segments[name] - segment.seek(0) - # Async call to put - schedules execution and returns a future - segment_future = self._pool_executor.submit( - self.object_store.put, - name, headers=headers, data=segment) - # TODO(mordred) Collect etags from results to add to this manifest - # dict. Then sort the list of dicts by path. - retry_futures.append(segment_future) - - # If any segments fail the second time, just throw the error - segment_results, retry_results = self._wait_for_futures( - retry_futures, raise_on_error=True) - - self._add_etag_to_manifest(segment_results, manifest) - - if use_slo: - return self._finish_large_object_slo(endpoint, headers, manifest) - else: - return self._finish_large_object_dlo(endpoint, headers) - - def _finish_large_object_slo(self, endpoint, headers, manifest): - # TODO(mordred) send an etag of the manifest, which is the md5sum - # of the concatenation of the etags of the results - headers = headers.copy() - return self._object_store_client.put( - endpoint, - params={'multipart-manifest': 'put'}, - headers=headers, data=json.dumps(manifest)) - - def _finish_large_object_dlo(self, endpoint, headers): - headers = headers.copy() - headers['X-Object-Manifest'] = endpoint - return self._object_store_client.put(endpoint, headers=headers) - def update_object(self, container, name, metadata=None, **headers): """Update the metadata of an object @@ -7633,9 +7401,8 @@ class _OpenStackCloudMixin(_normalize.Normalizer): :raises: OpenStackCloudException on operation error. """ - params = dict(format='json', prefix=prefix) - data = self._object_store_client.get(container, params=params) - return self._get_and_munchify(None, data) + data = self.object_store.objects(container, prefix=prefix) + return self._normalize_objects(self._get_and_munchify(None, data)) def search_objects(self, container, name=None, filters=None): """Search objects. @@ -7765,10 +7532,10 @@ class _OpenStackCloudMixin(_normalize.Normalizer): :raises: OpenStackCloudException on operation error. """ try: - with self.get_object_raw( - container, obj, query_string=query_string) as response: - for ret in response.iter_content(chunk_size=resp_chunk_size): - yield ret + for ret in self.object_store.stream_object( + obj, container=container, + chunk_size=resp_chunk_size): + yield ret except exc.OpenStackCloudHTTPError as e: if e.response.status_code == 404: return diff --git a/openstack/object_store/v1/_base.py b/openstack/object_store/v1/_base.py index 21372406f..779f1e92d 100644 --- a/openstack/object_store/v1/_base.py +++ b/openstack/object_store/v1/_base.py @@ -41,14 +41,15 @@ class BaseResource(resource.Resource): headers[header] = metadata[key] return headers - def set_metadata(self, session, metadata): + def set_metadata(self, session, metadata, refresh=True): request = self._prepare_request() response = session.post( request.url, headers=self._calculate_headers(metadata)) self._translate_response(response, has_body=False) - response = session.head(request.url) - self._translate_response(response, has_body=False) + if refresh: + response = session.head(request.url) + self._translate_response(response, has_body=False) return self def delete_metadata(self, session, keys): diff --git a/openstack/object_store/v1/_proxy.py b/openstack/object_store/v1/_proxy.py index 629a5f733..5221b303d 100644 --- a/openstack/object_store/v1/_proxy.py +++ b/openstack/object_store/v1/_proxy.py @@ -10,10 +10,22 @@ # License for the specific language governing permissions and limitations # under the License. +import collections +import os +import json + from openstack.object_store.v1 import account as _account from openstack.object_store.v1 import container as _container from openstack.object_store.v1 import obj as _obj +from openstack.object_store.v1 import info as _info +from openstack import _adapter +from openstack import exceptions +from openstack import _log from openstack import proxy +from openstack.cloud import _utils + +DEFAULT_OBJECT_SEGMENT_SIZE = 1073741824 # 1GB +DEFAULT_MAX_FILE_SIZE = (5 * 1024 * 1024 * 1024 + 2) / 2 class Proxy(proxy.Proxy): @@ -24,6 +36,8 @@ class Proxy(proxy.Proxy): Container = _container.Container Object = _obj.Object + log = _log.setup_logging('openstack') + def get_account_metadata(self): """Get metadata for this account. @@ -105,12 +119,13 @@ class Proxy(proxy.Proxy): """ return self._head(_container.Container, container) - def set_container_metadata(self, container, **metadata): + def set_container_metadata(self, container, refresh=True, **metadata): """Set metadata for a container. :param container: The value can be the name of a container or a :class:`~openstack.object_store.v1.container.Container` instance. + :param refresh: Flag to trigger refresh of container object re-fetch. :param kwargs metadata: Key/value pairs to be set as metadata on the container. Both custom and system metadata can be set. Custom metadata are keys @@ -128,7 +143,7 @@ class Proxy(proxy.Proxy): - `sync_key` """ res = self._get_resource(_container.Container, container) - res.set_metadata(self, metadata) + res.set_metadata(self, metadata, refresh=refresh) return res def delete_container_metadata(self, container, keys): @@ -160,7 +175,7 @@ class Proxy(proxy.Proxy): for obj in self._list( _obj.Object, container=container, - paginated=True, **query): + paginated=True, format='json', **query): obj.container = container yield obj @@ -232,25 +247,110 @@ class Proxy(proxy.Proxy): _obj.Object, obj, container=container_name, **attrs) return obj.stream(self, chunk_size=chunk_size) - def create_object(self, container, name, **attrs): - """Upload a new object from attributes + def create_object( + self, container, name, filename=None, + md5=None, sha256=None, segment_size=None, + use_slo=True, metadata=None, + generate_checksums=None, data=None, + **headers): + """Create a file object. - :param container: The value can be the name of a container or a - :class:`~openstack.object_store.v1.container.Container` - instance. - :param name: Name of the object to create. - :param dict attrs: Keyword arguments which will be used to create - a :class:`~openstack.object_store.v1.obj.Object`, - comprised of the properties on the Object class. + Automatically uses large-object segments if needed. - :returns: The results of object creation - :rtype: :class:`~openstack.object_store.v1.container.Container` + :param container: The name of the container to store the file in. + This container will be created if it does not exist already. + :param name: Name for the object within the container. + :param filename: The path to the local file whose contents will be + uploaded. Mutually exclusive with data. + :param data: The content to upload to the object. Mutually exclusive + with filename. + :param md5: A hexadecimal md5 of the file. (Optional), if it is known + and can be passed here, it will save repeating the expensive md5 + process. It is assumed to be accurate. + :param sha256: A hexadecimal sha256 of the file. (Optional) See md5. + :param segment_size: Break the uploaded object into segments of this + many bytes. (Optional) SDK will attempt to discover the maximum + value for this from the server if it is not specified, or will use + a reasonable default. + :param headers: These will be passed through to the object creation + API as HTTP Headers. + :param use_slo: If the object is large enough to need to be a Large + Object, use a static rather than dynamic object. Static Objects + will delete segment objects when the manifest object is deleted. + (optional, defaults to True) + :param generate_checksums: Whether to generate checksums on the client + side that get added to headers for later prevention of double + uploads of identical data. (optional, defaults to True) + :param metadata: This dict will get changed into headers that set + metadata of the object + + :raises: ``OpenStackCloudException`` on operation error. """ - # TODO(mordred) Add ability to stream data from a file - # TODO(mordred) Use create_object from OpenStackCloud + if data is not None and filename: + raise ValueError( + "Both filename and data given. Please choose one.") + if data is not None and not name: + raise ValueError( + "name is a required parameter when data is given") + if data is not None and generate_checksums: + raise ValueError( + "checksums cannot be generated with data parameter") + if generate_checksums is None: + if data is not None: + generate_checksums = False + else: + generate_checksums = True + + if not metadata: + metadata = {} + + if not filename and data is None: + filename = name + + if generate_checksums and (md5 is None or sha256 is None): + (md5, sha256) = self._connection._get_file_hashes(filename) + if md5: + headers[self._connection._OBJECT_MD5_KEY] = md5 or '' + if sha256: + headers[self._connection._OBJECT_SHA256_KEY] = sha256 or '' + for (k, v) in metadata.items(): + headers['x-object-meta-' + k] = v + container_name = self._get_container_name(container=container) - return self._create( - _obj.Object, container=container_name, name=name, **attrs) + endpoint = '{container}/{name}'.format(container=container_name, + name=name) + + if data is not None: + self.log.debug( + "swift uploading data to %(endpoint)s", + {'endpoint': endpoint}) + # TODO(gtema): custom headers need to be somehow injected + return self._create( + _obj.Object, container=container_name, + name=name, data=data, **headers) + + # segment_size gets used as a step value in a range call, so needs + # to be an int + if segment_size: + segment_size = int(segment_size) + segment_size = self.get_object_segment_size(segment_size) + file_size = os.path.getsize(filename) + + if self.is_object_stale(container_name, name, filename, md5, sha256): + + self._connection.log.debug( + "swift uploading %(filename)s to %(endpoint)s", + {'filename': filename, 'endpoint': endpoint}) + + if file_size <= segment_size: + # TODO(gtema): replace with regular resource put, but + # custom headers need to be somehow injected + self._upload_object(endpoint, filename, headers) + else: + self._upload_large_object( + endpoint, filename, headers, + file_size, segment_size, use_slo) + # Backwards compat upload_object = create_object @@ -341,3 +441,202 @@ class Proxy(proxy.Proxy): res = self._get_resource(_obj.Object, obj, container=container_name) res.delete_metadata(self, keys) return res + + def is_object_stale( + self, container, name, filename, file_md5=None, file_sha256=None): + """Check to see if an object matches the hashes of a file. + + :param container: Name of the container. + :param name: Name of the object. + :param filename: Path to the file. + :param file_md5: + Pre-calculated md5 of the file contents. Defaults to None which + means calculate locally. + :param file_sha256: + Pre-calculated sha256 of the file contents. Defaults to None which + means calculate locally. + """ + metadata = self._connection.get_object_metadata(container, name) + if not metadata: + self._connection.log.debug( + "swift stale check, no object: {container}/{name}".format( + container=container, name=name)) + return True + + if not (file_md5 or file_sha256): + (file_md5, file_sha256) = \ + self._connection._get_file_hashes(filename) + md5_key = metadata.get( + self._connection._OBJECT_MD5_KEY, + metadata.get(self._connection._SHADE_OBJECT_MD5_KEY, '')) + sha256_key = metadata.get( + self._connection._OBJECT_SHA256_KEY, metadata.get( + self._connection._SHADE_OBJECT_SHA256_KEY, '')) + up_to_date = self._connection._hashes_up_to_date( + md5=file_md5, sha256=file_sha256, + md5_key=md5_key, sha256_key=sha256_key) + + if not up_to_date: + self._connection.log.debug( + "swift checksum mismatch: " + " %(filename)s!=%(container)s/%(name)s", + {'filename': filename, 'container': container, 'name': name}) + return True + + self._connection.log.debug( + "swift object up to date: %(container)s/%(name)s", + {'container': container, 'name': name}) + return False + + def _upload_large_object( + self, endpoint, filename, + headers, file_size, segment_size, use_slo): + # If the object is big, we need to break it up into segments that + # are no larger than segment_size, upload each of them individually + # and then upload a manifest object. The segments can be uploaded in + # parallel, so we'll use the async feature of the TaskManager. + + segment_futures = [] + segment_results = [] + retry_results = [] + retry_futures = [] + manifest = [] + + # Get an OrderedDict with keys being the swift location for the + # segment, the value a FileSegment file-like object that is a + # slice of the data for the segment. + segments = self._get_file_segments( + endpoint, filename, file_size, segment_size) + + # Schedule the segments for upload + for name, segment in segments.items(): + # Async call to put - schedules execution and returns a future + segment_future = self._connection._pool_executor.submit( + self.put, + name, headers=headers, data=segment, + raise_exc=False) + segment_futures.append(segment_future) + # TODO(mordred) Collect etags from results to add to this manifest + # dict. Then sort the list of dicts by path. + manifest.append(dict( + path='/{name}'.format(name=name), + size_bytes=segment.length)) + + # Try once and collect failed results to retry + segment_results, retry_results = self._connection._wait_for_futures( + segment_futures, raise_on_error=False) + + self._add_etag_to_manifest(segment_results, manifest) + + for result in retry_results: + # Grab the FileSegment for the failed upload so we can retry + name = self._object_name_from_url(result.url) + segment = segments[name] + segment.seek(0) + # Async call to put - schedules execution and returns a future + segment_future = self._connection._pool_executor.submit( + self.put, + name, headers=headers, data=segment) + # TODO(mordred) Collect etags from results to add to this manifest + # dict. Then sort the list of dicts by path. + retry_futures.append(segment_future) + + # If any segments fail the second time, just throw the error + segment_results, retry_results = self._connection._wait_for_futures( + retry_futures, raise_on_error=True) + + self._add_etag_to_manifest(segment_results, manifest) + + if use_slo: + return self._finish_large_object_slo(endpoint, headers, manifest) + else: + return self._finish_large_object_dlo(endpoint, headers) + + def _finish_large_object_slo(self, endpoint, headers, manifest): + # TODO(mordred) send an etag of the manifest, which is the md5sum + # of the concatenation of the etags of the results + headers = headers.copy() + return self.put( + endpoint, + params={'multipart-manifest': 'put'}, + headers=headers, data=json.dumps(manifest)) + + def _finish_large_object_dlo(self, endpoint, headers): + headers = headers.copy() + headers['X-Object-Manifest'] = endpoint + return self.put(endpoint, headers=headers) + + def _upload_object(self, endpoint, filename, headers): + with open(filename, 'rb') as dt: + return _adapter._json_response(self.put( + endpoint, headers=headers, data=dt)) + + def _get_file_segments(self, endpoint, filename, file_size, segment_size): + # Use an ordered dict here so that testing can replicate things + segments = collections.OrderedDict() + for (index, offset) in enumerate(range(0, file_size, segment_size)): + remaining = file_size - (index * segment_size) + segment = _utils.FileSegment( + filename, offset, + segment_size if segment_size < remaining else remaining) + name = '{endpoint}/{index:0>6}'.format( + endpoint=endpoint, index=index) + segments[name] = segment + return segments + + def get_object_segment_size(self, segment_size): + """Get a segment size that will work given capabilities""" + if segment_size is None: + segment_size = DEFAULT_OBJECT_SEGMENT_SIZE + min_segment_size = 0 + try: + # caps = self.get_object_capabilities() + caps = self.get_info() + except exceptions.SDKException as e: + if e.response.status_code in (404, 412): + # Clear the exception so that it doesn't linger + # and get reported as an Inner Exception later + _utils._exc_clear() + server_max_file_size = DEFAULT_MAX_FILE_SIZE + self._connection.log.info( + "Swift capabilities not supported. " + "Using default max file size.") + else: + raise + else: + server_max_file_size = caps.swift.get('max_file_size', 0) + min_segment_size = caps.slo.get('min_segment_size', 0) + + if segment_size > server_max_file_size: + return server_max_file_size + if segment_size < min_segment_size: + return min_segment_size + return segment_size + + def _object_name_from_url(self, url): + '''Get container_name/object_name from the full URL called. + + Remove the Swift endpoint from the front of the URL, and remove + the leaving / that will leave behind.''' + endpoint = self.get_endpoint() + object_name = url.replace(endpoint, '') + if object_name.startswith('/'): + object_name = object_name[1:] + return object_name + + def _add_etag_to_manifest(self, segment_results, manifest): + for result in segment_results: + if 'Etag' not in result.headers: + continue + name = self._object_name_from_url(result.url) + for entry in manifest: + if entry['path'] == '/{name}'.format(name=name): + entry['etag'] = result.headers['Etag'] + + def get_info(self): + """Get infomation about the object-storage service + + The object-storage service publishes a set of capabilities that + include metadata about maximum values and thresholds. + """ + return self._get(_info.Info) diff --git a/openstack/object_store/v1/container.py b/openstack/object_store/v1/container.py index 04c0d45e4..1275b4a54 100644 --- a/openstack/object_store/v1/container.py +++ b/openstack/object_store/v1/container.py @@ -38,7 +38,7 @@ class Container(_base.BaseResource): allow_head = True _query_mapping = resource.QueryParameters( - 'prefix', + 'prefix', 'format' ) # Container body data (when id=None) diff --git a/openstack/object_store/v1/info.py b/openstack/object_store/v1/info.py new file mode 100644 index 000000000..f5fbde39b --- /dev/null +++ b/openstack/object_store/v1/info.py @@ -0,0 +1,75 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may + +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from openstack import exceptions +from openstack import resource + +from six.moves import urllib + + +class Info(resource.Resource): + + base_path = "/info" + + allow_fetch = True + + _query_mapping = resource.QueryParameters( + 'swiftinfo_sig', 'swiftinfo_expires' + ) + + # Properties + swift = resource.Body("swift", type=dict) + slo = resource.Body("slo", type=dict) + staticweb = resource.Body("staticweb", type=dict) + tempurl = resource.Body("tempurl", type=dict) + + def fetch(self, session, requires_id=False, + base_path=None, error_message=None): + """Get a remote resource based on this instance. + + :param session: The session to use for making this request. + :type session: :class:`~keystoneauth1.adapter.Adapter` + :param boolean requires_id: A boolean indicating whether resource ID + should be part of the requested URI. + :param str base_path: Base part of the URI for fetching resources, if + different from + :data:`~openstack.resource.Resource.base_path`. + :param str error_message: An Error message to be returned if + requested object does not exist. + :return: This :class:`Resource` instance. + :raises: :exc:`~openstack.exceptions.MethodNotSupported` if + :data:`Resource.allow_fetch` is not set to ``True``. + :raises: :exc:`~openstack.exceptions.ResourceNotFound` if + the resource was not found. + """ + if not self.allow_fetch: + raise exceptions.MethodNotSupported(self, "fetch") + + # The endpoint in the catalog has version and project-id in it + # To get capabilities, we have to disassemble and reassemble the URL + # This logic is taken from swiftclient + + session = self._get_session(session) + endpoint = urllib.parse.urlparse(session.get_endpoint()) + url = "{scheme}://{netloc}/info".format( + scheme=endpoint.scheme, netloc=endpoint.netloc) + + microversion = self._get_microversion_for(session, 'fetch') + response = session.get(url, microversion=microversion) + kwargs = {} + if error_message: + kwargs['error_message'] = error_message + + self.microversion = microversion + self._translate_response(response, **kwargs) + return self diff --git a/openstack/object_store/v1/obj.py b/openstack/object_store/v1/obj.py index 2ef16529f..b2c6d8bf0 100644 --- a/openstack/object_store/v1/obj.py +++ b/openstack/object_store/v1/obj.py @@ -40,7 +40,7 @@ class Object(_base.BaseResource): allow_head = True _query_mapping = resource.QueryParameters( - 'prefix', + 'prefix', 'format' ) # Data to be passed during a POST call to create an object on the server. @@ -295,3 +295,26 @@ class Object(_base.BaseResource): headers=request.headers) self._translate_response(response, has_body=False) return self + + def _raw_delete(self, session): + if not self.allow_delete: + raise exceptions.MethodNotSupported(self, "delete") + + request = self._prepare_request() + session = self._get_session(session) + microversion = self._get_microversion_for(session, 'delete') + + if self.is_static_large_object is None: + # Fetch metadata to determine SLO flag + self.head(session) + + headers = { + 'Accept': "" + } + if self.is_static_large_object: + headers['multipart-manifest'] = 'delete' + + return session.delete( + request.url, + headers=headers, + microversion=microversion) diff --git a/openstack/tests/unit/cloud/test__utils.py b/openstack/tests/unit/cloud/test__utils.py index bcb04dc12..2aea81158 100644 --- a/openstack/tests/unit/cloud/test__utils.py +++ b/openstack/tests/unit/cloud/test__utils.py @@ -12,9 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import random -import string -import tempfile from uuid import uuid4 import mock @@ -298,29 +295,6 @@ class TestUtils(base.TestCase): ): _utils.range_filter(RANGE_DATA, "key1", "<>100") - def test_file_segment(self): - file_size = 4200 - content = ''.join(random.SystemRandom().choice( - string.ascii_uppercase + string.digits) - for _ in range(file_size)).encode('latin-1') - self.imagefile = tempfile.NamedTemporaryFile(delete=False) - self.imagefile.write(content) - self.imagefile.close() - - segments = self.cloud._get_file_segments( - endpoint='test_container/test_image', - filename=self.imagefile.name, - file_size=file_size, - segment_size=1000) - self.assertEqual(len(segments), 5) - segment_content = b'' - for (index, (name, segment)) in enumerate(segments.items()): - self.assertEqual( - 'test_container/test_image/{index:0>6}'.format(index=index), - name) - segment_content += segment.read() - self.assertEqual(content, segment_content) - def test_get_entity_pass_object(self): obj = mock.Mock(id=uuid4().hex) self.cloud.use_direct_get = True diff --git a/openstack/tests/unit/cloud/test_object.py b/openstack/tests/unit/cloud/test_object.py index 90654547c..6eea2eb3d 100644 --- a/openstack/tests/unit/cloud/test_object.py +++ b/openstack/tests/unit/cloud/test_object.py @@ -20,6 +20,7 @@ import openstack.cloud import openstack.cloud.openstackcloud as oc_oc from openstack.cloud import exc from openstack.tests.unit import base +from openstack.object_store.v1 import _proxy class BaseTestObject(base.TestCase): @@ -443,7 +444,7 @@ class TestObject(BaseTestObject): self.register_uris([ dict(method='GET', uri='https://object-store.example.com/info', status_code=404, reason='Not Found')]) - self.assertEqual(oc_oc.DEFAULT_OBJECT_SEGMENT_SIZE, + self.assertEqual(_proxy.DEFAULT_OBJECT_SEGMENT_SIZE, self.cloud.get_object_segment_size(None)) self.assert_calls() @@ -452,7 +453,7 @@ class TestObject(BaseTestObject): dict(method='GET', uri='https://object-store.example.com/info', status_code=412, reason='Precondition failed')]) self.assertEqual( - oc_oc.DEFAULT_OBJECT_SEGMENT_SIZE, + _proxy.DEFAULT_OBJECT_SEGMENT_SIZE, self.cloud.get_object_segment_size(None)) self.assert_calls() diff --git a/openstack/tests/unit/object_store/v1/test_proxy.py b/openstack/tests/unit/object_store/v1/test_proxy.py index 8c56328ec..4d4a2ba48 100644 --- a/openstack/tests/unit/object_store/v1/test_proxy.py +++ b/openstack/tests/unit/object_store/v1/test_proxy.py @@ -10,17 +10,20 @@ # License for the specific language governing permissions and limitations # under the License. +import random import six +import string +import tempfile from openstack.object_store.v1 import _proxy from openstack.object_store.v1 import account from openstack.object_store.v1 import container from openstack.object_store.v1 import obj from openstack.tests.unit.cloud import test_object as base_test_object -from openstack.tests.unit import test_proxy_base2 +from openstack.tests.unit import test_proxy_base -class TestObjectStoreProxy(test_proxy_base2.TestProxyBase): +class TestObjectStoreProxy(test_proxy_base.TestProxyBase): kwargs_to_path_args = False @@ -51,8 +54,12 @@ class TestObjectStoreProxy(test_proxy_base2.TestProxyBase): expected_kwargs={'name': 'container_name', "x": 1, "y": 2, "z": 3}) def test_object_metadata_get(self): - self.verify_head(self.proxy.get_object_metadata, obj.Object, - value="object", container="container") + self._verify2("openstack.proxy.Proxy._head", + self.proxy.get_object_metadata, + method_args=['object'], + method_kwargs={'container': 'container'}, + expected_args=[obj.Object, 'object'], + expected_kwargs={'container': 'container'}) def _test_object_delete(self, ignore): expected_kwargs = { @@ -303,3 +310,28 @@ class Test_copy_object(TestObjectStoreProxy): def test_copy_object(self): self.assertRaises(NotImplementedError, self.proxy.copy_object) + + +class Test_utils(TestObjectStoreProxy): + def test_file_segment(self): + file_size = 4200 + content = ''.join(random.SystemRandom().choice( + string.ascii_uppercase + string.digits) + for _ in range(file_size)).encode('latin-1') + self.imagefile = tempfile.NamedTemporaryFile(delete=False) + self.imagefile.write(content) + self.imagefile.close() + + segments = self.proxy._get_file_segments( + endpoint='test_container/test_image', + filename=self.imagefile.name, + file_size=file_size, + segment_size=1000) + self.assertEqual(len(segments), 5) + segment_content = b'' + for (index, (name, segment)) in enumerate(segments.items()): + self.assertEqual( + 'test_container/test_image/{index:0>6}'.format(index=index), + name) + segment_content += segment.read() + self.assertEqual(content, segment_content)