Merge "Fix backup/restore error for ceph rbd backend"

This commit is contained in:
Zuul 2018-07-06 18:47:21 +00:00 committed by Gerrit Code Review
commit 8f7bc14005
2 changed files with 149 additions and 71 deletions

View File

@ -112,7 +112,8 @@ class VolumeMetadataBackup(object):
@property
def exists(self):
meta_obj = rados.Object(self._client.ioctx, self.name)
meta_obj = eventlet.tpool.Proxy(rados.Object(self._client.ioctx,
self.name))
return self._exists(meta_obj)
def _exists(self, obj):
@ -129,7 +130,8 @@ class VolumeMetadataBackup(object):
This should only be called once per backup. Raises
VolumeMetadataBackupExists if the object already exists.
"""
meta_obj = rados.Object(self._client.ioctx, self.name)
meta_obj = eventlet.tpool.Proxy(rados.Object(self._client.ioctx,
self.name))
if self._exists(meta_obj):
msg = _("Metadata backup object '%s' already exists") % self.name
raise exception.VolumeMetadataBackupExists(msg)
@ -141,7 +143,8 @@ class VolumeMetadataBackup(object):
Returns None if the object does not exist.
"""
meta_obj = rados.Object(self._client.ioctx, self.name)
meta_obj = eventlet.tpool.Proxy(rados.Object(self._client.ioctx,
self.name))
if not self._exists(meta_obj):
LOG.debug("Metadata backup object %s does not exist", self.name)
return None
@ -149,7 +152,8 @@ class VolumeMetadataBackup(object):
return meta_obj.read()
def remove_if_exists(self):
meta_obj = rados.Object(self._client.ioctx, self.name)
meta_obj = eventlet.tpool.Proxy(rados.Object(self._client.ioctx,
self.name))
try:
meta_obj.remove()
except rados.ObjectNotFound:
@ -287,8 +291,9 @@ class CephBackupDriver(driver.BackupDriver):
def _connect_to_rados(self, pool=None):
"""Establish connection to the backup Ceph cluster."""
client = self.rados.Rados(rados_id=self._ceph_backup_user,
conffile=self._ceph_backup_conf)
client = eventlet.tpool.Proxy(self.rados.Rados(
rados_id=self._ceph_backup_user,
conffile=self._ceph_backup_conf))
try:
client.connect()
pool_to_open = utils.convert_str(pool or self._ceph_backup_pool)
@ -332,7 +337,7 @@ class CephBackupDriver(driver.BackupDriver):
LOG.debug("Discarding %(length)s bytes from offset %(offset)s",
{'length': length, 'offset': offset})
if self._file_is_rbd(volume):
volume.rbd_image.discard(offset, length)
eventlet.tpool.Proxy(volume.rbd_image).discard(offset, length)
else:
zeroes = '\0' * self.chunk_size
chunks = int(length / self.chunk_size)
@ -340,8 +345,6 @@ class CephBackupDriver(driver.BackupDriver):
LOG.debug("Writing zeroes chunk %d", chunk)
volume.write(zeroes)
volume.flush()
# yield to any other pending backups
eventlet.sleep(0)
rem = int(length % self.chunk_size)
if rem:
@ -380,9 +383,6 @@ class CephBackupDriver(driver.BackupDriver):
'chunks': chunks,
'rate': rate})
# yield to any other pending backups
eventlet.sleep(0)
rem = int(length % self.chunk_size)
if rem:
LOG.debug("Transferring remaining %s bytes", rem)
@ -393,8 +393,6 @@ class CephBackupDriver(driver.BackupDriver):
else:
dest.write(data)
dest.flush()
# yield to any other pending backups
eventlet.sleep(0)
def _create_base_image(self, name, size, rados_client):
"""Create a base backup image.
@ -403,13 +401,14 @@ class CephBackupDriver(driver.BackupDriver):
"""
LOG.debug("Creating base image '%s'", name)
old_format, features = self._get_rbd_support()
self.rbd.RBD().create(ioctx=rados_client.ioctx,
name=name,
size=size,
old_format=old_format,
features=features,
stripe_unit=self.rbd_stripe_unit,
stripe_count=self.rbd_stripe_count)
eventlet.tpool.Proxy(self.rbd.RBD()).create(
ioctx=rados_client.ioctx,
name=name,
size=size,
old_format=old_format,
features=features,
stripe_unit=self.rbd_stripe_unit,
stripe_count=self.rbd_stripe_count)
def _delete_backup_snapshot(self, rados_client, base_name, backup_id):
"""Delete snapshot associated with this backup if one exists.
@ -423,7 +422,8 @@ class CephBackupDriver(driver.BackupDriver):
Returns tuple(deleted_snap_name, num_of_remaining_snaps).
"""
remaining_snaps = 0
base_rbd = self.rbd.Image(rados_client.ioctx, base_name)
base_rbd = eventlet.tpool.Proxy(self.rbd.Image(rados_client.ioctx,
base_name))
try:
snap_name = self._get_backup_snap_name(base_rbd, base_name,
backup_id)
@ -472,7 +472,8 @@ class CephBackupDriver(driver.BackupDriver):
"backup base image of volume %(volume)s.",
{'basename': base_name, 'volume': volume_id})
with rbd_driver.RADOSClient(self, backup.container) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self,
backup.container)) as client:
rbd_exists, base_name = \
self._rbd_image_exists(base_name, volume_id, client,
try_diff_format=try_diff_format)
@ -497,7 +498,8 @@ class CephBackupDriver(driver.BackupDriver):
{'basename': base_name, 'volume': volume_id})
# Delete base if no more snapshots
try:
self.rbd.RBD().remove(client.ioctx, base_name)
eventlet.tpool.Proxy(self.rbd.RBD()).remove(
client.ioctx, base_name)
except self.rbd.ImageBusy:
# Allow a retry if the image is busy
if retries > 0:
@ -507,7 +509,6 @@ class CephBackupDriver(driver.BackupDriver):
{'retries': retries,
'delay': delay,
'volume': volume_id})
eventlet.sleep(delay)
else:
LOG.error("Max retries reached deleting backup "
"%(basename)s image of volume %(volume)s.",
@ -525,11 +526,13 @@ class CephBackupDriver(driver.BackupDriver):
# Since we have deleted the base image we can delete the source
# volume backup snapshot.
src_name = utils.convert_str(volume_id)
if src_name in self.rbd.RBD().list(client.ioctx):
if src_name in eventlet.tpool.Proxy(
self.rbd.RBD()).list(client.ioctx):
LOG.debug("Deleting source volume snapshot '%(snapshot)s' "
"for backup %(basename)s.",
{'snapshot': snap, 'basename': base_name})
src_rbd = self.rbd.Image(client.ioctx, src_name)
src_rbd = eventlet.tpool.Proxy(self.rbd.Image(client.ioctx,
src_name))
try:
src_rbd.remove_snap(snap)
finally:
@ -609,7 +612,7 @@ class CephBackupDriver(driver.BackupDriver):
def _rbd_image_exists(self, name, volume_id, client,
try_diff_format=False):
"""Return tuple (exists, name)."""
rbds = self.rbd.RBD().list(client.ioctx)
rbds = eventlet.tpool.Proxy(self.rbd.RBD()).list(client.ioctx)
if name not in rbds:
LOG.debug("Image '%s' not found - trying diff format name", name)
if try_diff_format:
@ -624,7 +627,8 @@ class CephBackupDriver(driver.BackupDriver):
def _snap_exists(self, base_name, snap_name, client):
"""Return True if snapshot exists in base image."""
base_rbd = self.rbd.Image(client.ioctx, base_name, read_only=True)
base_rbd = eventlet.tpool.Proxy(self.rbd.Image(client.ioctx,
base_name, read_only=True))
try:
snaps = base_rbd.list_snaps()
finally:
@ -644,19 +648,21 @@ class CephBackupDriver(driver.BackupDriver):
rbd_user = volume_file.rbd_user
rbd_pool = volume_file.rbd_pool
rbd_conf = volume_file.rbd_conf
source_rbd_image = volume_file.rbd_image
source_rbd_image = eventlet.tpool.Proxy(volume_file.rbd_image)
volume_id = backup.volume_id
updates = {}
base_name = self._get_backup_base_name(volume_id, diff_format=True)
image_created = False
with rbd_driver.RADOSClient(self, backup.container) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self,
backup.container)) as client:
# If from_snap does not exist at the destination (and the
# destination exists), this implies a previous backup has failed.
# In this case we will force a full backup.
#
# TODO(dosaboy): find a way to repair the broken backup
#
if base_name not in self.rbd.RBD().list(ioctx=client.ioctx):
if base_name not in eventlet.tpool.Proxy(self.rbd.RBD()).list(
ioctx=client.ioctx):
src_vol_snapshots = self.get_backup_snaps(source_rbd_image)
if src_vol_snapshots:
# If there are source volume snapshots but base does not
@ -747,30 +753,34 @@ class CephBackupDriver(driver.BackupDriver):
volume_id = backup.volume_id
backup_name = self._get_backup_base_name(volume_id, backup.id)
with rbd_driver.RADOSClient(self, backup.container) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self,
backup.container)) as client:
# First create base backup image
old_format, features = self._get_rbd_support()
LOG.debug("Creating backup base image='%(name)s' for volume "
"%(volume)s.",
{'name': backup_name, 'volume': volume_id})
self.rbd.RBD().create(ioctx=client.ioctx,
name=backup_name,
size=length,
old_format=old_format,
features=features,
stripe_unit=self.rbd_stripe_unit,
stripe_count=self.rbd_stripe_count)
eventlet.tpool.Proxy(self.rbd.RBD()).create(
ioctx=client.ioctx,
name=backup_name,
size=length,
old_format=old_format,
features=features,
stripe_unit=self.rbd_stripe_unit,
stripe_count=self.rbd_stripe_count)
LOG.debug("Copying data from volume %s.", volume_id)
dest_rbd = self.rbd.Image(client.ioctx, backup_name)
dest_rbd = eventlet.tpool.Proxy(self.rbd.Image(client.ioctx,
backup_name))
try:
rbd_meta = linuxrbd.RBDImageMetadata(dest_rbd,
backup.container,
self._ceph_backup_user,
self._ceph_backup_conf)
rbd_fd = linuxrbd.RBDVolumeIOWrapper(rbd_meta)
self._transfer_data(src_volume, src_name, rbd_fd, backup_name,
length)
self._transfer_data(src_volume, src_name,
eventlet.tpool.Proxy(rbd_fd),
backup_name, length)
finally:
dest_rbd.close()
@ -887,7 +897,8 @@ class CephBackupDriver(driver.BackupDriver):
LOG.debug("Backing up metadata for volume %s.", backup.volume_id)
try:
with rbd_driver.RADOSClient(self, backup.container) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self,
backup.container)) as client:
vol_meta_backup = VolumeMetadataBackup(client, backup.id)
vol_meta_backup.set(json_meta)
except exception.VolumeMetadataBackupExists as e:
@ -959,7 +970,8 @@ class CephBackupDriver(driver.BackupDriver):
This will result in all extents being copied from source to
destination.
"""
with rbd_driver.RADOSClient(self, backup.container) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self,
backup.container)) as client:
# If a source snapshot is provided we assume the base is diff
# format.
if src_snap:
@ -972,16 +984,18 @@ class CephBackupDriver(driver.BackupDriver):
diff_format=diff_format)
# Retrieve backup volume
src_rbd = self.rbd.Image(client.ioctx, backup_name,
snapshot=src_snap, read_only=True)
src_rbd = eventlet.tpool.Proxy(self.rbd.Image(client.ioctx,
backup_name,
snapshot=src_snap,
read_only=True))
try:
rbd_meta = linuxrbd.RBDImageMetadata(src_rbd,
backup.container,
self._ceph_backup_user,
self._ceph_backup_conf)
rbd_fd = linuxrbd.RBDVolumeIOWrapper(rbd_meta)
self._transfer_data(rbd_fd, backup_name, dest_file, dest_name,
length)
self._transfer_data(eventlet.tpool.Proxy(rbd_fd), backup_name,
dest_file, dest_name, length)
finally:
src_rbd.close()
@ -996,11 +1010,12 @@ class CephBackupDriver(driver.BackupDriver):
backup_base = self._get_backup_base_name(backup.volume_id,
diff_format=True)
with rbd_driver.RADOSClient(self, backup.container) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self,
backup.container)) as client:
adjust_size = 0
base_image = self.rbd.Image(client.ioctx,
utils.convert_str(backup_base),
read_only=True)
base_image = eventlet.tpool.Proxy(self.rbd.Image(client.ioctx,
utils.convert_str(backup_base),
read_only=True))
try:
if restore_length != base_image.size():
adjust_size = restore_length
@ -1008,9 +1023,11 @@ class CephBackupDriver(driver.BackupDriver):
base_image.close()
if adjust_size:
with rbd_driver.RADOSClient(self, src_pool) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self,
src_pool)) as client:
restore_vol_encode = utils.convert_str(restore_vol)
dest_image = self.rbd.Image(client.ioctx, restore_vol_encode)
dest_image = eventlet.tpool.Proxy(self.rbd.Image(client.ioctx,
restore_vol_encode))
try:
LOG.debug("Adjusting restore vol size")
dest_image.resize(adjust_size)
@ -1058,8 +1075,10 @@ class CephBackupDriver(driver.BackupDriver):
base has no snapshots/restore points), None is returned. Otherwise, the
restore point associated with backup_id is returned.
"""
with rbd_driver.RADOSClient(self, self._ceph_backup_pool) as client:
base_rbd = self.rbd.Image(client.ioctx, base_name, read_only=True)
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self,
self._ceph_backup_pool)) as client:
base_rbd = eventlet.tpool.Proxy(self.rbd.Image(client.ioctx,
base_name, read_only=True))
try:
restore_point = self._get_backup_snap_name(base_rbd, base_name,
backup_id)
@ -1157,7 +1176,8 @@ class CephBackupDriver(driver.BackupDriver):
base_name = self._get_backup_base_name(backup.volume_id,
diff_format=True)
with rbd_driver.RADOSClient(self, backup.container) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(
self, backup.container)) as client:
diff_allowed, restore_point = \
self._diff_restore_allowed(base_name, backup, volume,
volume_file, client)
@ -1187,7 +1207,7 @@ class CephBackupDriver(driver.BackupDriver):
otherwise do nothing.
"""
try:
with rbd_driver.RADOSClient(self) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(self)) as client:
meta_bak = VolumeMetadataBackup(client, backup.id)
meta = meta_bak.get()
if meta is not None:
@ -1252,7 +1272,8 @@ class CephBackupDriver(driver.BackupDriver):
has_pool = False
if has_pool:
with rbd_driver.RADOSClient(self, backup.container) as client:
with eventlet.tpool.Proxy(rbd_driver.RADOSClient(
self, backup.container)) as client:
VolumeMetadataBackup(client, backup.id).remove_if_exists()
if delete_failed:

View File

@ -17,6 +17,7 @@
import hashlib
import os
import tempfile
import threading
import uuid
import ddt
@ -402,9 +403,11 @@ class BackupCephTestCase(test.TestCase):
@common_mocks
def test_backup_volume_from_file(self):
checksum = hashlib.sha256()
thread_dict = {}
def mock_write_data(data, offset):
checksum.update(data)
thread_dict['thread'] = threading.current_thread()
test_file.write(data)
self.service.rbd.Image.return_value.write.side_effect = mock_write_data
@ -418,6 +421,7 @@ class BackupCephTestCase(test.TestCase):
self.assertEqual(checksum.digest(), self.checksum.digest())
self.assertTrue(self.service.rbd.Image.return_value.write.called)
self.assertNotEqual(threading.current_thread(), thread_dict['thread'])
@common_mocks
def test_get_backup_base_name(self):
@ -843,7 +847,10 @@ class BackupCephTestCase(test.TestCase):
self.mock_rbd.RBD.return_value.list.return_value = [backup_name]
thread_dict = {}
def mock_read_data(offset, length):
thread_dict['thread'] = threading.current_thread()
return self.volume_file.read(self.data_length)
self.mock_rbd.Image.return_value.read.side_effect = mock_read_data
@ -874,14 +881,22 @@ class BackupCephTestCase(test.TestCase):
self.assertTrue(mock_discard_bytes.called)
self.assertTrue(self.service.rbd.Image.return_value.read.called)
self.assertNotEqual(threading.current_thread(), thread_dict['thread'])
@common_mocks
def test_discard_bytes(self):
# Lower the chunksize to a memory manageable number
thread_dict = {}
self.service.chunk_size = 1024
image = self.mock_rbd.Image.return_value
wrapped_rbd = self._get_wrapped_rbd_io(image)
def mock_discard(offset, length):
thread_dict['thread'] = threading.current_thread()
return self.mock_rbd.Image.discard(offset, length)
self.mock_rbd.Image.return_value.discard.side_effect = mock_discard
self.service._discard_bytes(wrapped_rbd, 0, 0)
self.assertEqual(0, image.discard.call_count)
@ -903,6 +918,8 @@ class BackupCephTestCase(test.TestCase):
zeroes = '\0' * self.service.chunk_size
image.write.assert_has_calls([mock.call(zeroes, 0),
mock.call(zeroes, self.chunk_size)])
self.assertNotEqual(threading.current_thread(),
thread_dict['thread'])
image.reset_mock()
image.write.reset_mock()
@ -931,6 +948,13 @@ class BackupCephTestCase(test.TestCase):
base_name = self.service._get_backup_base_name(self.volume_id,
diff_format=True)
self.mock_rbd.RBD.remove_snap = mock.Mock()
thread_dict = {}
def mock_side_effect(snap):
thread_dict['thread'] = threading.current_thread()
self.mock_rbd.Image.return_value.remove_snap.side_effect = \
mock_side_effect
with mock.patch.object(self.service, '_get_backup_snap_name') as \
mock_get_backup_snap_name:
@ -945,6 +969,8 @@ class BackupCephTestCase(test.TestCase):
self.assertTrue(mock_get_backup_snap_name.called)
self.assertTrue(mock_get_backup_snaps.called)
self.assertEqual((snap_name, 0), rem)
self.assertNotEqual(threading.current_thread(),
thread_dict['thread'])
@common_mocks
@mock.patch('cinder.backup.drivers.ceph.VolumeMetadataBackup', spec=True)
@ -970,12 +996,18 @@ class BackupCephTestCase(test.TestCase):
def test_try_delete_base_image(self, mock_meta_backup):
backup_name = self.service._get_backup_base_name(self.volume_id,
self.backup_id)
thread_dict = {}
def mock_side_effect(ioctx, base_name):
thread_dict['thread'] = threading.current_thread()
self.mock_rbd.RBD.return_value.list.return_value = [backup_name]
self.mock_rbd.RBD.return_value.remove.side_effect = mock_side_effect
with mock.patch.object(self.service, 'get_backup_snaps'):
self.service.delete_backup(self.backup)
self.assertTrue(self.mock_rbd.RBD.return_value.remove.called)
self.assertNotEqual(threading.current_thread(),
thread_dict['thread'])
@common_mocks
def test_try_delete_base_image_busy(self):
@ -998,13 +1030,6 @@ class BackupCephTestCase(test.TestCase):
self.assertTrue(rbd.remove.called)
self.assertIn(MockImageBusyException, RAISED_EXCEPTIONS)
@common_mocks
@mock.patch('cinder.backup.drivers.ceph.VolumeMetadataBackup', spec=True)
def test_delete(self, mock_meta_backup):
with mock.patch.object(self.service, '_try_delete_base_image'):
self.service.delete_backup(self.backup)
self.assertEqual([], RAISED_EXCEPTIONS)
@common_mocks
@mock.patch('cinder.backup.drivers.ceph.VolumeMetadataBackup', spec=True)
def test_delete_image_not_found(self, mock_meta_backup):
@ -1067,6 +1092,7 @@ class BackupCephTestCase(test.TestCase):
with mock.patch.object(self.service, '_rbd_image_exists') as \
mock_rbd_image_exists:
mock_rbd_image_exists.return_value = (True, backup_base)
with mock.patch.object(self.service, '_get_restore_point') as \
mock_get_restore_point:
mock_get_restore_point.return_value = None
@ -1343,11 +1369,21 @@ class BackupCephTestCase(test.TestCase):
@common_mocks
def test__snap_exists(self, snapshots, snap_exist):
client = mock.Mock()
thread_dict = {}
with mock.patch.object(self.service.rbd.Image(),
'list_snaps') as snaps:
snaps.return_value = snapshots
def mock_side_effect():
thread_dict['thread'] = threading.current_thread()
return snaps.return_value
snaps.side_effect = mock_side_effect
exist = self.service._snap_exists(None, 'fake', client)
self.assertEqual(snap_exist, exist)
self.assertNotEqual(thread_dict['thread'],
threading.current_thread())
def common_meta_backup_mocks(f):
@ -1384,10 +1420,17 @@ class VolumeMetadataBackupTestCase(test.TestCase):
@common_meta_backup_mocks
def test_exists(self):
thread_dict = {}
def mock_side_effect():
thread_dict['thread'] = threading.current_thread()
# True
self.mock_rados.Object.return_value.stat.side_effect = mock_side_effect
self.assertTrue(self.mb.exists)
self.assertTrue(self.mock_rados.Object.return_value.stat.called)
self.mock_rados.Object.return_value.reset_mock()
self.assertNotEqual(thread_dict['thread'], threading.current_thread())
# False
self.mock_rados.Object.return_value.stat.side_effect = (
@ -1400,6 +1443,7 @@ class VolumeMetadataBackupTestCase(test.TestCase):
def test_set(self):
obj_data = []
called = []
thread_dict = {}
def mock_read(*args):
called.append('read')
@ -1409,6 +1453,7 @@ class VolumeMetadataBackupTestCase(test.TestCase):
def _mock_write(data):
obj_data.append(data)
called.append('write')
thread_dict['thread'] = threading.current_thread()
self.mb.get = mock.Mock()
self.mb.get.side_effect = mock_read
@ -1424,14 +1469,19 @@ class VolumeMetadataBackupTestCase(test.TestCase):
self.mb._exists.return_value = True
# use the unmocked set() method.
self.assertRaises(exception.VolumeMetadataBackupExists, self.mb.set,
{'doo': 'dah'})
self.assertRaises(exception.VolumeMetadataBackupExists,
self.mb.set, {'doo': 'dah'})
# check the meta obj state has not changed.
self.assertEqual({'foo': 'bar'}, self.mb.get())
self.assertEqual(['write', 'read', 'read'], called)
self.mb._exists.return_value = False
self.mb.set({'doo': 'dah'})
self.assertNotEqual(thread_dict['thread'],
threading.current_thread)
@common_meta_backup_mocks
def test_get(self):
self.mock_rados.Object.return_value.stat.side_effect = (
@ -1443,12 +1493,19 @@ class VolumeMetadataBackupTestCase(test.TestCase):
@common_meta_backup_mocks
def remove_if_exists(self):
thread_dict = {}
def mock_side_effect():
thread_dict['thread'] = threading.current_thread()
with mock.patch.object(self.mock_rados.Object, 'remove') as \
mock_remove:
mock_remove.side_effect = self.mock_rados.ObjectNotFound
self.mb.remove_if_exists()
self.assertEqual([MockObjectNotFoundException], RAISED_EXCEPTIONS)
self.mock_rados.Object.remove.side_effect = None
self.mock_rados.Object.remove.side_effect = mock_side_effect
self.mb.remove_if_exists()
self.assertEqual([], RAISED_EXCEPTIONS)
self.assertNotEqual(thread_dict['thread'],
threading.current_thread)