Merge "Fix backup temp snapshot path on remote node"

This commit is contained in:
Jenkins 2017-05-25 15:17:34 +00:00 committed by Gerrit Code Review
commit 114e3bd722
6 changed files with 259 additions and 242 deletions

View File

@ -90,101 +90,10 @@ class BackupManager(manager.ThreadPoolManager):
self.service = importutils.import_module(self.driver_name)
self.az = CONF.storage_availability_zone
self.volume_managers = {}
# TODO(xyang): If backup_use_same_host is True, we'll find
# the volume backend on the backup node. This allows us
# to use a temp snapshot to backup an in-use volume if the
# driver supports it. This code should go away when we add
# support for backing up in-use volume using a temp snapshot
# on a remote node.
if CONF.backup_use_same_host:
self._setup_volume_drivers()
self.backup_rpcapi = backup_rpcapi.BackupAPI()
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
super(BackupManager, self).__init__(*args, **kwargs)
def _get_volume_backend(self, host=None, allow_null_host=False):
if host is None:
if not allow_null_host:
msg = _("NULL host not allowed for volume backend lookup.")
raise exception.BackupFailedToGetVolumeBackend(msg)
else:
LOG.debug("Checking hostname '%s' for backend info.", host)
# NOTE(xyang): If host='myhost@lvmdriver', backend='lvmdriver'
# by the logic below. This is different from extract_host.
# vol_utils.extract_host(host, 'backend')='myhost@lvmdriver'.
part = host.partition('@')
if (part[1] == '@') and (part[2] != ''):
backend = part[2]
LOG.debug("Got backend '%s'.", backend)
return backend
LOG.info("Backend not found in hostname (%s) so using default.",
host)
if 'default' not in self.volume_managers:
# For multi-backend we just pick the top of the list.
return next(iter(self.volume_managers))
return 'default'
def _get_manager(self, backend):
LOG.debug("Manager requested for volume_backend '%s'.",
backend)
if backend is None:
LOG.debug("Fetching default backend.")
backend = self._get_volume_backend(allow_null_host=True)
if backend not in self.volume_managers:
msg = (_("Volume manager for backend '%s' does not exist.") %
(backend))
raise exception.BackupFailedToGetVolumeBackend(msg)
return self.volume_managers[backend]
def _get_driver(self, backend=None):
LOG.debug("Driver requested for volume_backend '%s'.",
backend)
if backend is None:
LOG.debug("Fetching default backend.")
backend = self._get_volume_backend(allow_null_host=True)
mgr = self._get_manager(backend)
mgr.driver.db = self.db
return mgr.driver
def _setup_volume_drivers(self):
if CONF.enabled_backends:
for backend in filter(None, CONF.enabled_backends):
host = "%s@%s" % (CONF.host, backend)
mgr = importutils.import_object(CONF.volume_manager,
host=host,
service_name=backend)
config = mgr.configuration
backend_name = config.safe_get('volume_backend_name')
LOG.debug("Registering backend %(backend)s (host=%(host)s "
"backend_name=%(backend_name)s).",
{'backend': backend, 'host': host,
'backend_name': backend_name})
self.volume_managers[backend] = mgr
else:
default = importutils.import_object(CONF.volume_manager)
LOG.debug("Registering default backend %s.", default)
self.volume_managers['default'] = default
def _init_volume_driver(self, ctxt, driver):
LOG.info("Starting volume driver %(driver_name)s (%(version)s).",
{'driver_name': driver.__class__.__name__,
'version': driver.get_version()})
try:
driver.do_setup(ctxt)
driver.check_for_setup_error()
except Exception:
LOG.exception("Error encountered during initialization of "
"driver: %(name)s.",
{'name': driver.__class__.__name__})
# we don't want to continue since we failed
# to initialize the driver correctly.
return
driver.set_initialized()
@property
def driver_name(self):
"""This function maps old backup services to backup drivers."""
@ -207,9 +116,6 @@ class BackupManager(manager.ThreadPoolManager):
"""Run initialization needed for a standalone service."""
ctxt = context.get_admin_context()
for mgr in self.volume_managers.values():
self._init_volume_driver(ctxt, mgr.driver)
try:
self._cleanup_incomplete_backup_operations(ctxt)
except Exception:
@ -317,12 +223,7 @@ class BackupManager(manager.ThreadPoolManager):
try:
temp_snapshot = objects.Snapshot.get_by_id(
ctxt, backup.temp_snapshot_id)
volume = objects.Volume.get_by_id(
ctxt, backup.volume_id)
# The temp snapshot should be deleted directly through the
# volume driver, not through the volume manager.
self.volume_rpcapi.delete_snapshot(ctxt, temp_snapshot,
volume.host)
self.volume_rpcapi.delete_snapshot(ctxt, temp_snapshot)
except exception.SnapshotNotFound:
LOG.debug("Could not find temp snapshot %(snap)s to clean "
"up for backup %(backup)s.",
@ -932,18 +833,13 @@ class BackupManager(manager.ThreadPoolManager):
backup_service = self.service.get_backup_driver(context)
return backup_service.support_force_delete
def _attach_device(self, context, backup_device,
def _attach_device(self, ctxt, backup_device,
properties, is_snapshot=False):
"""Attach backup device."""
if not is_snapshot:
return self._attach_volume(context, backup_device, properties)
return self._attach_volume(ctxt, backup_device, properties)
else:
volume = self.db.volume_get(context, backup_device.volume_id)
host = volume_utils.extract_host(volume['host'], 'backend')
backend = self._get_volume_backend(host=host)
rc = self._get_driver(backend)._attach_snapshot(
context, backup_device, properties)
return rc
return self._attach_snapshot(ctxt, backup_device, properties)
def _attach_volume(self, context, volume, properties):
"""Attach a volume."""
@ -965,6 +861,24 @@ class BackupManager(manager.ThreadPoolManager):
"acceptable.",
{'volume_id', volume.id})
def _attach_snapshot(self, ctxt, snapshot, properties):
"""Attach a snapshot."""
try:
conn = self.volume_rpcapi.initialize_connection_snapshot(
ctxt, snapshot, properties)
return self._connect_device(conn)
except Exception:
with excutils.save_and_reraise_exception():
try:
self.volume_rpcapi.terminate_connection_snapshot(
ctxt, snapshot, properties, force=True)
except Exception:
LOG.warning("Failed to terminate the connection "
"of snapshot %(snapshot_id)s, but it is "
"acceptable.",
{'snapshot_id', snapshot.id})
def _connect_device(self, conn):
"""Establish connection to device."""
use_multipath = CONF.use_multipath_for_image_xfer
@ -979,20 +893,18 @@ class BackupManager(manager.ThreadPoolManager):
return {'conn': conn, 'device': vol_handle, 'connector': connector}
def _detach_device(self, context, attach_info, device,
def _detach_device(self, ctxt, attach_info, device,
properties, is_snapshot=False, force=False):
"""Disconnect the volume or snapshot from the host. """
connector = attach_info['connector']
connector.disconnect_volume(attach_info['conn']['data'],
attach_info['device'])
rpcapi = self.volume_rpcapi
if not is_snapshot:
connector = attach_info['connector']
connector.disconnect_volume(attach_info['conn']['data'],
attach_info['device'])
rpcapi = self.volume_rpcapi
rpcapi.terminate_connection(context, device, properties,
rpcapi.terminate_connection(ctxt, device, properties,
force=force)
rpcapi.remove_export(context, device)
rpcapi.remove_export(ctxt, device)
else:
volume = self.db.volume_get(context, device.volume_id)
host = volume_utils.extract_host(volume['host'], 'backend')
backend = self._get_volume_backend(host=host)
self._get_driver(backend)._detach_snapshot(
context, attach_info, device, properties, force)
rpcapi.terminate_connection_snapshot(ctxt, device,
properties, force=force)
rpcapi.remove_export_snapshot(ctxt, device)

View File

@ -38,7 +38,7 @@ from cinder import test
from cinder.tests import fake_driver
from cinder.tests.unit.backup import fake_service_with_verify as fake_service
from cinder.tests.unit import utils
from cinder.volume import driver
from cinder.volume import rpcapi as volume_rpcapi
CONF = cfg.CONF
@ -257,12 +257,9 @@ class BackupTestCase(BaseBackupTest):
mock_get_admin_context.side_effect = get_admin_context
self.volume = importutils.import_object(CONF.volume_manager)
self.backup_mgr.volume_managers = {'driver': self.volume}
self.backup_mgr.init_host()
mock_setup.assert_called_once_with(self.ctxt)
mock_check.assert_called_once_with()
mock_set_initialized.assert_called_once_with()
self.assertEqual({}, self.backup_mgr.volume_managers)
vol1 = db.volume_get(self.ctxt, vol1_id)
self.assertEqual('available', vol1['status'])
@ -347,13 +344,6 @@ class BackupTestCase(BaseBackupTest):
def test_is_working(self):
self.assertTrue(self.backup_mgr.is_working())
def test_get_volume_backend(self):
backup_mgr = manager.BackupManager()
backup_mgr.volume_managers = {'backend1': 'backend1',
'backend2': 'backend2'}
backend = backup_mgr._get_volume_backend(allow_null_host=True)
self.assertIn(backend, backup_mgr.volume_managers)
def test_cleanup_incomplete_backup_operations_with_exceptions(self):
"""Test cleanup resilience in the face of exceptions."""
@ -710,7 +700,6 @@ class BackupTestCase(BaseBackupTest):
mock_get_conn):
"""Test backup in-use volume using temp snapshot."""
self.override_config('backup_use_same_host', True)
self.backup_mgr._setup_volume_drivers()
vol_size = 1
vol_id = self._create_volume_db_entry(size=vol_size,
previous_status='in-use')
@ -728,29 +717,34 @@ class BackupTestCase(BaseBackupTest):
'device': {'path': '/dev/null'},
'conn': {'data': {}},
'connector': fake.FakeConnector(None)}
mock_detach_snapshot = self.mock_object(driver.BaseVD,
'_detach_snapshot')
mock_attach_snapshot = self.mock_object(driver.BaseVD,
'_attach_snapshot')
mock_attach_snapshot.return_value = attach_info
mock_terminate_connection_snapshot = self.mock_object(
volume_rpcapi.VolumeAPI,
'terminate_connection_snapshot')
mock_initialize_connection_snapshot = self.mock_object(
volume_rpcapi.VolumeAPI,
'initialize_connection_snapshot')
mock_connect_device = self.mock_object(
manager.BackupManager,
'_connect_device')
mock_connect_device.return_value = attach_info
properties = {}
mock_get_conn.return_value = properties
mock_open.return_value = open('/dev/null', 'rb')
self.backup_mgr.create_backup(self.ctxt, backup)
mock_temporary_chown.assert_called_once_with('/dev/null')
mock_attach_snapshot.assert_called_once_with(self.ctxt, snap,
properties)
mock_initialize_connection_snapshot.assert_called_once_with(
self.ctxt, snap, properties)
mock_get_backup_device.assert_called_once_with(self.ctxt, backup, vol)
mock_get_conn.assert_called_once_with()
mock_detach_snapshot.assert_called_once_with(self.ctxt, attach_info,
snap, properties, False)
mock_terminate_connection_snapshot.assert_called_once_with(
self.ctxt, snap, properties, force=False)
vol = objects.Volume.get_by_id(self.ctxt, vol_id)
self.assertEqual('in-use', vol['status'])
self.assertEqual('backing-up', vol['previous_status'])
backup = db.backup_get(self.ctxt, backup.id)
self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertEqual(vol_size, backup['size'])
backup = objects.Backup.get_by_id(self.ctxt, backup.id)
self.assertEqual(fields.BackupStatus.AVAILABLE, backup.status)
self.assertEqual(vol_size, backup.size)
@mock.patch.object(fake_driver.FakeLoggingVolumeDriver, 'create_snapshot')
def test_create_temp_snapshot(self, mock_create_snapshot):

View File

@ -592,3 +592,40 @@ class VolumeRPCAPITestCase(test.RPCAPITestCase):
service=service,
log_request='log_request',
version='3.12')
@ddt.data(None, 'mycluster')
def test_initialize_connection_snapshot(self, cluster_name):
self._change_cluster_name(self.fake_snapshot.volume, cluster_name)
self._test_rpc_api('initialize_connection_snapshot',
rpc_method='call',
server=(cluster_name or
self.fake_snapshot.volume.host),
connector='fake_connector',
snapshot=self.fake_snapshot,
expected_kwargs_diff={
'snapshot_id': self.fake_snapshot.id},
version='3.13')
@ddt.data(None, 'mycluster')
def test_terminate_connection_snapshot(self, cluster_name):
self._change_cluster_name(self.fake_snapshot.volume, cluster_name)
self._test_rpc_api('terminate_connection_snapshot',
rpc_method='call',
server=(cluster_name or
self.fake_snapshot.volume.host),
snapshot=self.fake_snapshot,
connector='fake_connector',
force=False,
retval=None,
expected_kwargs_diff={
'snapshot_id': self.fake_snapshot.id},
version='3.13')
def test_remove_export_snapshot(self):
self._test_rpc_api('remove_export_snapshot',
rpc_method='cast',
server=self.fake_volume_obj.host,
snapshot=self.fake_snapshot,
expected_kwargs_diff={
'snapshot_id': self.fake_snapshot.id},
version='3.13')

