Merge "Add methods for retrieving disks" into stable/pike

This commit is contained in:
Jenkins 2017-08-09 23:08:41 +00:00 committed by Gerrit Code Review
commit 111fa2896c
2 changed files with 102 additions and 18 deletions

View File

@ -28,32 +28,75 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
def setUp(self):
super(DiskUtilsTestCase, self).setUp()
self._diskutils = diskutils.DiskUtils()
self._diskutils._conn_cimv2 = mock.MagicMock()
self._diskutils._conn_storage = mock.MagicMock()
self._diskutils._win32_utils = mock.MagicMock()
self._mock_run = self._diskutils._win32_utils.run_and_check_output
def test_get_disk(self):
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
mock_disk = mock_msft_disk_cls.return_value[0]
@ddt.data(True, False)
def test_get_disk_by_number(self, msft_disk_cls):
resulted_disk = self._diskutils._get_disk_by_number(
mock.sentinel.disk_number,
msft_disk_cls=msft_disk_cls)
resulted_disk = self._diskutils._get_disk(mock.sentinel.disk_number)
if msft_disk_cls:
disk_cls = self._diskutils._conn_storage.Msft_Disk
disk_cls.assert_called_once_with(Number=mock.sentinel.disk_number)
else:
disk_cls = self._diskutils._conn_cimv2.Win32_DiskDrive
disk_cls.assert_called_once_with(Index=mock.sentinel.disk_number)
mock_msft_disk_cls.assert_called_once_with(
Number=mock.sentinel.disk_number)
mock_disk = disk_cls.return_value[0]
self.assertEqual(mock_disk, resulted_disk)
def test_get_unexisting_disk(self):
def test_get_unexisting_disk_by_number(self):
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
mock_msft_disk_cls.return_value = []
self.assertRaises(exceptions.DiskNotFound,
self._diskutils._get_disk,
self._diskutils._get_disk_by_number,
mock.sentinel.disk_number)
mock_msft_disk_cls.assert_called_once_with(
Number=mock.sentinel.disk_number)
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
def test_get_disk_by_unique_id(self):
disk_cls = self._diskutils._conn_storage.Msft_Disk
mock_disks = disk_cls.return_value
resulted_disks = self._diskutils._get_disks_by_unique_id(
mock.sentinel.unique_id,
mock.sentinel.unique_id_format)
disk_cls.assert_called_once_with(
UniqueId=mock.sentinel.unique_id,
UniqueIdFormat=mock.sentinel.unique_id_format)
self.assertEqual(mock_disks, resulted_disks)
def test_get_unexisting_disk_by_unique_id(self):
mock_msft_disk_cls = self._diskutils._conn_storage.Msft_Disk
mock_msft_disk_cls.return_value = []
self.assertRaises(exceptions.DiskNotFound,
self._diskutils._get_disks_by_unique_id,
mock.sentinel.unique_id,
mock.sentinel.unique_id_format)
@mock.patch.object(diskutils.DiskUtils, '_get_disks_by_unique_id')
def test_get_disk_number_by_unique_id(self, mock_get_disks):
mock_disks = [mock.Mock(), mock.Mock()]
mock_get_disks.return_value = mock_disks
exp_disk_numbers = [mock_disk.Number for mock_disk in mock_disks]
returned_disk_numbers = self._diskutils.get_disk_numbers_by_unique_id(
mock.sentinel.unique_id, mock.sentinel.unique_id_format)
self.assertEqual(exp_disk_numbers, returned_disk_numbers)
mock_get_disks.assert_called_once_with(
mock.sentinel.unique_id, mock.sentinel.unique_id_format)
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
def test_get_disk_uid_and_uid_type(self, mock_get_disk):
mock_disk = mock_get_disk.return_value
@ -77,7 +120,7 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
{'disk_path': r'\\?\SCSI#disk&ven_fakeVendor',
'expect_mpio': False})
@ddt.unpack
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
def test_is_mpio_disk(self, mock_get_disk, disk_path, expect_mpio):
mock_disk = mock_get_disk.return_value
mock_disk.Path = disk_path
@ -87,7 +130,7 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number)
@mock.patch.object(diskutils.DiskUtils, '_get_disk')
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
def test_refresh_disk(self, mock_get_disk):
mock_disk = mock_get_disk.return_value
@ -96,6 +139,16 @@ class DiskUtilsTestCase(test_base.OsWinBaseTestCase):
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number)
mock_disk.Refresh.assert_called_once_with()
@mock.patch.object(diskutils.DiskUtils, '_get_disk_by_number')
def test_get_device_name_by_device_number(self, mock_get_disk):
dev_name = self._diskutils.get_device_name_by_device_number(
mock.sentinel.disk_number)
self.assertEqual(mock_get_disk.return_value.Name, dev_name)
mock_get_disk.assert_called_once_with(mock.sentinel.disk_number,
msft_disk_cls=False)
def test_get_dev_number_from_dev_name(self):
fake_physical_device_name = r'\\.\PhysicalDrive15'
expected_device_number = '15'

