Run backup compression on native thread
Backup data compression is a CPU bound operation that will not yield to other greenthreads, so given enough simultaneous backup operations they will lead to other threads' starvation. This is really problematic for DB connections, since starvation will lead to connections getting dropped with errors such as "Lost connection to MySQL server during query". Detailed information on why these connections get dropped can be found in comment "[31 Aug 2007 9:21] Magnus Blåudd" on this MySQL bug [1]. These DB issues may result in backups unnecessary ending in an "error" state. This patch fixes this by moving the compression to a native thread so the cooperative multitasking in Cinder Backup can continue switching threads. [1] https://bugs.mysql.com/bug.php?id=28359 Closes-Bug: #1692775 Closes-Bug: #1719580 Change-Id: I1946dc0ad9cb7a68072a39816fa9fa224c2eb6a5 (cherry picked from commitaf0f00bc52
) (cherry picked from commit439f90da8e
) (cherry picked from commitb241f93267
)
This commit is contained in:
parent
ec7005ff91
commit
41754fd57f
|
@ -344,7 +344,10 @@ class ChunkedBackupDriver(driver.BackupDriver):
|
|||
if self.compressor is None:
|
||||
return 'none', data
|
||||
data_size_bytes = len(data)
|
||||
compressed_data = self.compressor.compress(data)
|
||||
# Execute compression in native thread so it doesn't prevent
|
||||
# cooperative greenthread switching.
|
||||
compressed_data = eventlet.tpool.execute(self.compressor.compress,
|
||||
data)
|
||||
comp_size_bytes = len(compressed_data)
|
||||
algorithm = CONF.backup_compression_algorithm.lower()
|
||||
if comp_size_bytes >= data_size_bytes:
|
||||
|
|
|
@ -25,6 +25,7 @@ import hashlib
|
|||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import zlib
|
||||
|
||||
import mock
|
||||
|
@ -547,14 +548,27 @@ class GoogleBackupDriverTestCase(test.TestCase):
|
|||
|
||||
@gcs_client
|
||||
def test_prepare_output_data_effective_compression(self):
|
||||
"""Test compression works on a native thread."""
|
||||
# Use dictionary to share data between threads
|
||||
thread_dict = {}
|
||||
original_compress = zlib.compress
|
||||
|
||||
def my_compress(data, *args, **kwargs):
|
||||
thread_dict['compress'] = threading.current_thread()
|
||||
return original_compress(data)
|
||||
|
||||
service = google_dr.GoogleBackupDriver(self.ctxt)
|
||||
# Set up buffer of 128 zeroed bytes
|
||||
fake_data = b'\0' * 128
|
||||
|
||||
result = service._prepare_output_data(fake_data)
|
||||
with mock.patch.object(service.compressor, 'compress',
|
||||
side_effect=my_compress):
|
||||
result = service._prepare_output_data(fake_data)
|
||||
|
||||
self.assertEqual('zlib', result[0])
|
||||
self.assertGreater(len(fake_data), len(result[1]))
|
||||
self.assertNotEqual(threading.current_thread(),
|
||||
thread_dict['compress'])
|
||||
|
||||
@gcs_client
|
||||
def test_prepare_output_data_no_compresssion(self):
|
||||
|
|
|
@ -22,6 +22,7 @@ import hashlib
|
|||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import zlib
|
||||
|
||||
import mock
|
||||
|
@ -630,12 +631,25 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
|
|||
return fake_data
|
||||
|
||||
def test_prepare_output_data_effective_compression(self):
|
||||
"""Test compression works on a native thread."""
|
||||
# Use dictionary to share data between threads
|
||||
thread_dict = {}
|
||||
original_compress = zlib.compress
|
||||
|
||||
def my_compress(data, *args, **kwargs):
|
||||
thread_dict['compress'] = threading.current_thread()
|
||||
return original_compress(data)
|
||||
|
||||
service = nfs.NFSBackupDriver(self.ctxt)
|
||||
fake_data = self.create_buffer(128)
|
||||
result = service._prepare_output_data(fake_data)
|
||||
with mock.patch.object(service.compressor, 'compress',
|
||||
side_effect=my_compress):
|
||||
result = service._prepare_output_data(fake_data)
|
||||
|
||||
self.assertEqual('zlib', result[0])
|
||||
self.assertGreater(len(fake_data), len(result[1]))
|
||||
self.assertNotEqual(threading.current_thread(),
|
||||
thread_dict['compress'])
|
||||
|
||||
def test_prepare_output_data_no_compresssion(self):
|
||||
self.flags(backup_compression_algorithm='none')
|
||||
|
|
|
@ -24,6 +24,7 @@ import hashlib
|
|||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import zlib
|
||||
|
||||
import mock
|
||||
|
@ -798,14 +799,27 @@ class BackupSwiftTestCase(test.TestCase):
|
|||
self.assertRaises(ValueError, service._get_compressor, 'fake')
|
||||
|
||||
def test_prepare_output_data_effective_compression(self):
|
||||
"""Test compression works on a native thread."""
|
||||
# Use dictionary to share data between threads
|
||||
thread_dict = {}
|
||||
original_compress = zlib.compress
|
||||
|
||||
def my_compress(data, *args, **kwargs):
|
||||
thread_dict['compress'] = threading.current_thread()
|
||||
return original_compress(data)
|
||||
|
||||
service = swift_dr.SwiftBackupDriver(self.ctxt)
|
||||
# Set up buffer of 128 zeroed bytes
|
||||
fake_data = b'\0' * 128
|
||||
|
||||
result = service._prepare_output_data(fake_data)
|
||||
with mock.patch.object(service.compressor, 'compress',
|
||||
side_effect=my_compress):
|
||||
result = service._prepare_output_data(fake_data)
|
||||
|
||||
self.assertEqual('zlib', result[0])
|
||||
self.assertGreater(len(fake_data), len(result[1]))
|
||||
self.assertNotEqual(threading.current_thread(),
|
||||
thread_dict['compress'])
|
||||
|
||||
def test_prepare_output_data_no_compresssion(self):
|
||||
self.flags(backup_compression_algorithm='none')
|
||||
|
|
Loading…
Reference in New Issue