mypy: initiator

Change-Id: I96d0a1b1276b3c666e8deb251e5bb71c68fc9a30
This commit is contained in:
Eric Harney 2021-04-15 12:04:21 -04:00
parent d5820f5319
commit a519dd8d07
14 changed files with 248 additions and 119 deletions

View File

@ -2,11 +2,19 @@ os_brick/exception.py
os_brick/executor.py
os_brick/i18n.py
os_brick/utils.py
os_brick/initiator/__init__.py
os_brick/initiator/linuxscsi.py
os_brick/initiator/connectors/base.py
os_brick/initiator/connectors/base_iscsi.py
os_brick/initiator/connectors/base_rbd.py
os_brick/initiator/connectors/fibre_channel.py
os_brick/initiator/connectors/iscsi.py
os_brick/initiator/connectors/nvmeof.py
os_brick/initiator/connectors/local.py
os_brick/initiator/connectors/remotefs.py
os_brick/initiator/host_driver.py
os_brick/initiator/linuxfc.py
os_brick/initiator/utils.py
os_brick/remotefs/remotefs.py
os_brick/initiator/linuxrbd.py
os_brick/encryptors/luks.py

View File

@ -19,7 +19,7 @@
"""
import threading
from typing import Tuple
from typing import Callable, Tuple # noqa: H301
from oslo_concurrency import processutils as putils
from oslo_context import context as context_utils
@ -37,11 +37,11 @@ class Executor(object):
self.set_root_helper(root_helper)
@staticmethod
def safe_decode(string):
def safe_decode(string) -> str:
return string and encodeutils.safe_decode(string, errors='ignore')
@classmethod
def make_putils_error_safe(cls, exc):
def make_putils_error_safe(cls, exc: putils.ProcessExecutionError) -> None:
"""Converts ProcessExecutionError string attributes to unicode."""
for field in ('stderr', 'stdout', 'cmd', 'description'):
value = getattr(exc, field, None)
@ -59,10 +59,10 @@ class Executor(object):
self.make_putils_error_safe(e)
raise
def set_execute(self, execute):
def set_execute(self, execute: Callable) -> None:
self.__execute = execute
def set_root_helper(self, helper):
def set_root_helper(self, helper: str) -> None:
self._root_helper = helper
@ -78,7 +78,7 @@ class Thread(threading.Thread):
self.__context__ = context_utils.get_current()
super(Thread, self).__init__(*args, **kwargs)
def run(self):
def run(self) -> None:
# Store the context in the current thread's request store
if self.__context__:
self.__context__.update_store()

View File

@ -16,6 +16,8 @@
import functools
import glob
import os
import typing
from typing import Optional, Tuple # noqa: H301
from oslo_concurrency import lockutils
from oslo_concurrency import processutils as putils
@ -136,7 +138,9 @@ class BaseLinuxConnector(initiator_connector.InitiatorConnector):
return False
return True
def get_all_available_volumes(self, connection_properties=None):
def get_all_available_volumes(
self,
connection_properties: Optional[dict] = None) -> list:
volumes = []
path = self.get_search_path()
if path:
@ -151,7 +155,7 @@ class BaseLinuxConnector(initiator_connector.InitiatorConnector):
def _discover_mpath_device(self,
device_wwn: str,
connection_properties: dict,
device_name: str) -> tuple:
device_name: str) -> Tuple[str, str]:
"""This method discovers a multipath device.
Discover a multipath device based on a defined connection_property
@ -189,4 +193,7 @@ class BaseLinuxConnector(initiator_connector.InitiatorConnector):
except exception.BlockDeviceReadOnly:
LOG.warning('Block device %s is still read-only. '
'Continuing anyway.', device_path)
device_path = typing.cast(str, device_path)
multipath_id = typing.cast(str, multipath_id)
return device_path, multipath_id

View File

