diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index ac7d497adb..67c879b1ac 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -148,6 +148,12 @@ use = egg:swift#object # 'keep_cache_private' is false. # keep_cache_slo_manifest = false # +# cooperative_period defines how frequent object server GET request will +# perform the cooperative yielding during iterating the disk chunks. For +# example, value of '5' will insert one sleep() after every 5 disk_chunk_size +# chunk reads. A value of '0' (the default) will turn off cooperative yielding. +# cooperative_period = 0 +# # on PUTs, sync data every n MB # mb_per_sync = 512 # diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index 52d235b5c7..a735e85a63 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -6506,18 +6506,20 @@ class CooperativeIterator(ClosingIterator): :param iterable: iterator to wrap. :param period: number of items yielded from this iterator between calls to - ``sleep()``. + ``sleep()``; a negative value or 0 mean that cooperative sleep will be + disabled. """ __slots__ = ('period', 'count') def __init__(self, iterable, period=5): super(CooperativeIterator, self).__init__(iterable) self.count = 0 - self.period = period + self.period = max(0, period or 0) def _get_next_item(self): - if self.count >= self.period: - self.count = 0 - sleep() - self.count += 1 + if self.period: + if self.count >= self.period: + self.count = 0 + sleep() + self.count += 1 return super(CooperativeIterator, self)._get_next_item() diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index d593c0d686..d59c486252 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -66,7 +66,7 @@ from swift.common.utils import mkdirs, Timestamp, \ MD5_OF_EMPTY_STRING, link_fd_to_path, \ O_TMPFILE, makedirs_count, replace_partition_in_path, remove_directory, \ md5, is_file_older, non_negative_float, config_fallocate_value, \ - fs_has_free_space + fs_has_free_space, CooperativeIterator from swift.common.splice import splice, tee from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \ DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \ @@ -2110,11 +2110,13 @@ class BaseDiskFileReader(object): :param pipe_size: size of pipe buffer used in zero-copy operations :param diskfile: the diskfile creating this DiskFileReader instance :param keep_cache: should resulting reads be kept in the buffer cache + :param cooperative_period: the period parameter when does cooperative + yielding during file read """ def __init__(self, fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, - keep_cache=False): + keep_cache=False, cooperative_period=0): # Parameter tracking self._fp = fp self._data_file = data_file @@ -2133,6 +2135,7 @@ class BaseDiskFileReader(object): self._keep_cache = obj_size < keep_cache_size else: self._keep_cache = False + self._cooperative_period = cooperative_period # Internal Attributes self._iter_etag = None @@ -2157,6 +2160,10 @@ class BaseDiskFileReader(object): self._iter_etag.update(chunk) def __iter__(self): + return CooperativeIterator( + self._inner_iter(), period=self._cooperative_period) + + def _inner_iter(self): """Returns an iterator over the data file.""" try: dropped_cache = 0 @@ -2975,7 +2982,7 @@ class BaseDiskFile(object): with self.open(current_time=current_time): return self.get_metadata() - def reader(self, keep_cache=False, + def reader(self, keep_cache=False, cooperative_period=0, _quarantine_hook=lambda m: None): """ Return a :class:`swift.common.swob.Response` class compatible @@ -2987,6 +2994,8 @@ class BaseDiskFile(object): :param keep_cache: caller's preference for keeping data read in the OS buffer cache + :param cooperative_period: the period parameter for cooperative + yielding during file read :param _quarantine_hook: 1-arg callable called when obj quarantined; the arg is the reason for quarantine. Default is to ignore it. @@ -2998,7 +3007,8 @@ class BaseDiskFile(object): self._metadata['ETag'], self._disk_chunk_size, self._manager.keep_cache_size, self._device_path, self._logger, use_splice=self._use_splice, quarantine_hook=_quarantine_hook, - pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache) + pipe_size=self._pipe_size, diskfile=self, keep_cache=keep_cache, + cooperative_period=cooperative_period) # At this point the reader object is now responsible for closing # the file pointer. self._fp = None @@ -3161,11 +3171,12 @@ class ECDiskFileReader(BaseDiskFileReader): def __init__(self, fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, quarantine_hook, use_splice, pipe_size, diskfile, - keep_cache=False): + keep_cache=False, cooperative_period=0): super(ECDiskFileReader, self).__init__( fp, data_file, obj_size, etag, disk_chunk_size, keep_cache_size, device_path, logger, - quarantine_hook, use_splice, pipe_size, diskfile, keep_cache) + quarantine_hook, use_splice, pipe_size, diskfile, keep_cache, + cooperative_period) self.frag_buf = None self.frag_offset = 0 self.frag_size = self._diskfile.policy.fragment_size diff --git a/swift/obj/mem_diskfile.py b/swift/obj/mem_diskfile.py index fa72372fe1..3dee2c1354 100644 --- a/swift/obj/mem_diskfile.py +++ b/swift/obj/mem_diskfile.py @@ -426,13 +426,14 @@ class DiskFile(object): with self.open(current_time=current_time): return self.get_metadata() - def reader(self, keep_cache=False): + def reader(self, keep_cache=False, cooperative_period=0): """ Return a swift.common.swob.Response class compatible "app_iter" object. The responsibility of closing the open file is passed to the DiskFileReader object. :param keep_cache: + :param cooperative_period: """ dr = DiskFileReader(self._name, self._fp, int(self._metadata['Content-Length']), diff --git a/swift/obj/server.py b/swift/obj/server.py index 343cf624f8..5853eac9a9 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -152,6 +152,7 @@ class ObjectController(BaseStorageServer): config_true_value(conf.get('keep_cache_private', 'false')) self.keep_cache_slo_manifest = \ config_true_value(conf.get('keep_cache_slo_manifest', 'false')) + self.cooperative_period = int(conf.get("cooperative_period", 0)) default_allowed_headers = ''' content-disposition, @@ -1097,10 +1098,15 @@ class ObjectController(BaseStorageServer): ) ) conditional_etag = resolve_etag_is_at_header(request, metadata) + app_iter = disk_file.reader( + keep_cache=keep_cache, + cooperative_period=self.cooperative_period, + ) response = Response( - app_iter=disk_file.reader(keep_cache=keep_cache), - request=request, conditional_response=True, - conditional_etag=conditional_etag) + app_iter=app_iter, request=request, + conditional_response=True, + conditional_etag=conditional_etag, + ) response.headers['Content-Type'] = metadata.get( 'Content-Type', 'application/octet-stream') for key, value in metadata.items(): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 0414d92cf1..5742f550e4 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -9821,7 +9821,7 @@ class TestCooperativeIterator(unittest.TestCase): it.close() self.assertTrue(closeable.close.called) - def test_next(self): + def test_sleeps(self): def do_test(it, period): results = [] for i in range(period): @@ -9852,8 +9852,21 @@ class TestCooperativeIterator(unittest.TestCase): self.assertEqual(list(range(7)), actual) actual = do_test(utils.CooperativeIterator(itertools.count(), 1), 1) self.assertEqual(list(range(3)), actual) - actual = do_test(utils.CooperativeIterator(itertools.count(), 0), 0) - self.assertEqual(list(range(2)), actual) + + def test_no_sleeps(self): + def do_test(period): + it = utils.CooperativeIterator(itertools.count(), period) + results = [] + with mock.patch('swift.common.utils.sleep') as mock_sleep: + for i in range(100): + results.append(next(it)) + self.assertFalse(mock_sleep.called, i) + self.assertEqual(list(range(100)), results) + + do_test(0) + do_test(-1) + do_test(-111) + do_test(None) class TestContextPool(unittest.TestCase): diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 52ab294585..3c27c0b61a 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -4346,7 +4346,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4369,7 +4370,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) # Request headers have 'X-Storage-Token'. @@ -4379,7 +4381,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) # Request headers have both 'X-Auth-Token' and 'X-Storage-Token'. @@ -4390,7 +4393,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) def test_GET_keep_cache_private_config_false(self): @@ -4418,7 +4422,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4441,7 +4446,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) # Request headers have 'X-Storage-Token'. @@ -4451,7 +4457,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) # Request headers have both 'X-Auth-Token' and 'X-Storage-Token'. @@ -4462,7 +4469,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) def test_GET_keep_cache_slo_manifest_no_config(self): @@ -4492,7 +4500,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4537,7 +4546,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4582,7 +4592,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=True) + reader_mock.assert_called_with( + keep_cache=True, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4626,7 +4637,8 @@ class TestObjectController(BaseTestCase): reader_mock = mock.Mock(keep_cache=False) with mock.patch('swift.obj.diskfile.BaseDiskFile.reader', reader_mock): resp = req.get_response(obj_controller) - reader_mock.assert_called_with(keep_cache=False) + reader_mock.assert_called_with( + keep_cache=False, cooperative_period=0) self.assertEqual(resp.status_int, 200) etag = '"%s"' % md5(b'VERIFY', usedforsecurity=False).hexdigest() self.assertEqual(dict(resp.headers), { @@ -4642,6 +4654,107 @@ class TestObjectController(BaseTestCase): gmtime(math.ceil(float(timestamp)))), }) + def test_GET_cooperative_period_config(self): + # Test config of 'cooperative_period' gets passed to DiskFile reader. + conf = {'devices': self.testdir, 'mount_check': 'false', + 'container_update_timeout': 0.0, + 'cooperative_period': '99'} + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + timestamp = normalize_timestamp(time()) + req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers={'X-Timestamp': timestamp, + 'Content-Type': 'application/x-test'}) + req.body = b'7 bytes' + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 201) + + req = Request.blank('/sda1/p/a/c/o', + headers={'Content-Type': 'application/x-test', + 'X-Auth-Token': '2340lsdfhhjl02lxfjj'}) + with mock.patch( + "swift.obj.diskfile.BaseDiskFile.reader" + ) as reader_mock: + resp = req.get_response(obj_controller) + reader_mock.assert_called_with(keep_cache=False, cooperative_period=99) + self.assertEqual(resp.status_int, 200) + + # Test DiskFile reader actually sleeps when reading chunks. When + # cooperative_period is 1, disk reader sleeps once AFTER each next(). + conf['cooperative_period'] = '1' + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-6'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 206) + self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range')) + self.assertEqual(b' bytes', resp.body) + self.assertEqual(1, mock_sleep.call_count) + + # Test DiskFile reader actually sleeps when reading chunks. And verify + # number of sleeps when 'disk_chunk_size' is set. + conf['cooperative_period'] = '2' + conf['disk_chunk_size'] = 2 + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual(b'7 bytes', resp.body) + self.assertEqual(2, mock_sleep.call_count) + + conf['cooperative_period'] = '2' + conf['disk_chunk_size'] = 3 + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 200) + self.assertEqual(b'7 bytes', resp.body) + self.assertEqual(1, mock_sleep.call_count) + + # Test DiskFile reader won't sleep with cooperative_period set as 0. + conf['cooperative_period'] = '0' + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-6'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 206) + self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range')) + self.assertEqual(b' bytes', resp.body) + self.assertFalse(mock_sleep.called) + + # Test DiskFile reader won't sleep with default cooperative_period + # which is also 0. + conf.pop('cooperative_period') + obj_controller = object_server.ObjectController( + conf, logger=self.logger) + + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-6'}) + with mock.patch('swift.common.utils.sleep') as mock_sleep: + resp = req.get_response(obj_controller) + self.assertEqual(resp.status_int, 206) + self.assertEqual('bytes 1-6/7', resp.headers.get('Content-Range')) + self.assertEqual(b' bytes', resp.body) + self.assertFalse(mock_sleep.called) + @mock.patch("time.time", mock_time) def test_DELETE(self): # Test swift.obj.server.ObjectController.DELETE