Storwize/SVC: Change volume copy task to async

If Cinder crashes during a migration or retype (where data is moved and
the operation can take a long time), the storage ends up with multiple
copies of the same volume which requires storage admin intervention.

This patch maintain a list of pending operations which is backed up in
admin metadata, and a periodic task reviews the list and removes volume
copies whose copy operation completed. When Cinder comes up, check the
admin metadata and rebuild the list.

Change-Id: I6549712bb0083996faced89c2207a4c438ae953d
Closes-Bug: #1278035
This commit is contained in:
LarryLiu 2014-02-14 15:46:07 +08:00 committed by Li Min Liu
parent 182e071a11
commit 4cac1bd322
6 changed files with 213 additions and 112 deletions

View File

@ -25,9 +25,11 @@ import re
from cinder import context
from cinder import exception
from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import processutils
from cinder import test
from cinder.tests import utils as testutils
from cinder import units
from cinder import utils
from cinder.volume import configuration as conf
@ -38,17 +40,6 @@ from cinder.volume import volume_types
LOG = logging.getLogger(__name__)
class StorwizeSVCFakeDB:
def __init__(self):
self.volume = None
def volume_get(self, ctxt, vol_id):
return self.volume
def volume_set(self, vol):
self.volume = vol
class StorwizeSVCManagementSimulator:
def __init__(self, pool_name):
self._flags = {'storwize_svc_volpool_name': pool_name}
@ -1564,7 +1555,10 @@ class StorwizeSVCDriverTestCase(test.TestCase):
self._connector = utils.brick_get_connector_properties()
self._reset_flags()
self.driver.db = StorwizeSVCFakeDB()
self.ctxt = context.get_admin_context()
db_driver = self.driver.configuration.db_driver
self.db = importutils.import_module(db_driver)
self.driver.db = self.db
self.driver.do_setup(None)
self.driver.check_for_setup_error()
self.sleeppatch = mock.patch('eventlet.greenthread.sleep')
@ -1688,8 +1682,17 @@ class StorwizeSVCDriverTestCase(test.TestCase):
'volume_type_id': None,
'mdisk_grp_name': 'openstack'}
def _create_volume(self, **kwargs):
vol = testutils.create_volume(self.ctxt, **kwargs)
self.driver.create_volume(vol)
return vol
def _delete_volume(self, volume):
self.driver.delete_volume(volume)
self.db.volume_destroy(self.ctxt, volume['id'])
def _create_test_vol(self, opts):
ctxt = context.get_admin_context()
ctxt = testutils.get_test_admin_context()
type_ref = volume_types.create(ctxt, 'testtype', opts)
volume = self._generate_vol_info(None, None)
type_id = type_ref['id']
@ -1704,9 +1707,7 @@ class StorwizeSVCDriverTestCase(test.TestCase):
return attrs
def test_storwize_svc_snapshots(self):
vol1 = self._generate_vol_info(None, None)
self.driver.create_volume(vol1)
self.driver.db.volume_set(vol1)
vol1 = self._create_volume()
snap1 = self._generate_vol_info(vol1['name'], vol1['id'])
# Test timeout and volume cleanup
@ -1749,9 +1750,7 @@ class StorwizeSVCDriverTestCase(test.TestCase):
self.driver.delete_snapshot(snap1)
def test_storwize_svc_create_volfromsnap_clone(self):
vol1 = self._generate_vol_info(None, None)
self.driver.create_volume(vol1)
self.driver.db.volume_set(vol1)
vol1 = self._create_volume()
snap1 = self._generate_vol_info(vol1['name'], vol1['id'])
self.driver.create_snapshot(snap1)
vol2 = self._generate_vol_info(None, None)
@ -2193,9 +2192,7 @@ class StorwizeSVCDriverTestCase(test.TestCase):
def test_storwize_svc_delete_volume_snapshots(self):
# Create a volume with two snapshots
master = self._generate_vol_info(None, None)
self.driver.create_volume(master)
self.driver.db.volume_set(master)
master = self._create_volume()
# Fail creating a snapshot - will force delete the snapshot
if self.USESIM and False:
@ -2286,9 +2283,7 @@ class StorwizeSVCDriverTestCase(test.TestCase):
self.assertAlmostEqual(stats['free_capacity_gb'], 3287.5)
def test_storwize_svc_extend_volume(self):
volume = self._generate_vol_info(None, None)
self.driver.db.volume_set(volume)
self.driver.create_volume(volume)
volume = self._create_volume()
self.driver.extend_volume(volume, '13')
attrs = self.driver._helpers.get_vdisk_attributes(volume['name'])
vol_size = int(attrs['capacity']) / units.GiB
@ -2320,39 +2315,18 @@ class StorwizeSVCDriverTestCase(test.TestCase):
cap = {'location_info': 'StorwizeSVCDriver:foo:bar'}
self._check_loc_info(cap, {'moved': False, 'model_update': None})
def test_storwize_svc_migrate_same_extent_size(self):
def test_storwize_svc_volume_migrate(self):
# Make sure we don't call migrate_volume_vdiskcopy
with mock.patch.object(self.driver._helpers,
'migrate_volume_vdiskcopy') as migr_vdiskcopy:
migr_vdiskcopy.side_effect = KeyError
self.driver.do_setup(None)
loc = ('StorwizeSVCDriver:' + self.driver._state['system_id'] +
':openstack2')
cap = {'location_info': loc, 'extent_size': '256'}
host = {'host': 'foo', 'capabilities': cap}
ctxt = context.get_admin_context()
volume = self._generate_vol_info(None, None)
volume['volume_type_id'] = None
self.driver.create_volume(volume)
self.driver.migrate_volume(ctxt, volume, host)
self.driver.delete_volume(volume)
def test_storwize_svc_migrate_diff_extent_size(self):
self.driver.do_setup(None)
loc = ('StorwizeSVCDriver:' + self.driver._state['system_id'] +
':openstack3')
cap = {'location_info': loc, 'extent_size': '128'}
':openstack2')
cap = {'location_info': loc, 'extent_size': '256'}
host = {'host': 'foo', 'capabilities': cap}
ctxt = context.get_admin_context()
volume = self._generate_vol_info(None, None)
volume = self._create_volume()
volume['volume_type_id'] = None
self.driver.create_volume(volume)
self.assertNotEqual(cap['extent_size'],
self.driver._state['extent_size'])
self.driver.migrate_volume(ctxt, volume, host)
attrs = self.driver._helpers.get_vdisk_attributes(volume['name'])
self.assertIn('openstack3', attrs['mdisk_grp_name'])
self.driver.delete_volume(volume)
self._delete_volume(volume)
def test_storwize_svc_retype_no_copy(self):
self.driver.do_setup(None)
@ -2446,8 +2420,29 @@ class StorwizeSVCDriverTestCase(test.TestCase):
def test_set_storage_code_level_success(self):
res = self.driver._helpers.get_system_info()
self.assertEqual((7, 2, 0, 0), res['code_level'],
'Get code level error')
if self.USESIM:
self.assertEqual((7, 2, 0, 0), res['code_level'],
'Get code level error')
def test_storwize_vdisk_copy_ops(self):
ctxt = testutils.get_test_admin_context()
volume = self._create_volume()
driver = self.driver
dest_pool = self.driver.configuration.storwize_svc_volpool_name
new_ops = driver._helpers.add_vdisk_copy(volume['name'], dest_pool,
None, self.driver._state,
self.driver.configuration)
self.driver._add_vdisk_copy_op(ctxt, volume, new_ops)
admin_metadata = self.db.volume_admin_metadata_get(ctxt, volume['id'])
self.assertEqual(":".join(x for x in new_ops),
admin_metadata['vdiskcopyops'],
'Storwize driver add vdisk copy error.')
self.driver._check_volume_copy_ops()
self.driver._rm_vdisk_copy_op(ctxt, volume, new_ops[0], new_ops[1])
admin_metadata = self.db.volume_admin_metadata_get(ctxt, volume['id'])
self.assertEqual(None, admin_metadata.get('vdiskcopyops', None),
'Storwize driver delete vdisk copy error')
self._delete_volume(volume)
class CLIResponseTestCase(test.TestCase):