@ -12,14 +12,19 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import copy
from typing import Any, Dict, Generator # noqa: H301
from os_brick.initiator import initiator_connector
class BaseISCSIConnector(initiator_connector.InitiatorConnector):
def _iterate_all_targets(self, connection_properties):
def _iterate_all_targets(
self,
connection_properties: dict) -> Generator[Dict[str, Any],
None, None]:
for portal, iqn, lun in self._get_all_targets(connection_properties):
props = copy.deepcopy(connection_properties)
props['target_portal'] = portal
@ -30,12 +35,14 @@ class BaseISCSIConnector(initiator_connector.InitiatorConnector):
yield props
@staticmethod
def _get_luns(con_props, iqns=None):
def _get_luns(con_props: dict, iqns=None) -> list:
luns = con_props.get('target_luns')
num_luns = len(con_props['target_iqns']) if iqns is None else len(iqns)
return luns or [con_props['target_lun']] * num_luns
def _get_all_targets(self, connection_properties):
def _get_all_targets(
self,
connection_properties: dict) -> list[tuple[str, str, list]]:
if all(key in connection_properties for key in ('target_portals',
'target_iqns')):
return list(zip(connection_properties['target_portals'],

View File

@ -13,6 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any
from oslo_log import log as logging
from oslo_utils import netutils
@ -23,20 +27,22 @@ class RBDConnectorMixin(object):
"""Mixin covering cross platform RBD connector functionality"""
@staticmethod
def _sanitize_mon_hosts(hosts):
def _sanitize_host(host):
def _sanitize_mon_hosts(hosts: list[str]) -> list[str]:
def _sanitize_host(host: str) -> str:
if netutils.is_valid_ipv6(host):
host = '[%s]' % host
return host
return list(map(_sanitize_host, hosts))
@classmethod
def _get_rbd_args(cls, connection_properties, conf=None):
def _get_rbd_args(cls,
connection_properties: dict[str, Any],
conf: str = None) -> list[str]:
user = connection_properties.get('auth_username')
monitor_ips = connection_properties.get('hosts')
monitor_ports = connection_properties.get('ports')
args = []
args: list[str] = []
if user:
args = ['--id', user]
if monitor_ips and monitor_ports:

View File

@ -12,7 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import typing
from typing import Any, Optional # noqa: H301
from oslo_log import log as logging
from oslo_service import loopingcall
@ -30,25 +34,33 @@ LOG = logging.getLogger(__name__)
class FibreChannelConnector(base.BaseLinuxConnector):
"""Connector class to attach/detach Fibre Channel volumes."""
def __init__(self, root_helper, driver=None,
execute=None, use_multipath=False,
device_scan_attempts=initiator.DEVICE_SCAN_ATTEMPTS_DEFAULT,
*args, **kwargs):
def __init__(
self,
root_helper: str,
driver=None,
execute: Optional[str] = None,
use_multipath: bool = False,
device_scan_attempts: int = initiator.DEVICE_SCAN_ATTEMPTS_DEFAULT,
*args, **kwargs):
self._linuxfc = linuxfc.LinuxFibreChannel(root_helper, execute)
super(FibreChannelConnector, self).__init__(
root_helper, driver=driver,
execute=execute,
device_scan_attempts=device_scan_attempts,
*args, **kwargs)
*args, **kwargs) # type: ignore
self.use_multipath = use_multipath
self.device_name: Optional[str]
self.host_device: Optional[str]
self.tries: int
def set_execute(self, execute):
def set_execute(self, execute) -> None:
super(FibreChannelConnector, self).set_execute(execute)
self._linuxscsi.set_execute(execute)
self._linuxfc.set_execute(execute)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
def get_connector_properties(
root_helper: str, *args, **kwargs) -> dict[str, Any]:
"""The Fibre Channel connector properties."""
props = {}
fc = linuxfc.LinuxFibreChannel(root_helper,
@ -63,11 +75,12 @@ class FibreChannelConnector(base.BaseLinuxConnector):
return props
def get_search_path(self):
def get_search_path(self) -> str:
"""Where do we look for FC based volumes."""
return '/dev/disk/by-path'
def _add_targets_to_connection_properties(self, connection_properties):
def _add_targets_to_connection_properties(
self, connection_properties: dict) -> dict:
LOG.debug('Adding targets to connection properties receives: %s',
connection_properties)
target_wwn = connection_properties.get('target_wwn')
@ -144,13 +157,15 @@ class FibreChannelConnector(base.BaseLinuxConnector):
connection_properties)
return connection_properties
def _get_possible_volume_paths(self, connection_properties, hbas):
def _get_possible_volume_paths(
self,
connection_properties: dict, hbas) -> list[str]:
targets = connection_properties['targets']
possible_devs = self._get_possible_devices(hbas, targets)
host_paths = self._get_host_devices(possible_devs)
return host_paths
def get_volume_paths(self, connection_properties):
def get_volume_paths(self, connection_properties: dict) -> list:
volume_paths = []
# first fetch all of the potential paths that might exist
# how the FC fabric is zoned may alter the actual list
@ -167,7 +182,7 @@ class FibreChannelConnector(base.BaseLinuxConnector):
@utils.trace
@base.synchronized('extend_volume', external=True)
@utils.connect_volume_undo_prepare_result
def extend_volume(self, connection_properties):
def extend_volume(self, connection_properties: dict) -> Optional[int]:
"""Update the local kernel's size information.
Try and update the local kernel's size information
@ -189,7 +204,7 @@ class FibreChannelConnector(base.BaseLinuxConnector):
@utils.trace
@utils.connect_volume_prepare_result
@base.synchronized('connect_volume', external=True)
def connect_volume(self, connection_properties):
def connect_volume(self, connection_properties: dict) -> dict:
"""Attach the volume to instance_name.
:param connection_properties: The dictionary that describes all
@ -217,7 +232,7 @@ class FibreChannelConnector(base.BaseLinuxConnector):
# The /dev/disk/by-path/... node is not always present immediately
# We only need to find the first device. Once we see the first device
# multipath will have any others.
def _wait_for_device_discovery(host_devices):
def _wait_for_device_discovery(host_devices: list[str]) -> None:
for device in host_devices:
LOG.debug("Looking for Fibre Channel dev %(device)s",
{'device': device})
@ -246,6 +261,8 @@ class FibreChannelConnector(base.BaseLinuxConnector):
_wait_for_device_discovery, host_devices)
timer.start(interval=2).wait()
self.host_device = typing.cast(str, self.host_device)
LOG.debug("Found Fibre Channel volume %(name)s "
"(after %(tries)s rescans.)",
{'name': self.device_name, 'tries': self.tries})
@ -261,6 +278,7 @@ class FibreChannelConnector(base.BaseLinuxConnector):
# Pass a symlink, not a real path, otherwise we'll get a real path
# back if we don't find a multipath and we'll return that to the
# caller, breaking Nova's encryption which requires a symlink.
assert self.host_device is not None
(device_path, multipath_id) = self._discover_mpath_device(
device_wwn, connection_properties, self.host_device)
if multipath_id:
@ -270,10 +288,12 @@ class FibreChannelConnector(base.BaseLinuxConnector):
else:
device_path = self.host_device
device_path = typing.cast(str, device_path)
device_info['path'] = device_path
return device_info
def _get_host_devices(self, possible_devs):
def _get_host_devices(self, possible_devs: list) -> list:
"""Compute the device paths on the system with an id, wwn, and lun
:param possible_devs: list of (platform, pci_id, wwn, lun) tuples
@ -289,7 +309,7 @@ class FibreChannelConnector(base.BaseLinuxConnector):
host_devices.append(host_device)
return host_devices
def _get_possible_devices(self, hbas, targets):
def _get_possible_devices(self, hbas: list, targets: list) -> list:
"""Compute the possible fibre channel device options.
:param hbas: available hba devices.
@ -315,8 +335,11 @@ class FibreChannelConnector(base.BaseLinuxConnector):
@utils.trace
@base.synchronized('connect_volume', external=True)
@utils.connect_volume_undo_prepare_result(unlink_after=True)
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
def disconnect_volume(self,
connection_properties: dict,
device_info: dict,
force: bool = False,
ignore_errors: bool = False) -> None:
"""Detach the volume from instance_name.
:param connection_properties: The dictionary that describes all
@ -346,13 +369,17 @@ class FibreChannelConnector(base.BaseLinuxConnector):
mpath_path = self._linuxscsi.find_multipath_device_path(wwn)
if mpath_path:
self._linuxscsi.flush_multipath_device(mpath_path)
real_path = typing.cast(str, real_path)
dev_info = self._linuxscsi.get_device_info(real_path)
devices.append(dev_info)
LOG.debug("devices to remove = %s", devices)
self._remove_devices(connection_properties, devices, device_info)
def _remove_devices(self, connection_properties, devices, device_info):
def _remove_devices(self,
connection_properties: dict,
devices: list,
device_info: dict) -> None:
# There may have been more than 1 device mounted
# by the kernel for this volume. We have to remove
# all of them
@ -373,7 +400,7 @@ class FibreChannelConnector(base.BaseLinuxConnector):
was_multipath)
self._linuxscsi.remove_scsi_device(device_path, flush=flush)
def _get_pci_num(self, hba):
def _get_pci_num(self, hba: Optional[dict]) -> tuple:
# NOTE(walter-boring)
# device path is in format of (FC and FCoE) :
# /sys/devices/pci0000:00/0000:00:03.0/0000:05:00.3/host2/fc_host/host2

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from collections import defaultdict
import copy
@ -19,7 +20,7 @@ import glob
import os
import re
import time
from typing import List, Tuple # noqa: H301
from typing import Any, Iterable, Optional, Union # noqa: H301
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
@ -49,7 +50,8 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
self, root_helper: str, driver=None,
execute=None, use_multipath: bool = False,
device_scan_attempts: int = initiator.DEVICE_SCAN_ATTEMPTS_DEFAULT,
transport='default', *args, **kwargs):
transport: str = 'default',
*args, **kwargs):
super(ISCSIConnector, self).__init__(
root_helper, driver=driver,
execute=execute,
@ -105,7 +107,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
return volume_paths
def _get_iscsi_sessions_full(self) -> List[tuple]:
def _get_iscsi_sessions_full(self) -> list[tuple[str, str, str, str, str]]:
"""Get iSCSI session information as a list of tuples.
Uses iscsiadm -m session and from a command output like
@ -123,7 +125,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
# Parse and clean the output from iscsiadm, which is in the form of:
# transport_name: [session_id] ip_address:port,tpgt iqn node_type
lines: List[tuple] = []
lines: list[tuple[str, str, str, str, str]] = []
for line in out.splitlines():
if line:
info = line.split()
@ -132,7 +134,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
lines.append((info[0], sid, portal, tpgt, info[3]))
return lines
def _get_iscsi_nodes(self) -> List[tuple]:
def _get_iscsi_nodes(self) -> list[tuple]:
"""Get iSCSI node information (portal, iqn) as a list of tuples.
Uses iscsiadm -m node and from a command output like
@ -151,7 +153,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
# Parse and clean the output from iscsiadm which is in the form of:
# ip_address:port,tpgt iqn
lines: List[tuple] = []
lines: list[tuple] = []
for line in out.splitlines():
if line:
info = line.split()
@ -166,10 +168,11 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
# entry: [tcp, [1], 192.168.121.250:3260,1 ...]
return [entry[2] for entry in self._get_iscsi_sessions_full()]
def _get_ips_iqns_luns(self,
connection_properties: dict,
discover: bool = True,
is_disconnect_call: bool = False):
def _get_ips_iqns_luns(
self,
connection_properties: dict,
discover: bool = True,
is_disconnect_call: bool = False) -> list[tuple[Any, Any, Any]]:
"""Build a list of ips, iqns, and luns.
Used when doing singlepath and multipath, and we have 4 cases:
@ -238,7 +241,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
return ips_iqns_luns
def _get_potential_volume_paths(self,
connection_properties: dict) -> List[str]:
connection_properties: dict) -> list[str]:
"""Build a list of potential volume paths that exist.
Given a list of target_portals in the connection_properties,
@ -265,19 +268,19 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
LOG.info("Multipath discovery for iSCSI not enabled.")
iscsi_sessions = self._get_iscsi_sessions()
host_devices = set()
host_devices_set: set = set()
for props in self._iterate_all_targets(connection_properties):
# If we aren't trying to connect to the portal, we
# want to find ALL possible paths from all of the
# alternate portals
if props['target_portal'] in iscsi_sessions:
paths = self._get_device_path(props)
host_devices.update(paths)
host_devices = list(host_devices)
host_devices_set.update(paths)
host_devices = list(host_devices_set)
return host_devices
def set_execute(self, execute):
def set_execute(self, execute) -> None:
super(ISCSIConnector, self).set_execute(execute)
self._linuxscsi.set_execute(execute)
@ -324,7 +327,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
return self.transport
def _get_discoverydb_portals(self,
connection_properties: dict) -> List[tuple]:
connection_properties: dict) -> list[tuple]:
"""Retrieve iscsi portals information from the discoverydb.
Example of discoverydb command output:
@ -452,8 +455,10 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
luns = self._get_luns(connection_properties, iqns)
return list(zip(ips, iqns, luns))
def _run_iscsiadm_update_discoverydb(self, connection_properties,
iscsi_transport='default'):
def _run_iscsiadm_update_discoverydb(
self,
connection_properties: dict,
iscsi_transport: str = 'default') -> tuple[str, str]:
return self._execute(
'iscsiadm',
'-m', 'discoverydb',
@ -473,7 +478,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
@utils.trace
@base.synchronized('extend_volume', external=True)
@utils.connect_volume_undo_prepare_result
def extend_volume(self, connection_properties: dict):
def extend_volume(self, connection_properties: dict) -> Optional[int]:
"""Update the local kernel's size information.
Try and update the local kernel's size information
@ -497,7 +502,9 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
@utils.trace
@utils.connect_volume_prepare_result
@base.synchronized('connect_volume', external=True)
def connect_volume(self, connection_properties: dict):
def connect_volume(
self,
connection_properties: dict) -> Optional[dict[str, str]]:
"""Attach the volume to instance_name.
:param connection_properties: The valid dictionary that describes all
@ -522,7 +529,13 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
with excutils.save_and_reraise_exception():
self._cleanup_connection(connection_properties, force=True)
def _get_connect_result(self, con_props, wwn, devices_names, mpath=None):
return None
def _get_connect_result(self,
con_props: dict,
wwn: str,
devices_names: list[str],
mpath: Optional[str] = None) -> dict[str, str]:
device = '/dev/' + (mpath or devices_names[0])
result = {'type': 'block', 'scsi_wwn': wwn, 'path': device}
if mpath:
@ -530,11 +543,14 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
return result
@utils.retry((exception.VolumeDeviceNotFound))
def _connect_single_volume(self, connection_properties):
def _connect_single_volume(
self,
connection_properties: dict) -> Optional[dict[str, str]]:
"""Connect to a volume using a single path."""
data = {'stop_connecting': False, 'num_logins': 0, 'failed_logins': 0,
'stopped_threads': 0, 'found_devices': [],
'just_added_devices': []}
data: dict[str, Any] = {'stop_connecting': False,
'num_logins': 0, 'failed_logins': 0,
'stopped_threads': 0, 'found_devices': [],
'just_added_devices': []}
for props in self._iterate_all_targets(connection_properties):
self._connect_vol(self.device_scan_attempts, props, data)
@ -547,14 +563,14 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
time.sleep(1)
else:
LOG.debug('Could not find the WWN for %s.',
found_devs[0]) # type: ignore
found_devs[0])
return self._get_connect_result(connection_properties,
wwn, found_devs)
# If we failed we must cleanup the connection, as we could be
# leaving the node entry if it's not being used by another device.
ips_iqns_luns = ((props['target_portal'], props['target_iqn'],
props['target_lun']), )
ips_iqns_luns = [(props['target_portal'], props['target_iqn'],
props['target_lun']), ]
self._cleanup_connection(props, ips_iqns_luns, force=True,
ignore_errors=True)
# Reset connection result values for next try
@ -562,7 +578,8 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
raise exception.VolumeDeviceNotFound(device='')
def _connect_vol(self, rescans, props, data):
def _connect_vol(self,
rescans: int, props: dict, data: dict[str, Any]) -> None:
"""Make a connection to a volume, send scans and wait for the device.
This method is specifically designed to support multithreading and
@ -657,7 +674,9 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
data['stopped_threads'] += 1
@utils.retry((exception.VolumeDeviceNotFound))
def _connect_multipath_volume(self, connection_properties):
def _connect_multipath_volume(
self,
connection_properties: dict) -> Optional[dict[str, str]]:
"""Connect to a multipathed volume launching parallel login requests.
We will be doing parallel login requests, which will considerably speed
@ -673,7 +692,8 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
it's already known by multipathd it would have been discarded if it
was the first time this volume was seen here.
"""
wwn = mpath = None
wwn: Optional[str] = None
mpath = None
wwn_added = False
last_try_on = 0.0
found: list = []
@ -758,11 +778,16 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
'bad and will perform poorly.')
elif not wwn:
wwn = self._linuxscsi.get_sysfs_wwn(found, mpath)
assert wwn is not None
return self._get_connect_result(connection_properties, wwn, found,
mpath)
def _get_connection_devices(self, connection_properties,
ips_iqns_luns=None, is_disconnect_call=False):
def _get_connection_devices(
self,
connection_properties: dict,
ips_iqns_luns: Optional[list[tuple[str, str, str]]] = None,
is_disconnect_call: bool = False) -> dict[set, set]:
"""Get map of devices by sessions from our connection.
For each of the TCP sessions that correspond to our connection
@ -830,8 +855,11 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
@utils.trace
@base.synchronized('connect_volume', external=True)
@utils.connect_volume_undo_prepare_result(unlink_after=True)
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
def disconnect_volume(self,
connection_properties: dict,
device_info: dict,
force: bool = False,
ignore_errors: bool = False) -> None:
"""Detach the volume from instance_name.
:param connection_properties: The dictionary that describes all
@ -854,9 +882,14 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
device_info=device_info,
is_disconnect_call=True)
def _cleanup_connection(self, connection_properties, ips_iqns_luns=None,
force=False, ignore_errors=False,
device_info=None, is_disconnect_call=False):
def _cleanup_connection(
self,
connection_properties: dict,
ips_iqns_luns: Optional[list[tuple[Any, Any, Any]]] = None,
force: bool = False,
ignore_errors: bool = False,
device_info: Optional[dict] = None,
is_disconnect_call: bool = False) -> None:
"""Cleans up connection flushing and removing devices and multipath.
:param connection_properties: The dictionary that describes all
@ -924,7 +957,9 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
if not ignore_errors:
raise exc # type: ignore
def _munge_portal(self, target):
def _munge_portal(
self,
target: tuple[str, str, Union[list, str]]) -> tuple[str, str, str]:
"""Remove brackets from portal.
In case IPv6 address was used the udev path should not contain any
@ -934,7 +969,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
return (portal.replace('[', '').replace(']', ''), iqn,
self._linuxscsi.process_lun_id(lun))
def _get_device_path(self, connection_properties):
def _get_device_path(self, connection_properties: dict) -> list:
if self._get_transport() == "default":
return ["/dev/disk/by-path/ip-%s-iscsi-%s-lun-%s" %
self._munge_portal(x) for x in
@ -952,7 +987,7 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
device_list.extend(look_for_device)
return device_list
def get_initiator(self):
def get_initiator(self) -> Optional[str]:
"""Secure helper to read file as root."""
file_path = '/etc/iscsi/initiatorname.iscsi'
try:
@ -967,7 +1002,12 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
file_path)
return None
def _run_iscsiadm(self, connection_properties, iscsi_command, **kwargs):
return None
def _run_iscsiadm(self,
connection_properties: dict,
iscsi_command: tuple[str, ...],
**kwargs) -> tuple[str, str]:
check_exit_code = kwargs.pop('check_exit_code', 0)
attempts = kwargs.pop('attempts', 1)
delay_on_retry = kwargs.pop('delay_on_retry', True)
@ -987,14 +1027,15 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
return (out, err)
def _iscsiadm_update(self, connection_properties, property_key,
property_value, **kwargs):
def _iscsiadm_update(self, connection_properties: dict, property_key: str,
property_value, **kwargs) -> tuple[str, str]:
iscsi_command = ('--op', 'update', '-n', property_key,
'-v', property_value)
return self._run_iscsiadm(connection_properties, iscsi_command,
**kwargs)
def _get_target_portals_from_iscsiadm_output(self, output):
def _get_target_portals_from_iscsiadm_output(
self, output: str) -> tuple[list[str], list[str]]:
# return both portals and iqns as 2 lists
#
# as we are parsing a command line utility, allow for the
@ -1008,7 +1049,10 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
iqns.append(data[1])
return ips, iqns
def _connect_to_iscsi_portal(self, connection_properties):
def _connect_to_iscsi_portal(
self,
connection_properties: dict) -> tuple[Optional[str],
Optional[bool]]:
"""Safely connect to iSCSI portal-target and return the session id."""
portal = connection_properties['target_portal'].split(",")[0]
target_iqn = connection_properties['target_iqn']
@ -1019,7 +1063,10 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
return method(connection_properties)
@utils.retry((exception.BrickException))
def _connect_to_iscsi_portal_unsafe(self, connection_properties):
def _connect_to_iscsi_portal_unsafe(
self,
connection_properties: dict) -> tuple[Optional[str],
Optional[bool]]:
"""Connect to an iSCSI portal-target an return the session id."""
portal = connection_properties['target_portal'].split(",")[0]
target_iqn = connection_properties['target_iqn']
@ -1077,24 +1124,25 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
# Found our session, return session_id
if (s[0] in self.VALID_SESSIONS_PREFIX and
portal.lower() == s[2].lower() and s[4] == target_iqn):
return s[1], manual_scan
return str(s[1]), manual_scan
try:
# exit_code=15 means the session already exists, so it should
# be regarded as successful login.
self._run_iscsiadm(connection_properties, ("--login",),
check_exit_code=(0, 15, 255))
except putils.ProcessExecutionError as err:
except putils.ProcessExecutionError as p_err:
LOG.warning('Failed to login iSCSI target %(iqn)s on portal '
'%(portal)s (exit code %(err)s).',
{'iqn': target_iqn, 'portal': portal,
'err': err.exit_code})
'err': p_err.exit_code})
return None, None
self._iscsiadm_update(connection_properties,
"node.startup",
"automatic")
def _disconnect_from_iscsi_portal(self, connection_properties):
def _disconnect_from_iscsi_portal(self,
connection_properties: dict) -> None:
self._iscsiadm_update(connection_properties, "node.startup", "manual",
check_exit_code=[0, 21, 255])
self._run_iscsiadm(connection_properties, ("--logout",),
@ -1104,8 +1152,11 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
attempts=5,
delay_on_retry=True)
def _disconnect_connection(self, connection_properties, connections, force,
exc):
def _disconnect_connection(self,
connection_properties: dict,
connections: Iterable,
force: bool,
exc) -> None:
LOG.debug('Disconnecting from: %s', connections)
props = connection_properties.copy()
for ip, iqn in connections:
@ -1114,14 +1165,14 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
with exc.context(force, 'Disconnect from %s %s failed', ip, iqn):
self._disconnect_from_iscsi_portal(props)
def _run_iscsi_session(self):
def _run_iscsi_session(self) -> tuple[str, str]:
(out, err) = self._run_iscsiadm_bare(('-m', 'session'),
check_exit_code=[0, 21, 255])
LOG.debug("iscsi session list stdout=%(out)s stderr=%(err)s",
{'out': out, 'err': err})
return (out, err)
def _run_iscsiadm_bare(self, iscsi_command, **kwargs) -> Tuple[str, str]:
def _run_iscsiadm_bare(self, iscsi_command, **kwargs) -> tuple[str, str]:
check_exit_code = kwargs.pop('check_exit_code', 0)
(out, err) = self._execute('iscsiadm',
*iscsi_command,

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from os_brick.i18n import _
from os_brick.initiator.connectors import base
from os_brick import utils
@ -23,14 +25,14 @@ class LocalConnector(base.BaseLinuxConnector):
def __init__(self, root_helper, driver=None,
*args, **kwargs):
super(LocalConnector, self).__init__(root_helper, driver=driver,
*args, **kwargs)
*args, **kwargs) # type: ignore
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
def get_connector_properties(root_helper: str, *args, **kwargs) -> dict:
"""The Local connector properties."""
return {}
def get_volume_paths(self, connection_properties):
def get_volume_paths(self, connection_properties: dict) -> list[str]:
path = connection_properties['device_path']
return [path]
@ -42,7 +44,7 @@ class LocalConnector(base.BaseLinuxConnector):
return []
@utils.trace
def connect_volume(self, connection_properties):
def connect_volume(self, connection_properties: dict) -> dict:
"""Connect to a volume.
:param connection_properties: The dictionary that describes all of the

