diff --git a/swiftclient/service.py b/swiftclient/service.py index ce0c4fc..223641b 100644 --- a/swiftclient/service.py +++ b/swiftclient/service.py @@ -43,7 +43,8 @@ from swiftclient.command_helpers import ( ) from swiftclient.utils import ( config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG, - parse_api_response, report_traceback, n_groups, split_request_headers + parse_api_response, report_traceback, n_groups, split_request_headers, + n_at_a_time ) from swiftclient.exceptions import ClientException from swiftclient.multithreading import MultiThreadingManager @@ -2151,11 +2152,15 @@ class SwiftService(object): rq = Queue() obj_dels = {} - if self._should_bulk_delete(objects): - for obj_slice in n_groups( - objects, self._options['object_dd_threads']): - self._bulk_delete(container, obj_slice, options, - obj_dels) + bulk_page_size = self._bulk_delete_page_size(objects) + if bulk_page_size > 1: + page_at_a_time = n_at_a_time(objects, bulk_page_size) + for page_slice in page_at_a_time: + for obj_slice in n_groups( + page_slice, + self._options['object_dd_threads']): + self._bulk_delete(container, obj_slice, options, + obj_dels) else: self._per_item_delete(container, objects, options, obj_dels, rq) @@ -2208,23 +2213,36 @@ class SwiftService(object): and not res['success']): cancelled = True - def _should_bulk_delete(self, objects): - if len(objects) < 2 * self._options['object_dd_threads']: + def _bulk_delete_page_size(self, objects): + ''' + Given the iterable 'objects', will return how many items should be + deleted at a time. + + :param objects: An iterable that supports 'len()' + :returns: The bulk delete page size (i.e. the max number of + objects that can be bulk deleted at once, as reported by + the cluster). If bulk delete is disabled, return 1 + ''' + if len(objects) <= 2 * self._options['object_dd_threads']: # Not many objects; may as well delete one-by-one - return False + return 1 try: cap_result = self.capabilities() if not cap_result['success']: # This shouldn't actually happen, but just in case we start # being more nuanced about our capabilities result... - return False + return 1 except ClientException: # Old swift, presumably; assume no bulk middleware - return False + return 1 swift_info = cap_result['capabilities'] - return 'bulk_delete' in swift_info + if 'bulk_delete' in swift_info: + return swift_info['bulk_delete'].get( + 'max_deletes_per_request', 10000) + else: + return 1 def _per_item_delete(self, container, objects, options, rdict, rq): for obj in objects: diff --git a/tests/unit/test_service.py b/tests/unit/test_service.py index 378b70b..260f1cb 100644 --- a/tests/unit/test_service.py +++ b/tests/unit/test_service.py @@ -474,6 +474,40 @@ class TestServiceDelete(_TestServiceBase): self.assertLessEqual(r['error_timestamp'], after) self.assertIn('Traceback', r['traceback']) + @mock.patch.object(swiftclient.service.SwiftService, 'capabilities', + lambda *a: {'action': 'capabilities', + 'timestamp': time.time(), + 'success': True, + 'capabilities': { + 'bulk_delete': + {'max_deletes_per_request': 10}} + }) + def test_bulk_delete_page_size(self): + # make a list of 100 objects + obj_list = ['x%02d' % i for i in range(100)] + errors = [] + + # _bulk_delete_page_size uses 2x the number of threads to determine + # if if there are "many" object to delete or not + + # format is: [(thread_count, expected result), ...] + obj_threads_exp = [ + (10, 10), # something small + (49, 10), # just under the bounds + (50, 1), # cutover point + (51, 1), # just over bounds + (100, 1), # something big + ] + for thread_count, exp in obj_threads_exp: + s = SwiftService(options={'object_dd_threads': thread_count}) + res = s._bulk_delete_page_size(obj_list) + if res != exp: + msg = 'failed for thread_count %d: got %r expected %r' % \ + (thread_count, res, exp) + errors.append(msg) + if errors: + self.fail('_bulk_delete_page_size() failed\n' + '\n'.join(errors)) + class TestSwiftError(unittest.TestCase): diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py index f7e9c1a..481aedd 100644 --- a/tests/unit/test_shell.py +++ b/tests/unit/test_shell.py @@ -903,8 +903,8 @@ class TestShell(unittest.TestCase): 'x-object-meta-mtime': mock.ANY}, response_dict={}) - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: False) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 0) @mock.patch('swiftclient.service.Connection') def test_delete_bad_threads(self, mock_connection): mock_connection.return_value.get_container.return_value = (None, []) @@ -934,8 +934,8 @@ class TestShell(unittest.TestCase): check_good(["--object-threads", "1"]) check_good(["--container-threads", "1"]) - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: False) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 1) @mock.patch('swiftclient.service.Connection') def test_delete_account(self, connection): connection.return_value.get_account.side_effect = [ @@ -971,8 +971,8 @@ class TestShell(unittest.TestCase): mock.call('container2', response_dict={}, headers={}), mock.call('empty_container', response_dict={}, headers={})]) - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: True) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 10) @mock.patch('swiftclient.service.Connection') def test_delete_bulk_account(self, connection): connection.return_value.get_account.side_effect = [ @@ -1085,8 +1085,80 @@ class TestShell(unittest.TestCase): self.assertEqual(connection.return_value.get_capabilities.mock_calls, [mock.call(None)]) # only one /info request - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: False) + @mock.patch('swiftclient.service.Connection') + def test_delete_bulk_account_with_capabilities_and_pages(self, connection): + connection.return_value.get_capabilities.return_value = { + 'bulk_delete': { + 'max_deletes_per_request': 2, + 'max_failed_deletes': 1000, + }, + } + connection.return_value.get_account.side_effect = [ + [None, [{'name': 'container'}]], + [None, [{'name': 'container2'}]], + [None, [{'name': 'empty_container'}]], + [None, []], + ] + connection.return_value.get_container.side_effect = [ + [None, [{'name': 'object'}, {'name': 'obj\xe9ct2'}, + {'name': 'z_object'}, {'name': 'z_obj\xe9ct2'}]], + [None, []], + [None, [{'name': 'object'}, {'name': 'obj\xe9ct2'}, + {'name': 'z_object'}, {'name': 'z_obj\xe9ct2'}]], + [None, []], + [None, []], + ] + connection.return_value.attempts = 0 + argv = ["", "delete", "--all", "--object-threads", "1"] + connection.return_value.post_account.return_value = {}, ( + b'{"Number Not Found": 0, "Response Status": "200 OK", ' + b'"Errors": [], "Number Deleted": 1, "Response Body": ""}') + swiftclient.shell.main(argv) + # check that each bulk call was only called with 2 objects + self.assertEqual( + connection.return_value.post_account.mock_calls, [ + mock.call(query_string='bulk-delete', + data=b''.join([ + b'/container/object\n', + b'/container/obj%C3%A9ct2\n', + ]), + headers={'Content-Type': 'text/plain', + 'Accept': 'application/json'}, + response_dict={}), + mock.call(query_string='bulk-delete', + data=b''.join([ + b'/container/z_object\n', + b'/container/z_obj%C3%A9ct2\n' + ]), + headers={'Content-Type': 'text/plain', + 'Accept': 'application/json'}, + response_dict={}), + mock.call(query_string='bulk-delete', + data=b''.join([ + b'/container2/object\n', + b'/container2/obj%C3%A9ct2\n', + ]), + headers={'Content-Type': 'text/plain', + 'Accept': 'application/json'}, + response_dict={}), + mock.call(query_string='bulk-delete', + data=b''.join([ + b'/container2/z_object\n', + b'/container2/z_obj%C3%A9ct2\n' + ]), + headers={'Content-Type': 'text/plain', + 'Accept': 'application/json'}, + response_dict={})]) + self.assertEqual( + connection.return_value.delete_container.mock_calls, [ + mock.call('container', response_dict={}, headers={}), + mock.call('container2', response_dict={}, headers={}), + mock.call('empty_container', response_dict={}, headers={})]) + self.assertEqual(connection.return_value.get_capabilities.mock_calls, + [mock.call(None)]) # only one /info request + + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 1) @mock.patch('swiftclient.service.Connection') def test_delete_container(self, connection): connection.return_value.get_container.side_effect = [ @@ -1103,8 +1175,8 @@ class TestShell(unittest.TestCase): 'container', 'object', query_string=None, response_dict={}, headers={}) - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: False) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 1) @mock.patch('swiftclient.service.Connection') def test_delete_container_headers(self, connection): connection.return_value.get_container.side_effect = [ @@ -1122,8 +1194,8 @@ class TestShell(unittest.TestCase): 'container', 'object', query_string=None, response_dict={}, headers={'Skip-Middleware': 'Test'}) - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: True) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 10) @mock.patch('swiftclient.service.Connection') def test_delete_bulk_container(self, connection): connection.return_value.get_container.side_effect = [ @@ -1176,8 +1248,8 @@ class TestShell(unittest.TestCase): self.assertTrue(out.out.find( 't\u00e9st_c [after 2 attempts]') >= 0, out) - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: False) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 1) @mock.patch('swiftclient.service.Connection') def test_delete_per_object(self, connection): argv = ["", "delete", "container", "object"] @@ -1188,8 +1260,8 @@ class TestShell(unittest.TestCase): 'container', 'object', query_string=None, response_dict={}, headers={}) - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: True) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 10) @mock.patch('swiftclient.service.Connection') def test_delete_bulk_object(self, connection): argv = ["", "delete", "container", "object"] @@ -2714,8 +2786,8 @@ class TestCrossAccountObjectAccess(TestBase, MockHttpTest): return status return on_request - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: False) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 1) @mock.patch('swiftclient.service.Connection') def test_upload_bad_threads(self, mock_connection): mock_connection.return_value.put_object.return_value = EMPTY_ETAG @@ -2897,8 +2969,8 @@ class TestCrossAccountObjectAccess(TestBase, MockHttpTest): self.assertIn(expected_err, out.err) self.assertEqual('', out) - @mock.patch.object(swiftclient.service.SwiftService, '_should_bulk_delete', - lambda *a: False) + @mock.patch.object(swiftclient.service.SwiftService, + '_bulk_delete_page_size', lambda *a: 1) @mock.patch('swiftclient.service.Connection') def test_download_bad_threads(self, mock_connection): mock_connection.return_value.get_object.return_value = [{}, '']