Merge "Allow concurrent bulk deletes"

This commit is contained in:
Jenkins 2016-05-25 03:50:43 +00:00 committed by Gerrit Code Review
commit d4a7ad0a3c
6 changed files with 247 additions and 69 deletions

View File

@ -618,19 +618,23 @@ use = egg:swift#bulk
# max_failed_extractions = 1000
# max_deletes_per_request = 10000
# max_failed_deletes = 1000
#
# In order to keep a connection active during a potentially long bulk request,
# Swift may return whitespace prepended to the actual response body. This
# whitespace will be yielded no more than every yield_frequency seconds.
# yield_frequency = 10
#
# Note: The following parameter is used during a bulk delete of objects and
# their container. This would frequently fail because it is very likely
# that all replicated objects have not been deleted by the time the middleware got a
# successful response. It can be configured the number of retries. And the
# number of seconds to wait between each retry will be 1.5**retry
# delete_container_retry_count = 0
#
# To speed up the bulk delete process, multiple deletes may be executed in
# parallel. Avoid setting this too high, as it gives clients a force multiplier
# which may be used in DoS attacks. The suggested range is between 2 and 10.
# delete_concurrency = 2
# Note: Put after auth and staticweb in the pipeline.
[filter:slo]
@ -651,6 +655,12 @@ use = egg:swift#slo
#
# Time limit on GET requests (seconds)
# max_get_time = 86400
#
# When deleting with ?multipart-manifest=delete, multiple deletes may be
# executed in parallel. Avoid setting this too high, as it gives clients a
# force multiplier which may be used in DoS attacks. The suggested range is
# between 2 and 10.
# delete_concurrency = 2
# Note: Put after auth and staticweb in the pipeline.
# If you don't put it in the pipeline, it will be inserted for you.

View File

