Run backup compression on native thread
Backup data compression is a CPU bound operation that will not yield to other greenthreads, so given enough simultaneous backup operations they will lead to other threads' starvation. This is really problematic for DB connections, since starvation will lead to connections getting dropped with errors such as "Lost connection to MySQL server during query". Detailed information on why these connections get dropped can be found in comment "[31 Aug 2007 9:21] Magnus Blåudd" on this MySQL bug [1]. These DB issues may result in backups unnecessary ending in an "error" state. This patch fixes this by moving the compression to a native thread so the cooperative multitasking in Cinder Backup can continue switching threads. [1] https://bugs.mysql.com/bug.php?id=28359 Closes-Bug: #1692775 Closes-Bug: #1719580 Change-Id: I1946dc0ad9cb7a68072a39816fa9fa224c2eb6a5 (cherry picked from commitaf0f00bc52
) (cherry picked from commit439f90da8e
)
This commit is contained in:
parent
b1232cdde8
commit
173d4d0a46
|
@ -344,7 +344,10 @@ class ChunkedBackupDriver(driver.BackupDriver):
|
|||
if self.compressor is None:
|
||||
return 'none', data
|
||||
data_size_bytes = len(data)
|
||||
compressed_data = self.compressor.compress(data)
|
||||
# Execute compression in native thread so it doesn't prevent
|
||||
# cooperative greenthread switching.
|
||||
compressed_data = eventlet.tpool.execute(self.compressor.compress,
|
||||
data)
|
||||
comp_size_bytes = len(compressed_data)
|
||||
algorithm = CONF.backup_compression_algorithm.lower()
|
||||
if comp_size_bytes >= data_size_bytes:
|
||||
|
|
|
@ -25,6 +25,7 @@ import hashlib
|
|||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import zlib
|
||||
|
||||
import mock
|
||||
|
@ -547,14 +548,27 @@ class GoogleBackupDriverTestCase(test.TestCase):
|
|||
|
||||
@gcs_client
|
||||
def test_prepare_output_data_effective_compression(self):
|
||||
"""Test compression works on a native thread."""
|
||||
# Use dictionary to share data between threads
|
||||
thread_dict = {}
|
||||
original_compress = zlib.compress
|
||||
|
||||
def my_compress(data, *args, **kwargs):
|
||||
thread_dict['compress'] = threading.current_thread()
|
||||
return original_compress(data)
|
||||
|
||||
service = google_dr.GoogleBackupDriver(self.ctxt)
|
||||
# Set up buffer of 128 zeroed bytes
|
||||
fake_data = b'\0' * 128
|
||||
|
||||
result = service._prepare_output_data(fake_data)
|
||||
with mock.patch.object(service.compressor, 'compress',
|
||||
side_effect=my_compress):
|
||||
result = service._prepare_output_data(fake_data)
|
||||
|
||||
self.assertEqual('zlib', result[0])
|
||||
self.assertGreater(len(fake_data), len(result[1]))
|
||||
self.assertNotEqual(threading.current_thread(),
|
||||
thread_dict['compress'])
|
||||
|
||||
@gcs_client
|
||||
def test_prepare_output_data_no_compresssion(self):
|
||||
|
|
|
@ -22,6 +22,7 @@ import hashlib
|
|||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import zlib
|
||||
|
||||
import mock
|
||||
|
@ -677,12 +678,25 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
|
|||
return fake_data
|
||||
|
||||
def test_prepare_output_data_effective_compression(self):
|
||||
"""Test compression works on a native thread."""
|
||||
# Use dictionary to share data between threads
|
||||
thread_dict = {}
|
||||
original_compress = zlib.compress
|
||||
|
||||
def my_compress(data, *args, **kwargs):
|
||||
thread_dict['compress'] = threading.current_thread()
|
||||
return original_compress(data)
|
||||
|
||||
service = nfs.NFSBackupDriver(self.ctxt)
|
||||
fake_data = self.create_buffer(128)
|
||||
result = service._prepare_output_data(fake_data)
|
||||
with mock.patch.object(service.compressor, 'compress',
|
||||
side_effect=my_compress):
|
||||
result = service._prepare_output_data(fake_data)
|
||||
|
||||
self.assertEqual('zlib', result[0])
|
||||
self.assertGreater(len(fake_data), len(result[1]))
|
||||
self.assertNotEqual(threading.current_thread(),
|
||||
thread_dict['compress'])
|
||||
|
||||
def test_prepare_output_data_no_compresssion(self):
|
||||
self.flags(backup_compression_algorithm='none')
|
||||
|
|
|
@ -24,6 +24,7 @@ import hashlib
|
|||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import zlib
|
||||
|
||||
import mock
|
||||
|
@ -798,14 +799,27 @@ class BackupSwiftTestCase(test.TestCase):
|
|||
self.assertRaises(ValueError, service._get_compressor, 'fake')
|
||||
|
||||
def test_prepare_output_data_effective_compression(self):
|
||||
"""Test compression works on a native thread."""
|
||||
# Use dictionary to share data between threads
|
||||
thread_dict = {}
|
||||
original_compress = zlib.compress
|
||||
|
||||
def my_compress(data, *args, **kwargs):
|
||||
thread_dict['compress'] = threading.current_thread()
|
||||
return original_compress(data)
|
||||
|
||||
service = swift_dr.SwiftBackupDriver(self.ctxt)
|
||||
# Set up buffer of 128 zeroed bytes
|
||||
fake_data = b'\0' * 128
|
||||
|
||||
result = service._prepare_output_data(fake_data)
|
||||
with mock.patch.object(service.compressor, 'compress',
|
||||
side_effect=my_compress):
|
||||
result = service._prepare_output_data(fake_data)
|
||||
|
||||
self.assertEqual('zlib', result[0])
|
||||
self.assertGreater(len(fake_data), len(result[1]))
|
||||
self.assertNotEqual(threading.current_thread(),
|
||||
thread_dict['compress'])
|
||||
|
||||
def test_prepare_output_data_no_compresssion(self):
|
||||
self.flags(backup_compression_algorithm='none')
|
||||
|
|
Loading…
Reference in New Issue