View File

@ -466,48 +466,26 @@ class BaseVD(object):
raise exception.RemoveExportException(volume=volume['id'],
reason=ex)
def _detach_snapshot(self, context, attach_info, snapshot, properties,
force=False, remote=False):
def _detach_snapshot(self, ctxt, snapshot, properties, force=False):
"""Disconnect the snapshot from the host."""
# Use Brick's code to do attach/detach
connector = attach_info['connector']
connector.disconnect_volume(attach_info['conn']['data'],
attach_info['device'])
try:
self.terminate_connection_snapshot(snapshot, properties,
force=force)
except Exception as err:
err_msg = (_('Unable to terminate snapshot connection: %(err)s')
% {'err': six.text_type(err)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
# NOTE(xyang): This method is introduced for non-disruptive backup.
# Currently backup service has to be on the same node as the volume
# driver. Therefore it is not possible to call a volume driver on a
# remote node. In the future, if backup can be done from a remote
# node, this function can be modified to allow RPC calls. The remote
# flag in the interface is for anticipation that it will be enabled
# in the future.
if remote:
LOG.error("Detaching snapshot from a remote node "
"is not supported.")
raise exception.NotSupportedOperation(
operation=_("detach snapshot from remote node"))
else:
# Call local driver's terminate_connection and remove export.
# NOTE(avishay) This is copied from the manager's code - need to
# clean this up in the future.
try:
self.terminate_connection_snapshot(snapshot, properties,
force=force)
except Exception as err:
err_msg = (_('Unable to terminate volume connection: %(err)s')
% {'err': six.text_type(err)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
try:
LOG.debug("Snapshot %s: removing export.", snapshot.id)
self.remove_export_snapshot(context, snapshot)
except Exception as ex:
LOG.exception("Error detaching snapshot %(snapshot)s, "
"due to remove export failure.",
{"snapshot": snapshot.id})
raise exception.RemoveExportException(volume=snapshot.id,
reason=ex)
try:
LOG.debug("Snapshot %s: removing export.", snapshot.id)
self.remove_export_snapshot(ctxt, snapshot)
except Exception as ex:
LOG.exception("Error detaching snapshot %(snapshot)s, "
"due to remove export failure.",
{"snapshot": snapshot.id})
raise exception.RemoveExportException(volume=snapshot.id,
reason=ex)
def set_initialized(self):
self._initialized = True
@ -1016,63 +994,47 @@ class BaseVD(object):
return (attach_info, volume)
def _attach_snapshot(self, context, snapshot, properties, remote=False):
def _attach_snapshot(self, ctxt, snapshot, properties):
"""Attach the snapshot."""
# NOTE(xyang): This method is introduced for non-disruptive backup.
# Currently backup service has to be on the same node as the volume
# driver. Therefore it is not possible to call a volume driver on a
# remote node. In the future, if backup can be done from a remote
# node, this function can be modified to allow RPC calls. The remote
# flag in the interface is for anticipation that it will be enabled
# in the future.
if remote:
LOG.error("Attaching snapshot from a remote node "
"is not supported.")
raise exception.NotSupportedOperation(
operation=_("attach snapshot from remote node"))
else:
# Call local driver's create_export and initialize_connection.
# NOTE(avishay) This is copied from the manager's code - need to
# clean this up in the future.
model_update = None
try:
LOG.debug("Snapshot %s: creating export.", snapshot.id)
model_update = self.create_export_snapshot(context, snapshot,
properties)
if model_update:
snapshot.provider_location = model_update.get(
'provider_location', None)
snapshot.provider_auth = model_update.get(
'provider_auth', None)
snapshot.save()
except exception.CinderException as ex:
if model_update:
LOG.exception("Failed updating model of snapshot "
"%(snapshot_id)s with driver provided "
"model %(model)s.",
{'snapshot_id': snapshot.id,
'model': model_update})
raise exception.ExportFailure(reason=ex)
model_update = None
try:
LOG.debug("Snapshot %s: creating export.", snapshot.id)
model_update = self.create_export_snapshot(ctxt, snapshot,
properties)
if model_update:
snapshot.provider_location = model_update.get(
'provider_location', None)
snapshot.provider_auth = model_update.get(
'provider_auth', None)
snapshot.save()
except exception.CinderException as ex:
if model_update:
LOG.exception("Failed updating model of snapshot "
"%(snapshot_id)s with driver provided "
"model %(model)s.",
{'snapshot_id': snapshot.id,
'model': model_update})
raise exception.ExportFailure(reason=ex)
try:
conn = self.initialize_connection_snapshot(
snapshot, properties)
except Exception as err:
try:
conn = self.initialize_connection_snapshot(
snapshot, properties)
except Exception as err:
try:
err_msg = (_('Unable to fetch connection information from '
'backend: %(err)s') %
{'err': six.text_type(err)})
LOG.error(err_msg)
LOG.debug("Cleaning up failed connect initialization.")
self.remove_export_snapshot(context, snapshot)
except Exception as ex:
ex_msg = (_('Error encountered during cleanup '
'of a failed attach: %(ex)s') %
{'ex': six.text_type(ex)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=ex_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
return self._connect_device(conn)
err_msg = (_('Unable to fetch connection information from '
'backend: %(err)s') %
{'err': six.text_type(err)})
LOG.error(err_msg)
LOG.debug("Cleaning up failed connect initialization.")
self.remove_export_snapshot(ctxt, snapshot)
except Exception as ex:
ex_msg = (_('Error encountered during cleanup '
'of a failed attach: %(ex)s') %
{'ex': six.text_type(ex)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=ex_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
return conn
def _connect_device(self, conn):
# Use Brick's code to do attach/detach
@ -1129,7 +1091,7 @@ class BaseVD(object):
"""
backup_device = None
is_snapshot = False
if self.backup_use_temp_snapshot() and CONF.backup_use_same_host:
if self.backup_use_temp_snapshot():
(backup_device, is_snapshot) = (
self._get_backup_volume_temp_snapshot(context, backup))
else:

View File

@ -1543,6 +1543,66 @@ class VolumeManager(manager.CleanableManager,
resource=volume)
return conn_info
def initialize_connection_snapshot(self, ctxt, snapshot_id, connector):
utils.require_driver_initialized(self.driver)
snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
try:
self.driver.validate_connector(connector)
except exception.InvalidConnectorException as err:
raise exception.InvalidInput(reason=six.text_type(err))
except Exception as err:
err_msg = (_("Validate snapshot connection failed "
"(error: %(err)s).") % {'err': six.text_type(err)})
LOG.exception(err_msg, resource=snapshot)
raise exception.VolumeBackendAPIException(data=err_msg)
model_update = None
try:
LOG.debug("Snapshot %s: creating export.", snapshot.id)
model_update = self.driver.create_export_snapshot(
ctxt.elevated(), snapshot, connector)
if model_update:
snapshot.provider_location = model_update.get(
'provider_location', None)
snapshot.provider_auth = model_update.get(
'provider_auth', None)
snapshot.save()
except exception.CinderException as ex:
msg = _("Create export of snapshot failed (%s)") % ex.msg
LOG.exception(msg, resource=snapshot)
raise exception.VolumeBackendAPIException(data=msg)
try:
if model_update:
snapshot.update(model_update)
snapshot.save()
except exception.CinderException as ex:
LOG.exception("Model update failed.", resource=snapshot)
raise exception.ExportFailure(reason=six.text_type(ex))
try:
conn = self.driver.initialize_connection_snapshot(snapshot,
connector)
except Exception as err:
try:
err_msg = (_('Unable to fetch connection information from '
'backend: %(err)s') %
{'err': six.text_type(err)})
LOG.error(err_msg)
LOG.debug("Cleaning up failed connect initialization.")
self.driver.remove_export_snapshot(ctxt.elevated(), snapshot)
except Exception as ex:
ex_msg = (_('Error encountered during cleanup '
'of a failed attach: %(ex)s') %
{'ex': six.text_type(ex)})
LOG.error(ex_msg)
raise exception.VolumeBackendAPIException(data=ex_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
LOG.info("Initialize snapshot connection completed successfully.",
resource=snapshot)
return conn
def terminate_connection(self, context, volume_id, connector, force=False):
"""Cleanup connection from host represented by connector.
@ -1565,6 +1625,22 @@ class VolumeManager(manager.CleanableManager,
LOG.info("Terminate volume connection completed successfully.",
resource=volume_ref)
def terminate_connection_snapshot(self, ctxt, snapshot_id,
connector, force=False):
utils.require_driver_initialized(self.driver)
snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
try:
self.driver.terminate_connection_snapshot(snapshot, connector,
force=force)
except Exception as err:
err_msg = (_('Terminate snapshot connection failed: %(err)s')
% {'err': six.text_type(err)})
LOG.exception(err_msg, resource=snapshot)
raise exception.VolumeBackendAPIException(data=err_msg)
LOG.info("Terminate snapshot connection completed successfully.",
resource=snapshot)
def remove_export(self, context, volume_id):
"""Removes an export for a volume."""
utils.require_driver_initialized(self.driver)
@ -1579,6 +1655,20 @@ class VolumeManager(manager.CleanableManager,
LOG.info("Remove volume export completed successfully.",
resource=volume_ref)
def remove_export_snapshot(self, ctxt, snapshot_id):
"""Removes an export for a snapshot."""
utils.require_driver_initialized(self.driver)
snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
try:
self.driver.remove_export_snapshot(ctxt, snapshot)
except Exception:
msg = _("Remove snapshot export failed.")
LOG.exception(msg, resource=snapshot)
raise exception.VolumeBackendAPIException(data=msg)
LOG.info("Remove snapshot export completed successfully.",
resource=snapshot)
def accept_transfer(self, context, volume_id, new_user, new_project):
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught

View File

@ -122,15 +122,17 @@ class VolumeAPI(rpc.RPCAPI):
that we were doing in init_host.
3.8 - Make failover_host cluster aware and add failover_completed.
3.9 - Adds new attach/detach methods
3.10 - Returning objects instead of raw dictionaries in
3.10 - Returning objects instead of raw dictionaries in
get_manageable_volumes & get_manageable_snapshots
3.11 - Removes create_consistencygroup, delete_consistencygroup,
create_cgsnapshot, delete_cgsnapshot, update_consistencygroup,
and create_consistencygroup_from_src.
3.12 - Adds set_log_levels and get_log_levels
3.13 - Add initialize_connection_snapshot,
terminate_connection_snapshot, and remove_export_snapshot.
"""
RPC_API_VERSION = '3.12'
RPC_API_VERSION = '3.13'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume'
@ -399,6 +401,26 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'delete_group_snapshot',
group_snapshot=group_snapshot)
@rpc.assert_min_rpc_version('3.13')
def initialize_connection_snapshot(self, ctxt, snapshot, connector):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
return cctxt.call(ctxt, 'initialize_connection_snapshot',
snapshot_id=snapshot.id,
connector=connector)
@rpc.assert_min_rpc_version('3.13')
def terminate_connection_snapshot(self, ctxt, snapshot, connector,
force=False):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
return cctxt.call(ctxt, 'terminate_connection_snapshot',
snapshot_id=snapshot.id,
connector=connector, force=force)
@rpc.assert_min_rpc_version('3.13')
def remove_export_snapshot(self, ctxt, snapshot):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
cctxt.cast(ctxt, 'remove_export_snapshot', snapshot_id=snapshot.id)
@rpc.assert_min_rpc_version('3.9')
def attachment_update(self, ctxt, vref, connector, attachment_id):
version = self._compat_ver('3.9')