@ -201,7 +201,8 @@ from swift.common.swob import Request, HTTPBadGateway, \
HTTPCreated, HTTPBadRequest, HTTPNotFound, HTTPUnauthorized, HTTPOk, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPNotAcceptable, \
HTTPLengthRequired, HTTPException, HTTPServerError, wsgify
from swift.common.utils import get_logger, register_swift_info
from swift.common.utils import get_logger, register_swift_info, \
StreamingPile
from swift.common import constraints
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND, HTTP_CONFLICT
@ -274,8 +275,9 @@ class Bulk(object):
def __init__(self, app, conf, max_containers_per_extraction=10000,
max_failed_extractions=1000, max_deletes_per_request=10000,
max_failed_deletes=1000, yield_frequency=10, retry_count=0,
retry_interval=1.5, logger=None):
max_failed_deletes=1000, yield_frequency=10,
delete_concurrency=2, retry_count=0, retry_interval=1.5,
logger=None):
self.app = app
self.logger = logger or get_logger(conf, log_route='bulk')
self.max_containers = max_containers_per_extraction
@ -283,6 +285,7 @@ class Bulk(object):
self.max_failed_deletes = max_failed_deletes
self.max_deletes_per_request = max_deletes_per_request
self.yield_frequency = yield_frequency
self.delete_concurrency = min(1000, max(1, delete_concurrency))
self.retry_count = retry_count
self.retry_interval = retry_interval
self.max_path_length = constraints.MAX_OBJECT_NAME_LENGTH \
@ -397,39 +400,74 @@ class Bulk(object):
objs_to_delete = self.get_objs_to_delete(req)
failed_file_response = {'type': HTTPBadRequest}
req.environ['eventlet.minimum_write_chunk_size'] = 0
for obj_to_delete in objs_to_delete:
if last_yield + self.yield_frequency < time():
separator = '\r\n\r\n'
last_yield = time()
yield ' '
obj_name = obj_to_delete['name']
if not obj_name:
continue
if len(failed_files) >= self.max_failed_deletes:
raise HTTPBadRequest('Max delete failures exceeded')
if obj_to_delete.get('error'):
if obj_to_delete['error']['code'] == HTTP_NOT_FOUND:
resp_dict['Number Not Found'] += 1
else:
def delete_filter(predicate, objs_to_delete):
for obj_to_delete in objs_to_delete:
obj_name = obj_to_delete['name']
if not obj_name:
continue
if not predicate(obj_name):
continue
if obj_to_delete.get('error'):
if obj_to_delete['error']['code'] == HTTP_NOT_FOUND:
resp_dict['Number Not Found'] += 1
else:
failed_files.append([
quote(obj_name),
obj_to_delete['error']['message']])
continue
delete_path = '/'.join(['', vrs, account,
obj_name.lstrip('/')])
if not constraints.check_utf8(delete_path):
failed_files.append([quote(obj_name),
obj_to_delete['error']['message']])
continue
delete_path = '/'.join(['', vrs, account,
obj_name.lstrip('/')])
if not constraints.check_utf8(delete_path):
failed_files.append([quote(obj_name),
HTTPPreconditionFailed().status])
continue
HTTPPreconditionFailed().status])
continue
yield (obj_name, delete_path)
def objs_then_containers(objs_to_delete):
# process all objects first
yield delete_filter(lambda name: '/' in name.strip('/'),
objs_to_delete)
# followed by containers
yield delete_filter(lambda name: '/' not in name.strip('/'),
objs_to_delete)
def do_delete(obj_name, delete_path):
new_env = req.environ.copy()
new_env['PATH_INFO'] = delete_path
del(new_env['wsgi.input'])
new_env['CONTENT_LENGTH'] = 0
new_env['REQUEST_METHOD'] = 'DELETE'
new_env['HTTP_USER_AGENT'] = \
'%s %s' % (req.environ.get('HTTP_USER_AGENT'), user_agent)
new_env['HTTP_USER_AGENT'] = '%s %s' % (
req.environ.get('HTTP_USER_AGENT'), user_agent)
new_env['swift.source'] = swift_source
self._process_delete(delete_path, obj_name, new_env, resp_dict,
failed_files, failed_file_response)
delete_obj_req = Request.blank(delete_path, new_env)
return (delete_obj_req.get_response(self.app), obj_name, 0)
with StreamingPile(self.delete_concurrency) as pile:
for names_to_delete in objs_then_containers(objs_to_delete):
for resp, obj_name, retry in pile.asyncstarmap(
do_delete, names_to_delete):
if last_yield + self.yield_frequency < time():
separator = '\r\n\r\n'
last_yield = time()
yield ' '
self._process_delete(resp, pile, obj_name,
resp_dict, failed_files,
failed_file_response, retry)
if len(failed_files) >= self.max_failed_deletes:
# Abort, but drain off the in-progress deletes
for resp, obj_name, retry in pile:
if last_yield + self.yield_frequency < time():
separator = '\r\n\r\n'
last_yield = time()
yield ' '
# Don't pass in the pile, as we shouldn't retry
self._process_delete(
resp, None, obj_name, resp_dict,
failed_files, failed_file_response, retry)
msg = 'Max delete failures exceeded'
raise HTTPBadRequest(msg)
if failed_files:
resp_dict['Response Status'] = \
@ -603,10 +641,8 @@ class Bulk(object):
yield separator + get_response_body(
out_content_type, resp_dict, failed_files)
def _process_delete(self, delete_path, obj_name, env, resp_dict,
def _process_delete(self, resp, pile, obj_name, resp_dict,
failed_files, failed_file_response, retry=0):
delete_obj_req = Request.blank(delete_path, env)
resp = delete_obj_req.get_response(self.app)
if resp.status_int // 100 == 2:
resp_dict['Number Deleted'] += 1
elif resp.status_int == HTTP_NOT_FOUND:
@ -614,13 +650,16 @@ class Bulk(object):
elif resp.status_int == HTTP_UNAUTHORIZED:
failed_files.append([quote(obj_name),
HTTPUnauthorized().status])
elif resp.status_int == HTTP_CONFLICT and \
elif resp.status_int == HTTP_CONFLICT and pile and \
self.retry_count > 0 and self.retry_count > retry:
retry += 1
sleep(self.retry_interval ** retry)
self._process_delete(delete_path, obj_name, env, resp_dict,
failed_files, failed_file_response,
retry)
delete_obj_req = Request.blank(resp.environ['PATH_INFO'],
resp.environ)
def _retry(req, app, obj_name, retry):
return req.get_response(app), obj_name, retry
pile.spawn(_retry, delete_obj_req, self.app, obj_name, retry)
else:
if resp.status_int // 100 == 5:
failed_file_response['type'] = HTTPBadGateway
@ -664,6 +703,8 @@ def filter_factory(global_conf, **local_conf):
max_deletes_per_request = int(conf.get('max_deletes_per_request', 10000))
max_failed_deletes = int(conf.get('max_failed_deletes', 1000))
yield_frequency = int(conf.get('yield_frequency', 10))
delete_concurrency = min(1000, max(1, int(
conf.get('delete_concurrency', 2))))
retry_count = int(conf.get('delete_container_retry_count', 0))
retry_interval = 1.5
@ -684,6 +725,7 @@ def filter_factory(global_conf, **local_conf):
max_deletes_per_request=max_deletes_per_request,
max_failed_deletes=max_failed_deletes,
yield_frequency=yield_frequency,
delete_concurrency=delete_concurrency,
retry_count=retry_count,
retry_interval=retry_interval)
return bulk_filter

