Retry large object manifest upload

Failure to upload the final manifest can leave the already uploaded
segments lying around, unused and unloved. Remove those.

Change-Id: I1756cfb3038c2312afedbc188161dd8d8459e4a7
This commit is contained in:
David Shrewsbury 2019-08-07 11:07:31 -04:00
parent 09cdcc0020
commit 5f5353a791
2 changed files with 303 additions and 11 deletions

View File

@ -554,24 +554,55 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
self._add_etag_to_manifest(segment_results, manifest)
if use_slo:
return self._finish_large_object_slo(endpoint, headers, manifest)
else:
return self._finish_large_object_dlo(endpoint, headers)
# If the final manifest upload fails, remove the segments we've
# already uploaded.
try:
if use_slo:
return self._finish_large_object_slo(endpoint, headers,
manifest)
else:
return self._finish_large_object_dlo(endpoint, headers)
except Exception:
try:
segment_prefix = endpoint.split('/')[-1]
self.log.debug(
"Failed to upload large object manifest for %s. "
"Removing segment uploads.", segment_prefix)
self.delete_autocreated_image_objects(
segment_prefix=segment_prefix)
except Exception:
self.log.exception(
"Failed to cleanup image objects for %s:",
segment_prefix)
raise
def _finish_large_object_slo(self, endpoint, headers, manifest):
# TODO(mordred) send an etag of the manifest, which is the md5sum
# of the concatenation of the etags of the results
headers = headers.copy()
return self._object_store_client.put(
endpoint,
params={'multipart-manifest': 'put'},
headers=headers, data=json.dumps(manifest))
retries = 3
while True:
try:
return self._object_store_client.put(
endpoint,
params={'multipart-manifest': 'put'},
headers=headers, data=json.dumps(manifest))
except Exception:
retries -= 1
if retries == 0:
raise
def _finish_large_object_dlo(self, endpoint, headers):
headers = headers.copy()
headers['X-Object-Manifest'] = endpoint
return self._object_store_client.put(endpoint, headers=headers)
retries = 3
while True:
try:
return self._object_store_client.put(endpoint, headers=headers)
except Exception:
retries -= 1
if retries == 0:
raise
def update_object(self, container, name, metadata=None, **headers):
"""Update the metadata of an object
@ -668,7 +699,8 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
except exc.OpenStackCloudHTTPError:
return False
def delete_autocreated_image_objects(self, container=None):
def delete_autocreated_image_objects(self, container=None,
segment_prefix=None):
"""Delete all objects autocreated for image uploads.
This method should generally not be needed, as shade should clean up
@ -676,6 +708,11 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
goes wrong and it is found that there are leaked objects, this method
can be used to delete any objects that shade has created on the user's
behalf in service of image uploads.
:param str container: Name of the container. Defaults to 'images'.
:param str segment_prefix: Prefix for the image segment names to
delete. If not given, all image upload segments present are
deleted.
"""
if container is None:
container = self._OBJECT_AUTOCREATE_CONTAINER
@ -684,7 +721,7 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
return False
deleted = False
for obj in self.list_objects(container):
for obj in self.list_objects(container, prefix=segment_prefix):
meta = self.get_object_metadata(container, obj['name'])
if meta.get(
self._OBJECT_AUTOCREATE_KEY, meta.get(

View File

@ -841,6 +841,261 @@ class TestObjectUploads(BaseTestObject):
},
], self.adapter.request_history[-1].json())
def test_slo_manifest_retry(self):
"""
Uploading the SLO manifest file should be retried up to 3 times before
giving up. This test should succeed on the 3rd and final attempt.
"""
max_file_size = 25
min_file_size = 1
uris_to_mock = [
dict(method='GET', uri='https://object-store.example.com/info',
json=dict(
swift={'max_file_size': max_file_size},
slo={'min_segment_size': min_file_size})),
dict(method='HEAD',
uri='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=404)
]
uris_to_mock.extend([
dict(method='PUT',
uri='{endpoint}/{container}/{object}/{index:0>6}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object,
index=index),
status_code=201,
headers=dict(Etag='etag{index}'.format(index=index)))
for index, offset in enumerate(
range(0, len(self.content), max_file_size))
])
# manifest file upload calls
uris_to_mock.extend([
dict(method='PUT',
uri='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=400,
validate=dict(
params={
'multipart-manifest', 'put'
},
headers={
'x-object-meta-x-sdk-md5': self.md5,
'x-object-meta-x-sdk-sha256': self.sha256,
})),
dict(method='PUT',
uri='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=400,
validate=dict(
params={
'multipart-manifest', 'put'
},
headers={
'x-object-meta-x-sdk-md5': self.md5,
'x-object-meta-x-sdk-sha256': self.sha256,
})),
dict(method='PUT',
uri='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=201,
validate=dict(
params={
'multipart-manifest', 'put'
},
headers={
'x-object-meta-x-sdk-md5': self.md5,
'x-object-meta-x-sdk-sha256': self.sha256,
})),
])
self.register_uris(uris_to_mock)
self.cloud.create_object(
container=self.container, name=self.object,
filename=self.object_file.name, use_slo=True)
# After call 3, order become indeterminate because of thread pool
self.assert_calls(stop_after=3)
for key, value in self.calls[-1]['headers'].items():
self.assertEqual(
value, self.adapter.request_history[-1].headers[key],
'header mismatch in manifest call')
base_object = '/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)
self.assertEqual([
{
'path': "{base_object}/000000".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag0',
},
{
'path': "{base_object}/000001".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag1',
},
{
'path': "{base_object}/000002".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag2',
},
{
'path': "{base_object}/000003".format(
base_object=base_object),
'size_bytes': len(self.object) - 75,
'etag': 'etag3',
},
], self.adapter.request_history[-1].json())
def test_slo_manifest_fail(self):
"""
Uploading the SLO manifest file should be retried up to 3 times before
giving up. This test fails all 3 attempts and should verify that we
delete uploaded segments that begin with the object prefix.
"""
max_file_size = 25
min_file_size = 1
uris_to_mock = [
dict(method='GET', uri='https://object-store.example.com/info',
json=dict(
swift={'max_file_size': max_file_size},
slo={'min_segment_size': min_file_size})),
dict(method='HEAD',
uri='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=404)
]
uris_to_mock.extend([
dict(method='PUT',
uri='{endpoint}/{container}/{object}/{index:0>6}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object,
index=index),
status_code=201,
headers=dict(Etag='etag{index}'.format(index=index)))
for index, offset in enumerate(
range(0, len(self.content), max_file_size))
])
# manifest file upload calls
uris_to_mock.extend([
dict(method='PUT',
uri='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=400,
validate=dict(
params={
'multipart-manifest', 'put'
},
headers={
'x-object-meta-x-sdk-md5': self.md5,
'x-object-meta-x-sdk-sha256': self.sha256,
})),
dict(method='PUT',
uri='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=400,
validate=dict(
params={
'multipart-manifest', 'put'
},
headers={
'x-object-meta-x-sdk-md5': self.md5,
'x-object-meta-x-sdk-sha256': self.sha256,
})),
dict(method='PUT',
uri='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=400,
validate=dict(
params={
'multipart-manifest', 'put'
},
headers={
'x-object-meta-x-sdk-md5': self.md5,
'x-object-meta-x-sdk-sha256': self.sha256,
})),
])
# Cleaning up image upload segments involves calling the
# delete_autocreated_image_objects() API method which will list
# objects (LIST), get the object metadata (HEAD), then delete the
# object (DELETE).
uris_to_mock.extend([
dict(method='GET',
uri='{endpoint}/images?format=json&prefix={prefix}'.format(
endpoint=self.endpoint,
prefix=self.object),
complete_qs=True,
json=[{
'content_type': 'application/octet-stream',
'bytes': 1437258240,
'hash': '249219347276c331b87bf1ac2152d9af',
'last_modified': '2015-02-16T17:50:05.289600',
'name': self.object
}]),
dict(method='HEAD',
uri='{endpoint}/images/{object}'.format(
endpoint=self.endpoint,
object=self.object),
headers={
'X-Timestamp': '1429036140.50253',
'X-Trans-Id': 'txbbb825960a3243b49a36f-005a0dadaedfw1',
'Content-Length': '1290170880',
'Last-Modified': 'Tue, 14 Apr 2015 18:29:01 GMT',
'x-object-meta-x-sdk-autocreated': 'true',
'X-Object-Meta-X-Shade-Sha256': 'does not matter',
'X-Object-Meta-X-Shade-Md5': 'does not matter',
'Date': 'Thu, 16 Nov 2017 15:24:30 GMT',
'Accept-Ranges': 'bytes',
'Content-Type': 'application/octet-stream',
'Etag': '249219347276c331b87bf1ac2152d9af',
}),
dict(method='DELETE',
uri='{endpoint}/images/{object}'.format(
endpoint=self.endpoint, object=self.object))
])
self.register_uris(uris_to_mock)
# image_api_use_tasks needs to be set to True in order for the API
# method delete_autocreated_image_objects() to do the cleanup.
self.cloud.image_api_use_tasks = True
self.assertRaises(
exc.OpenStackCloudException,
self.cloud.create_object,
container=self.container, name=self.object,
filename=self.object_file.name, use_slo=True)
# After call 3, order become indeterminate because of thread pool
self.assert_calls(stop_after=3)
def test_object_segment_retry_failure(self):
max_file_size = 25