From b7ea6c7150cad5e17404edb5401d5eead827041d Mon Sep 17 00:00:00 2001 From: Monty Taylor Date: Thu, 29 Dec 2016 07:37:06 -1000 Subject: [PATCH] Replace SwiftService with direct REST uploads SwiftService uploads large objects using a thread pool. (The pool defaults to 5 and we're not currently configuring it larger or smaller) Instead of using that, spin up upload threads on our own so that we can get rid of the swiftclient depend. A few notes: - We're using the new async feature of the Adapter wrapper, which rate limits at the _start_ of a REST call. This is sane as far as we can tell, but also might not be what someone is expecting. - We'll skip the thread pool uploader for objects that are smaller than the default max segment size. - In splitting the file into segments, we'd like to avoid reading all of the segments into RAM when we don't need to - so there is a file-like wrapper class which can be passed to requests. This implements a read-view of a portion of the file. In a pathological case, this could be slower due to disk seeking on the read side. However, let's go back and deal with buffering when we have a problem - I imagine that the REST upload will be the bottleneck long before the overhead of interleaved disk seeks will be. Change-Id: Id9258980d2e0782e4e3c0ac26c7f11dc4db80354 --- .../removed-swiftclient-aff22bfaeee5f59f.yaml | 5 + requirements.txt | 1 - shade/_adapter.py | 21 +- shade/_tasks.py | 5 - shade/_utils.py | 37 + shade/openstackcloud.py | 205 ++++- shade/task_manager.py | 31 + shade/tests/functional/test_object.py | 41 +- shade/tests/unit/base.py | 4 +- shade/tests/unit/test__utils.py | 27 + shade/tests/unit/test_image.py | 31 +- shade/tests/unit/test_object.py | 743 +++++++++++++++++- 12 files changed, 1067 insertions(+), 84 deletions(-) create mode 100644 releasenotes/notes/removed-swiftclient-aff22bfaeee5f59f.yaml diff --git a/releasenotes/notes/removed-swiftclient-aff22bfaeee5f59f.yaml b/releasenotes/notes/removed-swiftclient-aff22bfaeee5f59f.yaml new file mode 100644 index 000000000..4927c1e68 --- /dev/null +++ b/releasenotes/notes/removed-swiftclient-aff22bfaeee5f59f.yaml @@ -0,0 +1,5 @@ +--- +upgrade: + - Removed swiftclient as a dependency. All swift operations + are now performed with direct REST calls using keystoneauth + Adapter. diff --git a/requirements.txt b/requirements.txt index 000072795..c0187fe74 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,7 +17,6 @@ python-cinderclient>=1.3.1 python-neutronclient>=2.3.10 python-troveclient>=1.2.0 python-ironicclient>=0.10.0 -python-swiftclient>=2.5.0 python-heatclient>=1.0.0 python-designateclient>=2.1.0 python-magnumclient>=2.1.0 diff --git a/shade/_adapter.py b/shade/_adapter.py index 5d91ef81a..e8549b5ea 100644 --- a/shade/_adapter.py +++ b/shade/_adapter.py @@ -91,7 +91,15 @@ class ShadeAdapter(adapter.Adapter): def _munch_response(self, response, result_key=None): exc.raise_from_response(response) # Glance image downloads just return the data in the body - if response.headers.get('Content-Type') == 'application/octet-stream': + if response.headers.get('Content-Type') in ( + 'text/plain', + 'application/octet-stream'): + return response + elif response.headers.get('X-Static-Large-Object'): + # Workaround what seems to be a bug in swift where SLO objects + # return Content-Type application/json but contain + # application/octet-stream + # Bug filed: https://bugs.launchpad.net/swift/+bug/1658295 return response else: if not response.content: @@ -100,12 +108,12 @@ class ShadeAdapter(adapter.Adapter): try: result_json = response.json() except Exception: - self.shade_logger.debug( + raise exc.OpenStackCloudHTTPError( "Problems decoding json from response." " Reponse: {code} {reason}".format( code=response.status_code, - reason=response.reason)) - raise + reason=response.reason), + response=response) request_id = response.headers.get('x-openstack-request-id') @@ -154,4 +162,7 @@ class ShadeAdapter(adapter.Adapter): return request_method(**self.args) response = self.manager.submit_task(RequestTask(**kwargs)) - return self._munch_response(response) + if run_async: + return response + else: + return self._munch_response(response) diff --git a/shade/_tasks.py b/shade/_tasks.py index b46eab4d8..cfb13cc87 100644 --- a/shade/_tasks.py +++ b/shade/_tasks.py @@ -493,11 +493,6 @@ class FloatingIPPoolList(task_manager.Task): return client.nova_client.floating_ip_pools.list() -class ObjectCreate(task_manager.Task): - def main(self, client): - return client.swift_service.upload(**self.args) - - class SubnetCreate(task_manager.Task): def main(self, client): return client.neutron_client.create_subnet(**self.args) diff --git a/shade/_utils.py b/shade/_utils.py index 7ab9bfe29..bfcfbd416 100644 --- a/shade/_utils.py +++ b/shade/_utils.py @@ -616,3 +616,40 @@ def generate_patches_from_kwargs(operation, **kwargs): 'path': '/%s' % k} patches.append(patch) return patches + + +class FileSegment(object): + """File-like object to pass to requests.""" + + def __init__(self, filename, offset, length): + self.filename = filename + self.offset = offset + self.length = length + self.pos = 0 + self._file = open(filename, 'rb') + self.seek(0) + + def tell(self): + return self._file.tell() - self.offset + + def seek(self, offset, whence=0): + if whence == 0: + self._file.seek(self.offset + offset, whence) + elif whence == 1: + self._file.seek(offset, whence) + elif whence == 2: + self._file.seek(self.offset + self.length - offset, 0) + + def read(self, size=-1): + remaining = self.length - self.pos + if remaining <= 0: + return b'' + + to_read = remaining if size < 0 else min(size, remaining) + chunk = self._file.read(to_read) + self.pos += len(chunk) + + return chunk + + def reset(self): + self._file.seek(self.offset, 0) diff --git a/shade/openstackcloud.py b/shade/openstackcloud.py index e695c33d7..549e848f4 100644 --- a/shade/openstackcloud.py +++ b/shade/openstackcloud.py @@ -10,6 +10,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import functools import hashlib import ipaddress @@ -40,7 +41,6 @@ import magnumclient.client import neutronclient.neutron.client import novaclient.client import novaclient.exceptions as nova_exceptions -import swiftclient.service import troveclient.client import designateclient.client @@ -297,12 +297,6 @@ class OpenStackCloud(_normalize.Normalizer): self._keystone_client = None self._neutron_client = None self._nova_client = None - self._swift_service = None - # Lock used to reset swift client. Since swift client does not - # support keystone sessions, we we have to make a new client - # in order to get new auth prior to operations, otherwise - # long-running sessions will fail. - self._swift_service_lock = threading.Lock() self._trove_client = None self._designate_client = None self._magnum_client = None @@ -1077,19 +1071,25 @@ class OpenStackCloud(_normalize.Normalizer): @property def swift_service(self): - with self._swift_service_lock: - if self._swift_service is None: - with _utils.shade_exceptions("Error constructing " - "swift client"): - endpoint = self.get_session_endpoint( - service_key='object-store') - options = dict(os_auth_token=self.auth_token, - os_storage_url=endpoint, - os_region_name=self.region_name) - options.update(self._get_swift_kwargs()) - self._swift_service = swiftclient.service.SwiftService( - options=options) - return self._swift_service + warnings.warn( + 'Using shade to get a swift object is deprecated. If you' + ' need a raw swiftclient.service.SwiftService object, please use' + ' make_legacy_client in os-client-config instead') + try: + import swiftclient.service + except ImportError: + self.log.error( + 'swiftclient is no longer a dependency of shade. You need to' + ' install python-swiftclient directly.') + with _utils.shade_exceptions("Error constructing " + "swift client"): + endpoint = self.get_session_endpoint( + service_key='object-store') + options = dict(os_auth_token=self.auth_token, + os_storage_url=endpoint, + os_region_name=self.region_name) + options.update(self._get_swift_kwargs()) + return swiftclient.service.SwiftService(options=options) @property def cinder_client(self): @@ -3432,10 +3432,6 @@ class OpenStackCloud(_normalize.Normalizer): parameters = image_kwargs.pop('parameters', {}) image_kwargs.update(parameters) - # get new client sessions - with self._swift_service_lock: - self._swift_service = None - self.create_object( container, name, filename, md5=md5, sha256=sha256) @@ -5758,34 +5754,148 @@ class OpenStackCloud(_normalize.Normalizer): if not filename: filename = name + # segment_size gets used as a step value in a range call, so needs + # to be an int + if segment_size: + segment_size = int(segment_size) segment_size = self.get_object_segment_size(segment_size) + file_size = os.path.getsize(filename) if not (md5 or sha256): (md5, sha256) = self._get_file_hashes(filename) headers[OBJECT_MD5_KEY] = md5 or '' headers[OBJECT_SHA256_KEY] = sha256 or '' - header_list = sorted([':'.join([k, v]) for (k, v) in headers.items()]) for (k, v) in metadata.items(): - header_list.append(':'.join(['x-object-meta-' + k, v])) + headers['x-object-meta-' + k] = v # On some clouds this is not necessary. On others it is. I'm confused. self.create_container(container) if self.is_object_stale(container, name, filename, md5, sha256): + + endpoint = '{container}/{name}'.format( + container=container, name=name) self.log.debug( - "swift uploading %(filename)s to %(container)s/%(name)s", - {'filename': filename, 'container': container, 'name': name}) - upload = swiftclient.service.SwiftUploadObject( - source=filename, object_name=name) - for r in self.manager.submit_task(_tasks.ObjectCreate( - container=container, objects=[upload], - options=dict( - header=header_list, - segment_size=segment_size, - use_slo=use_slo))): - if not r['success']: - raise OpenStackCloudException( - 'Failed at action ({action}) [{error}]:'.format(**r)) + "swift uploading %(filename)s to %(endpoint)s", + {'filename': filename, 'endpoint': endpoint}) + + if file_size <= segment_size: + self._upload_object(endpoint, filename, headers) + else: + self._upload_large_object( + endpoint, filename, headers, + file_size, segment_size, use_slo) + + def _upload_object(self, endpoint, filename, headers): + return self._object_store_client.put( + endpoint, headers=headers, data=open(filename, 'r')) + + def _get_file_segments(self, endpoint, filename, file_size, segment_size): + # Use an ordered dict here so that testing can replicate things + segments = collections.OrderedDict() + for (index, offset) in enumerate(range(0, file_size, segment_size)): + remaining = file_size - (index * segment_size) + segment = _utils.FileSegment( + filename, offset, + segment_size if segment_size < remaining else remaining) + name = '{endpoint}/{index:0>6}'.format( + endpoint=endpoint, index=index) + segments[name] = segment + return segments + + def _object_name_from_url(self, url): + '''Get container_name/object_name from the full URL called. + + Remove the Swift endpoint from the front of the URL, and remove + the leaving / that will leave behind.''' + endpoint = self._object_store_client.get_endpoint() + object_name = url.replace(endpoint, '') + if object_name.startswith('/'): + object_name = object_name[1:] + return object_name + + def _add_etag_to_manifest(self, segment_results, manifest): + for result in segment_results: + if 'Etag' not in result.headers: + continue + name = self._object_name_from_url(result.url) + for entry in manifest: + if entry['path'] == '/{name}'.format(name=name): + entry['etag'] = result.headers['Etag'] + + def _upload_large_object( + self, endpoint, filename, headers, file_size, segment_size, use_slo): + # If the object is big, we need to break it up into segments that + # are no larger than segment_size, upload each of them individually + # and then upload a manifest object. The segments can be uploaded in + # parallel, so we'll use the async feature of the TaskManager. + + segment_futures = [] + segment_results = [] + retry_results = [] + retry_futures = [] + manifest = [] + + # Get an OrderedDict with keys being the swift location for the + # segment, the value a FileSegment file-like object that is a + # slice of the data for the segment. + segments = self._get_file_segments( + endpoint, filename, file_size, segment_size) + + # Schedule the segments for upload + for name, segment in segments.items(): + # Async call to put - schedules execution and returns a future + segment_future = self._object_store_client.put( + name, headers=headers, data=segment, run_async=True) + segment_futures.append(segment_future) + # TODO(mordred) Collect etags from results to add to this manifest + # dict. Then sort the list of dicts by path. + manifest.append(dict( + path='/{name}'.format(name=name), + size_bytes=segment.length)) + + # Try once and collect failed results to retry + segment_results, retry_results = task_manager.wait_for_futures( + segment_futures, raise_on_error=False) + + self._add_etag_to_manifest(segment_results, manifest) + + for result in retry_results: + # Grab the FileSegment for the failed upload so we can retry + name = self._object_name_from_url(result.url) + segment = segments[name] + segment.seek(0) + # Async call to put - schedules execution and returns a future + segment_future = self._object_store_client.put( + name, headers=headers, data=segment, run_async=True) + # TODO(mordred) Collect etags from results to add to this manifest + # dict. Then sort the list of dicts by path. + retry_futures.append(segment_future) + + # If any segments fail the second time, just throw the error + segment_results, retry_results = task_manager.wait_for_futures( + retry_futures, raise_on_error=True) + + self._add_etag_to_manifest(segment_results, manifest) + + if use_slo: + return self._finish_large_object_slo(endpoint, headers, manifest) + else: + return self._finish_large_object_dlo(endpoint, headers) + + def _finish_large_object_slo(self, endpoint, headers, manifest): + # TODO(mordred) send an etag of the manifest, which is the md5sum + # of the concatenation of the etags of the results + headers = headers.copy() + return self._object_store_client.put( + endpoint, + params={'multipart-manifest': 'put'}, + headers=headers, json=manifest) + + def _finish_large_object_dlo(self, endpoint, headers): + headers = headers.copy() + headers['X-Object-Manifest'] = endpoint + return self._object_store_client.put(endpoint, headers=headers) def update_object(self, container, name, metadata=None, **headers): """Update the metadata of an object @@ -5837,10 +5947,25 @@ class OpenStackCloud(_normalize.Normalizer): :raises: OpenStackCloudException on operation error. """ + # TODO(mordred) DELETE for swift returns status in text/plain format + # like so: + # Number Deleted: 15 + # Number Not Found: 0 + # Response Body: + # Response Status: 200 OK + # Errors: + # We should ultimately do something with that try: + meta = self.get_object_metadata(container, name) + if not meta: + return False + params = {} + if meta.get('X-Static-Large-Object', None) == 'True': + params['multipart-manifest'] = 'delete' self._object_store_client.delete( '{container}/{object}'.format( - container=container, object=name)) + container=container, object=name), + params=params) return True except OpenStackCloudHTTPError: return False diff --git a/shade/task_manager.py b/shade/task_manager.py index 5144af01f..bfad5a241 100644 --- a/shade/task_manager.py +++ b/shade/task_manager.py @@ -26,6 +26,7 @@ import simplejson import six from shade import _log +from shade import exc from shade import meta @@ -287,3 +288,33 @@ class TaskManager(object): task_class = generate_task_class(method, name, result_filter_cb) return self._executor.submit_task(task_class(**kwargs)) + + +def wait_for_futures(futures, raise_on_error=True, log=None): + '''Collect results or failures from a list of running future tasks.''' + + results = [] + retries = [] + + # Check on each result as its thread finishes + for completed in concurrent.futures.as_completed(futures): + try: + result = completed.result() + # We have to do this here because munch_response doesn't + # get called on async job results + exc.raise_from_response(result) + results.append(result) + except (keystoneauth1.exceptions.RetriableConnectionFailure, + exc.OpenStackCloudException) as e: + if log: + log.debug( + "Exception processing async task: {e}".format( + e=str(e)), + exc_info=True) + # If we get an exception, put the result into a list so we + # can try again + if raise_on_error: + raise + else: + retries.append(result) + return results, retries diff --git a/shade/tests/functional/test_object.py b/shade/tests/functional/test_object.py index f6d5d9c16..f32802e74 100644 --- a/shade/tests/functional/test_object.py +++ b/shade/tests/functional/test_object.py @@ -17,10 +17,13 @@ test_object Functional tests for `shade` object methods. """ +import random +import string import tempfile from testtools import content +from shade import exc from shade.tests.functional import base @@ -40,24 +43,29 @@ class TestObject(base.BaseFunctionalTestCase): self.assertEqual(container_name, self.demo_cloud.list_containers()[0]['name']) sizes = ( - (64 * 1024, 1), # 64K, one segment - (50 * 1024 ** 2, 5) # 50MB, 5 segments + (64 * 1024, 1), # 64K, one segment + (64 * 1024, 5) # 64MB, 5 segments ) for size, nseg in sizes: - segment_size = round(size / nseg) - with tempfile.NamedTemporaryFile() as sparse_file: - sparse_file.seek(size) - sparse_file.write("\0") - sparse_file.flush() + segment_size = int(round(size / nseg)) + with tempfile.NamedTemporaryFile() as fake_file: + fake_content = ''.join(random.SystemRandom().choice( + string.ascii_uppercase + string.digits) + for _ in range(size)).encode('latin-1') + + fake_file.write(fake_content) + fake_file.flush() name = 'test-%d' % size + self.addCleanup( + self.demo_cloud.delete_object, container_name, name) self.demo_cloud.create_object( container_name, name, - sparse_file.name, + fake_file.name, segment_size=segment_size, metadata={'foo': 'bar'}) self.assertFalse(self.demo_cloud.is_object_stale( container_name, name, - sparse_file.name + fake_file.name ) ) self.assertEqual( @@ -70,12 +78,21 @@ class TestObject(base.BaseFunctionalTestCase): 'testv', self.demo_cloud.get_object_metadata( container_name, name)['x-object-meta-testk'] ) - self.assertIsNotNone( - self.demo_cloud.get_object(container_name, name)) + try: + self.assertIsNotNone( + self.demo_cloud.get_object(container_name, name)) + except exc.OpenStackCloudException as e: + self.addDetail( + 'failed_response', + content.text_content(str(e.response.headers))) + self.addDetail( + 'failed_response', + content.text_content(e.response.text)) self.assertEqual( name, self.demo_cloud.list_objects(container_name)[0]['name']) - self.demo_cloud.delete_object(container_name, name) + self.assertTrue( + self.demo_cloud.delete_object(container_name, name)) self.assertEqual([], self.demo_cloud.list_objects(container_name)) self.assertEqual(container_name, self.demo_cloud.list_containers()[0]['name']) diff --git a/shade/tests/unit/base.py b/shade/tests/unit/base.py index 3bff06361..14683c19b 100644 --- a/shade/tests/unit/base.py +++ b/shade/tests/unit/base.py @@ -171,10 +171,12 @@ class RequestsMockTestCase(BaseTestCase): dict(method='GET', url='https://image.example.com/'), ] - def assert_calls(self): + def assert_calls(self, stop_after=None): self.assertEqual(len(self.calls), len(self.adapter.request_history)) for (x, (call, history)) in enumerate( zip(self.calls, self.adapter.request_history)): + if stop_after and x > stop_after: + break self.assertEqual( call['method'], history.method, 'Method mismatch on call {index}'.format(index=x)) diff --git a/shade/tests/unit/test__utils.py b/shade/tests/unit/test__utils.py index 997ae874d..bfe566bd4 100644 --- a/shade/tests/unit/test__utils.py +++ b/shade/tests/unit/test__utils.py @@ -10,6 +10,10 @@ # License for the specific language governing permissions and limitations # under the License. +import random +import string +import tempfile + import testtools from shade import _utils @@ -230,3 +234,26 @@ class TestUtils(base.TestCase): "Invalid range value: <>100" ): _utils.range_filter(RANGE_DATA, "key1", "<>100") + + def test_file_segment(self): + file_size = 4200 + content = ''.join(random.SystemRandom().choice( + string.ascii_uppercase + string.digits) + for _ in range(file_size)).encode('latin-1') + self.imagefile = tempfile.NamedTemporaryFile(delete=False) + self.imagefile.write(content) + self.imagefile.close() + + segments = self.cloud._get_file_segments( + endpoint='test_container/test_image', + filename=self.imagefile.name, + file_size=file_size, + segment_size=1000) + self.assertEqual(len(segments), 5) + segment_content = b'' + for (index, (name, segment)) in enumerate(segments.items()): + self.assertEqual( + 'test_container/test_image/{index:0>6}'.format(index=index), + name) + segment_content += segment.read() + self.assertEqual(content, segment_content) diff --git a/shade/tests/unit/test_image.py b/shade/tests/unit/test_image.py index cd0ae847e..43dab0302 100644 --- a/shade/tests/unit/test_image.py +++ b/shade/tests/unit/test_image.py @@ -215,9 +215,7 @@ class TestImage(BaseTestImage): self.assert_calls() self.assertEqual(self.adapter.request_history[5].text.read(), b'\x00') - @mock.patch.object(shade.OpenStackCloud, 'swift_service') - def test_create_image_task(self, - swift_service_mock): + def test_create_image_task(self): self.cloud.image_api_use_tasks = True image_name = 'name-99' container_name = 'image_upload_v2_test_container' @@ -262,6 +260,12 @@ class TestImage(BaseTestImage): container=container_name, object=image_name), status_code=404) + self.adapter.put( + '{endpoint}/{container}/{object}'.format( + endpoint=endpoint, + container=container_name, object=image_name), + status_code=201) + task_id = str(uuid.uuid4()) args = dict( id=task_id, @@ -304,18 +308,6 @@ class TestImage(BaseTestImage): image_name, self.imagefile.name, wait=True, timeout=1, is_public=False, container=container_name) - args = { - 'header': [ - 'x-object-meta-x-shade-md5:{md5}'.format(md5=NO_MD5), - 'x-object-meta-x-shade-sha256:{sha}'.format(sha=NO_SHA256), - ], - 'segment_size': 1000, - 'use_slo': True} - swift_service_mock.upload.assert_called_with( - container='image_upload_v2_test_container', - objects=mock.ANY, - options=args) - self.calls += [ dict(method='GET', url='https://image.example.com/v2/images'), dict(method='GET', url='https://object-store.example.com/info'), @@ -339,6 +331,15 @@ class TestImage(BaseTestImage): url='{endpoint}/{container}/{object}'.format( endpoint=endpoint, container=container_name, object=image_name)), + dict( + method='PUT', + url='{endpoint}/{container}/{object}'.format( + endpoint=endpoint, + container=container_name, object=image_name), + headers={ + 'x-object-meta-x-shade-md5': NO_MD5, + 'x-object-meta-x-shade-sha256': NO_SHA256, + }), dict(method='GET', url='https://image.example.com/v2/images'), dict( method='POST', diff --git a/shade/tests/unit/test_object.py b/shade/tests/unit/test_object.py index 5d2ac6304..a99d53177 100644 --- a/shade/tests/unit/test_object.py +++ b/shade/tests/unit/test_object.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import tempfile + import testtools import shade @@ -20,10 +22,10 @@ from shade import exc from shade.tests.unit import base -class TestObject(base.RequestsMockTestCase): +class BaseTestObject(base.RequestsMockTestCase): def setUp(self): - super(TestObject, self).setUp() + super(BaseTestObject, self).setUp() self.container = self.getUniqueString() self.object = self.getUniqueString() @@ -33,6 +35,9 @@ class TestObject(base.RequestsMockTestCase): self.object_endpoint = '{endpoint}/{object}'.format( endpoint=self.container_endpoint, object=self.object) + +class TestObject(BaseTestObject): + def test_create_container(self): """Test creating a (private) container""" self.adapter.head( @@ -331,22 +336,31 @@ class TestObject(base.RequestsMockTestCase): self.cloud.list_objects, self.container) def test_delete_object(self): + self.adapter.head( + self.object_endpoint, headers={'X-Object-Meta': 'foo'}) self.adapter.delete(self.object_endpoint, status_code=204) self.assertTrue(self.cloud.delete_object(self.container, self.object)) self.calls += [ - dict(method='DELETE', url=self.object_endpoint), + dict( + method='HEAD', + url=self.object_endpoint), + dict( + method='DELETE', + url=self.object_endpoint), ] self.assert_calls() def test_delete_object_not_found(self): - self.adapter.delete(self.object_endpoint, status_code=404) + self.adapter.head(self.object_endpoint, status_code=404) self.assertFalse(self.cloud.delete_object(self.container, self.object)) self.calls += [ - dict(method='DELETE', url=self.object_endpoint), + dict( + method='HEAD', + url=self.object_endpoint), ] self.assert_calls() @@ -439,3 +453,722 @@ class TestObject(base.RequestsMockTestCase): reason='Precondition failed') self.assertEqual(shade.openstackcloud.DEFAULT_OBJECT_SEGMENT_SIZE, self.cloud.get_object_segment_size(None)) + + +class TestObjectUploads(BaseTestObject): + + def setUp(self): + super(TestObjectUploads, self).setUp() + + self.content = self.getUniqueString().encode('latin-1') + self.object_file = tempfile.NamedTemporaryFile(delete=False) + self.object_file.write(self.content) + self.object_file.close() + (self.md5, self.sha256) = self.cloud._get_file_hashes( + self.object_file.name) + self.endpoint = self.cloud._object_store_client.get_endpoint() + + def test_create_object(self): + + self.adapter.get( + 'https://object-store.example.com/info', + json=dict( + swift={'max_file_size': 1000}, + slo={'min_segment_size': 500})) + + self.adapter.put( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container,), + status_code=201, + headers={ + 'Date': 'Fri, 16 Dec 2016 18:21:20 GMT', + 'Content-Length': '0', + 'Content-Type': 'text/html; charset=UTF-8', + }) + self.adapter.head( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container), + [ + dict(status_code=404), + dict(headers={ + 'Content-Length': '0', + 'X-Container-Object-Count': '0', + 'Accept-Ranges': 'bytes', + 'X-Storage-Policy': 'Policy-0', + 'Date': 'Fri, 16 Dec 2016 18:29:05 GMT', + 'X-Timestamp': '1481912480.41664', + 'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1', + 'X-Container-Bytes-Used': '0', + 'Content-Type': 'text/plain; charset=utf-8'}), + ]) + self.adapter.head( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=404) + + self.adapter.put( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=201) + + self.cloud.create_object( + container=self.container, name=self.object, + filename=self.object_file.name) + + self.calls += [ + dict(method='GET', url='https://object-store.example.com/info'), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='PUT', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object)), + dict( + method='PUT', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + headers={ + 'x-object-meta-x-shade-md5': self.md5, + 'x-object-meta-x-shade-sha256': self.sha256, + }), + ] + + self.assert_calls() + + def test_create_dynamic_large_object(self): + + max_file_size = 2 + min_file_size = 1 + + self.adapter.get( + 'https://object-store.example.com/info', + json=dict( + swift={'max_file_size': max_file_size}, + slo={'min_segment_size': min_file_size})) + + self.adapter.put( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container,), + status_code=201, + headers={ + 'Date': 'Fri, 16 Dec 2016 18:21:20 GMT', + 'Content-Length': '0', + 'Content-Type': 'text/html; charset=UTF-8', + }) + self.adapter.head( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container), + [ + dict(status_code=404), + dict(headers={ + 'Content-Length': '0', + 'X-Container-Object-Count': '0', + 'Accept-Ranges': 'bytes', + 'X-Storage-Policy': 'Policy-0', + 'Date': 'Fri, 16 Dec 2016 18:29:05 GMT', + 'X-Timestamp': '1481912480.41664', + 'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1', + 'X-Container-Bytes-Used': '0', + 'Content-Type': 'text/plain; charset=utf-8'}), + ]) + self.adapter.head( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=404) + + self.adapter.put( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=201) + + self.calls += [ + dict(method='GET', url='https://object-store.example.com/info'), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='PUT', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object)), + ] + + for index, offset in enumerate( + range(0, len(self.content), max_file_size)): + + self.adapter.put( + '{endpoint}/{container}/{object}/{index:0>6}'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object, + index=index), + status_code=201) + + self.calls += [ + dict( + method='PUT', + url='{endpoint}/{container}/{object}/{index:0>6}'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object, + index=index))] + + self.calls += [ + dict( + method='PUT', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + headers={ + 'x-object-manifest': '{container}/{object}'.format( + container=self.container, object=self.object), + 'x-object-meta-x-shade-md5': self.md5, + 'x-object-meta-x-shade-sha256': self.sha256, + }), + ] + + self.cloud.create_object( + container=self.container, name=self.object, + filename=self.object_file.name, use_slo=False) + + # After call 6, order become indeterminate because of thread pool + self.assert_calls(stop_after=6) + + for key, value in self.calls[-1]['headers'].items(): + self.assertEqual( + value, self.adapter.request_history[-1].headers[key], + 'header mismatch in manifest call') + + def test_create_static_large_object(self): + + max_file_size = 25 + min_file_size = 1 + + self.adapter.get( + 'https://object-store.example.com/info', + json=dict( + swift={'max_file_size': max_file_size}, + slo={'min_segment_size': min_file_size})) + + self.adapter.put( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container,), + status_code=201, + headers={ + 'Date': 'Fri, 16 Dec 2016 18:21:20 GMT', + 'Content-Length': '0', + 'Content-Type': 'text/html; charset=UTF-8', + }) + self.adapter.head( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container), + [ + dict(status_code=404), + dict(headers={ + 'Content-Length': '0', + 'X-Container-Object-Count': '0', + 'Accept-Ranges': 'bytes', + 'X-Storage-Policy': 'Policy-0', + 'Date': 'Fri, 16 Dec 2016 18:29:05 GMT', + 'X-Timestamp': '1481912480.41664', + 'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1', + 'X-Container-Bytes-Used': '0', + 'Content-Type': 'text/plain; charset=utf-8'}), + ]) + self.adapter.head( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=404) + + self.adapter.put( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=201) + + self.calls += [ + dict(method='GET', url='https://object-store.example.com/info'), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='PUT', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object)), + ] + + for index, offset in enumerate( + range(0, len(self.content), max_file_size)): + + self.adapter.put( + '{endpoint}/{container}/{object}/{index:0>6}'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object, + index=index), + status_code=201, + headers=dict(Etag='etag{index}'.format(index=index))) + + self.calls += [ + dict( + method='PUT', + url='{endpoint}/{container}/{object}/{index:0>6}'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object, + index=index))] + + self.calls += [ + dict( + method='PUT', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + params={ + 'multipart-manifest', 'put' + }, + headers={ + 'x-object-meta-x-shade-md5': self.md5, + 'x-object-meta-x-shade-sha256': self.sha256, + }), + ] + + self.cloud.create_object( + container=self.container, name=self.object, + filename=self.object_file.name, use_slo=True) + + # After call 6, order become indeterminate because of thread pool + self.assert_calls(stop_after=6) + + for key, value in self.calls[-1]['headers'].items(): + self.assertEqual( + value, self.adapter.request_history[-1].headers[key], + 'header mismatch in manifest call') + + base_object = '/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object) + + self.assertEqual([ + { + 'path': "{base_object}/000000".format( + base_object=base_object), + 'size_bytes': 25, + 'etag': 'etag0', + }, + { + 'path': "{base_object}/000001".format( + base_object=base_object), + 'size_bytes': 25, + 'etag': 'etag1', + }, + { + 'path': "{base_object}/000002".format( + base_object=base_object), + 'size_bytes': 25, + 'etag': 'etag2', + }, + { + 'path': "{base_object}/000003".format( + base_object=base_object), + 'size_bytes': 5, + 'etag': 'etag3', + }, + ], self.adapter.request_history[-1].json()) + + def test_object_segment_retry_failure(self): + + max_file_size = 25 + min_file_size = 1 + + self.adapter.get( + 'https://object-store.example.com/info', + json=dict( + swift={'max_file_size': max_file_size}, + slo={'min_segment_size': min_file_size})) + + self.adapter.put( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container,), + status_code=201, + headers={ + 'Date': 'Fri, 16 Dec 2016 18:21:20 GMT', + 'Content-Length': '0', + 'Content-Type': 'text/html; charset=UTF-8', + }) + self.adapter.head( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container), + [ + dict(status_code=404), + dict(headers={ + 'Content-Length': '0', + 'X-Container-Object-Count': '0', + 'Accept-Ranges': 'bytes', + 'X-Storage-Policy': 'Policy-0', + 'Date': 'Fri, 16 Dec 2016 18:29:05 GMT', + 'X-Timestamp': '1481912480.41664', + 'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1', + 'X-Container-Bytes-Used': '0', + 'Content-Type': 'text/plain; charset=utf-8'}), + ]) + self.adapter.head( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=404) + + self.adapter.put( + '{endpoint}/{container}/{object}/000000'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object), + status_code=201) + + self.adapter.put( + '{endpoint}/{container}/{object}/000001'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object), + status_code=201) + + self.adapter.put( + '{endpoint}/{container}/{object}/000002'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object), + status_code=201) + + self.adapter.put( + '{endpoint}/{container}/{object}/000003'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object), + status_code=501) + + self.adapter.put( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=201) + + self.calls += [ + dict(method='GET', url='https://object-store.example.com/info'), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='PUT', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000000'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000001'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000002'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000003'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000003'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + ] + + self.assertRaises( + exc.OpenStackCloudException, + self.cloud.create_object, + container=self.container, name=self.object, + filename=self.object_file.name, use_slo=True) + + # After call 6, order become indeterminate because of thread pool + self.assert_calls(stop_after=6) + + def test_object_segment_retries(self): + + max_file_size = 25 + min_file_size = 1 + + self.adapter.get( + 'https://object-store.example.com/info', + json=dict( + swift={'max_file_size': max_file_size}, + slo={'min_segment_size': min_file_size})) + + self.adapter.put( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container,), + status_code=201, + headers={ + 'Date': 'Fri, 16 Dec 2016 18:21:20 GMT', + 'Content-Length': '0', + 'Content-Type': 'text/html; charset=UTF-8', + }) + self.adapter.head( + '{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container), + [ + dict(status_code=404), + dict(headers={ + 'Content-Length': '0', + 'X-Container-Object-Count': '0', + 'Accept-Ranges': 'bytes', + 'X-Storage-Policy': 'Policy-0', + 'Date': 'Fri, 16 Dec 2016 18:29:05 GMT', + 'X-Timestamp': '1481912480.41664', + 'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1', + 'X-Container-Bytes-Used': '0', + 'Content-Type': 'text/plain; charset=utf-8'}), + ]) + self.adapter.head( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=404) + + self.adapter.put( + '{endpoint}/{container}/{object}/000000'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object), + headers={'etag': 'etag0'}, + status_code=201) + + self.adapter.put( + '{endpoint}/{container}/{object}/000001'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object), + headers={'etag': 'etag1'}, + status_code=201) + + self.adapter.put( + '{endpoint}/{container}/{object}/000002'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object), + headers={'etag': 'etag2'}, + status_code=201) + + self.adapter.put( + '{endpoint}/{container}/{object}/000003'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object), [ + dict(status_code=501), + dict(status_code=201, headers={'etag': 'etag3'}), + ]) + + self.adapter.put( + '{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + status_code=201) + + self.calls += [ + dict(method='GET', url='https://object-store.example.com/info'), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='PUT', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}'.format( + endpoint=self.endpoint, + container=self.container)), + dict( + method='HEAD', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000000'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000001'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000002'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000003'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}/000003'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object)), + + dict( + method='PUT', + url='{endpoint}/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, object=self.object), + params={ + 'multipart-manifest', 'put' + }, + headers={ + 'x-object-meta-x-shade-md5': self.md5, + 'x-object-meta-x-shade-sha256': self.sha256, + }), + ] + + self.cloud.create_object( + container=self.container, name=self.object, + filename=self.object_file.name, use_slo=True) + + # After call 6, order become indeterminate because of thread pool + self.assert_calls(stop_after=6) + + for key, value in self.calls[-1]['headers'].items(): + self.assertEqual( + value, self.adapter.request_history[-1].headers[key], + 'header mismatch in manifest call') + + base_object = '/{container}/{object}'.format( + endpoint=self.endpoint, + container=self.container, + object=self.object) + + self.assertEqual([ + { + 'path': "{base_object}/000000".format( + base_object=base_object), + 'size_bytes': 25, + 'etag': 'etag0', + }, + { + 'path': "{base_object}/000001".format( + base_object=base_object), + 'size_bytes': 25, + 'etag': 'etag1', + }, + { + 'path': "{base_object}/000002".format( + base_object=base_object), + 'size_bytes': 25, + 'etag': 'etag2', + }, + { + 'path': "{base_object}/000003".format( + base_object=base_object), + 'size_bytes': 1, + 'etag': 'etag3', + }, + ], self.adapter.request_history[-1].json())