Merge "Add methods for retrieving disks" into stable/pike
This commit is contained in:
commit
111fa2896c
|
@ -28,32 +28,75 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
|
|||
def setUp(self):
|
||||
super(DiskUtilsTestCase, self).setUp()
|
||||
self._diskutils = diskutils.DiskUtils()
|
||||
self._diskutils._conn_cimv2 = mock.MagicMock()
|
||||
self._diskutils._conn_storage = mock.MagicMock()
|
||||
self._diskutils._win32_utils = mock.MagicMock()
|
||||
self._mock_run = self._diskutils._win32_utils.run_and_check_output
|
||||
|
||||
def test_get_disk(self):
|
||||
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||
mock_disk = mock_msft_disk_cls.return_value[0]
|
||||
@ddt.data(True, False)
|
||||
def test_get_disk_by_number(self, msft_disk_cls):
|
||||
resulted_disk = self._diskutils._get_disk_by_number(
|
||||
mock.sentinel.disk_number,
|
||||
msft_disk_cls=msft_disk_cls)
|
||||
|
||||
resulted_disk = self._diskutils._get_disk(mock.sentinel.disk_number)
|
||||
if msft_disk_cls:
|
||||
disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||
disk_cls.assert_called_once_with(Number=mock.sentinel.disk_number)
|
||||
else:
|
||||
disk_cls = self._diskutils._conn_cimv2.Win32_DiskDrive
|
||||
disk_cls.assert_called_once_with(Index=mock.sentinel.disk_number)
|
||||
|
||||
mock_msft_disk_cls.assert_called_once_with(
|
||||
Number=mock.sentinel.disk_number)
|
||||
mock_disk = disk_cls.return_value[0]
|
||||
self.assertEqual(mock_disk, resulted_disk)
|
||||
|
||||
def test_get_unexisting_disk(self):
|
||||
def test_get_unexisting_disk_by_number(self):
|
||||
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||
mock_msft_disk_cls.return_value = []
|
||||
|
||||
self.assertRaises(exceptions.DiskNotFound,
|
||||
self._diskutils._get_disk,
|
||||
self._diskutils._get_disk_by_number,
|
||||
mock.sentinel.disk_number)
|
||||
|
||||
mock_msft_disk_cls.assert_called_once_with(
|
||||
Number=mock.sentinel.disk_number)
|
||||
|
||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
|
||||
def test_get_disk_by_unique_id(self):
|
||||
disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||
mock_disks = disk_cls.return_value
|
||||
|
||||
resulted_disks = self._diskutils._get_disks_by_unique_id(
|
||||
mock.sentinel.unique_id,
|
||||
mock.sentinel.unique_id_format)
|
||||
|
||||
disk_cls.assert_called_once_with(
|
||||
UniqueId=mock.sentinel.unique_id,
|
||||
UniqueIdFormat=mock.sentinel.unique_id_format)
|
||||
|
||||
self.assertEqual(mock_disks, resulted_disks)
|
||||
|
||||
def test_get_unexisting_disk_by_unique_id(self):
|
||||
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
|
||||
mock_msft_disk_cls.return_value = []
|
||||
|
||||
self.assertRaises(exceptions.DiskNotFound,
|
||||
self._diskutils._get_disks_by_unique_id,
|
||||
mock.sentinel.unique_id,
|
||||
mock.sentinel.unique_id_format)
|
||||
|
||||
@mock.patch.object(diskutils.DiskUtils, '_get_disks_by_unique_id')
|
||||
def test_get_disk_number_by_unique_id(self, mock_get_disks):
|
||||
mock_disks = [mock.Mock(), mock.Mock()]
|
||||
mock_get_disks.return_value = mock_disks
|
||||
|
||||
exp_disk_numbers = [mock_disk.Number for mock_disk in mock_disks]
|
||||
returned_disk_numbers = self._diskutils.get_disk_numbers_by_unique_id(
|
||||
mock.sentinel.unique_id, mock.sentinel.unique_id_format)
|
||||
|
||||
self.assertEqual(exp_disk_numbers, returned_disk_numbers)
|
||||
mock_get_disks.assert_called_once_with(
|
||||
mock.sentinel.unique_id, mock.sentinel.unique_id_format)
|
||||
|
||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
|
||||
def test_get_disk_uid_and_uid_type(self, mock_get_disk):
|
||||
mock_disk = mock_get_disk.return_value
|
||||
|
||||
|
@ -77,7 +120,7 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
|
|||
{'disk_path': r'\\?\SCSI#disk&ven_fakeVendor',
|
||||
'expect_mpio': False})
|
||||
@ddt.unpack
|
||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
|
||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
|
||||
def test_is_mpio_disk(self, mock_get_disk, disk_path, expect_mpio):
|
||||
mock_disk = mock_get_disk.return_value
|
||||
mock_disk.Path = disk_path
|
||||
|
@ -87,7 +130,7 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
|
|||
|
||||
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number)
|
||||
|
||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
|
||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
|
||||
def test_refresh_disk(self, mock_get_disk):
|
||||
mock_disk = mock_get_disk.return_value
|
||||
|
||||
|
@ -96,6 +139,16 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
|
|||
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number)
|
||||
mock_disk.Refresh.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
|
||||
def test_get_device_name_by_device_number(self, mock_get_disk):
|
||||
dev_name = self._diskutils.get_device_name_by_device_number(
|
||||
mock.sentinel.disk_number)
|
||||
|
||||
self.assertEqual(mock_get_disk.return_value.Name, dev_name)
|
||||
|
||||
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number,
|
||||
msft_disk_cls=False)
|
||||
|
||||
def test_get_dev_number_from_dev_name(self):
|
||||
fake_physical_device_name = r'\\.\PhysicalDrive15'
|
||||
expected_device_number = '15'
|
||||
|
|
|
@ -65,34 +65,65 @@ SCSI_ID_CODE_SET_ASCII = 2
|
|||
|
||||
class DiskUtils(baseutils.BaseUtils):
|
||||
|
||||
_wmi_namespace = 'root/microsoft/windows/storage'
|
||||
_wmi_cimv2_namespace = 'root/cimv2'
|
||||
_wmi_storage_namespace = 'root/microsoft/windows/storage'
|
||||
|
||||
def __init__(self):
|
||||
self._conn_storage = self._get_wmi_conn(self._wmi_namespace)
|
||||
self._conn_cimv2 = self._get_wmi_conn(self._wmi_cimv2_namespace)
|
||||
self._conn_storage = self._get_wmi_conn(self._wmi_storage_namespace)
|
||||
self._win32_utils = win32utils.Win32Utils()
|
||||
|
||||
# Physical device names look like \\.\PHYSICALDRIVE1
|
||||
self._phys_dev_name_regex = re.compile(r'\\\\.*\\[a-zA-Z]*([\d]+)')
|
||||
|
||||
def _get_disk(self, disk_number):
|
||||
disk = self._conn_storage.Msft_Disk(Number=disk_number)
|
||||
def _get_disk_by_number(self, disk_number, msft_disk_cls=True):
|
||||
if msft_disk_cls:
|
||||
disk = self._conn_storage.Msft_Disk(Number=disk_number)
|
||||
else:
|
||||
disk = self._conn_cimv2.Win32_DiskDrive(Index=disk_number)
|
||||
|
||||
if not disk:
|
||||
err_msg = _("Could not find the disk number %s")
|
||||
raise exceptions.DiskNotFound(err_msg % disk_number)
|
||||
return disk[0]
|
||||
|
||||
def _get_disks_by_unique_id(self, unique_id, unique_id_format):
|
||||
# In some cases, multiple disks having the same unique id may be
|
||||
# exposed to the OS. This may happen if there are multiple paths
|
||||
# to the LUN and MPIO is not properly configured. This can be
|
||||
# valuable information to the caller.
|
||||
disks = self._conn_storage.Msft_Disk(UniqueId=unique_id,
|
||||
UniqueIdFormat=unique_id_format)
|
||||
if not disks:
|
||||
err_msg = _("Could not find any disk having unique id "
|
||||
"'%(unique_id)s' and unique id format "
|
||||
"'%(unique_id_format)s'")
|
||||
raise exceptions.DiskNotFound(err_msg % dict(
|
||||
unique_id=unique_id,
|
||||
unique_id_format=unique_id_format))
|
||||
return disks
|
||||
|
||||
def get_disk_numbers_by_unique_id(self, unique_id, unique_id_format):
|
||||
disks = self._get_disks_by_unique_id(unique_id, unique_id_format)
|
||||
return [disk.Number for disk in disks]
|
||||
|
||||
def get_disk_uid_and_uid_type(self, disk_number):
|
||||
disk = self._get_disk(disk_number)
|
||||
disk = self._get_disk_by_number(disk_number)
|
||||
return disk.UniqueId, disk.UniqueIdFormat
|
||||
|
||||
def is_mpio_disk(self, disk_number):
|
||||
disk = self._get_disk(disk_number)
|
||||
disk = self._get_disk_by_number(disk_number)
|
||||
return disk.Path.lower().startswith(r'\\?\mpio')
|
||||
|
||||
def refresh_disk(self, disk_number):
|
||||
disk = self._get_disk(disk_number)
|
||||
disk = self._get_disk_by_number(disk_number)
|
||||
disk.Refresh()
|
||||
|
||||
def get_device_name_by_device_number(self, device_number):
|
||||
disk = self._get_disk_by_number(device_number,
|
||||
msft_disk_cls=False)
|
||||
return disk.Name
|
||||
|
||||
def get_device_number_from_device_name(self, device_name):
|
||||
matches = self._phys_dev_name_regex.findall(device_name)
|
||||
if matches:
|
||||
|
|
Loading…
Reference in New Issue