Merge "Add a new URL parameter to allow for async cleanup of SLO segments"

This commit is contained in:
Zuul 2020-11-18 00:50:54 +00:00 committed by Gerrit Code Review
commit cd228fafad
14 changed files with 635 additions and 46 deletions

View File

@ -34,6 +34,7 @@ use = egg:swift#dlo
[filter:slo]
use = egg:swift#slo
allow_async_delete = True
[filter:container_sync]
use = egg:swift#container_sync

View File

@ -1041,6 +1041,13 @@ use = egg:swift#slo
# clients may request that Swift send whitespace ahead of the final response
# body. This whitespace will be yielded at most every yield_frequency seconds.
# yield_frequency = 10
#
# Since SLOs may have thousands of segments, clients may request that the
# object-expirer handle the deletion of segments using query params like
# `?multipart-manifest=delete&async=on`. You may want to keep this off if it
# negatively impacts your expirers; in that case, the deletes will still
# be done as part of the client request.
# allow_async_delete = false
# Note: Put after auth and staticweb in the pipeline.
# If you don't put it in the pipeline, it will be inserted for you.

View File

@ -60,7 +60,7 @@ def make_delete_jobs(account, container, objects, timestamp):
'name': build_task_obj(
timestamp, account, container,
obj.decode('utf8') if six.PY2 and isinstance(obj, str)
else obj),
else obj, high_precision=True),
'deleted': 0,
'created_at': timestamp.internal,
'etag': MD5_OF_EMPTY_STRING,

View File

@ -1486,7 +1486,15 @@ class S3Request(swob.Request):
if version is not None:
query['version-id'] = version
resp = self.get_response(app, 'HEAD', obj=obj, query=query)
return {'multipart-manifest': 'delete'} if resp.is_slo else {}
if not resp.is_slo:
return {}
elif resp.sysmeta_headers.get(sysmeta_header('object', 'etag')):
# Even if allow_async_delete is turned off, SLO will just handle
# the delete synchronously, so we don't need to check before
# setting async=on
return {'multipart-manifest': 'delete', 'async': 'on'}
else:
return {'multipart-manifest': 'delete'}
def set_acl_handler(self, handler):
pass

View File

