From efd476c74e476fd463a7db4bc63923239489edeb Mon Sep 17 00:00:00 2001 From: Eric Harney Date: Wed, 27 Jan 2021 10:36:05 -0500 Subject: [PATCH] mypy: RBD driver Change-Id: I4f8e4c13e678912ce5a3105dc21ea81d1b53bf9a --- cinder/volume/drivers/rbd.py | 403 ++++++++++++++++++++++++----------- mypy-files.txt | 1 + 2 files changed, 284 insertions(+), 120 deletions(-) diff --git a/cinder/volume/drivers/rbd.py b/cinder/volume/drivers/rbd.py index a704c416195..698a963ed04 100644 --- a/cinder/volume/drivers/rbd.py +++ b/cinder/volume/drivers/rbd.py @@ -19,6 +19,8 @@ import json import math import os import tempfile +import typing +from typing import Any, Dict, List, Optional, Tuple, Union # noqa: H301 from castellan import key_manager from eventlet import tpool @@ -36,15 +38,19 @@ try: except ImportError: rados = None rbd = None -import six from six.moves import urllib +from cinder import context from cinder import exception from cinder.i18n import _ from cinder.image import image_utils from cinder import interface from cinder import objects +from cinder.objects.backup import Backup from cinder.objects import fields +from cinder.objects.snapshot import Snapshot +from cinder.objects.volume import Volume +from cinder.objects.volume_type import VolumeType from cinder import utils from cinder.volume import configuration from cinder.volume import driver @@ -150,9 +156,16 @@ class RBDVolumeProxy(object): The underlying librados client and ioctx can be accessed as the attributes 'client' and 'ioctx'. """ - def __init__(self, driver, name, pool=None, snapshot=None, - read_only=False, remote=None, timeout=None, - client=None, ioctx=None): + def __init__(self, + driver: 'RBDDriver', + name: str, + pool: str = None, + snapshot: str = None, + read_only: bool = False, + remote: Optional[Dict[str, str]] = None, + timeout: int = None, + client: 'rados.Rados' = None, + ioctx: 'rados.Ioctx' = None): self._close_conn = not (client and ioctx) rados_client, rados_ioctx = driver._connect_to_rados( pool, remote, timeout) if self._close_conn else (client, ioctx) @@ -173,34 +186,37 @@ class RBDVolumeProxy(object): self.client = rados_client self.ioctx = rados_ioctx - def __enter__(self): + def __enter__(self) -> 'RBDVolumeProxy': return self - def __exit__(self, type_, value, traceback): + def __exit__(self, + type_: Optional[Any], + value: Optional[Any], + traceback: Optional[Any]) -> None: try: self.volume.close() finally: if self._close_conn: self.driver._disconnect_from_rados(self.client, self.ioctx) - def __getattr__(self, attrib): + def __getattr__(self, attrib: str): return getattr(self.volume, attrib) class RADOSClient(object): """Context manager to simplify error handling for connecting to ceph.""" - def __init__(self, driver, pool=None): + def __init__(self, driver: 'RBDDriver', pool: str = None) -> None: self.driver = driver self.cluster, self.ioctx = driver._connect_to_rados(pool) - def __enter__(self): + def __enter__(self) -> 'RADOSClient': return self - def __exit__(self, type_, value, traceback): + def __exit__(self, type_, value, traceback) -> None: self.driver._disconnect_from_rados(self.cluster, self.ioctx) @property - def features(self): + def features(self) -> int: features = self.cluster.conf_get('rbd_default_features') if ((features is None) or (int(features) == 0)): features = self.driver.RBD_FEATURE_LAYERING @@ -229,10 +245,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, RBD_FEATURE_JOURNALING = 64 STORAGE_PROTOCOL = 'ceph' - def __init__(self, active_backend_id=None, *args, **kwargs): + def __init__(self, + active_backend_id: str = None, + *args, + **kwargs) -> None: super(RBDDriver, self).__init__(*args, **kwargs) self.configuration.append_config_values(RBD_OPTS) - self._stats = {} + self._stats: Dict[str, Union[str, bool]] = {} # allow overrides for testing self.rados = kwargs.get('rados', rados) self.rbd = kwargs.get('rbd', rbd) @@ -247,12 +266,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self._backend_name = (self.configuration.volume_backend_name or self.__class__.__name__) - self._active_backend_id = active_backend_id - self._active_config = {} + self._active_backend_id: Optional[str] = active_backend_id + self._active_config: Dict[str, str] = {} self._is_replication_enabled = False - self._replication_targets = [] - self._target_names = [] - self._clone_v2_api_checked = False + self._replication_targets: list = [] + self._target_names: List[str] = [] + self._clone_v2_api_checked: bool = False if self.rbd is not None: self.RBD_FEATURE_LAYERING = self.rbd.RBD_FEATURE_LAYERING @@ -268,9 +287,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self.RBD_FEATURE_OBJECT_MAP | self.RBD_FEATURE_EXCLUSIVE_LOCK) + self.keyring_data: Optional[str] = None self._set_keyring_attributes() - def _set_keyring_attributes(self): + def _set_keyring_attributes(self) -> None: # The rbd_keyring_conf option is not available for OpenStack usage # for security reasons (OSSN-0085) and in OpenStack we use # rbd_secret_uuid or make sure that the keyring files are present on @@ -281,8 +301,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, # file even if it's there (because we have removed the conf option # definition), but cinderlib will find it because it sets the option # directly as an attribute. - self.keyring_file = getattr(self.configuration, 'rbd_keyring_conf', - None) + self.keyring_file: Optional[str] = getattr(self.configuration, + 'rbd_keyring_conf', + None) self.keyring_data = None try: @@ -293,13 +314,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.debug('Cannot read RBD keyring file: %s.', self.keyring_file) @classmethod - def get_driver_options(cls): + def get_driver_options(cls) -> list: additional_opts = cls._get_oslo_driver_opts( 'replication_device', 'reserved_percentage', 'max_over_subscription_ratio', 'volume_dd_blocksize') return RBD_OPTS + additional_opts - def _show_msg_check_clone_v2_api(self, volume_name): + def _show_msg_check_clone_v2_api(self, volume_name: str) -> None: if not self._clone_v2_api_checked: self._clone_v2_api_checked = True with RBDVolumeProxy(self, volume_name, read_only=True) as volume: @@ -315,7 +336,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, ' compat version to mimic for better' ' performance, fewer deletion issues') - def _get_target_config(self, target_id): + def _get_target_config(self, + target_id: Optional[str]) -> Dict[str, + str]: """Get a replication target from known replication targets.""" for target in self._replication_targets: if target['name'] == target_id: @@ -330,12 +353,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, raise exception.InvalidReplicationTarget( reason=_('RBD: Unknown failover target host %s.') % target_id) - def do_setup(self, context): + def do_setup(self, context: context.RequestContext) -> None: """Performs initialization steps that could raise exceptions.""" self._do_setup_replication() self._active_config = self._get_target_config(self._active_backend_id) - def _do_setup_replication(self): + def _do_setup_replication(self) -> None: replication_devices = self.configuration.safe_get( 'replication_device') if replication_devices: @@ -343,7 +366,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self._is_replication_enabled = True self._target_names.append('default') - def _parse_replication_configs(self, replication_devices): + def _parse_replication_configs(self, + replication_devices: List[dict]) -> None: for replication_device in replication_devices: if 'backend_id' not in replication_device: msg = _('Missing backend_id in replication_device ' @@ -366,13 +390,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self._replication_targets.append(replication_target) self._target_names.append(name) - def _get_config_tuple(self, remote=None): + def _get_config_tuple( + self, + remote: Optional[Dict[str, str]] = None) \ + -> Tuple[Optional[str], Optional[str], + Optional[str], Optional[str]]: if not remote: remote = self._active_config return (remote.get('name'), remote.get('conf'), remote.get('user'), remote.get('secret_uuid', None)) - def _trash_purge(self): + def _trash_purge(self) -> None: LOG.info("Purging trash for backend '%s'", self._backend_name) def _err(vol_name: str, backend_name: str) -> None: @@ -405,7 +433,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, vol.get('name'), self._backend_name) - def _start_periodic_tasks(self): + def _start_periodic_tasks(self) -> None: if self.configuration.enable_deferred_deletion: LOG.info("Starting periodic trash purge for backend '%s'", self._backend_name) @@ -414,7 +442,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, deferred_deletion_ptask.start( interval=self.configuration.deferred_deletion_purge_interval) - def check_for_setup_error(self): + def check_for_setup_error(self) -> None: """Returns an error if prerequisites aren't met.""" if rados is None: msg = _('rados and rbd python libraries not found') @@ -451,10 +479,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self._start_periodic_tasks() - def RBDProxy(self): + def RBDProxy(self) -> tpool.Proxy: return tpool.Proxy(self.rbd.RBD()) - def _ceph_args(self): + def _ceph_args(self) -> List[str]: args = [] name, conf, user, secret_uuid = self._get_config_tuple() @@ -468,11 +496,18 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return args - def _connect_to_rados(self, pool=None, remote=None, timeout=None): + def _connect_to_rados(self, + pool: Optional[str] = None, + remote: Optional[dict] = None, + timeout: Optional[int] = None) -> \ + Tuple['rados.Rados', 'rados.Ioctx']: @utils.retry(exception.VolumeBackendAPIException, self.configuration.rados_connection_interval, self.configuration.rados_connection_retries) - def _do_conn(pool, remote, timeout): + def _do_conn(pool: Optional[str], + remote: Optional[dict], + timeout: Optional[int]) -> Tuple['rados.Rados', + 'rados.Ioctx']: name, conf, user, secret_uuid = self._get_config_tuple(remote) if pool is not None: @@ -494,10 +529,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, try: if timeout >= 0: - timeout = six.text_type(timeout) - client.conf_set('rados_osd_op_timeout', timeout) - client.conf_set('rados_mon_op_timeout', timeout) - client.conf_set('client_mount_timeout', timeout) + t = str(timeout) + client.conf_set('rados_osd_op_timeout', t) + client.conf_set('rados_mon_op_timeout', t) + client.conf_set('client_mount_timeout', t) client.connect() ioctx = client.open_ioctx(pool) @@ -510,12 +545,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return _do_conn(pool, remote, timeout) - def _disconnect_from_rados(self, client, ioctx): + def _disconnect_from_rados(self, + client: 'rados.Rados', + ioctx: 'rados.Ioctx') -> None: # closing an ioctx cannot raise an exception ioctx.close() client.shutdown() - def _get_backup_snaps(self, rbd_image): + def _get_backup_snaps(self, rbd_image) -> List: """Get list of any backup snapshots that exist on this volume. There should only ever be one but accept all since they need to be @@ -528,7 +565,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, from cinder.backup.drivers import ceph return ceph.CephBackupDriver.get_backup_snaps(rbd_image) - def _get_mon_addrs(self): + def _get_mon_addrs(self) -> Tuple[List[str], List[str]]: args = ['ceph', 'mon', 'dump', '--format=json'] args.extend(self._ceph_args()) out, _ = self._execute(*args) @@ -536,7 +573,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, if lines[0].startswith('dumped monmap epoch'): lines = lines[1:] monmap = json.loads('\n'.join(lines)) - addrs = [mon['addr'] for mon in monmap['mons']] + addrs: List[str] = [mon['addr'] for mon in monmap['mons']] hosts = [] ports = [] for addr in addrs: @@ -546,7 +583,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, ports.append(port) return hosts, ports - def _get_usage_info(self): + def _get_usage_info(self) -> int: """Calculate provisioned volume space in GiB. Stats report should send provisioned size of volumes (snapshot must not @@ -572,7 +609,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, total_provisioned = math.ceil(float(total_provisioned) / units.Gi) return total_provisioned - def _get_pool_stats(self): + def _get_pool_stats(self) -> Union[Tuple[str, str], + Tuple[float, float]]: """Gets pool free and total capacity in GiB. Calculate free and total capacity of the pool based on the pool's @@ -602,6 +640,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, pool_stats = [pool for pool in df_data['pools'] if pool['name'] == pool_name][0]['stats'] + total_capacity: float + free_capacity: float quota_outbuf = encodeutils.safe_decode(quota_outbuf) bytes_quota = json.loads(quota_outbuf)['quota_max_bytes'] # With quota the total is the quota limit and free is quota - used @@ -624,7 +664,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return free_capacity, total_capacity - def _update_volume_stats(self): + def _update_volume_stats(self) -> None: location_info = '%s:%s:%s:%s:%s' % ( self.configuration.rbd_cluster_name, self.configuration.rbd_ceph_conf, @@ -673,7 +713,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.exception('error refreshing volume stats') self._stats = stats - def _get_clone_depth(self, client, volume_name, depth=0): + def _get_clone_depth(self, + client: 'rados.Rados', + volume_name: str, + depth: int = 0) -> int: """Returns the number of ancestral clones of the given volume.""" parent_volume = self.rbd.Image(client.ioctx, volume_name, @@ -689,7 +732,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return self._get_clone_depth(client, parent, depth + 1) - def _extend_if_required(self, volume, src_vref): + def _extend_if_required(self, volume: Volume, src_vref: Volume) -> None: """Extends a volume if required In case src_vref size is smaller than the size if the requested @@ -702,7 +745,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, 'dst_size': volume.size}) self._resize(volume) - def create_cloned_volume(self, volume, src_vref): + def create_cloned_volume( + self, + volume: Volume, + src_vref: Volume) -> Optional[Dict[str, Optional[str]]]: """Create a cloned volume from another volume. Since we are cloning from a volume and not a snapshot, we must first @@ -722,7 +768,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, with RBDVolumeProxy(self, src_name, read_only=True) as vol: vol.copy(vol.ioctx, dest_name) self._extend_if_required(volume, src_vref) - return + return None # Otherwise do COW clone. with RADOSClient(self) as client: @@ -808,7 +854,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.debug("clone created successfully") return volume_update - def _enable_replication(self, volume): + def _enable_replication(self, volume: Volume) -> Dict[str, str]: """Enable replication for a volume. Returns required volume update. @@ -832,7 +878,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return {'replication_status': fields.ReplicationStatus.ENABLED, 'replication_driver_data': driver_data} - def _enable_multiattach(self, volume): + def _enable_multiattach(self, volume: Volume) -> Dict[str, str]: vol_name = utils.convert_str(volume.name) with RBDVolumeProxy(self, vol_name) as image: image_features = image.features() @@ -842,7 +888,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return {'provider_location': self._dumps({'saved_features': image_features})} - def _disable_multiattach(self, volume): + def _disable_multiattach(self, volume: Volume) -> Dict[str, None]: vol_name = utils.convert_str(volume.name) with RBDVolumeProxy(self, vol_name) as image: try: @@ -859,7 +905,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return {'provider_location': None} - def _is_replicated_type(self, volume_type): + def _is_replicated_type(self, volume_type: VolumeType) -> bool: try: extra_specs = volume_type.extra_specs LOG.debug('extra_specs: %s', extra_specs) @@ -868,7 +914,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.debug('Unable to retrieve extra specs info') return False - def _is_multiattach_type(self, volume_type): + def _is_multiattach_type(self, volume_type: VolumeType) -> bool: try: extra_specs = volume_type.extra_specs LOG.debug('extra_specs: %s', extra_specs) @@ -877,7 +923,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.debug('Unable to retrieve extra specs info') return False - def _setup_volume(self, volume, volume_type=None): + def _setup_volume( + self, + volume: Volume, + volume_type: Optional[VolumeType] = None) -> Dict[str, + Optional[str]]: if volume_type: had_replication = self._is_replicated_type(volume.volume_type) @@ -894,7 +944,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, msg = _('Replication and Multiattach are mutually exclusive.') raise RBDDriverException(reason=msg) - volume_update = dict() + volume_update: dict = dict() if want_replication: if had_multiattach: @@ -924,7 +974,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return volume_update - def _create_encrypted_volume(self, volume, context): + def _create_encrypted_volume(self, + volume: Volume, + context: context.RequestContext) -> None: """Create an encrypted volume. This works by creating an encrypted image locally, @@ -972,11 +1024,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, cmd.extend(self._ceph_args()) self._execute(*cmd) - def create_volume(self, volume): + def create_volume(self, volume: Volume) -> Dict[str, Any]: """Creates a logical volume.""" if volume.encryption_key_id: - return self._create_encrypted_volume(volume, volume.obj_context) + self._create_encrypted_volume(volume, volume.obj_context) + return {} size = int(volume.size) * units.Gi @@ -1004,13 +1057,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return volume_update - def _flatten(self, pool, volume_name): + def _flatten(self, pool: str, volume_name: str) -> None: LOG.debug('flattening %(pool)s/%(img)s', dict(pool=pool, img=volume_name)) with RBDVolumeProxy(self, volume_name, pool) as vol: vol.flatten() - def _get_stripe_unit(self, ioctx, volume_name): + def _get_stripe_unit(self, ioctx: 'rados.Ioctx', volume_name: str) -> int: """Return the correct stripe unit for a cloned volume. A cloned volume must be created with a stripe unit at least as large @@ -1029,7 +1082,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return max(image_stripe_unit, default_stripe_unit) - def _clone(self, volume, src_pool, src_image, src_snap): + def _clone(self, + volume: Volume, + src_pool: str, + src_image: str, + src_snap: str) -> Dict[str, Optional[str]]: LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s', dict(pool=src_pool, img=src_image, snap=src_snap, dst=volume.name)) @@ -1056,7 +1113,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, volume_id=volume.id) return volume_update or {} - def _resize(self, volume, **kwargs): + def _resize(self, volume: Volume, **kwargs: Any) -> None: size = kwargs.get('size', None) if not size: size = int(volume.size) * units.Gi @@ -1064,14 +1121,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, with RBDVolumeProxy(self, volume.name) as vol: vol.resize(size) - def _calculate_new_size(self, size_diff, volume_name): + def _calculate_new_size(self, size_diff: int, volume_name: str) -> int: with RBDVolumeProxy(self, volume_name) as vol: current_size_bytes = vol.volume.size() size_diff_bytes = size_diff * units.Gi new_size_bytes = current_size_bytes + size_diff_bytes return new_size_bytes - def create_volume_from_snapshot(self, volume, snapshot): + def create_volume_from_snapshot( + self, + volume: Volume, + snapshot: Snapshot) -> dict: """Creates a volume from a snapshot.""" volume_update = self._clone(volume, self.configuration.rbd_pool, snapshot.volume_name, snapshot.name) @@ -1098,7 +1158,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self._show_msg_check_clone_v2_api(snapshot.volume_name) return volume_update - def _delete_backup_snaps(self, rbd_image): + def _delete_backup_snaps(self, rbd_image: 'rbd.Image') -> None: backup_snaps = self._get_backup_snaps(rbd_image) if backup_snaps: for snap in backup_snaps: @@ -1106,7 +1166,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, else: LOG.debug("volume has no backup snaps") - def _get_clone_info(self, volume, volume_name, snap=None): + def _get_clone_info( + self, + volume: 'rbd.Image', + volume_name: str, + snap: Optional[str] = None) -> Union[Tuple[str, str, str], + Tuple[None, None, None]]: """If volume is a clone, return its parent info. Returns a tuple of (pool, parent, snap). A snapshot may optionally be @@ -1132,7 +1197,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return (None, None, None) - def _get_children_info(self, volume, snap): + def _get_children_info(self, + volume: 'rbd.Image', + snap: Optional[str]) -> List[tuple]: """List children for the given snapshot of a volume(image). Returns a list of (pool, image). @@ -1147,7 +1214,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return children_list - def _delete_clone_parent_refs(self, client, parent_name, parent_snap): + def _delete_clone_parent_refs(self, + client: RADOSClient, + parent_name: str, + parent_snap: str) -> None: """Walk back up the clone chain and delete references. Deletes references i.e. deleted parent volumes and snapshots. @@ -1183,9 +1253,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, # Now move up to grandparent if there is one if g_parent: + g_parent_snap = typing.cast(str, g_parent_snap) self._delete_clone_parent_refs(client, g_parent, g_parent_snap) - def delete_volume(self, volume): + def delete_volume(self, volume: Volume) -> None: """Deletes a logical volume.""" # NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are # utf-8 otherwise librbd will barf. @@ -1226,7 +1297,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, @utils.retry(self.rbd.ImageBusy, self.configuration.rados_connection_interval, self.configuration.rados_connection_retries) - def _try_remove_volume(client, volume_name): + def _try_remove_volume(client: Any, volume_name: str) -> None: if self.configuration.enable_deferred_deletion: delay = self.configuration.deferred_deletion_delay else: @@ -1269,6 +1340,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, # references. if parent: LOG.debug("volume is a clone so cleaning references") + parent_snap = typing.cast(str, parent_snap) self._delete_clone_parent_refs(client, parent, parent_snap) else: # If the volume has copy-on-write clones we will not be able to @@ -1277,14 +1349,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, new_name = "%s.deleted" % (volume_name) self.RBDProxy().rename(client.ioctx, volume_name, new_name) - def create_snapshot(self, snapshot): + def create_snapshot(self, snapshot: Snapshot) -> None: """Creates an rbd snapshot.""" with RBDVolumeProxy(self, snapshot.volume_name) as volume: snap = utils.convert_str(snapshot.name) volume.create_snap(snap) volume.protect_snap(snap) - def delete_snapshot(self, snapshot): + def delete_snapshot(self, snapshot: Snapshot) -> None: """Deletes an rbd snapshot.""" # NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are # utf-8 otherwise librbd will barf. @@ -1320,11 +1392,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.info("Snapshot %s does not exist in backend.", snap_name) - def snapshot_revert_use_temp_snapshot(self): + def snapshot_revert_use_temp_snapshot(self) -> bool: """Disable the use of a temporary snapshot on revert.""" return False - def revert_to_snapshot(self, context, volume, snapshot): + def revert_to_snapshot(self, + context: context.RequestContext, + volume: Volume, + snapshot: Snapshot) -> None: """Revert a volume to a given snapshot.""" # NOTE(rosmaita): The Ceph documentation notes that this operation is # inefficient on the backend for large volumes, and that the preferred @@ -1346,7 +1421,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, with RBDVolumeProxy(self, volume.name) as image: image.rollback_to_snap(snapshot.name) - def _disable_replication(self, volume): + def _disable_replication(self, volume: Volume) -> Dict[str, Optional[str]]: """Disable replication on the given volume.""" vol_name = utils.convert_str(volume.name) with RBDVolumeProxy(self, vol_name) as image: @@ -1363,14 +1438,24 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return {'replication_status': fields.ReplicationStatus.DISABLED, 'replication_driver_data': None} - def retype(self, context, volume, new_type, diff, host): + def retype(self, + context: context.RequestContext, + volume: Volume, + new_type: VolumeType, + diff: Union[Dict[str, Dict[str, str]], Dict[str, Dict], None], + host: Optional[Dict[str, str]]) -> Tuple[bool, dict]: """Retype from one volume type to another on the same backend.""" return True, self._setup_volume(volume, new_type) - def _dumps(self, obj): + def _dumps(self, obj: Dict[str, Union[bool, int]]) -> str: return json.dumps(obj, separators=(',', ':'), sort_keys=True) - def _exec_on_volume(self, volume_name, remote, operation, *args, **kwargs): + def _exec_on_volume(self, + volume_name: str, + remote: Dict[str, str], + operation: str, + *args: Any, + **kwargs: Any): @utils.retry(rbd.ImageBusy, self.configuration.rados_connection_interval, self.configuration.rados_connection_retries) @@ -1381,7 +1466,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return getattr(rbd_image, operation)(*args, **kwargs) return _do_exec() - def _failover_volume(self, volume, remote, is_demoted, replication_status): + def _failover_volume(self, + volume: Volume, + remote: Dict[str, str], + is_demoted: bool, + replication_status: str) -> Dict[str, Any]: """Process failover for a volume. There are 2 different cases that will return different update values @@ -1420,7 +1509,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return error_result - def _demote_volumes(self, volumes, until_failure=True): + def _demote_volumes(self, + volumes: List[Volume], + until_failure: bool = True) -> List[bool]: """Try to demote volumes on the current primary cluster.""" result = [] try_demoting = True @@ -1440,7 +1531,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, result.append(demoted) return result - def _get_failover_target_config(self, secondary_id=None): + def _get_failover_target_config(self, + secondary_id: str = None) -> Tuple[str, + dict]: if not secondary_id: # In auto mode exclude failback and active candidates = set(self._target_names).difference( @@ -1451,7 +1544,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, secondary_id = candidates.pop() return secondary_id, self._get_target_config(secondary_id) - def failover(self, context, volumes, secondary_id=None, groups=None): + def failover(self, + context: context.RequestContext, + volumes: list, + secondary_id: Optional[str] = None, + groups=None) -> Tuple[str, list, list]: """Failover replicated volumes.""" LOG.info('RBD driver failover started.') if not self._is_replication_enabled: @@ -1476,7 +1573,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.info('RBD driver failover completed.') return secondary_id, updates, [] - def failover_completed(self, context, secondary_id=None): + def failover_completed(self, + context: context.RequestContext, + secondary_id: Optional[str] = None) -> None: """Failover to replication target.""" LOG.info('RBD driver failover completion started.') secondary_id, remote = self._get_failover_target_config(secondary_id) @@ -1485,7 +1584,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self._active_config = remote LOG.info('RBD driver failover completion completed.') - def failover_host(self, context, volumes, secondary_id=None, groups=None): + def failover_host(self, + context: context.RequestContext, + volumes: List[Volume], + secondary_id: str = None, + groups: Optional[List] = None) -> Tuple[str, + List[Volume], + List]: """Failover to replication target. This function combines calls to failover() and failover_completed() to @@ -1496,19 +1601,28 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, self.failover_completed(context, secondary_id) return active_backend_id, volume_update_list, group_update_list - def ensure_export(self, context, volume): + def ensure_export(self, + context: context.RequestContext, + volume: Volume): """Synchronously recreates an export for a logical volume.""" pass - def create_export(self, context, volume, connector): + def create_export(self, + context: context.RequestContext, + volume: Volume, + connector: dict): """Exports the volume.""" pass - def remove_export(self, context, volume): + def remove_export(self, + context: context.RequestContext, + volume: Volume): """Removes an export for a logical volume.""" pass - def initialize_connection(self, volume, connector): + def initialize_connection(self, + volume: Volume, + connector: dict) -> Dict[str, Any]: hosts, ports = self._get_mon_addrs() name, conf, user, secret_uuid = self._get_config_tuple() data = { @@ -1528,14 +1642,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, } } if self.keyring_data: - data['data']['keyring'] = self.keyring_data + data['data']['keyring'] = self.keyring_data # type: ignore LOG.debug('connection data: %s', data) return data - def terminate_connection(self, volume, connector, **kwargs): + def terminate_connection(self, + volume: Volume, + connector: dict, + **kwargs) -> None: pass - def _parse_location(self, location): + def _parse_location(self, location: str) -> List[str]: prefix = 'rbd://' if not location.startswith(prefix): reason = _('Not stored in rbd') @@ -1550,7 +1667,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, raise exception.ImageUnacceptable(image_id=location, reason=reason) return pieces - def _get_fsid(self): + def _get_fsid(self) -> str: with RADOSClient(self) as client: # Librados's get_fsid is represented as binary # in py3 instead of str as it is in py2. @@ -1567,7 +1684,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, # https://tracker.ceph.com/issues/38381 return encodeutils.safe_decode(client.cluster.get_fsid()) - def _is_cloneable(self, image_location, image_meta): + def _is_cloneable(self, image_location: str, image_meta: dict) -> bool: try: fsid, pool, image, snapshot = self._parse_location(image_location) except exception.ImageUnacceptable as e: @@ -1597,9 +1714,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, dict(loc=image_location, err=e)) return False - def clone_image(self, context, volume, - image_location, image_meta, - image_service): + def clone_image(self, + context: context.RequestContext, + volume: Volume, + image_location: Optional[list], + image_meta: dict, + image_service) -> Tuple[dict, bool]: if image_location: # Note: image_location[0] is glance image direct_url. # image_location[1] contains the list of all locations (including @@ -1623,16 +1743,29 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return volume_update, True return ({}, False) - def copy_image_to_encrypted_volume(self, context, volume, image_service, - image_id): + def copy_image_to_encrypted_volume(self, + context: context.RequestContext, + volume: Volume, + image_service, + image_id: str) -> None: self._copy_image_to_volume(context, volume, image_service, image_id, encrypted=True) - def copy_image_to_volume(self, context, volume, image_service, image_id): + def copy_image_to_volume(self, + context: context.RequestContext, + volume: Volume, + image_service, + image_id: str) -> None: self._copy_image_to_volume(context, volume, image_service, image_id) - def _encrypt_image(self, context, volume, tmp_dir, src_image_path): - encryption = volume_utils.check_encryption_provider(volume, context) + def _encrypt_image(self, + context: context.RequestContext, + volume: Volume, + tmp_dir: str, + src_image_path: Any) -> None: + encryption = volume_utils.check_encryption_provider( + volume, + context) # Fetch the key associated with the volume and decode the passphrase keymgr = key_manager.API(CONF) @@ -1663,8 +1796,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, finally: fileutils.delete_if_exists(dest_image_path) - def _copy_image_to_volume(self, context, volume, image_service, image_id, - encrypted=False): + def _copy_image_to_volume(self, + context: context.RequestContext, + volume: Volume, + image_service: Any, + image_id: str, + encrypted: bool = False) -> None: tmp_dir = volume_utils.image_conversion_dir() @@ -1680,7 +1817,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, @utils.retry(exception.VolumeIsBusy, self.configuration.rados_connection_interval, self.configuration.rados_connection_retries) - def _delete_volume(volume): + def _delete_volume(volume: Volume) -> None: self.delete_volume(volume) _delete_volume(volume) @@ -1721,7 +1858,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, volume) os.unlink(tmp_file) - def extend_volume(self, volume, new_size): + def extend_volume(self, volume: Volume, new_size: str) -> None: """Extend an existing volume.""" old_size = volume.size @@ -1737,7 +1874,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, LOG.debug("Extend volume from %(old_size)s GB to %(new_size)s GB.", {'old_size': old_size, 'new_size': new_size}) - def manage_existing(self, volume, existing_ref): + def manage_existing(self, + volume: Volume, existing_ref: Dict[str, str]) -> None: """Manages an existing image. Renames the image name to match the expected name for the volume. @@ -1756,7 +1894,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, utils.convert_str(rbd_name), utils.convert_str(volume.name)) - def manage_existing_get_size(self, volume, existing_ref): + def manage_existing_get_size(self, + volume: Volume, + existing_ref: Dict[str, str]) -> int: """Return size of an existing image for manage_existing. :param volume: @@ -1812,8 +1952,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, out, _ = self._execute(*args) return json.loads(out) - def get_manageable_volumes(self, cinder_volumes, marker, limit, offset, - sort_keys, sort_dirs): + def get_manageable_volumes(self, + cinder_volumes: List[Dict[str, str]], + marker: Optional[Any], + limit: int, + offset: int, + sort_keys: List[str], + sort_dirs: List[str]) -> List[Dict[str, Any]]: manageable_volumes = [] cinder_ids = [resource['id'] for resource in cinder_volumes] @@ -1860,8 +2005,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, def unmanage(self, volume): pass - def update_migrated_volume(self, ctxt, volume, new_volume, - original_volume_status): + def update_migrated_volume(self, + ctxt: Dict, + volume: Volume, + new_volume: Volume, + original_volume_status: str) -> \ + Union[Dict[str, None], Dict[str, Optional[str]]]: """Return model update from RBD for migrated volume. This method should rename the back-end volume name(id) on the @@ -1902,7 +2051,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return {'_name_id': name_id, 'provider_location': provider_location} - def migrate_volume(self, context, volume, host): + def migrate_volume(self, + context: context.RequestContext, + volume: Volume, + host: Dict[str, Dict[str, str]]) -> Tuple[bool, None]: refuse_to_migrate = (False, None) @@ -1982,7 +2134,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return (True, None) - def manage_existing_snapshot_get_size(self, snapshot, existing_ref): + def manage_existing_snapshot_get_size(self, + snapshot: Snapshot, + existing_ref: Dict[str, Any]) -> int: """Return size of an existing image for manage_existing. :param snapshot: @@ -2031,7 +2185,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, raise exception.VolumeBackendAPIException( data=exception_message) - def manage_existing_snapshot(self, snapshot, existing_ref): + def manage_existing_snapshot(self, + snapshot: Snapshot, + existing_ref: Dict[str, Any]) -> None: """Manages an existing snapshot. Renames the snapshot name to match the expected name for the snapshot. @@ -2053,8 +2209,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, if not volume.is_protected_snap(snapshot.name): volume.protect_snap(snapshot.name) - def get_manageable_snapshots(self, cinder_snapshots, marker, limit, offset, - sort_keys, sort_dirs): + def get_manageable_snapshots(self, + cinder_snapshots: List[Dict[str, str]], + marker: Optional[Any], + limit: int, + offset: int, + sort_keys: List[str], + sort_dirs: List[str]) -> List[Dict[str, Any]]: """List manageable snapshots on RBD backend.""" manageable_snapshots = [] cinder_snapshot_ids = [resource['id'] for resource in cinder_snapshots] @@ -2104,7 +2265,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, return volume_utils.paginate_entries_list( manageable_snapshots, marker, limit, offset, sort_keys, sort_dirs) - def unmanage_snapshot(self, snapshot): + def unmanage_snapshot(self, snapshot: Snapshot) -> None: """Removes the specified snapshot from Cinder management.""" with RBDVolumeProxy(self, snapshot.volume_name) as volume: volume.set_snap(snapshot.name) @@ -2113,7 +2274,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD, if not children and volume.is_protected_snap(snapshot.name): volume.unprotect_snap(snapshot.name) - def get_backup_device(self, context, backup): + def get_backup_device(self, + context: context.RequestContext, + backup: Backup) -> Tuple[Volume, bool]: """Get a backup device from an existing volume. To support incremental backups on Ceph to Ceph we don't clone diff --git a/mypy-files.txt b/mypy-files.txt index 1e6d60cdef5..5ec1c0d76ef 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -28,6 +28,7 @@ cinder/scheduler/weights/volume_number.py cinder/utils.py cinder/volume/__init__.py cinder/volume/api.py +cinder/volume/drivers/rbd.py cinder/volume/flows/api/create_volume.py cinder/volume/flows/manager/create_volume.py cinder/volume/manager.py