diff --git a/os_win/tests/unit/utils/storage/test_diskutils.py b/os_win/tests/unit/utils/storage/test_diskutils.py index 33ae6f36..0c38f131 100644 --- a/os_win/tests/unit/utils/storage/test_diskutils.py +++ b/os_win/tests/unit/utils/storage/test_diskutils.py @@ -28,32 +28,75 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase): def setUp(self): super(DiskUtilsTestCase, self).setUp() self._diskutils = diskutils.DiskUtils() + self._diskutils._conn_cimv2 = mock.MagicMock() self._diskutils._conn_storage = mock.MagicMock() self._diskutils._win32_utils = mock.MagicMock() self._mock_run = self._diskutils._win32_utils.run_and_check_output - def test_get_disk(self): - mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk - mock_disk = mock_msft_disk_cls.return_value[0] + @ddt.data(True, False) + def test_get_disk_by_number(self, msft_disk_cls): + resulted_disk = self._diskutils._get_disk_by_number( + mock.sentinel.disk_number, + msft_disk_cls=msft_disk_cls) - resulted_disk = self._diskutils._get_disk(mock.sentinel.disk_number) + if msft_disk_cls: + disk_cls = self._diskutils._conn_storage.Msft_Disk + disk_cls.assert_called_once_with(Number=mock.sentinel.disk_number) + else: + disk_cls = self._diskutils._conn_cimv2.Win32_DiskDrive + disk_cls.assert_called_once_with(Index=mock.sentinel.disk_number) - mock_msft_disk_cls.assert_called_once_with( - Number=mock.sentinel.disk_number) + mock_disk = disk_cls.return_value[0] self.assertEqual(mock_disk, resulted_disk) - def test_get_unexisting_disk(self): + def test_get_unexisting_disk_by_number(self): mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk mock_msft_disk_cls.return_value = [] self.assertRaises(exceptions.DiskNotFound, - self._diskutils._get_disk, + self._diskutils._get_disk_by_number, mock.sentinel.disk_number) mock_msft_disk_cls.assert_called_once_with( Number=mock.sentinel.disk_number) - @mock.patch.object(diskutils.DiskUtils, '_get_disk') + def test_get_disk_by_unique_id(self): + disk_cls = self._diskutils._conn_storage.Msft_Disk + mock_disks = disk_cls.return_value + + resulted_disks = self._diskutils._get_disks_by_unique_id( + mock.sentinel.unique_id, + mock.sentinel.unique_id_format) + + disk_cls.assert_called_once_with( + UniqueId=mock.sentinel.unique_id, + UniqueIdFormat=mock.sentinel.unique_id_format) + + self.assertEqual(mock_disks, resulted_disks) + + def test_get_unexisting_disk_by_unique_id(self): + mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk + mock_msft_disk_cls.return_value = [] + + self.assertRaises(exceptions.DiskNotFound, + self._diskutils._get_disks_by_unique_id, + mock.sentinel.unique_id, + mock.sentinel.unique_id_format) + + @mock.patch.object(diskutils.DiskUtils, '_get_disks_by_unique_id') + def test_get_disk_number_by_unique_id(self, mock_get_disks): + mock_disks = [mock.Mock(), mock.Mock()] + mock_get_disks.return_value = mock_disks + + exp_disk_numbers = [mock_disk.Number for mock_disk in mock_disks] + returned_disk_numbers = self._diskutils.get_disk_numbers_by_unique_id( + mock.sentinel.unique_id, mock.sentinel.unique_id_format) + + self.assertEqual(exp_disk_numbers, returned_disk_numbers) + mock_get_disks.assert_called_once_with( + mock.sentinel.unique_id, mock.sentinel.unique_id_format) + + @mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number') def test_get_disk_uid_and_uid_type(self, mock_get_disk): mock_disk = mock_get_disk.return_value @@ -77,7 +120,7 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase): {'disk_path': r'\\?\SCSI#disk&ven_fakeVendor', 'expect_mpio': False}) @ddt.unpack - @mock.patch.object(diskutils.DiskUtils, '_get_disk') + @mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number') def test_is_mpio_disk(self, mock_get_disk, disk_path, expect_mpio): mock_disk = mock_get_disk.return_value mock_disk.Path = disk_path @@ -87,7 +130,7 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase): mock_get_disk.assert_called_once_with(mock.sentinel.disk_number) - @mock.patch.object(diskutils.DiskUtils, '_get_disk') + @mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number') def test_refresh_disk(self, mock_get_disk): mock_disk = mock_get_disk.return_value @@ -96,6 +139,16 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase): mock_get_disk.assert_called_once_with(mock.sentinel.disk_number) mock_disk.Refresh.assert_called_once_with() + @mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number') + def test_get_device_name_by_device_number(self, mock_get_disk): + dev_name = self._diskutils.get_device_name_by_device_number( + mock.sentinel.disk_number) + + self.assertEqual(mock_get_disk.return_value.Name, dev_name) + + mock_get_disk.assert_called_once_with(mock.sentinel.disk_number, + msft_disk_cls=False) + def test_get_dev_number_from_dev_name(self): fake_physical_device_name = r'\\.\PhysicalDrive15' expected_device_number = '15' diff --git a/os_win/utils/storage/diskutils.py b/os_win/utils/storage/diskutils.py index c4f78cd7..e9a51c02 100644 --- a/os_win/utils/storage/diskutils.py +++ b/os_win/utils/storage/diskutils.py @@ -65,34 +65,65 @@ SCSI_ID_CODE_SET_ASCII = 2 class DiskUtils(baseutils.BaseUtils): - _wmi_namespace = 'root/microsoft/windows/storage' + _wmi_cimv2_namespace = 'root/cimv2' + _wmi_storage_namespace = 'root/microsoft/windows/storage' def __init__(self): - self._conn_storage = self._get_wmi_conn(self._wmi_namespace) + self._conn_cimv2 = self._get_wmi_conn(self._wmi_cimv2_namespace) + self._conn_storage = self._get_wmi_conn(self._wmi_storage_namespace) self._win32_utils = win32utils.Win32Utils() # Physical device names look like \\.\PHYSICALDRIVE1 self._phys_dev_name_regex = re.compile(r'\\\\.*\\[a-zA-Z]*([\d]+)') - def _get_disk(self, disk_number): - disk = self._conn_storage.Msft_Disk(Number=disk_number) + def _get_disk_by_number(self, disk_number, msft_disk_cls=True): + if msft_disk_cls: + disk = self._conn_storage.Msft_Disk(Number=disk_number) + else: + disk = self._conn_cimv2.Win32_DiskDrive(Index=disk_number) + if not disk: err_msg = _("Could not find the disk number %s") raise exceptions.DiskNotFound(err_msg % disk_number) return disk[0] + def _get_disks_by_unique_id(self, unique_id, unique_id_format): + # In some cases, multiple disks having the same unique id may be + # exposed to the OS. This may happen if there are multiple paths + # to the LUN and MPIO is not properly configured. This can be + # valuable information to the caller. + disks = self._conn_storage.Msft_Disk(UniqueId=unique_id, + UniqueIdFormat=unique_id_format) + if not disks: + err_msg = _("Could not find any disk having unique id " + "'%(unique_id)s' and unique id format " + "'%(unique_id_format)s'") + raise exceptions.DiskNotFound(err_msg % dict( + unique_id=unique_id, + unique_id_format=unique_id_format)) + return disks + + def get_disk_numbers_by_unique_id(self, unique_id, unique_id_format): + disks = self._get_disks_by_unique_id(unique_id, unique_id_format) + return [disk.Number for disk in disks] + def get_disk_uid_and_uid_type(self, disk_number): - disk = self._get_disk(disk_number) + disk = self._get_disk_by_number(disk_number) return disk.UniqueId, disk.UniqueIdFormat def is_mpio_disk(self, disk_number): - disk = self._get_disk(disk_number) + disk = self._get_disk_by_number(disk_number) return disk.Path.lower().startswith(r'\\?\mpio') def refresh_disk(self, disk_number): - disk = self._get_disk(disk_number) + disk = self._get_disk_by_number(disk_number) disk.Refresh() + def get_device_name_by_device_number(self, device_number): + disk = self._get_disk_by_number(device_number, + msft_disk_cls=False) + return disk.Name + def get_device_number_from_device_name(self, device_name): matches = self._phys_dev_name_regex.findall(device_name) if matches: