Make migration's volume source deletion async
Currently source volume deletion on a volume migration (and therefore on retype with migration as well) is synchronous and that has 2 problems: 1. If the deletion takes more than rpc_response_timeout then when performed on in-use volumes it will fail. 2. If the deletion fails then the volume will be left in the back-end and Cinder will no longer have a reference of that volume in the DB. This patch makes source volume deletion asynchronous resolving both issues. Closes-Bug: #1483155 Closes-Bug: #1483157 Change-Id: I55c3d86660f90044a56b8609bb5774d4f5481227
This commit is contained in:
parent
122c3da26b
commit
cdf5e92f10
|
@ -1139,10 +1139,27 @@ def volume_data_get_for_project(context, project_id, volume_type_id=None):
|
|||
|
||||
@require_admin_context
|
||||
def finish_volume_migration(context, src_vol_id, dest_vol_id):
|
||||
"""Copy almost all columns from dest to source."""
|
||||
"""Swap almost all columns between dest and source.
|
||||
|
||||
We swap fields between source and destination at the end of migration
|
||||
because we want to keep the original volume id in the DB but now pointing
|
||||
to the migrated volume.
|
||||
|
||||
Original volume will be deleted, after this method original volume will be
|
||||
pointed by dest_vol_id, so we set its status and migrating_status to
|
||||
'deleting'. We change status here to keep it in sync with migration_status
|
||||
which must be changed here.
|
||||
|
||||
param src_vol_id:: ID of the migration original volume
|
||||
param dest_vol_id: ID of the migration destination volume
|
||||
returns: Tuple with new source and destination ORM objects. Source will be
|
||||
the migrated volume and destination will be original volume that
|
||||
will be deleted.
|
||||
"""
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
src_volume_ref = _volume_get(context, src_vol_id, session=session)
|
||||
src_original_data = dict(src_volume_ref.iteritems())
|
||||
dest_volume_ref = _volume_get(context, dest_vol_id, session=session)
|
||||
|
||||
# NOTE(rpodolyaka): we should copy only column values, while model
|
||||
|
@ -1152,15 +1169,30 @@ def finish_volume_migration(context, src_vol_id, dest_vol_id):
|
|||
return attr in inst.__class__.__table__.columns
|
||||
|
||||
for key, value in dest_volume_ref.iteritems():
|
||||
value_to_dst = src_original_data.get(key)
|
||||
# The implementation of update_migrated_volume will decide the
|
||||
# values for _name_id and provider_location.
|
||||
if (key in ('id', '_name_id', 'provider_location')
|
||||
if (key in ('id', 'provider_location')
|
||||
or not is_column(dest_volume_ref, key)):
|
||||
continue
|
||||
|
||||
# Destination must have a _name_id since the id no longer matches
|
||||
# the volume. If it doesn't have a _name_id we set one.
|
||||
elif key == '_name_id':
|
||||
if not dest_volume_ref._name_id:
|
||||
setattr(dest_volume_ref, key, src_volume_ref.id)
|
||||
continue
|
||||
elif key == 'migration_status':
|
||||
value = None
|
||||
value_to_dst = 'deleting'
|
||||
elif key == 'display_description':
|
||||
value_to_dst = 'migration src for ' + src_volume_ref.id
|
||||
elif key == 'status':
|
||||
value_to_dst = 'deleting'
|
||||
|
||||
setattr(src_volume_ref, key, value)
|
||||
setattr(dest_volume_ref, key, value_to_dst)
|
||||
return src_volume_ref, dest_volume_ref
|
||||
|
||||
|
||||
@require_admin_context
|
||||
|
@ -1174,7 +1206,8 @@ def volume_destroy(context, volume_id):
|
|||
update({'status': 'deleted',
|
||||
'deleted': True,
|
||||
'deleted_at': now,
|
||||
'updated_at': literal_column('updated_at')})
|
||||
'updated_at': literal_column('updated_at'),
|
||||
'migration_status': None})
|
||||
model_query(context, models.IscsiTarget, session=session).\
|
||||
filter_by(volume_id=volume_id).\
|
||||
update({'volume_id': None})
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import objects
|
||||
from cinder import test
|
||||
from cinder.tests.unit import utils as testutils
|
||||
|
||||
|
@ -34,10 +35,18 @@ class FinishVolumeMigrationTestCase(test.TestCase):
|
|||
dest_volume = testutils.create_volume(ctxt, host='dest',
|
||||
migration_status='target:fake',
|
||||
status='available')
|
||||
db.finish_volume_migration(ctxt, src_volume['id'],
|
||||
dest_volume['id'])
|
||||
db.finish_volume_migration(ctxt, src_volume['id'], dest_volume['id'])
|
||||
|
||||
src_volume = db.volume_get(ctxt, src_volume['id'])
|
||||
self.assertEqual('dest', src_volume['host'])
|
||||
self.assertEqual('available', src_volume['status'])
|
||||
self.assertIsNone(src_volume['migration_status'])
|
||||
# Check that we have copied destination volume DB data into source DB
|
||||
# entry so we can keep the id
|
||||
src_volume = objects.Volume.get_by_id(ctxt, src_volume['id'])
|
||||
self.assertEqual('dest', src_volume.host)
|
||||
self.assertEqual('available', src_volume.status)
|
||||
self.assertIsNone(src_volume.migration_status)
|
||||
|
||||
# Check that we have copied source volume DB data into destination DB
|
||||
# entry and we are setting it to deleting
|
||||
dest_volume = objects.Volume.get_by_id(ctxt, dest_volume['id'])
|
||||
self.assertEqual('src', dest_volume.host)
|
||||
self.assertEqual('deleting', dest_volume.status)
|
||||
self.assertEqual('deleting', dest_volume.migration_status)
|
||||
|
|
|
@ -3968,7 +3968,8 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
def test_update_migrated_volume(self, volume_update):
|
||||
fake_host = 'fake_host'
|
||||
fake_new_host = 'fake_new_host'
|
||||
fake_update = {'_name_id': None, 'provider_location': None}
|
||||
fake_update = {'_name_id': 'updated_id',
|
||||
'provider_location': 'updated_location'}
|
||||
fake_elevated = 'fake_elevated'
|
||||
volume = tests_utils.create_volume(self.context, size=1,
|
||||
status='available',
|
||||
|
@ -3981,6 +3982,8 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
fake_update_error = {'_name_id': new_volume['_name_id'],
|
||||
'provider_location':
|
||||
new_volume['provider_location']}
|
||||
expected_update = {'_name_id': volume['_name_id'],
|
||||
'provider_location': volume['provider_location']}
|
||||
with mock.patch.object(self.volume.driver,
|
||||
'update_migrated_volume') as \
|
||||
migrate_update,\
|
||||
|
@ -3989,9 +3992,9 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
elevated.return_value = fake_elevated
|
||||
self.volume.update_migrated_volume(self.context, volume,
|
||||
new_volume, 'available')
|
||||
volume_update.assert_called_once_with(fake_elevated,
|
||||
volume['id'],
|
||||
fake_update)
|
||||
volume_update.assert_has_calls((
|
||||
mock.call(fake_elevated, volume['id'], fake_update),
|
||||
mock.call(fake_elevated, new_volume['id'], expected_update)))
|
||||
|
||||
# Test the case for update_migrated_volume not implemented
|
||||
# for the driver.
|
||||
|
@ -4000,9 +4003,9 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
migrate_update.side_effect = NotImplementedError
|
||||
self.volume.update_migrated_volume(self.context, volume,
|
||||
new_volume, 'available')
|
||||
volume_update.assert_called_once_with(fake_elevated,
|
||||
volume['id'],
|
||||
fake_update_error)
|
||||
volume_update.assert_has_calls((
|
||||
mock.call(fake_elevated, volume['id'], fake_update_error),
|
||||
mock.call(fake_elevated, new_volume['id'], expected_update)))
|
||||
|
||||
def test_list_availability_zones_enabled_service(self):
|
||||
services = [
|
||||
|
@ -4137,7 +4140,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
@mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume')
|
||||
@mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')
|
||||
def test_migrate_volume_for_volume_generic(self, create_volume,
|
||||
delete_volume,
|
||||
rpc_delete_volume,
|
||||
update_migrated_volume):
|
||||
fake_volume = tests_utils.create_volume(self.context, size=1,
|
||||
host=CONF.host)
|
||||
|
@ -4150,7 +4153,9 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume') as \
|
||||
mock_migrate_volume,\
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data'):
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data'), \
|
||||
mock.patch.object(self.volume.driver, 'delete_volume') as \
|
||||
delete_volume:
|
||||
create_volume.side_effect = fake_create_volume
|
||||
self.volume.migrate_volume(self.context, fake_volume['id'],
|
||||
host_obj, True)
|
||||
|
@ -4160,6 +4165,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
self.assertIsNone(volume['migration_status'])
|
||||
self.assertFalse(mock_migrate_volume.called)
|
||||
self.assertFalse(delete_volume.called)
|
||||
self.assertTrue(rpc_delete_volume.called)
|
||||
self.assertTrue(update_migrated_volume.called)
|
||||
|
||||
def test_migrate_volume_generic_copy_error(self):
|
||||
|
@ -4390,12 +4396,14 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
self.assertEqual('in-use', vol['status'])
|
||||
attachment_id = vol['volume_attachment'][0]['id']
|
||||
target_status = 'target:%s' % old_volume['id']
|
||||
new_host = CONF.host + 'new'
|
||||
new_volume = tests_utils.create_volume(self.context, size=0,
|
||||
host=CONF.host,
|
||||
host=new_host,
|
||||
migration_status=target_status)
|
||||
with mock.patch.object(self.volume, 'detach_volume') as \
|
||||
mock_detach_volume,\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume') as \
|
||||
mock_delete_volume, \
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'attach_volume') as \
|
||||
mock_attach_volume,\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI,
|
||||
|
@ -4404,6 +4412,8 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
mock_attach_volume.side_effect = fake_attach_volume
|
||||
self.volume.migrate_volume_completion(self.context, old_volume[
|
||||
'id'], new_volume['id'])
|
||||
after_new_volume = db.volume_get(self.context, new_volume.id)
|
||||
after_old_volume = db.volume_get(self.context, old_volume.id)
|
||||
if status == 'in-use':
|
||||
mock_detach_volume.assert_called_with(self.context,
|
||||
old_volume['id'],
|
||||
|
@ -4415,6 +4425,9 @@ class VolumeTestCase(BaseVolumeTestCase):
|
|||
self.assertEqual(instance_uuid, attachment['instance_uuid'])
|
||||
else:
|
||||
self.assertFalse(mock_detach_volume.called)
|
||||
self.assertTrue(mock_delete_volume.called)
|
||||
self.assertEqual(old_volume.host, after_new_volume.host)
|
||||
self.assertEqual(new_volume.host, after_old_volume.host)
|
||||
|
||||
def test_migrate_volume_completion_retype_available(self):
|
||||
self._test_migrate_volume_completion('available', retyping=True)
|
||||
|
|
|
@ -355,7 +355,7 @@ class API(base.Base):
|
|||
'vol_status': volume['status']})
|
||||
raise exception.InvalidVolume(reason=msg)
|
||||
|
||||
if volume['migration_status'] is not None:
|
||||
if volume['migration_status'] not in (None, 'deleting'):
|
||||
# Volume is migrating, wait until done
|
||||
LOG.info(_LI('Unable to delete volume: %s, '
|
||||
'volume is currently migrating.'), volume['id'])
|
||||
|
|
|
@ -589,9 +589,10 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
LOG.exception(_LE("Failed to update usages deleting volume."),
|
||||
resource=volume_ref)
|
||||
|
||||
# If deleting the source volume in a migration, we should skip database
|
||||
# update here. In other cases, continue to update database entries.
|
||||
if not is_migrating or is_migrating_dest:
|
||||
# If deleting the destination volume in a migration, we should skip
|
||||
# database update here. In other cases, continue to update database
|
||||
# entries.
|
||||
if not is_migrating_dest:
|
||||
|
||||
# Delete glance metadata if it exists
|
||||
self.db.volume_glance_metadata_delete_by_volume(context, volume_id)
|
||||
|
@ -1373,15 +1374,14 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
self.db.volume_update(ctxt, volume_id,
|
||||
{'migration_status': 'completing'})
|
||||
|
||||
# Delete the source volume (if it fails, don't fail the migration)
|
||||
# Detach the source volume (if it fails, don't fail the migration)
|
||||
try:
|
||||
if orig_volume_status == 'in-use':
|
||||
attachments = volume['volume_attachment']
|
||||
for attachment in attachments:
|
||||
self.detach_volume(ctxt, volume_id, attachment['id'])
|
||||
self.delete_volume(ctxt, volume_id)
|
||||
except Exception as ex:
|
||||
LOG.error(_LE("Delete migration source volume failed: %(err)s"),
|
||||
LOG.error(_LE("Detach migration source volume failed: %(err)s"),
|
||||
{'err': ex}, resource=volume)
|
||||
|
||||
# Give driver (new_volume) a chance to update things as needed
|
||||
|
@ -1390,8 +1390,11 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
# the current host and driver object is for the "existing" volume.
|
||||
rpcapi.update_migrated_volume(ctxt, volume, new_volume,
|
||||
orig_volume_status)
|
||||
self.db.finish_volume_migration(ctxt, volume_id, new_volume_id)
|
||||
self.db.volume_destroy(ctxt, new_volume_id)
|
||||
|
||||
# Swap src and dest DB records so we can continue using the src id and
|
||||
# asynchronously delete the destination id
|
||||
__, updated_new = self.db.finish_volume_migration(
|
||||
ctxt, volume_id, new_volume_id)
|
||||
if orig_volume_status == 'in-use':
|
||||
updates = {'migration_status': 'completing',
|
||||
'status': orig_volume_status}
|
||||
|
@ -1407,6 +1410,16 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
attachment['attached_host'],
|
||||
attachment['mountpoint'],
|
||||
'rw')
|
||||
|
||||
# Asynchronous deletion of the source volume in the back-end (now
|
||||
# pointed by the target volume id)
|
||||
try:
|
||||
rpcapi.delete_volume(ctxt, updated_new)
|
||||
except Exception as ex:
|
||||
LOG.error(_LE('Failed to request async delete of migration source '
|
||||
'vol %(vol)s: %(err)s'),
|
||||
{'vol': volume_id, 'err': ex})
|
||||
|
||||
LOG.info(_LI("Complete-Migrate volume completed successfully."),
|
||||
resource=volume)
|
||||
return volume['id']
|
||||
|
@ -2683,3 +2696,9 @@ class VolumeManager(manager.SchedulerDependentManager):
|
|||
if model_update:
|
||||
self.db.volume_update(ctxt.elevated(), volume['id'],
|
||||
model_update)
|
||||
# Swap keys that were changed in the source so we keep their values
|
||||
# in the temporary volume's DB record.
|
||||
model_update_new = {key: volume[key]
|
||||
for key in model_update.iterkeys()}
|
||||
self.db.volume_update(ctxt.elevated(), new_volume['id'],
|
||||
model_update_new)
|
||||
|
|
Loading…
Reference in New Issue