From 41754fd57f27bba646d1d6e26388c7f5f4c2dc4e Mon Sep 17 00:00:00 2001 From: Gorka Eguileor Date: Wed, 13 Sep 2017 19:46:17 +0200 Subject: [PATCH] Run backup compression on native thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backup data compression is a CPU bound operation that will not yield to other greenthreads, so given enough simultaneous backup operations they will lead to other threads' starvation. This is really problematic for DB connections, since starvation will lead to connections getting dropped with errors such as "Lost connection to MySQL server during query". Detailed information on why these connections get dropped can be found in comment "[31 Aug 2007 9:21] Magnus BlÄudd" on this MySQL bug [1]. These DB issues may result in backups unnecessary ending in an "error" state. This patch fixes this by moving the compression to a native thread so the cooperative multitasking in Cinder Backup can continue switching threads. [1] https://bugs.mysql.com/bug.php?id=28359 Closes-Bug: #1692775 Closes-Bug: #1719580 Change-Id: I1946dc0ad9cb7a68072a39816fa9fa224c2eb6a5 (cherry picked from commit af0f00bc52f79d9395adfe0575b0dbe353e18bbe) (cherry picked from commit 439f90da8e4c1cfa1acfe36b5ea8a6c5bae6139f) (cherry picked from commit b241f93267646a6501e3a5fb14521f05e38a2ae9) --- cinder/backup/chunkeddriver.py | 5 ++++- .../unit/backup/drivers/test_backup_google.py | 16 +++++++++++++++- .../tests/unit/backup/drivers/test_backup_nfs.py | 16 +++++++++++++++- .../unit/backup/drivers/test_backup_swift.py | 16 +++++++++++++++- 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/cinder/backup/chunkeddriver.py b/cinder/backup/chunkeddriver.py index 595986ace8e..aac2d85f415 100644 --- a/cinder/backup/chunkeddriver.py +++ b/cinder/backup/chunkeddriver.py @@ -344,7 +344,10 @@ class ChunkedBackupDriver(driver.BackupDriver): if self.compressor is None: return 'none', data data_size_bytes = len(data) - compressed_data = self.compressor.compress(data) + # Execute compression in native thread so it doesn't prevent + # cooperative greenthread switching. + compressed_data = eventlet.tpool.execute(self.compressor.compress, + data) comp_size_bytes = len(compressed_data) algorithm = CONF.backup_compression_algorithm.lower() if comp_size_bytes >= data_size_bytes: diff --git a/cinder/tests/unit/backup/drivers/test_backup_google.py b/cinder/tests/unit/backup/drivers/test_backup_google.py index 7e4d223f86e..98505f98678 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_google.py +++ b/cinder/tests/unit/backup/drivers/test_backup_google.py @@ -25,6 +25,7 @@ import hashlib import os import shutil import tempfile +import threading import zlib import mock @@ -547,14 +548,27 @@ class GoogleBackupDriverTestCase(test.TestCase): @gcs_client def test_prepare_output_data_effective_compression(self): + """Test compression works on a native thread.""" + # Use dictionary to share data between threads + thread_dict = {} + original_compress = zlib.compress + + def my_compress(data, *args, **kwargs): + thread_dict['compress'] = threading.current_thread() + return original_compress(data) + service = google_dr.GoogleBackupDriver(self.ctxt) # Set up buffer of 128 zeroed bytes fake_data = b'\0' * 128 - result = service._prepare_output_data(fake_data) + with mock.patch.object(service.compressor, 'compress', + side_effect=my_compress): + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1])) + self.assertNotEqual(threading.current_thread(), + thread_dict['compress']) @gcs_client def test_prepare_output_data_no_compresssion(self): diff --git a/cinder/tests/unit/backup/drivers/test_backup_nfs.py b/cinder/tests/unit/backup/drivers/test_backup_nfs.py index 9d97fe575bb..c68634234ae 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_nfs.py +++ b/cinder/tests/unit/backup/drivers/test_backup_nfs.py @@ -22,6 +22,7 @@ import hashlib import os import shutil import tempfile +import threading import zlib import mock @@ -630,12 +631,25 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): return fake_data def test_prepare_output_data_effective_compression(self): + """Test compression works on a native thread.""" + # Use dictionary to share data between threads + thread_dict = {} + original_compress = zlib.compress + + def my_compress(data, *args, **kwargs): + thread_dict['compress'] = threading.current_thread() + return original_compress(data) + service = nfs.NFSBackupDriver(self.ctxt) fake_data = self.create_buffer(128) - result = service._prepare_output_data(fake_data) + with mock.patch.object(service.compressor, 'compress', + side_effect=my_compress): + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1])) + self.assertNotEqual(threading.current_thread(), + thread_dict['compress']) def test_prepare_output_data_no_compresssion(self): self.flags(backup_compression_algorithm='none') diff --git a/cinder/tests/unit/backup/drivers/test_backup_swift.py b/cinder/tests/unit/backup/drivers/test_backup_swift.py index f24d1be4928..c288cf89cdd 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_swift.py +++ b/cinder/tests/unit/backup/drivers/test_backup_swift.py @@ -24,6 +24,7 @@ import hashlib import os import shutil import tempfile +import threading import zlib import mock @@ -798,14 +799,27 @@ class BackupSwiftTestCase(test.TestCase): self.assertRaises(ValueError, service._get_compressor, 'fake') def test_prepare_output_data_effective_compression(self): + """Test compression works on a native thread.""" + # Use dictionary to share data between threads + thread_dict = {} + original_compress = zlib.compress + + def my_compress(data, *args, **kwargs): + thread_dict['compress'] = threading.current_thread() + return original_compress(data) + service = swift_dr.SwiftBackupDriver(self.ctxt) # Set up buffer of 128 zeroed bytes fake_data = b'\0' * 128 - result = service._prepare_output_data(fake_data) + with mock.patch.object(service.compressor, 'compress', + side_effect=my_compress): + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1])) + self.assertNotEqual(threading.current_thread(), + thread_dict['compress']) def test_prepare_output_data_no_compresssion(self): self.flags(backup_compression_algorithm='none')