View File

@ -65,34 +65,65 @@ SCSI_ID_CODE_SET_ASCII = 2
class DiskUtils(baseutils.BaseUtils):
_wmi_namespace = 'root/microsoft/windows/storage'
_wmi_cimv2_namespace = 'root/cimv2'
_wmi_storage_namespace = 'root/microsoft/windows/storage'
def __init__(self):
self._conn_storage = self._get_wmi_conn(self._wmi_namespace)
self._conn_cimv2 = self._get_wmi_conn(self._wmi_cimv2_namespace)
self._conn_storage = self._get_wmi_conn(self._wmi_storage_namespace)
self._win32_utils = win32utils.Win32Utils()
# Physical device names look like \\.\PHYSICALDRIVE1
self._phys_dev_name_regex = re.compile(r'\\\\.*\\[a-zA-Z]*([\d]+)')
def _get_disk(self, disk_number):
disk = self._conn_storage.Msft_Disk(Number=disk_number)
def _get_disk_by_number(self, disk_number, msft_disk_cls=True):
if msft_disk_cls:
disk = self._conn_storage.Msft_Disk(Number=disk_number)
else:
disk = self._conn_cimv2.Win32_DiskDrive(Index=disk_number)
if not disk:
err_msg = _("Could not find the disk number %s")
raise exceptions.DiskNotFound(err_msg % disk_number)
return disk[0]
def _get_disks_by_unique_id(self, unique_id, unique_id_format):
# In some cases, multiple disks having the same unique id may be
# exposed to the OS. This may happen if there are multiple paths
# to the LUN and MPIO is not properly configured. This can be
# valuable information to the caller.
disks = self._conn_storage.Msft_Disk(UniqueId=unique_id,
UniqueIdFormat=unique_id_format)
if not disks:
err_msg = _("Could not find any disk having unique id "
"'%(unique_id)s' and unique id format "
"'%(unique_id_format)s'")
raise exceptions.DiskNotFound(err_msg % dict(
unique_id=unique_id,
unique_id_format=unique_id_format))
return disks
def get_disk_numbers_by_unique_id(self, unique_id, unique_id_format):
disks = self._get_disks_by_unique_id(unique_id, unique_id_format)
return [disk.Number for disk in disks]
def get_disk_uid_and_uid_type(self, disk_number):
disk = self._get_disk(disk_number)
disk = self._get_disk_by_number(disk_number)
return disk.UniqueId, disk.UniqueIdFormat
def is_mpio_disk(self, disk_number):
disk = self._get_disk(disk_number)
disk = self._get_disk_by_number(disk_number)
return disk.Path.lower().startswith(r'\\?\mpio')
def refresh_disk(self, disk_number):
disk = self._get_disk(disk_number)
disk = self._get_disk_by_number(disk_number)
disk.Refresh()
def get_device_name_by_device_number(self, device_number):
disk = self._get_disk_by_number(device_number,
msft_disk_cls=False)
return disk.Name
def get_device_number_from_device_name(self, device_name):
matches = self._phys_dev_name_regex.findall(device_name)
if matches: