Run backup compression on native thread

Backup data compression is a CPU bound operation that will not yield to
other greenthreads, so given enough simultaneous backup operations they
will lead to other threads' starvation.

This is really problematic for DB connections, since starvation will
lead to connections getting dropped with errors such as "Lost connection
to MySQL server during query".

Detailed information on why these connections get dropped can be found
in comment "[31 Aug 2007 9:21] Magnus Blåudd" on this MySQL bug [1].

These DB issues may result in backups unnecessary ending in an "error"
state.

This patch fixes this by moving the compression to a native thread so
the cooperative multitasking in Cinder Backup can continue switching
threads.

[1] https://bugs.mysql.com/bug.php?id=28359

Closes-Bug: #1692775
Closes-Bug: #1719580
Change-Id: I1946dc0ad9cb7a68072a39816fa9fa224c2eb6a5
(cherry picked from commit af0f00bc52)
(cherry picked from commit 439f90da8e)
(cherry picked from commit b241f93267)
This commit is contained in:
Gorka Eguileor 2017-09-13 19:46:17 +02:00
parent ec7005ff91
commit 41754fd57f
4 changed files with 49 additions and 4 deletions

View File

@ -344,7 +344,10 @@ class ChunkedBackupDriver(driver.BackupDriver):
if self.compressor is None:
return 'none', data
data_size_bytes = len(data)
compressed_data = self.compressor.compress(data)
# Execute compression in native thread so it doesn't prevent
# cooperative greenthread switching.
compressed_data = eventlet.tpool.execute(self.compressor.compress,
data)
comp_size_bytes = len(compressed_data)
algorithm = CONF.backup_compression_algorithm.lower()
if comp_size_bytes >= data_size_bytes:

View File

@ -25,6 +25,7 @@ import hashlib
import os
import shutil
import tempfile
import threading
import zlib
import mock
@ -547,14 +548,27 @@ class GoogleBackupDriverTestCase(test.TestCase):
@gcs_client
def test_prepare_output_data_effective_compression(self):
"""Test compression works on a native thread."""
# Use dictionary to share data between threads
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
service = google_dr.GoogleBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
result = service._prepare_output_data(fake_data)
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))
self.assertNotEqual(threading.current_thread(),
thread_dict['compress'])
@gcs_client
def test_prepare_output_data_no_compresssion(self):

View File

@ -22,6 +22,7 @@ import hashlib
import os
import shutil
import tempfile
import threading
import zlib
import mock
@ -630,12 +631,25 @@ class BackupNFSSwiftBasedTestCase(test.TestCase):
return fake_data
def test_prepare_output_data_effective_compression(self):
"""Test compression works on a native thread."""
# Use dictionary to share data between threads
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
service = nfs.NFSBackupDriver(self.ctxt)
fake_data = self.create_buffer(128)
result = service._prepare_output_data(fake_data)
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))
self.assertNotEqual(threading.current_thread(),
thread_dict['compress'])
def test_prepare_output_data_no_compresssion(self):
self.flags(backup_compression_algorithm='none')

View File

@ -24,6 +24,7 @@ import hashlib
import os
import shutil
import tempfile
import threading
import zlib
import mock
@ -798,14 +799,27 @@ class BackupSwiftTestCase(test.TestCase):
self.assertRaises(ValueError, service._get_compressor, 'fake')
def test_prepare_output_data_effective_compression(self):
"""Test compression works on a native thread."""
# Use dictionary to share data between threads
thread_dict = {}
original_compress = zlib.compress
def my_compress(data, *args, **kwargs):
thread_dict['compress'] = threading.current_thread()
return original_compress(data)
service = swift_dr.SwiftBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
result = service._prepare_output_data(fake_data)
with mock.patch.object(service.compressor, 'compress',
side_effect=my_compress):
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertGreater(len(fake_data), len(result[1]))
self.assertNotEqual(threading.current_thread(),
thread_dict['compress'])
def test_prepare_output_data_no_compresssion(self):
self.flags(backup_compression_algorithm='none')