View File

@ -137,6 +137,7 @@ class VolumeDriver(object):
def __init__(self, execute=utils.execute, *args, **kwargs):
# NOTE(vish): db is set by Manager
self.db = kwargs.get('db')
self.host = kwargs.get('host')
self.configuration = kwargs.get('configuration', None)
if self.configuration:
self.configuration.append_config_values(volume_opts)

View File

@ -40,6 +40,7 @@ from cinder import context
from cinder import exception
from cinder.openstack.common import excutils
from cinder.openstack.common import log as logging
from cinder.openstack.common import loopingcall
from cinder import units
from cinder import utils
from cinder.volume.drivers.ibm.storwize_svc import helpers as storwize_helpers
@ -114,21 +115,24 @@ class StorwizeSVCDriver(san.SanDriver):
1.2.3 - Fix Fibre Channel connectivity: bug #1279758 (add delim to
lsfabric, clear unused data from connections, ensure matching
WWPNs by comparing lower case
1.2.4 - Fix bug #1278035 (async migration/retype)
"""
VERSION = "1.2.3"
VERSION = "1.2.4"
VDISKCOPYOPS_INTERVAL = 600
def __init__(self, *args, **kwargs):
super(StorwizeSVCDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(storwize_svc_opts)
self._helpers = storwize_helpers.StorwizeHelpers(self._run_ssh)
self._vdiskcopyops = {}
self._vdiskcopyops_loop = None
self._state = {'storage_nodes': {},
'enabled_protocols': set(),
'compression_enabled': False,
'available_iogrps': [],
'system_name': None,
'system_id': None,
'extent_size': None,
'code_level': None,
}
@ -142,11 +146,10 @@ class StorwizeSVCDriver(san.SanDriver):
# Validate that the pool exists
pool = self.configuration.storwize_svc_volpool_name
try:
attributes = self._helpers.get_pool_attrs(pool)
self._helpers.get_pool_attrs(pool)
except exception.VolumeBackendAPIException:
msg = _('Failed getting details for pool %s') % pool
raise exception.InvalidInput(reason=msg)
self._state['extent_size'] = attributes['extent_size']
# Check if compression is supported
self._state['compression_enabled'] = \
@ -184,6 +187,28 @@ class StorwizeSVCDriver(san.SanDriver):
msg = _('do_setup: No configured nodes.')
LOG.error(msg)
raise exception.VolumeDriverException(message=msg)
# Build the list of in-progress vdisk copy operations
if ctxt is None:
admin_context = context.get_admin_context()
else:
admin_context = ctxt.elevated()
volumes = self.db.volume_get_all_by_host(admin_context, self.host)
for volume in volumes:
metadata = self.db.volume_admin_metadata_get(admin_context,
volume['id'])
curr_ops = metadata.get('vdiskcopyops', None)
if curr_ops:
ops = [tuple(x.split(':')) for x in curr_ops.split(';')]
self._vdiskcopyops[volume['id']] = ops
# if vdiskcopy exists in database, start the looping call
if len(self._vdiskcopyops) >= 1:
self._vdiskcopyops_loop = loopingcall.LoopingCall(
self._check_volume_copy_ops)
self._vdiskcopyops_loop.start(interval=self.VDISKCOPYOPS_INTERVAL)
LOG.debug(_('leave: do_setup'))
def check_for_setup_error(self):
@ -197,9 +222,6 @@ class StorwizeSVCDriver(san.SanDriver):
if self._state['system_id'] is None:
exception_msg = (_('Unable to determine system id'))
raise exception.VolumeBackendAPIException(data=exception_msg)
if self._state['extent_size'] is None:
exception_msg = (_('Unable to determine pool extent size'))
raise exception.VolumeBackendAPIException(data=exception_msg)
required_flags = ['san_ip', 'san_ssh_port', 'san_login',
'storwize_svc_volpool_name']
@ -285,7 +307,6 @@ class StorwizeSVCDriver(san.SanDriver):
'conn': str(connector)})
vol_opts = self._get_vdisk_params(volume['volume_type_id'])
host_name = connector['host']
volume_name = volume['name']
# Delete irrelevant connection information that later could result
@ -453,7 +474,12 @@ class StorwizeSVCDriver(san.SanDriver):
def create_snapshot(self, snapshot):
ctxt = context.get_admin_context()
source_vol = self.db.volume_get(ctxt, snapshot['volume_id'])
try:
source_vol = self.db.volume_get(ctxt, snapshot['volume_id'])
except Exception:
msg = (_('create_snapshot: get source volume failed.'))
LOG.error(msg)
raise exception.VolumeDriverException(message=msg)
opts = self._get_vdisk_params(source_vol['volume_type_id'])
self._helpers.create_copy(snapshot['volume_name'], snapshot['name'],
snapshot['volume_id'], self.configuration,
@ -500,13 +526,112 @@ class StorwizeSVCDriver(san.SanDriver):
self._helpers.extend_vdisk(volume['name'], extend_amt)
LOG.debug(_('leave: extend_volume: volume %s') % volume['id'])
def _add_vdisk_copy_op(self, ctxt, volume, new_op):
metadata = self.db.volume_admin_metadata_get(ctxt.elevated(),
volume['id'])
curr_ops = metadata.get('vdiskcopyops', None)
if curr_ops:
curr_ops_list = [tuple(x.split(':')) for x in curr_ops.split(';')]
new_ops_list = curr_ops_list.append(new_op)
else:
new_ops_list = [new_op]
new_ops_str = ';'.join([':'.join(x) for x in new_ops_list])
self.db.volume_admin_metadata_update(ctxt.elevated(), volume['id'],
{'vdiskcopyops': new_ops_str},
False)
if volume['id'] in self._vdiskcopyops:
self._vdiskcopyops[volume['id']].append(new_op)
else:
self._vdiskcopyops[volume['id']] = [new_op]
# We added the first copy operation, so start the looping call
if len(self._vdiskcopyops) == 1:
self._vdiskcopyops_loop = loopingcall.LoopingCall(
self._check_volume_copy_ops)
self._vdiskcopyops_loop.start(interval=self.VDISKCOPYOPS_INTERVAL)
def _rm_vdisk_copy_op(self, ctxt, volume, orig_copy_id, new_copy_id):
try:
self._vdiskcopyops[volume['id']].remove((orig_copy_id,
new_copy_id))
if not len(self._vdiskcopyops[volume['id']]):
del self._vdiskcopyops[volume['id']]
if not len(self._vdiskcopyops):
self._vdiskcopyops_loop.stop()
self._vdiskcopyops_loop = None
except IndexError:
msg = (_('_rm_vdisk_copy_op: Volume %s does not have any '
'registered vdisk copy operations.') % volume['id'])
LOG.error(msg)
return
except ValueError:
msg = (_('_rm_vdisk_copy_op: Volume %(vol)s does not have the '
'specified vdisk copy operation: orig=%(orig)s '
'new=%(new)s.')
% {'vol': volume['id'], 'orig': orig_copy_id,
'new': new_copy_id})
LOG.error(msg)
return
metadata = self.db.volume_admin_metadata_get(ctxt.elevated(),
volume['id'])
curr_ops = metadata.get('vdiskcopyops', None)
if not curr_ops:
msg = (_('_rm_vdisk_copy_op: Volume metadata %s does not have any '
'registered vdisk copy operations.') % volume['id'])
LOG.error(msg)
return
curr_ops_list = [tuple(x.split(':')) for x in curr_ops.split(';')]
try:
curr_ops_list.remove((orig_copy_id, new_copy_id))
except ValueError:
msg = (_('_rm_vdisk_copy_op: Volume %(vol)s metadata does not '
'have the specified vdisk copy operation: orig=%(orig)s '
'new=%(new)s.')
% {'vol': volume['id'], 'orig': orig_copy_id,
'new': new_copy_id})
LOG.error(msg)
return
if len(curr_ops_list):
new_ops_str = ';'.join([':'.join(x) for x in curr_ops_list])
self.db.volume_admin_metadata_update(ctxt.elevated(), volume['id'],
{'vdiskcopyops': new_ops_str},
False)
else:
self.db.volume_admin_metadata_delete(ctxt.elevated(), volume['id'],
'vdiskcopyops')
def _check_volume_copy_ops(self):
LOG.debug(_("enter: update volume copy status"))
ctxt = context.get_admin_context()
copy_items = self._vdiskcopyops.items()
for vol_id, copy_ops in copy_items:
volume = self.db.volume_get(ctxt, vol_id)
for copy_op in copy_ops:
try:
synced = self._helpers.is_vdisk_copy_synced(volume['name'],
copy_op[1])
except Exception:
msg = (_('_check_volume_copy_ops: Volume %(vol)s does not '
'have the specified vdisk copy operation: '
'orig=%(orig)s new=%(new)s.')
% {'vol': volume['id'], 'orig': copy_op[0],
'new': copy_op[1]})
LOG.info(msg)
else:
if synced:
self._helpers.rm_vdisk_copy(volume['name'], copy_op[0])
self._rm_vdisk_copy_op(ctxt, volume, copy_op[0],
copy_op[1])
LOG.debug(_("exit: update volume copy status"))
def migrate_volume(self, ctxt, volume, host):
"""Migrate directly if source and dest are managed by same storage.
The method uses the migratevdisk method, which returns almost
immediately, if the source and target pools have the same extent_size.
Otherwise, it uses addvdiskcopy and rmvdiskcopy, which require waiting
for the copy operation to complete.
We create a new vdisk copy in the desired pool, and add the original
vdisk copy to the admin_metadata of the volume to be deleted. The
deletion will occur using a periodic task once the new copy is synced.
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
@ -522,24 +647,17 @@ class StorwizeSVCDriver(san.SanDriver):
if dest_pool is None:
return false_ret
if 'extent_size' not in host['capabilities']:
return false_ret
if host['capabilities']['extent_size'] == self._state['extent_size']:
# If source and dest pools have the same extent size, migratevdisk
self._helpers.migrate_vdisk(volume['name'], dest_pool)
ctxt = context.get_admin_context()
if volume['volume_type_id'] is not None:
volume_type_id = volume['volume_type_id']
vol_type = volume_types.get_volume_type(ctxt, volume_type_id)
else:
# If source and dest pool extent size differ, add/delete vdisk copy
ctxt = context.get_admin_context()
if volume['volume_type_id'] is not None:
volume_type_id = volume['volume_type_id']
vol_type = volume_types.get_volume_type(ctxt, volume_type_id)
else:
vol_type = None
self._helpers.migrate_volume_vdiskcopy(volume['name'], dest_pool,
vol_type,
self._state,
self.configuration)
vol_type = None
new_op = self._helpers.add_vdisk_copy(volume['name'], dest_pool,
vol_type, self._state,
self.configuration)
self._add_vdisk_copy_op(ctxt, volume, new_op)
LOG.debug(_('leave: migrate_volume: id=%(id)s, host=%(host)s') %
{'id': volume['id'], 'host': host['host']})
return (True, None)
@ -590,10 +708,10 @@ class StorwizeSVCDriver(san.SanDriver):
if dest_pool is None:
return False
self._helpers.migrate_volume_vdiskcopy(volume['name'], dest_pool,
new_type,
self._state,
self.configuration)
new_op = self._helpers.add_vdisk_copy(volume['name'], dest_pool,
new_type, self._state,
self.configuration)
self._add_vdisk_copy_op(ctxt, volume, new_op)
else:
self._helpers.change_vdisk_options(volume['name'], vdisk_changes,
new_opts, self._state)
@ -650,7 +768,6 @@ class StorwizeSVCDriver(san.SanDriver):
units.GiB)
data['easytier_support'] = attributes['easy_tier'] in ['on', 'auto']
data['compression_support'] = self._state['compression_enabled']
data['extent_size'] = self._state['extent_size']
data['location_info'] = ('StorwizeSVCDriver:%(sys_id)s:%(pool)s' %
{'sys_id': self._state['system_id'],
'pool': pool})