View File

@ -12,6 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any, Callable # noqa: H301
from oslo_log import log as logging
from os_brick import initiator
@ -43,6 +47,7 @@ class RemoteFsConnector(base.BaseLinuxConnector):
LOG.warning("Connection details not present."
" RemoteFsClient may not initialize properly.")
cls: Any
if mount_type_lower == 'scality':
cls = remotefs.ScalityRemoteFsClient
elif mount_type_lower == 'vzstorage':
@ -56,21 +61,21 @@ class RemoteFsConnector(base.BaseLinuxConnector):
root_helper, driver=driver,
execute=execute,
device_scan_attempts=device_scan_attempts,
*args, **kwargs)
*args, **kwargs) # type: ignore
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The RemoteFS connector properties."""
return {}
def set_execute(self, execute):
def set_execute(self, execute: Callable) -> None:
super(RemoteFsConnector, self).set_execute(execute)
self._remotefsclient.set_execute(execute)
def get_search_path(self):
def get_search_path(self) -> str:
return self._remotefsclient.get_mount_base()
def _get_volume_path(self, connection_properties):
def _get_volume_path(self, connection_properties: dict[str, Any]) -> str:
mnt_flags = []
if connection_properties.get('options'):
mnt_flags = connection_properties['options'].split()
@ -81,12 +86,15 @@ class RemoteFsConnector(base.BaseLinuxConnector):
path = mount_point + '/' + connection_properties['name']
return path
def get_volume_paths(self, connection_properties):
def get_volume_paths(self,
connection_properties: dict[str, Any]) -> list[str]:
path = self._get_volume_path(connection_properties)
return [path]
@utils.trace
def connect_volume(self, connection_properties):
def connect_volume(
self,
connection_properties: dict[str, Any]) -> dict[str, Any]:
"""Ensure that the filesystem containing the volume is mounted.
:param connection_properties: The dictionary that describes all
@ -105,8 +113,11 @@ class RemoteFsConnector(base.BaseLinuxConnector):
return {'path': path}
@utils.trace
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
def disconnect_volume(self,
connection_properties: dict[str, Any],
device_info: dict,
force: bool = False,
ignore_errors: bool = False) -> None:
"""No need to do anything to disconnect a volume in a filesystem.
:param connection_properties: The dictionary that describes all
@ -116,6 +127,6 @@ class RemoteFsConnector(base.BaseLinuxConnector):
:type device_info: dict
"""
def extend_volume(self, connection_properties):
def extend_volume(self, connection_properties: dict[str, Any]):
# TODO(walter-boring): is this possible?
raise NotImplementedError

