Add methods for retrieving disks
This change adds two new public methods that will allow retrieving
disks by device number or unique id.
Related-Bug: #1694671
Change-Id: I7c7bbbf422e4f55dde11b90ffc29c1b34901e2e2
(cherry picked from commit 01f804f428
)
This commit is contained in:
parent
40941c9c67
commit
1e3ff66002
|
@ -26,32 +26,75 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(DiskUtilsTestCase, self).setUp()
|
super(DiskUtilsTestCase, self).setUp()
|
||||||
self._diskutils = diskutils.DiskUtils()
|
self._diskutils = diskutils.DiskUtils()
|
||||||
|
self._diskutils._conn_cimv2 = mock.MagicMock()
|
||||||
self._diskutils._conn_storage = mock.MagicMock()
|
self._diskutils._conn_storage = mock.MagicMock()
|
||||||
self._diskutils._win32_utils = mock.MagicMock()
|
self._diskutils._win32_utils = mock.MagicMock()
|
||||||
self._mock_run = self._diskutils._win32_utils.run_and_check_output
|
self._mock_run = self._diskutils._win32_utils.run_and_check_output
|
||||||
|
|
||||||
def test_get_disk(self):
|
@ddt.data(True, False)
|
||||||
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
|
def test_get_disk_by_number(self, msft_disk_cls):
|
||||||
mock_disk = mock_msft_disk_cls.return_value[0]
|
resulted_disk = self._diskutils._get_disk_by_number(
|
||||||
|
mock.sentinel.disk_number,
|
||||||
|
msft_disk_cls=msft_disk_cls)
|
||||||
|
|
||||||
resulted_disk = self._diskutils._get_disk(mock.sentinel.disk_number)
|
if msft_disk_cls:
|
||||||
|
disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||||
|
disk_cls.assert_called_once_with(Number=mock.sentinel.disk_number)
|
||||||
|
else:
|
||||||
|
disk_cls = self._diskutils._conn_cimv2.Win32_DiskDrive
|
||||||
|
disk_cls.assert_called_once_with(Index=mock.sentinel.disk_number)
|
||||||
|
|
||||||
mock_msft_disk_cls.assert_called_once_with(
|
mock_disk = disk_cls.return_value[0]
|
||||||
Number=mock.sentinel.disk_number)
|
|
||||||
self.assertEqual(mock_disk, resulted_disk)
|
self.assertEqual(mock_disk, resulted_disk)
|
||||||
|
|
||||||
def test_get_unexisting_disk(self):
|
def test_get_unexisting_disk_by_number(self):
|
||||||
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
|
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||||
mock_msft_disk_cls.return_value = []
|
mock_msft_disk_cls.return_value = []
|
||||||
|
|
||||||
self.assertRaises(exceptions.DiskNotFound,
|
self.assertRaises(exceptions.DiskNotFound,
|
||||||
self._diskutils._get_disk,
|
self._diskutils._get_disk_by_number,
|
||||||
mock.sentinel.disk_number)
|
mock.sentinel.disk_number)
|
||||||
|
|
||||||
mock_msft_disk_cls.assert_called_once_with(
|
mock_msft_disk_cls.assert_called_once_with(
|
||||||
Number=mock.sentinel.disk_number)
|
Number=mock.sentinel.disk_number)
|
||||||
|
|
||||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
|
def test_get_disk_by_unique_id(self):
|
||||||
|
disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||||
|
mock_disks = disk_cls.return_value
|
||||||
|
|
||||||
|
resulted_disks = self._diskutils._get_disks_by_unique_id(
|
||||||
|
mock.sentinel.unique_id,
|
||||||
|
mock.sentinel.unique_id_format)
|
||||||
|
|
||||||
|
disk_cls.assert_called_once_with(
|
||||||
|
UniqueId=mock.sentinel.unique_id,
|
||||||
|
UniqueIdFormat=mock.sentinel.unique_id_format)
|
||||||
|
|
||||||
|
self.assertEqual(mock_disks, resulted_disks)
|
||||||
|
|
||||||
|
def test_get_unexisting_disk_by_unique_id(self):
|
||||||
|
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||||
|
mock_msft_disk_cls.return_value = []
|
||||||
|
|
||||||
|
self.assertRaises(exceptions.DiskNotFound,
|
||||||
|
self._diskutils._get_disks_by_unique_id,
|
||||||
|
mock.sentinel.unique_id,
|
||||||
|
mock.sentinel.unique_id_format)
|
||||||
|
|
||||||
|
@mock.patch.object(diskutils.DiskUtils, '_get_disks_by_unique_id')
|
||||||
|
def test_get_disk_number_by_unique_id(self, mock_get_disks):
|
||||||
|
mock_disks = [mock.Mock(), mock.Mock()]
|
||||||
|
mock_get_disks.return_value = mock_disks
|
||||||
|
|
||||||
|
exp_disk_numbers = [mock_disk.Number for mock_disk in mock_disks]
|
||||||
|
returned_disk_numbers = self._diskutils.get_disk_numbers_by_unique_id(
|
||||||
|
mock.sentinel.unique_id, mock.sentinel.unique_id_format)
|
||||||
|
|
||||||
|
self.assertEqual(exp_disk_numbers, returned_disk_numbers)
|
||||||
|
mock_get_disks.assert_called_once_with(
|
||||||
|
mock.sentinel.unique_id, mock.sentinel.unique_id_format)
|
||||||
|
|
||||||
|
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
|
||||||
def test_get_disk_uid_and_uid_type(self, mock_get_disk):
|
def test_get_disk_uid_and_uid_type(self, mock_get_disk):
|
||||||
mock_disk = mock_get_disk.return_value
|
mock_disk = mock_get_disk.return_value
|
||||||
|
|
||||||
|
@ -75,7 +118,7 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
|
||||||
{'disk_path': r'\\?\SCSI#disk&ven_fakeVendor',
|
{'disk_path': r'\\?\SCSI#disk&ven_fakeVendor',
|
||||||
'expect_mpio': False})
|
'expect_mpio': False})
|
||||||
@ddt.unpack
|
@ddt.unpack
|
||||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
|
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
|
||||||
def test_is_mpio_disk(self, mock_get_disk, disk_path, expect_mpio):
|
def test_is_mpio_disk(self, mock_get_disk, disk_path, expect_mpio):
|
||||||
mock_disk = mock_get_disk.return_value
|
mock_disk = mock_get_disk.return_value
|
||||||
mock_disk.Path = disk_path
|
mock_disk.Path = disk_path
|
||||||
|
@ -85,7 +128,7 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
|
||||||
|
|
||||||
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number)
|
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number)
|
||||||
|
|
||||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
|
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
|
||||||
def test_refresh_disk(self, mock_get_disk):
|
def test_refresh_disk(self, mock_get_disk):
|
||||||
mock_disk = mock_get_disk.return_value
|
mock_disk = mock_get_disk.return_value
|
||||||
|
|
||||||
|
@ -94,6 +137,16 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
|
||||||
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number)
|
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number)
|
||||||
mock_disk.Refresh.assert_called_once_with()
|
mock_disk.Refresh.assert_called_once_with()
|
||||||
|
|
||||||
|
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
|
||||||
|
def test_get_device_name_by_device_number(self, mock_get_disk):
|
||||||
|
dev_name = self._diskutils.get_device_name_by_device_number(
|
||||||
|
mock.sentinel.disk_number)
|
||||||
|
|
||||||
|
self.assertEqual(mock_get_disk.return_value.Name, dev_name)
|
||||||
|
|
||||||
|
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number,
|
||||||
|
msft_disk_cls=False)
|
||||||
|
|
||||||
def test_get_dev_number_from_dev_name(self):
|
def test_get_dev_number_from_dev_name(self):
|
||||||
fake_physical_device_name = r'\\.\PhysicalDrive15'
|
fake_physical_device_name = r'\\.\PhysicalDrive15'
|
||||||
expected_device_number = '15'
|
expected_device_number = '15'
|
||||||
|
|
|
@ -34,34 +34,65 @@ LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
class DiskUtils(baseutils.BaseUtils):
|
class DiskUtils(baseutils.BaseUtils):
|
||||||
|
|
||||||
_wmi_namespace = 'root/microsoft/windows/storage'
|
_wmi_cimv2_namespace = 'root/cimv2'
|
||||||
|
_wmi_storage_namespace = 'root/microsoft/windows/storage'
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._conn_storage = self._get_wmi_conn(self._wmi_namespace)
|
self._conn_cimv2 = self._get_wmi_conn(self._wmi_cimv2_namespace)
|
||||||
|
self._conn_storage = self._get_wmi_conn(self._wmi_storage_namespace)
|
||||||
self._win32_utils = win32utils.Win32Utils()
|
self._win32_utils = win32utils.Win32Utils()
|
||||||
|
|
||||||
# Physical device names look like \\.\PHYSICALDRIVE1
|
# Physical device names look like \\.\PHYSICALDRIVE1
|
||||||
self._phys_dev_name_regex = re.compile(r'\\\\.*\\[a-zA-Z]*([\d]+)')
|
self._phys_dev_name_regex = re.compile(r'\\\\.*\\[a-zA-Z]*([\d]+)')
|
||||||
|
|
||||||
def _get_disk(self, disk_number):
|
def _get_disk_by_number(self, disk_number, msft_disk_cls=True):
|
||||||
disk = self._conn_storage.Msft_Disk(Number=disk_number)
|
if msft_disk_cls:
|
||||||
|
disk = self._conn_storage.Msft_Disk(Number=disk_number)
|
||||||
|
else:
|
||||||
|
disk = self._conn_cimv2.Win32_DiskDrive(Index=disk_number)
|
||||||
|
|
||||||
if not disk:
|
if not disk:
|
||||||
err_msg = _("Could not find the disk number %s")
|
err_msg = _("Could not find the disk number %s")
|
||||||
raise exceptions.DiskNotFound(err_msg % disk_number)
|
raise exceptions.DiskNotFound(err_msg % disk_number)
|
||||||
return disk[0]
|
return disk[0]
|
||||||
|
|
||||||
|
def _get_disks_by_unique_id(self, unique_id, unique_id_format):
|
||||||
|
# In some cases, multiple disks having the same unique id may be
|
||||||
|
# exposed to the OS. This may happen if there are multiple paths
|
||||||
|
# to the LUN and MPIO is not properly configured. This can be
|
||||||
|
# valuable information to the caller.
|
||||||
|
disks = self._conn_storage.Msft_Disk(UniqueId=unique_id,
|
||||||
|
UniqueIdFormat=unique_id_format)
|
||||||
|
if not disks:
|
||||||
|
err_msg = _("Could not find any disk having unique id "
|
||||||
|
"'%(unique_id)s' and unique id format "
|
||||||
|
"'%(unique_id_format)s'")
|
||||||
|
raise exceptions.DiskNotFound(err_msg % dict(
|
||||||
|
unique_id=unique_id,
|
||||||
|
unique_id_format=unique_id_format))
|
||||||
|
return disks
|
||||||
|
|
||||||
|
def get_disk_numbers_by_unique_id(self, unique_id, unique_id_format):
|
||||||
|
disks = self._get_disks_by_unique_id(unique_id, unique_id_format)
|
||||||
|
return [disk.Number for disk in disks]
|
||||||
|
|
||||||
def get_disk_uid_and_uid_type(self, disk_number):
|
def get_disk_uid_and_uid_type(self, disk_number):
|
||||||
disk = self._get_disk(disk_number)
|
disk = self._get_disk_by_number(disk_number)
|
||||||
return disk.UniqueId, disk.UniqueIdFormat
|
return disk.UniqueId, disk.UniqueIdFormat
|
||||||
|
|
||||||
def is_mpio_disk(self, disk_number):
|
def is_mpio_disk(self, disk_number):
|
||||||
disk = self._get_disk(disk_number)
|
disk = self._get_disk_by_number(disk_number)
|
||||||
return disk.Path.lower().startswith(r'\\?\mpio')
|
return disk.Path.lower().startswith(r'\\?\mpio')
|
||||||
|
|
||||||
def refresh_disk(self, disk_number):
|
def refresh_disk(self, disk_number):
|
||||||
disk = self._get_disk(disk_number)
|
disk = self._get_disk_by_number(disk_number)
|
||||||
disk.Refresh()
|
disk.Refresh()
|
||||||
|
|
||||||
|
def get_device_name_by_device_number(self, device_number):
|
||||||
|
disk = self._get_disk_by_number(device_number,
|
||||||
|
msft_disk_cls=False)
|
||||||
|
return disk.Name
|
||||||
|
|
||||||
def get_device_number_from_device_name(self, device_name):
|
def get_device_number_from_device_name(self, device_name):
|
||||||
matches = self._phys_dev_name_regex.findall(device_name)
|
matches = self._phys_dev_name_regex.findall(device_name)
|
||||||
if matches:
|
if matches:
|
||||||
|
|
Loading…
Reference in New Issue