@ -290,6 +290,16 @@ A ``DELETE`` with a query parameter::
will delete all the segments referenced in the manifest and then the manifest
itself. The failure response will be similar to the bulk delete middleware.
A ``DELETE`` with the query parameters::
?multipart-manifest=delete&async=yes
will schedule all the segments referenced in the manifest to be deleted
asynchronously and then delete the manifest itself. Note that segments will
continue to appear in listings and be counted for quotas until they are
cleaned up by the object-expirer. This option is only available when all
segments are in the same container and none of them are nested SLOs.
------------------------
Modifying a Large Object
------------------------
@ -324,6 +334,7 @@ from hashlib import md5
import six
from swift.cli.container_deleter import make_delete_jobs
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.middleware.listing_formats import \
MAX_CONTAINER_LISTING_CONTENT_LENGTH
@ -332,20 +343,22 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
HTTPOk, HTTPPreconditionFailed, HTTPException, HTTPNotFound, \
HTTPUnauthorized, HTTPConflict, HTTPUnprocessableEntity, \
HTTPServiceUnavailable, Response, Range, normalize_etag, \
RESPONSE_REASONS, str_to_wsgi, wsgi_to_str, wsgi_quote
RESPONSE_REASONS, str_to_wsgi, bytes_to_wsgi, wsgi_to_str, wsgi_quote
from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
closing_if_possible, LRUCache, StreamingPile, strict_b64decode, \
Timestamp
Timestamp, drain_and_close, get_expirer_container
from swift.common.request_helpers import SegmentedIterable, \
get_sys_meta_prefix, update_etag_is_at_header, resolve_etag_is_at_header, \
get_container_update_override_key, update_ignore_range_header
from swift.common.constraints import check_utf8
from swift.common.constraints import check_utf8, AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
from swift.common.wsgi import WSGIContext, make_subrequest
from swift.common.wsgi import WSGIContext, make_subrequest, make_env, \
make_pre_authed_request
from swift.common.middleware.bulk import get_response_body, \
ACCEPTABLE_FORMATS, Bulk
from swift.proxy.controllers.base import get_container_info
DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 ** 2 # 1 MiB
@ -1086,13 +1099,15 @@ class StaticLargeObject(object):
def __init__(self, app, conf,
max_manifest_segments=DEFAULT_MAX_MANIFEST_SEGMENTS,
max_manifest_size=DEFAULT_MAX_MANIFEST_SIZE,
yield_frequency=DEFAULT_YIELD_FREQUENCY):
yield_frequency=DEFAULT_YIELD_FREQUENCY,
allow_async_delete=False):
self.conf = conf
self.app = app
self.logger = get_logger(conf, log_route='slo')
self.max_manifest_segments = max_manifest_segments
self.max_manifest_size = max_manifest_size
self.yield_frequency = yield_frequency
self.allow_async_delete = allow_async_delete
self.max_get_time = int(self.conf.get('max_get_time', 86400))
self.rate_limit_under_size = int(self.conf.get(
'rate_limit_under_size', DEFAULT_RATE_LIMIT_UNDER_SIZE))
@ -1110,6 +1125,17 @@ class StaticLargeObject(object):
delete_concurrency=delete_concurrency,
logger=self.logger)
# Need to know how to expire things to do async deletes
if conf.get('auto_create_account_prefix'):
# proxy app will log about how this should get moved to swift.conf
prefix = conf['auto_create_account_prefix']
else:
prefix = AUTO_CREATE_ACCOUNT_PREFIX
self.expiring_objects_account = prefix + (
conf.get('expiring_objects_account_name') or 'expiring_objects')
self.expiring_objects_container_divisor = int(
conf.get('expiring_objects_container_divisor', 86400))
def handle_multipart_get_or_head(self, req, start_response):
"""
Handles the GET or HEAD of a SLO manifest.
@ -1511,6 +1537,83 @@ class StaticLargeObject(object):
else:
raise HTTPServerError('Unable to load SLO manifest or segment.')
def handle_async_delete(self, req):
if not check_utf8(wsgi_to_str(req.path_info)):
raise HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL')
vrs, account, container, obj = req.split_path(4, 4, True)
if six.PY2:
obj_path = ('/%s/%s' % (container, obj)).decode('utf-8')
else:
obj_path = '/%s/%s' % (wsgi_to_str(container), wsgi_to_str(obj))
segments = [seg for seg in self.get_slo_segments(obj_path, req)
if 'data' not in seg]
if not segments:
# Degenerate case: just delete the manifest
return self.app
segment_containers, segment_objects = zip(*(
split_path(seg['name'], 2, 2, True) for seg in segments))
segment_containers = set(segment_containers)
if len(segment_containers) > 1:
container_csv = ', '.join(
'"%s"' % quote(c) for c in segment_containers)
raise HTTPBadRequest('All segments must be in one container. '
'Found segments in %s' % container_csv)
if any(seg.get('sub_slo') for seg in segments):
raise HTTPBadRequest('No segments may be large objects.')
# Auth checks
segment_container = segment_containers.pop()
if 'swift.authorize' in req.environ:
container_info = get_container_info(
req.environ, self.app, swift_source='SLO')
req.acl = container_info.get('write_acl')
aresp = req.environ['swift.authorize'](req)
req.acl = None
if aresp:
return aresp
if bytes_to_wsgi(segment_container.encode('utf-8')) != container:
path = '/%s/%s/%s' % (vrs, account, bytes_to_wsgi(
segment_container.encode('utf-8')))
seg_container_info = get_container_info(
make_env(req.environ, path=path, swift_source='SLO'),
self.app, swift_source='SLO')
req.acl = seg_container_info.get('write_acl')
aresp = req.environ['swift.authorize'](req)
req.acl = None
if aresp:
return aresp
# Did our sanity checks; schedule segments to be deleted
ts = req.ensure_x_timestamp()
expirer_jobs = make_delete_jobs(
wsgi_to_str(account), segment_container, segment_objects, ts)
expirer_cont = get_expirer_container(
ts, self.expiring_objects_container_divisor,
wsgi_to_str(account), wsgi_to_str(container), wsgi_to_str(obj))
enqueue_req = make_pre_authed_request(
req.environ,
method='UPDATE',
path="/v1/%s/%s" % (self.expiring_objects_account, expirer_cont),
body=json.dumps(expirer_jobs),
headers={'Content-Type': 'application/json',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Allow-Private-Methods': 'True'},
)
resp = enqueue_req.get_response(self.app)
if not resp.is_success:
self.logger.error(
'Failed to enqueue expiration entries: %s\n%s',
resp.status, resp.body)
return HTTPServiceUnavailable()
# consume the response (should be short)
drain_and_close(resp)
# Finally, delete the manifest
return self.app
def handle_multipart_delete(self, req):
"""
Will delete all the segments in the SLO manifest and then, if
@ -1519,6 +1622,10 @@ class StaticLargeObject(object):
:param req: a :class:`~swift.common.swob.Request` with an obj in path
:returns: swob.Response whose app_iter set to Bulk.handle_delete_iter
"""
if self.allow_async_delete and config_true_value(
req.params.get('async')):
return self.handle_async_delete(req)
req.headers['Content-Type'] = None # Ignore content-type from client
resp = HTTPOk(request=req)
try:
@ -1609,6 +1716,8 @@ def filter_factory(global_conf, **local_conf):
DEFAULT_MAX_MANIFEST_SIZE))
yield_frequency = int(conf.get('yield_frequency',
DEFAULT_YIELD_FREQUENCY))
allow_async_delete = config_true_value(conf.get('allow_async_delete',
'false'))
register_swift_info('slo',
max_manifest_segments=max_manifest_segments,
@ -1616,12 +1725,14 @@ def filter_factory(global_conf, **local_conf):
yield_frequency=yield_frequency,
# this used to be configurable; report it as 1 for
# clients that might still care
min_segment_size=1)
min_segment_size=1,
allow_async_delete=allow_async_delete)
def slo_filter(app):
return StaticLargeObject(
app, conf,
max_manifest_segments=max_manifest_segments,
max_manifest_size=max_manifest_size,
yield_frequency=yield_frequency)
yield_frequency=yield_frequency,
allow_async_delete=allow_async_delete)
return slo_filter

View File

@ -1508,15 +1508,16 @@ def last_modified_date_to_timestamp(last_modified_date_str):
return Timestamp(delta.total_seconds())
def normalize_delete_at_timestamp(timestamp):
def normalize_delete_at_timestamp(timestamp, high_precision=False):
"""
Format a timestamp (string or numeric) into a standardized
xxxxxxxxxx (10) format.
xxxxxxxxxx (10) or xxxxxxxxxx.xxxxx (10.5) format.
Note that timestamps less than 0000000000 are raised to
0000000000 and values greater than November 20th, 2286 at
17:46:39 UTC will be capped at that date and time, resulting in
no return value exceeding 9999999999.
no return value exceeding 9999999999.99999 (or 9999999999 if
using low-precision).
This cap is because the expirer is already working through a
sorted list of strings that were all a length of 10. Adding
@ -1528,7 +1529,8 @@ def normalize_delete_at_timestamp(timestamp):
:param timestamp: unix timestamp
:returns: normalized timestamp as a string
"""
return '%010d' % min(max(0, float(timestamp)), 9999999999)
fmt = '%016.5f' if high_precision else '%010d'
return fmt % min(max(0, float(timestamp)), 9999999999.99999)
def mkdirs(path):
@ -4428,7 +4430,7 @@ def quote(value, safe='/'):
def get_expirer_container(x_delete_at, expirer_divisor, acc, cont, obj):
"""
Returns an expiring object container name for given X-Delete-At and
a/c/o.
(native string) a/c/o.
"""
shard_int = int(hash_path(acc, cont, obj), 16) % 100
return normalize_delete_at_timestamp(

View File

@ -42,14 +42,14 @@ ASYNC_DELETE_TYPE = 'application/async-deleted'
def build_task_obj(timestamp, target_account, target_container,
target_obj):
target_obj, high_precision=False):
"""
:return: a task object name in format of
"<timestamp>-<target_account>/<target_container>/<target_obj>"
"""
timestamp = Timestamp(timestamp)
return '%s-%s/%s/%s' % (
normalize_delete_at_timestamp(timestamp),
normalize_delete_at_timestamp(timestamp, high_precision),
target_account, target_container, target_obj)

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import random
import time
import uuid
@ -338,6 +339,69 @@ class TestObjectExpirer(ReplProbeTest):
def test_expirer_delete_returns_outdated_412(self):
self._test_expirer_delete_outdated_object_version(object_exists=True)
def test_slo_async_delete(self):
if not self.cluster_info.get('slo', {}).get('allow_async_delete'):
raise unittest.SkipTest('allow_async_delete not enabled')
segment_container = self.container_name + '_segments'
client.put_container(self.url, self.token, self.container_name, {})
client.put_container(self.url, self.token, segment_container, {})
client.put_object(self.url, self.token,
segment_container, 'segment_1', b'1234')
client.put_object(self.url, self.token,
segment_container, 'segment_2', b'5678')
client.put_object(
self.url, self.token, self.container_name, 'slo', json.dumps([
{'path': segment_container + '/segment_1'},
{'data': 'Cg=='},
{'path': segment_container + '/segment_2'},
]), query_string='multipart-manifest=put')
_, body = client.get_object(self.url, self.token,
self.container_name, 'slo')
self.assertEqual(body, b'1234\n5678')
client.delete_object(
self.url, self.token, self.container_name, 'slo',
query_string='multipart-manifest=delete&async=true')
# Object's deleted
_, objects = client.get_container(self.url, self.token,
self.container_name)
self.assertEqual(objects, [])
with self.assertRaises(client.ClientException) as caught:
client.get_object(self.url, self.token, self.container_name, 'slo')
self.assertEqual(404, caught.exception.http_status)
# But segments are still around and accessible
_, objects = client.get_container(self.url, self.token,
segment_container)
self.assertEqual([o['name'] for o in objects],
['segment_1', 'segment_2'])
_, body = client.get_object(self.url, self.token,
segment_container, 'segment_1')
self.assertEqual(body, b'1234')
_, body = client.get_object(self.url, self.token,
segment_container, 'segment_2')
self.assertEqual(body, b'5678')
# make sure auto-created expirer-queue containers get in the account
# listing so the expirer can find them
Manager(['container-updater']).once()
self.expirer.once()
# Now the expirer has cleaned up the segments
_, objects = client.get_container(self.url, self.token,
segment_container)
self.assertEqual(objects, [])
with self.assertRaises(client.ClientException) as caught:
client.get_object(self.url, self.token,
segment_container, 'segment_1')
self.assertEqual(404, caught.exception.http_status)
with self.assertRaises(client.ClientException) as caught:
client.get_object(self.url, self.token,
segment_container, 'segment_2')
self.assertEqual(404, caught.exception.http_status)
if __name__ == "__main__":
unittest.main()

View File

@ -70,14 +70,14 @@ class TestContainerDeleter(unittest.TestCase):
container_deleter.make_delete_jobs(
'acct', 'cont', ['obj1', 'obj2'],
utils.Timestamp(ts)),
[{'name': ts.split('.')[0] + '-acct/cont/obj1',
[{'name': ts + '-acct/cont/obj1',
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'},
{'name': ts.split('.')[0] + '-acct/cont/obj2',
{'name': ts + '-acct/cont/obj2',
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
@ -99,14 +99,14 @@ class TestContainerDeleter(unittest.TestCase):
self.assertEqual(
container_deleter.make_delete_jobs(
acct, cont, [obj1, obj2], utils.Timestamp(ts)),
[{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], uacct, ucont, uobj1),
[{'name': u'%s-%s/%s/%s' % (ts, uacct, ucont, uobj1),
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'},
{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], uacct, ucont, uobj2),
{'name': u'%s-%s/%s/%s' % (ts, uacct, ucont, uobj2),
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
@ -123,14 +123,14 @@ class TestContainerDeleter(unittest.TestCase):
self.assertEqual(
container_deleter.make_delete_jobs(
acct, cont, [obj1, obj2], utils.Timestamp(ts)),
[{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], acct, cont, obj1),
[{'name': u'%s-%s/%s/%s' % (ts, acct, cont, obj1),
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,
'size': 0,
'storage_policy_index': 0,
'content_type': 'application/async-deleted'},
{'name': u'%s-%s/%s/%s' % (ts.split('.')[0], acct, cont, obj2),
{'name': u'%s-%s/%s/%s' % (ts, acct, cont, obj2),
'deleted': 0,
'created_at': ts,
'etag': utils.MD5_OF_EMPTY_STRING,

View File

@ -75,10 +75,12 @@ class FakeSwift(object):
A good-enough fake Swift proxy server to use in testing middleware.
"""
ALLOWED_METHODS = [
'PUT', 'POST', 'DELETE', 'GET', 'HEAD', 'OPTIONS', 'REPLICATE']
'PUT', 'POST', 'DELETE', 'GET', 'HEAD', 'OPTIONS', 'REPLICATE',
'UPDATE']
def __init__(self):
self._calls = []
self.req_bodies = []
self._unclosed_req_keys = defaultdict(int)
self._unread_req_paths = defaultdict(int)
self.req_method_paths = []
@ -146,19 +148,23 @@ class FakeSwift(object):
raise KeyError("Didn't find %r in allowed responses" % (
(method, path),))
req_body = None # generally, we don't care and let eventlet discard()
if (cont and not obj and method == 'UPDATE') or (
obj and method == 'PUT'):
req_body = b''.join(iter(env['wsgi.input'].read, b''))
# simulate object PUT
if method == 'PUT' and obj:
put_body = b''.join(iter(env['wsgi.input'].read, b''))
if 'swift.callback.update_footers' in env:
footers = HeaderKeyDict()
env['swift.callback.update_footers'](footers)
req.headers.update(footers)
etag = md5(put_body).hexdigest()
etag = md5(req_body).hexdigest()
headers.setdefault('Etag', etag)
headers.setdefault('Content-Length', len(put_body))
headers.setdefault('Content-Length', len(req_body))
# keep it for subsequent GET requests later
self.uploaded[path] = (dict(req.headers), put_body)
self.uploaded[path] = (dict(req.headers), req_body)
if "CONTENT_TYPE" in env:
self.uploaded[path][0]['Content-Type'] = env["CONTENT_TYPE"]
@ -186,6 +192,7 @@ class FakeSwift(object):
# so we deliberately use a HeaderKeyDict
self._calls.append(
FakeSwiftCall(method, path, HeaderKeyDict(req.headers)))
self.req_bodies.append(req_body)
# Apply conditional etag overrides
conditional_etag = resolve_etag_is_at_header(req, headers)