View File

@ -15,11 +15,12 @@
import errno
import os
from typing import List
class HostDriver(object):
def get_all_block_devices(self):
def get_all_block_devices(self) -> List[str]:
"""Get the list of all block devices seen in /dev/disk/by-path/."""
dir = "/dev/disk/by-path/"
try:

View File

@ -14,8 +14,11 @@
"""Generic linux Fibre Channel utilities."""
from __future__ import annotations
import glob
import os
from typing import Dict, Iterable, List # noqa: H301
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
@ -81,7 +84,9 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
luns_not_found.add(lun)
return ctls, luns_not_found
def rescan_hosts(self, hbas, connection_properties):
def rescan_hosts(self,
hbas: Iterable,
connection_properties: dict) -> None:
LOG.debug('Rescaning HBAs %(hbas)s with connection properties '
'%(conn_props)s', {'hbas': hbas,
'conn_props': connection_properties})
@ -145,7 +150,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
'l': target_lun})
@classmethod
def get_fc_hbas(cls):
def get_fc_hbas(cls) -> list[dict[str, str]]:
"""Get the Fibre Channel HBA information from sysfs."""
hbas = []
for hostpath in glob.glob(f'{cls.FC_HOST_SYSFS_PATH}/*'):
@ -161,7 +166,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
{'hp': hostpath, 'exc': exc})
return hbas
def get_fc_hbas_info(self):
def get_fc_hbas_info(self) -> List[Dict[str, str]]:
"""Get Fibre Channel WWNs and device paths from the system, if any."""
hbas = self.get_fc_hbas()
@ -177,7 +182,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
'device_path': device_path})
return hbas_info
def get_fc_wwpns(self):
def get_fc_wwpns(self) -> List[str]:
"""Get Fibre Channel WWPNs from the system, if any."""
hbas = self.get_fc_hbas()
@ -189,7 +194,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
return wwpns
def get_fc_wwnns(self):
def get_fc_wwnns(self) -> List[str]:
"""Get Fibre Channel WWNNs from the system, if any."""
hbas = self.get_fc_hbas()