View File

@ -784,7 +784,9 @@ class StaticLargeObject(object):
'rate_limit_after_segment', '10'))
self.rate_limit_segments_per_sec = int(self.conf.get(
'rate_limit_segments_per_sec', '1'))
self.bulk_deleter = Bulk(app, {}, logger=self.logger)
delete_concurrency = int(self.conf.get('delete_concurrency', '2'))
self.bulk_deleter = Bulk(
app, {}, delete_concurrency=delete_concurrency, logger=self.logger)
def handle_multipart_get_or_head(self, req, start_response):
"""

View File

@ -2606,6 +2606,48 @@ class GreenAsyncPile(object):
__next__ = next
class StreamingPile(GreenAsyncPile):
"""
Runs jobs in a pool of green threads, spawning more jobs as results are
retrieved and worker threads become available.
When used as a context manager, has the same worker-killing properties as
:class:`ContextPool`.
"""
def __init__(self, size):
""":param size: number of worker threads to use"""
self.pool = ContextPool(size)
super(StreamingPile, self).__init__(self.pool)
def asyncstarmap(self, func, args_iter):
"""
This is the same as :func:`itertools.starmap`, except that *func* is
executed in a separate green thread for each item, and results won't
necessarily have the same order as inputs.
"""
args_iter = iter(args_iter)
# Initialize the pile
for args in itertools.islice(args_iter, self.pool.size):
self.spawn(func, *args)
# Keep populating the pile as greenthreads become available
for args in args_iter:
yield next(self)
self.spawn(func, *args)
# Drain the pile
for result in self:
yield result
def __enter__(self):
self.pool.__enter__()
return self
def __exit__(self, type, value, traceback):
self.pool.__exit__(type, value, traceback)
class ModifiedParseResult(ParseResult):
"Parse results class for urlparse."

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import Counter
import numbers
from six.moves import urllib
import unittest
@ -611,10 +612,11 @@ class TestUntar(unittest.TestCase):
class TestDelete(unittest.TestCase):
conf = {'delete_concurrency': 1} # default to old single-threaded behavior
def setUp(self):
self.app = FakeApp()
self.bulk = bulk.filter_factory({})(self.app)
self.bulk = bulk.filter_factory(self.conf)(self.app)
def tearDown(self):
self.app.calls = 0
@ -729,10 +731,10 @@ class TestDelete(unittest.TestCase):
req.method = 'POST'
resp_body = self.handle_delete_and_iter(req)
self.assertEqual(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/f',
'/delete_works/AUTH_Acc/c/f404',
'/delete_works/AUTH_Acc/c/%25'])
Counter(self.app.delete_paths),
Counter(['/delete_works/AUTH_Acc/c/f',
'/delete_works/AUTH_Acc/c/f404',
'/delete_works/AUTH_Acc/c/%25']))
self.assertEqual(self.app.calls, 3)
resp_data = utils.json.loads(resp_body)
self.assertEqual(resp_data['Number Deleted'], 2)
@ -756,19 +758,20 @@ class TestDelete(unittest.TestCase):
req.method = 'POST'
resp_body = self.handle_delete_and_iter(req)
self.assertEqual(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/ obj \xe2\x99\xa1',
'/delete_works/AUTH_Acc/c/ objbadutf8'])
Counter(self.app.delete_paths),
Counter(['/delete_works/AUTH_Acc/c/ obj \xe2\x99\xa1',
'/delete_works/AUTH_Acc/c/ objbadutf8']))
self.assertEqual(self.app.calls, 2)
resp_data = utils.json.loads(resp_body)
self.assertEqual(resp_data['Number Deleted'], 1)
self.assertEqual(len(resp_data['Errors']), 2)
self.assertEqual(resp_data['Errors'],
[[urllib.parse.quote('c/ objbadutf8'),
'412 Precondition Failed'],
[urllib.parse.quote('/c/f\xdebadutf8'),
'412 Precondition Failed']])
self.assertEqual(
Counter(map(tuple, resp_data['Errors'])),
Counter([(urllib.parse.quote('c/ objbadutf8'),
'412 Precondition Failed'),
(urllib.parse.quote('/c/f\xdebadutf8'),
'412 Precondition Failed')]))
def test_bulk_delete_no_body(self):
req = Request.blank('/unauth/AUTH_acc/')
@ -798,8 +801,9 @@ class TestDelete(unittest.TestCase):
resp_body = self.handle_delete_and_iter(req)
resp_data = utils.json.loads(resp_body)
self.assertEqual(
resp_data['Errors'],
[['/c/f', '500 Internal Error'], ['c/f2', '500 Internal Error']])
Counter(map(tuple, resp_data['Errors'])),
Counter([('/c/f', '500 Internal Error'),
('c/f2', '500 Internal Error')]))
self.assertEqual(resp_data['Response Status'], '502 Bad Gateway')
def test_bulk_delete_bad_path(self):
@ -879,19 +883,91 @@ class TestDelete(unittest.TestCase):
self.assertTrue('400 Bad Request' in resp_body)
def test_bulk_delete_max_failures(self):
req = Request.blank('/unauth/AUTH_Acc', body='/c/f1\n/c/f2\n/c/f3',
body = '\n'.join([
'/c/f1', '/c/f2', '/c/f3', '/c/f4', '/c/f5', '/c/f6',
])
req = Request.blank('/unauth/AUTH_Acc', body=body,
headers={'Accept': 'application/json'})
req.method = 'POST'
with patch.object(self.bulk, 'max_failed_deletes', 2):
resp_body = self.handle_delete_and_iter(req)
self.assertEqual(self.app.calls, 2)
# We know there should be at least max_failed_deletes, but there
# may be more as we clean up in-progress requests.
self.assertGreaterEqual(self.app.calls,
self.bulk.max_failed_deletes)
# As we're pulling things off the pile, we:
# - get delete result,
# - process the result,
# - check max_failed_deletes,
# - spawn another delete, repeat.
# As a result, we know our app calls should be *strictly* less.
# Note this means that when delete_concurrency is one,
# self.app.calls will exactly equal self.bulk.max_failed_deletes.
self.assertLess(self.app.calls,
self.bulk.max_failed_deletes +
self.bulk.delete_concurrency)
resp_data = utils.json.loads(resp_body)
self.assertEqual(resp_data['Response Status'], '400 Bad Request')
self.assertEqual(resp_data['Response Body'],
'Max delete failures exceeded')
self.assertEqual(resp_data['Errors'],
[['/c/f1', '401 Unauthorized'],
['/c/f2', '401 Unauthorized']])
self.assertIn(['/c/f1', '401 Unauthorized'], resp_data['Errors'])
self.assertIn(['/c/f2', '401 Unauthorized'], resp_data['Errors'])
class TestConcurrentDelete(TestDelete):
conf = {'delete_concurrency': 3}
def test_concurrency_set(self):
self.assertEqual(self.bulk.delete_concurrency, 3)
class TestConfig(unittest.TestCase):
def test_defaults(self):
expected_defaults = {
'delete_concurrency': 2,
'max_containers': 10000,
'max_deletes_per_request': 10000,
'max_failed_deletes': 1000,
'max_failed_extractions': 1000,
'retry_count': 0,
'retry_interval': 1.5,
'yield_frequency': 10,
}
filter_app = bulk.filter_factory({})(FakeApp())
self.assertEqual(expected_defaults, {k: getattr(filter_app, k)
for k in expected_defaults})
filter_app = bulk.Bulk(FakeApp(), None)
self.assertEqual(expected_defaults, {k: getattr(filter_app, k)
for k in expected_defaults})
def test_delete_concurrency(self):
# Must be an integer
conf = {'delete_concurrency': '1.5'}
self.assertRaises(ValueError, bulk.filter_factory, conf)
conf = {'delete_concurrency': 'asdf'}
self.assertRaises(ValueError, bulk.filter_factory, conf)
# Will be at least one
conf = {'delete_concurrency': '-1'}
filter_app = bulk.filter_factory(conf)(FakeApp())
self.assertEqual(1, filter_app.delete_concurrency)
conf = {'delete_concurrency': '0'}
filter_app = bulk.filter_factory(conf)(FakeApp())
self.assertEqual(1, filter_app.delete_concurrency)
# But if you want to set it stupid-high, we won't stop you
conf = {'delete_concurrency': '1000'}
filter_app = bulk.filter_factory(conf)(FakeApp())
self.assertEqual(1000, filter_app.delete_concurrency)
# ...unless it's extra-stupid-high, in which case we cap it
conf = {'delete_concurrency': '1001'}
filter_app = bulk.filter_factory(conf)(FakeApp())
self.assertEqual(1000, filter_app.delete_concurrency)
class TestSwiftInfo(unittest.TestCase):

View File

@ -917,15 +917,17 @@ class TestSloDeleteManifest(SloTestCase):
status, headers, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEqual(
self.app.calls,
[('GET', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/a_1?multipart-manifest=delete'),
('GET', '/v1/AUTH_test/deltest/' +
'missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/d_3?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=delete')])
set(self.app.calls),
set([('GET', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/' +
'a_1?multipart-manifest=delete'),
('GET', '/v1/AUTH_test/deltest/' +
'missing-submanifest?multipart-manifest=get'),
('DELETE', '/v1/AUTH_test/deltest/' +
'd_3?multipart-manifest=delete'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest?multipart-manifest=delete')]))
self.assertEqual(resp_data['Response Status'], '200 OK')
self.assertEqual(resp_data['Response Body'], '')
self.assertEqual(resp_data['Number Deleted'], 3)
@ -2652,6 +2654,10 @@ class TestSloBulkLogger(unittest.TestCase):
slo_mware = slo.filter_factory({})('fake app')
self.assertTrue(slo_mware.logger is slo_mware.bulk_deleter.logger)
def test_passes_through_concurrency(self):
slo_mware = slo.filter_factory({'delete_concurrency': 5})('fake app')
self.assertEqual(5, slo_mware.bulk_deleter.delete_concurrency)
class TestSwiftInfo(unittest.TestCase):
def setUp(self):