View File

@ -1535,6 +1535,41 @@ class TestS3ApiObj(S3ApiTestCase):
key, arg = q.split('=')
query[key] = arg
self.assertEqual(query['multipart-manifest'], 'delete')
# HEAD did not indicate that it was an S3 MPU, so no async delete
self.assertNotIn('async', query)
self.assertNotIn('Content-Type', headers)
@s3acl
def test_slo_object_async_DELETE(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket/object',
swob.HTTPOk,
{'x-static-large-object': 'True',
'x-object-sysmeta-s3api-etag': 's3-style-etag'},
None)
self.swift.register('DELETE', '/v1/AUTH_test/bucket/object',
swob.HTTPOk, {}, '<SLO delete results>')
req = Request.blank('/bucket/object',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Content-Type': 'foo/bar'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '204')
self.assertEqual(body, b'')
self.assertIn(('HEAD', '/v1/AUTH_test/bucket/object?symlink=get'),
self.swift.calls)
self.assertIn(('DELETE', '/v1/AUTH_test/bucket/object'
'?async=on&multipart-manifest=delete'),
self.swift.calls)
_, path, headers = self.swift.calls_with_headers[-1]
path, query_string = path.split('?', 1)
query = {}
for q in query_string.split('&'):
key, arg = q.split('=')
query[key] = arg
self.assertEqual(query['multipart-manifest'], 'delete')
self.assertEqual(query['async'], 'on')
self.assertNotIn('Content-Type', headers)
def _test_object_for_s3acl(self, method, account):

View File

@ -28,9 +28,11 @@ from io import BytesIO
from swift.common import swob, utils
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.middleware import slo
from swift.common.swob import Request, HTTPException, str_to_wsgi
from swift.common.swob import Request, HTTPException, str_to_wsgi, \
bytes_to_wsgi
from swift.common.utils import quote, closing_if_possible, close_if_possible, \
parse_content_type, iter_multipart_mime_documents, parse_mime_headers
parse_content_type, iter_multipart_mime_documents, parse_mime_headers, \
Timestamp, get_expirer_container
from test.unit.common.middleware.helpers import FakeSwift
@ -1138,13 +1140,35 @@ class TestSloDeleteManifest(SloTestCase):
json.dumps([{'name': '/deltest/b_2', 'hash': 'a', 'bytes': '1'},
{'name': '/deltest/c_3', 'hash': 'b', 'bytes': '2'}]).
encode('ascii'))
self.app.register(
'GET', '/v1/AUTH_test-un\xc3\xafcode',
swob.HTTPOk, {}, None)
self.app.register(
'GET', '/v1/AUTH_test-un\xc3\xafcode/deltest', swob.HTTPOk, {
'X-Container-Read': 'diff read',
'X-Container-Write': 'diff write',
}, None)
self.app.register(
'GET', '/v1/AUTH_test-un\xc3\xafcode/\xe2\x98\x83', swob.HTTPOk, {
'X-Container-Read': 'same read',
'X-Container-Write': 'same write',
}, None)
self.app.register(
'GET', '/v1/AUTH_test-un\xc3\xafcode/deltest/man-all-there',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/deltest/b_2', 'hash': 'a', 'bytes': '1'},
{'name': '/deltest/c_3', 'hash': 'b', 'bytes': '2'}]).
encode('ascii'))
json.dumps([
{'name': u'/\N{SNOWMAN}/b_2', 'hash': 'a', 'bytes': '1'},
{'name': u'/\N{SNOWMAN}/c_3', 'hash': 'b', 'bytes': '2'},
]).encode('ascii'))
self.app.register(
'GET', '/v1/AUTH_test-un\xc3\xafcode/\xe2\x98\x83/same-container',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([
{'name': u'/\N{SNOWMAN}/b_2', 'hash': 'a', 'bytes': '1'},
{'name': u'/\N{SNOWMAN}/c_3', 'hash': 'b', 'bytes': '2'},
]).encode('ascii'))
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/man-all-there',
swob.HTTPNoContent, {}, None)
@ -1170,10 +1194,14 @@ class TestSloDeleteManifest(SloTestCase):
'DELETE', '/v1/AUTH_test-un\xc3\xafcode/deltest/man-all-there',
swob.HTTPNoContent, {}, None)
self.app.register(
'DELETE', '/v1/AUTH_test-un\xc3\xafcode/deltest/b_2',
'DELETE',
'/v1/AUTH_test-un\xc3\xafcode/\xe2\x98\x83/same-container',
swob.HTTPNoContent, {}, None)
self.app.register(
'DELETE', '/v1/AUTH_test-un\xc3\xafcode/deltest/c_3',
'DELETE', '/v1/AUTH_test-un\xc3\xafcode/\xe2\x98\x83/b_2',
swob.HTTPNoContent, {}, None)
self.app.register(
'DELETE', '/v1/AUTH_test-un\xc3\xafcode/\xe2\x98\x83/c_3',
swob.HTTPNoContent, {}, None)
self.app.register(
@ -1330,12 +1358,11 @@ class TestSloDeleteManifest(SloTestCase):
('DELETE', ('/v1/AUTH_test/deltest/man-all-there'))]))
def test_handle_multipart_delete_non_ascii(self):
if six.PY2:
acct = u'AUTH_test-un\u00efcode'.encode('utf-8')
else:
acct = str_to_wsgi(u'AUTH_test-un\u00efcode')
unicode_acct = u'AUTH_test-un\u00efcode'
wsgi_acct = bytes_to_wsgi(unicode_acct.encode('utf-8'))
req = Request.blank(
'/v1/%s/deltest/man-all-there?multipart-manifest=delete' % acct,
'/v1/%s/deltest/man-all-there?'
'multipart-manifest=delete' % wsgi_acct,
environ={'REQUEST_METHOD': 'DELETE'})
status, _, body = self.call_slo(req)
self.assertEqual('200 OK', status)
@ -1351,10 +1378,11 @@ class TestSloDeleteManifest(SloTestCase):
self.assertEqual(set(self.app.calls), set([
('GET',
'/v1/%s/deltest/man-all-there?multipart-manifest=get' % acct),
('DELETE', '/v1/%s/deltest/b_2' % acct),
('DELETE', '/v1/%s/deltest/c_3' % acct),
('DELETE', ('/v1/%s/deltest/man-all-there' % acct))]))
'/v1/%s/deltest/man-all-there'
'?multipart-manifest=get' % wsgi_acct),
('DELETE', '/v1/%s/\xe2\x98\x83/b_2' % wsgi_acct),
('DELETE', '/v1/%s/\xe2\x98\x83/c_3' % wsgi_acct),
('DELETE', ('/v1/%s/deltest/man-all-there' % wsgi_acct))]))
def test_handle_multipart_delete_nested(self):
req = Request.blank(
@ -1523,6 +1551,268 @@ class TestSloDeleteManifest(SloTestCase):
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', '/v1/AUTH_test/deltest/man-all-there')]))
def test_handle_async_delete_whole_404(self):
self.slo.allow_async_delete = True
req = Request.blank(
'/v1/AUTH_test/deltest/man_404?async=t&multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
status, headers, body = self.call_slo(req)
self.assertEqual('404 Not Found', status)
self.assertEqual(
self.app.calls,
[('GET',
'/v1/AUTH_test/deltest/man_404?multipart-manifest=get')])
def test_handle_async_delete_turned_off(self):
self.slo.allow_async_delete = False
req = Request.blank(
'/v1/AUTH_test/deltest/man-all-there?'
'multipart-manifest=delete&async=on&heartbeat=on',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Accept': 'application/json'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '200 OK')
resp_data = json.loads(body)
self.assertEqual(resp_data["Number Deleted"], 3)
self.assertEqual(set(self.app.calls), set([
('GET',
'/v1/AUTH_test/deltest/man-all-there?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', '/v1/AUTH_test/deltest/man-all-there')]))
def test_handle_async_delete_whole(self):
self.slo.allow_async_delete = True
now = Timestamp(time.time())
exp_obj_cont = get_expirer_container(
int(now), 86400, 'AUTH_test', 'deltest', 'man-all-there')
self.app.register(
'UPDATE', '/v1/.expiring_objects/%s' % exp_obj_cont,
swob.HTTPNoContent, {}, None)
req = Request.blank(
'/v1/AUTH_test/deltest/man-all-there'
'?async=true&multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
with patch('swift.common.utils.Timestamp.now', return_value=now):
status, headers, body = self.call_slo(req)
self.assertEqual('204 No Content', status)
self.assertEqual(b'', body)
self.assertEqual(self.app.calls, [
('GET',
'/v1/AUTH_test/deltest/man-all-there?multipart-manifest=get'),
('UPDATE', '/v1/.expiring_objects/%s'
'?async=true&multipart-manifest=delete' % exp_obj_cont),
('DELETE', '/v1/AUTH_test/deltest/man-all-there'
'?async=true&multipart-manifest=delete'),
])
for header, expected in (
('Content-Type', 'application/json'),
('X-Backend-Storage-Policy-Index', '0'),
('X-Backend-Allow-Private-Methods', 'True'),
):
self.assertIn(header, self.app.calls_with_headers[1].headers)
value = self.app.calls_with_headers[1].headers[header]
msg = 'Expected %s header to be %r, not %r'
self.assertEqual(value, expected, msg % (header, expected, value))
self.assertEqual(json.loads(self.app.req_bodies[1]), [
{'content_type': 'application/async-deleted',
'created_at': now.internal,
'deleted': 0,
'etag': 'd41d8cd98f00b204e9800998ecf8427e',
'name': '%s-AUTH_test/deltest/b_2' % now.internal,
'size': 0,
'storage_policy_index': 0},
{'content_type': 'application/async-deleted',
'created_at': now.internal,
'deleted': 0,
'etag': 'd41d8cd98f00b204e9800998ecf8427e',
'name': '%s-AUTH_test/deltest/c_3' % now.internal,
'size': 0,
'storage_policy_index': 0},
])
def test_handle_async_delete_non_ascii(self):
self.slo.allow_async_delete = True
unicode_acct = u'AUTH_test-un\u00efcode'
wsgi_acct = bytes_to_wsgi(unicode_acct.encode('utf-8'))
now = Timestamp(time.time())
exp_obj_cont = get_expirer_container(
int(now), 86400, unicode_acct, 'deltest', 'man-all-there')
self.app.register(
'UPDATE', '/v1/.expiring_objects/%s' % exp_obj_cont,
swob.HTTPNoContent, {}, None)
authorize_calls = []
def authorize(req):
authorize_calls.append((req.method, req.acl))
req = Request.blank(
'/v1/%s/deltest/man-all-there?'
'async=1&multipart-manifest=delete&heartbeat=1' % wsgi_acct,
environ={'REQUEST_METHOD': 'DELETE', 'swift.authorize': authorize})
with patch('swift.common.utils.Timestamp.now', return_value=now):
status, _, body = self.call_slo(req)
# Every async delete should only need to make 3 requests during the
# client request/response cycle, so no need to support heart-beating
self.assertEqual('204 No Content', status)
self.assertEqual(b'', body)
self.assertEqual(self.app.calls, [
('GET',
'/v1/%s/deltest/man-all-there?'
'multipart-manifest=get' % wsgi_acct),
('HEAD', '/v1/%s' % wsgi_acct),
('HEAD', '/v1/%s/deltest' % wsgi_acct),
('HEAD', '/v1/%s/\xe2\x98\x83' % wsgi_acct),
('UPDATE',
'/v1/.expiring_objects/%s'
'?async=1&heartbeat=1&multipart-manifest=delete' % exp_obj_cont),
('DELETE',
'/v1/%s/deltest/man-all-there'
'?async=1&heartbeat=1&multipart-manifest=delete' % wsgi_acct),
])
self.assertEqual(authorize_calls, [
('GET', None), # Original GET
('DELETE', 'diff write'),
('DELETE', 'same write'),
('DELETE', None), # Final DELETE
])
for header, expected in (
('Content-Type', 'application/json'),
('X-Backend-Storage-Policy-Index', '0'),
('X-Backend-Allow-Private-Methods', 'True'),
):
self.assertIn(header, self.app.calls_with_headers[-2].headers)
value = self.app.calls_with_headers[-2].headers[header]
msg = 'Expected %s header to be %r, not %r'
self.assertEqual(value, expected, msg % (header, expected, value))
self.assertEqual(json.loads(self.app.req_bodies[-2]), [
{'content_type': 'application/async-deleted',
'created_at': now.internal,
'deleted': 0,
'etag': 'd41d8cd98f00b204e9800998ecf8427e',
'name': u'%s-%s/\N{SNOWMAN}/b_2' % (now.internal, unicode_acct),
'size': 0,
'storage_policy_index': 0},
{'content_type': 'application/async-deleted',
'created_at': now.internal,
'deleted': 0,
'etag': 'd41d8cd98f00b204e9800998ecf8427e',
'name': u'%s-%s/\N{SNOWMAN}/c_3' % (now.internal, unicode_acct),
'size': 0,
'storage_policy_index': 0},
])
def test_handle_async_delete_non_ascii_same_container(self):
self.slo.allow_async_delete = True
unicode_acct = u'AUTH_test-un\u00efcode'
wsgi_acct = bytes_to_wsgi(unicode_acct.encode('utf-8'))
now = Timestamp(time.time())
exp_obj_cont = get_expirer_container(
int(now), 86400, unicode_acct, u'\N{SNOWMAN}', 'same-container')
self.app.register(
'UPDATE', '/v1/.expiring_objects/%s' % exp_obj_cont,
swob.HTTPNoContent, {}, None)
authorize_calls = []
def authorize(req):
authorize_calls.append((req.method, req.acl))
req = Request.blank(
'/v1/%s/\xe2\x98\x83/same-container?'
'async=yes&multipart-manifest=delete' % wsgi_acct,
environ={'REQUEST_METHOD': 'DELETE', 'swift.authorize': authorize})
with patch('swift.common.utils.Timestamp.now', return_value=now):
status, _, body = self.call_slo(req)
self.assertEqual('204 No Content', status)
self.assertEqual(b'', body)
self.assertEqual(self.app.calls, [
('GET',
'/v1/%s/\xe2\x98\x83/same-container?'
'multipart-manifest=get' % wsgi_acct),
('HEAD', '/v1/%s' % wsgi_acct),
('HEAD', '/v1/%s/\xe2\x98\x83' % wsgi_acct),
('UPDATE',
'/v1/.expiring_objects/%s'
'?async=yes&multipart-manifest=delete' % exp_obj_cont),
('DELETE',
'/v1/%s/\xe2\x98\x83/same-container'
'?async=yes&multipart-manifest=delete' % wsgi_acct),
])
self.assertEqual(authorize_calls, [
('GET', None), # Original GET
('DELETE', 'same write'), # Only need one auth check
('DELETE', None), # Final DELETE
])
for header, expected in (
('Content-Type', 'application/json'),
('X-Backend-Storage-Policy-Index', '0'),
('X-Backend-Allow-Private-Methods', 'True'),
):
self.assertIn(header, self.app.calls_with_headers[-2].headers)
value = self.app.calls_with_headers[-2].headers[header]
msg = 'Expected %s header to be %r, not %r'
self.assertEqual(value, expected, msg % (header, expected, value))
self.assertEqual(json.loads(self.app.req_bodies[-2]), [
{'content_type': 'application/async-deleted',
'created_at': now.internal,
'deleted': 0,
'etag': 'd41d8cd98f00b204e9800998ecf8427e',
'name': u'%s-%s/\N{SNOWMAN}/b_2' % (now.internal, unicode_acct),
'size': 0,
'storage_policy_index': 0},
{'content_type': 'application/async-deleted',
'created_at': now.internal,
'deleted': 0,
'etag': 'd41d8cd98f00b204e9800998ecf8427e',
'name': u'%s-%s/\N{SNOWMAN}/c_3' % (now.internal, unicode_acct),
'size': 0,
'storage_policy_index': 0},
])
def test_handle_async_delete_nested(self):
self.slo.allow_async_delete = True
req = Request.blank(
'/v1/AUTH_test/deltest/manifest-with-submanifest' +
'?async=on&multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
status, _, body = self.call_slo(req)
self.assertEqual('400 Bad Request', status)
self.assertEqual(b'No segments may be large objects.', body)
self.assertEqual(self.app.calls, [
('GET', '/v1/AUTH_test/deltest/' +
'manifest-with-submanifest?multipart-manifest=get')])
def test_handle_async_delete_too_many_containers(self):
self.slo.allow_async_delete = True
self.app.register(
'GET', '/v1/AUTH_test/deltest/man',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/cont1/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/cont2/b_2', 'hash': 'b', 'bytes': '2'}]).
encode('ascii'))
req = Request.blank(
'/v1/AUTH_test/deltest/man?async=on&multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
status, _, body = self.call_slo(req)
self.assertEqual('400 Bad Request', status)
expected = b'All segments must be in one container. Found segments in '
self.assertEqual(expected, body[:len(expected)])
self.assertEqual(self.app.calls, [
('GET', '/v1/AUTH_test/deltest/man?multipart-manifest=get')])
class TestSloHeadOldManifest(SloTestCase):
slo_etag = md5hex("seg01-hashseg02-hash")
@ -4215,6 +4505,7 @@ class TestSwiftInfo(unittest.TestCase):
self.assertEqual(swift_info['slo'].get('min_segment_size'), 1)
self.assertEqual(swift_info['slo'].get('max_manifest_size'),
mware.max_manifest_size)
self.assertIs(swift_info['slo'].get('allow_async_delete'), False)
self.assertEqual(1000, mware.max_manifest_segments)
self.assertEqual(8388608, mware.max_manifest_size)
self.assertEqual(1048576, mware.rate_limit_under_size)
@ -4223,19 +4514,21 @@ class TestSwiftInfo(unittest.TestCase):
self.assertEqual(10, mware.yield_frequency)
self.assertEqual(2, mware.concurrency)
self.assertEqual(2, mware.bulk_deleter.delete_concurrency)
self.assertIs(False, mware.allow_async_delete)
def test_registered_non_defaults(self):
conf = dict(
max_manifest_segments=500, max_manifest_size=1048576,
rate_limit_under_size=2097152, rate_limit_after_segment=20,
rate_limit_segments_per_sec=2, yield_frequency=5, concurrency=1,
delete_concurrency=3)
delete_concurrency=3, allow_async_delete='y')
mware = slo.filter_factory(conf)('have to pass in an app')
swift_info = utils.get_swift_info()
self.assertTrue('slo' in swift_info)
self.assertEqual(swift_info['slo'].get('max_manifest_segments'), 500)
self.assertEqual(swift_info['slo'].get('min_segment_size'), 1)
self.assertEqual(swift_info['slo'].get('max_manifest_size'), 1048576)
self.assertIs(swift_info['slo'].get('allow_async_delete'), True)
self.assertEqual(500, mware.max_manifest_segments)
self.assertEqual(1048576, mware.max_manifest_size)
self.assertEqual(2097152, mware.rate_limit_under_size)
@ -4244,6 +4537,7 @@ class TestSwiftInfo(unittest.TestCase):
self.assertEqual(5, mware.yield_frequency)
self.assertEqual(1, mware.concurrency)
self.assertEqual(3, mware.bulk_deleter.delete_concurrency)
self.assertIs(True, mware.allow_async_delete)
if __name__ == '__main__':

View File

@ -1216,8 +1216,56 @@ class TestUtils(unittest.TestCase):
self.assertEqual(
utils.normalize_delete_at_timestamp('71253327593.67890'),
'9999999999')
self.assertRaises(ValueError, utils.normalize_timestamp, '')
self.assertRaises(ValueError, utils.normalize_timestamp, 'abc')
with self.assertRaises(TypeError):
utils.normalize_delete_at_timestamp(None)
with self.assertRaises(ValueError):
utils.normalize_delete_at_timestamp('')
with self.assertRaises(ValueError):
utils.normalize_delete_at_timestamp('abc')
def test_normalize_delete_at_timestamp_high_precision(self):
self.assertEqual(
utils.normalize_delete_at_timestamp(1253327593, True),
'1253327593.00000')
self.assertEqual(
utils.normalize_delete_at_timestamp(1253327593.67890, True),
'1253327593.67890')
self.assertEqual(
utils.normalize_delete_at_timestamp('1253327593', True),
'1253327593.00000')
self.assertEqual(
utils.normalize_delete_at_timestamp('1253327593.67890', True),
'1253327593.67890')
self.assertEqual(
utils.normalize_delete_at_timestamp(-1253327593, True),
'0000000000.00000')
self.assertEqual(
utils.normalize_delete_at_timestamp(-1253327593.67890, True),
'0000000000.00000')
self.assertEqual(
utils.normalize_delete_at_timestamp('-1253327593', True),
'0000000000.00000')
self.assertEqual(
utils.normalize_delete_at_timestamp('-1253327593.67890', True),
'0000000000.00000')
self.assertEqual(
utils.normalize_delete_at_timestamp(71253327593, True),
'9999999999.99999')
self.assertEqual(
utils.normalize_delete_at_timestamp(71253327593.67890, True),
'9999999999.99999')
self.assertEqual(
utils.normalize_delete_at_timestamp('71253327593', True),
'9999999999.99999')
self.assertEqual(
utils.normalize_delete_at_timestamp('71253327593.67890', True),
'9999999999.99999')
with self.assertRaises(TypeError):
utils.normalize_delete_at_timestamp(None, True)
with self.assertRaises(ValueError):
utils.normalize_delete_at_timestamp('', True)
with self.assertRaises(ValueError):
utils.normalize_delete_at_timestamp('abc', True)
def test_last_modified_date_to_timestamp(self):
expectations = {

View File

@ -1029,6 +1029,18 @@ class TestObjectExpirer(TestCase):
args = (ts, a, c, o)
self.assertEqual(args, expirer.parse_task_obj(
expirer.build_task_obj(ts, a, c, o)))
self.assertEqual(args, expirer.parse_task_obj(
expirer.build_task_obj(ts, a, c, o, high_precision=True)))
ts = Timestamp(next(self.ts), delta=1234)
a = u'\N{SNOWMAN}'
c = u'\N{SNOWFLAKE}'
o = u'\U0001F334'
args = (ts, a, c, o)
self.assertNotEqual(args, expirer.parse_task_obj(
expirer.build_task_obj(ts, a, c, o)))
self.assertEqual(args, expirer.parse_task_obj(
expirer.build_task_obj(ts, a, c, o, high_precision=True)))
if __name__ == '__main__':