View File

@ -132,7 +132,7 @@ class LinuxSCSI(executor.Executor):
LOG.debug('dev_info=%s', str(dev_info))
return dev_info
def get_sysfs_wwn(self, device_names, mpath=None) -> str:
def get_sysfs_wwn(self, device_names: List[str], mpath=None) -> str:
"""Return the wwid from sysfs in any of devices in udev format."""
# If we have a multipath DM we know that it has found the WWN
if mpath:
@ -154,7 +154,7 @@ class LinuxSCSI(executor.Executor):
return wwid
# If we have multiple designators use symlinks to find out the wwn
device_names = set(device_names)
device_names_set = set(device_names)
for wwn_path in wwn_paths:
try:
if os.path.islink(wwn_path) and os.stat(wwn_path):
@ -170,11 +170,11 @@ class LinuxSCSI(executor.Executor):
dm_devs = os.listdir(slaves_path)
# This is the right wwn_path if the devices we have
# attached belong to the dm we followed
if device_names.intersection(dm_devs):
if device_names_set.intersection(dm_devs):
break
# This is the right wwn_path if devices we have
elif name in device_names:
elif name in device_names_set:
break
except OSError:
continue
@ -197,7 +197,7 @@ class LinuxSCSI(executor.Executor):
return udev_wwid
return ''
def get_scsi_wwn(self, path):
def get_scsi_wwn(self, path: str) -> str:
"""Read the WWN from page 0x83 value for a SCSI device."""
(out, _err) = self._execute('/lib/udev/scsi_id', '--page', '0x83',
@ -601,7 +601,9 @@ class LinuxSCSI(executor.Executor):
ctx.reraise = False
time.sleep(1)
def extend_volume(self, volume_paths, use_multipath=False):
def extend_volume(self,
volume_paths: list,
use_multipath: bool = False) -> Optional[int]:
"""Signal the SCSI subsystem to test for volume resize.
This function tries to signal the local system's kernel

View File

@ -13,14 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import os
from typing import Generator
from oslo_concurrency import lockutils
from oslo_concurrency import processutils as putils
def check_manual_scan():
def check_manual_scan() -> bool:
if os.name == 'nt':
return False
@ -35,7 +37,7 @@ ISCSI_SUPPORTS_MANUAL_SCAN = check_manual_scan()
@contextlib.contextmanager
def guard_connection(device):
def guard_connection(device: dict) -> Generator:
"""Context Manager handling locks for attach/detach operations.
In Cinder microversion 3.69 the shared_targets field for volumes are

View File

@ -1172,7 +1172,7 @@ Setting up iSCSI targets: unused
cleanup_mock.assert_called_once_with(
{'target_lun': 4, 'volume_id': 'vol_id',
'target_portal': 'ip1:port1', 'target_iqn': 'tgt1'},
(('ip1:port1', 'tgt1', 4),),
[('ip1:port1', 'tgt1', 4), ],
force=True, ignore_errors=True)
@mock.patch.object(linuxscsi.LinuxSCSI, 'get_sysfs_wwn', return_value='')
@ -1223,7 +1223,7 @@ Setting up iSCSI targets: unused
calls_per_try = [
mock.call({'target_portal': prop[0], 'target_iqn': prop[1],
'target_lun': prop[2], 'volume_id': 'vol_id'},
(prop,), force=True, ignore_errors=True)
[prop, ], force=True, ignore_errors=True)
for prop in props
]
cleanup_mock.assert_has_calls(calls_per_try * 3)