View File

@ -679,13 +679,8 @@ class StorwizeHelpers(object):
def extend_vdisk(self, vdisk, amount):
self.ssh.expandvdisksize(vdisk, amount)
def migrate_volume_vdiskcopy(self, vdisk, dest_pool, volume_type,
state, config):
"""Migrate a volume using addvdiskcopy and rmvdiskcopy.
This will add a vdisk copy with the given volume type in the given
pool, wait until it syncs, and delete the original copy.
"""
def add_vdisk_copy(self, vdisk, dest_pool, volume_type, state, config):
"""Add a vdisk copy in the given pool."""
this_pool = config.storwize_svc_volpool_name
resp = self.ssh.lsvdiskcopy(vdisk)
orig_copy_id = None
@ -694,7 +689,7 @@ class StorwizeHelpers(object):
orig_copy_id = copy_id
if orig_copy_id is None:
msg = (_('migrate_volume started without a vdisk copy in the '
msg = (_('add_vdisk_copy started without a vdisk copy in the '
'expected pool.'))
LOG.error(msg)
raise exception.VolumeDriverException(message=msg)
@ -706,19 +701,16 @@ class StorwizeHelpers(object):
volume_type=volume_type)
params = self._get_vdisk_create_params(opts)
new_copy_id = self.ssh.addvdiskcopy(vdisk, dest_pool, params)
return (orig_copy_id, new_copy_id)
sync = False
while not sync:
sync = self.ssh.lsvdiskcopy(vdisk, copy_id=new_copy_id)[0]['sync']
if sync == 'yes':
sync = True
else:
greenthread.sleep(10)
def is_vdisk_copy_synced(self, vdisk, copy_id):
sync = self.ssh.lsvdiskcopy(vdisk, copy_id=copy_id)[0]['sync']
if sync == 'yes':
return True
return False
self.ssh.rmvdiskcopy(vdisk, orig_copy_id)
def migrate_vdisk(self, vdisk, dest_pool):
self.ssh.migratevdisk(vdisk, dest_pool)
def rm_vdisk_copy(self, vdisk, copy_id):
self.ssh.rmvdiskcopy(vdisk, copy_id)
@staticmethod
def can_migrate_to_host(host, state):

View File

@ -231,11 +231,6 @@ class StorwizeSSH(object):
'-unit', 'gb', vdisk])
self.run_ssh_assert_no_output(ssh_cmd)
def migratevdisk(self, vdisk, dest_pool):
ssh_cmd = ['svctask', 'migratevdisk', '-mdiskgrp', dest_pool,
'-vdisk', vdisk]
self.run_ssh_assert_no_output(ssh_cmd)
def mkfcmap(self, source, target, full_copy):
ssh_cmd = ['svctask', 'mkfcmap', '-source', source, '-target',
target, '-autodelete']

View File

@ -189,7 +189,8 @@ class VolumeManager(manager.SchedulerDependentManager):
self.driver = importutils.import_object(
volume_driver,
configuration=self.configuration,
db=self.db)
db=self.db,
host=self.host)
def _add_to_threadpool(self, func, *args, **kwargs):
self._tp.spawn_n(func, *args, **kwargs)