Split out a common helper for blkid
Add a new helper get_device_information that can call blkid with various arguments. Also add get_partition_table_type using the PTTYPE blkid field. Switch a few calls to using the new helper. Change-Id: I005613d9f7de8106f31c30abd9c87e0bcd6b9272
This commit is contained in:
parent
7fa890fb01
commit
97affbf596
|
@ -20,7 +20,6 @@ import logging
|
|||
import math
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import shutil
|
||||
import stat
|
||||
import tempfile
|
||||
|
@ -178,6 +177,41 @@ def get_disk_identifier(dev):
|
|||
return disk_identifier[0]
|
||||
|
||||
|
||||
def get_partition_table_type(device):
|
||||
"""Get partition table type, msdos or gpt.
|
||||
|
||||
:param device: the name of the device
|
||||
:return: dos, gpt or None
|
||||
"""
|
||||
return get_device_information(device, probe=True).get('PTTYPE')
|
||||
|
||||
|
||||
def get_device_information(device, probe=False, fields=None):
|
||||
"""Get information about a device using blkid.
|
||||
|
||||
Can be applied to all block devices: disks, RAID, partitions.
|
||||
|
||||
:param device: Device name.
|
||||
:param probe: Switch to low-level probing mode.
|
||||
:param fields: A list of fields to request (all by default).
|
||||
:return: A dictionary with requested fields as keys.
|
||||
:raises: ProcessExecutionError
|
||||
"""
|
||||
args = []
|
||||
if probe:
|
||||
args.append('--probe')
|
||||
if fields:
|
||||
args += sum((['--match-tag', field] for field in fields), [])
|
||||
|
||||
output, err = utils.execute('blkid', device, *args,
|
||||
use_standard_locale=True, run_as_root=True)
|
||||
if output.strip():
|
||||
output = output.split(':', 1)[1]
|
||||
return next(utils.parse_device_tags(output))
|
||||
else:
|
||||
return {}
|
||||
|
||||
|
||||
def get_uefi_disk_identifier(dev):
|
||||
"""Get the uuid from the disk being exposed by the ramdisk.
|
||||
|
||||
|
@ -435,15 +469,14 @@ def block_uuid(dev):
|
|||
|
||||
Try to fetch the UUID, if that fails, try to fetch the PARTUUID.
|
||||
"""
|
||||
out, _err = utils.execute('blkid', '-s', 'UUID', '-o', 'value', dev,
|
||||
run_as_root=True, check_exit_code=[0])
|
||||
if not out:
|
||||
info = get_device_information(dev, fields=['UUID', 'PARTUUID'])
|
||||
if info.get('UUID'):
|
||||
return info['UUID']
|
||||
else:
|
||||
LOG.debug('Falling back to partition UUID as the block device UUID '
|
||||
'was not found while examining %(device)s',
|
||||
{'device': dev})
|
||||
out, _err = utils.execute('blkid', '-s', 'PARTUUID', '-o', 'value',
|
||||
dev, run_as_root=True, check_exit_code=[0])
|
||||
return out.strip()
|
||||
return info.get('PARTUUID', '')
|
||||
|
||||
|
||||
def get_image_mb(image_path, virtual_size=True):
|
||||
|
@ -820,11 +853,7 @@ def _get_labelled_partition(device_path, label, node_uuid):
|
|||
|
||||
found_part = None
|
||||
if output:
|
||||
for device in output.split('\n'):
|
||||
dev = {key: value for key, value in (v.split('=', 1)
|
||||
for v in shlex.split(device))}
|
||||
if not dev:
|
||||
continue
|
||||
for dev in utils.parse_device_tags(output):
|
||||
if dev['LABEL'].upper() == label.upper():
|
||||
if found_part:
|
||||
found_2 = '/dev/%(part)s' % {'part': dev['NAME'].strip()}
|
||||
|
@ -840,31 +869,6 @@ def _get_labelled_partition(device_path, label, node_uuid):
|
|||
return found_part
|
||||
|
||||
|
||||
def _is_disk_gpt_partitioned(device, node_uuid):
|
||||
"""Checks if the disk is GPT partitioned
|
||||
|
||||
:param device: The device path.
|
||||
:param node_uuid: UUID of the Node. Used for logging.
|
||||
:raises: InstanceDeployFailure, if any disk partitioning related
|
||||
commands fail.
|
||||
:param node_uuid: UUID of the Node
|
||||
:returns: Boolean. Returns True if disk is GPT partitioned
|
||||
"""
|
||||
try:
|
||||
stdout, _stderr = utils.execute(
|
||||
'blkid', '-p', '-o', 'value', '-s', 'PTTYPE', device,
|
||||
use_standard_locale=True, run_as_root=True)
|
||||
except (processutils.UnknownArgumentError,
|
||||
processutils.ProcessExecutionError, OSError) as e:
|
||||
msg = (_('Failed to retrieve partition table type for disk %(disk)s '
|
||||
'for node %(node)s. Error: %(error)s') %
|
||||
{'disk': device, 'node': node_uuid, 'error': e})
|
||||
LOG.error(msg)
|
||||
raise exception.InstanceDeployFailure(msg)
|
||||
|
||||
return (stdout.lower().strip() == 'gpt')
|
||||
|
||||
|
||||
def _fix_gpt_structs(device, node_uuid):
|
||||
"""Checks backup GPT data structures and moves them to end of the device
|
||||
|
||||
|
@ -899,7 +903,7 @@ def fix_gpt_partition(device, node_uuid):
|
|||
:raises: InstanceDeployFailure if exception is caught.
|
||||
"""
|
||||
try:
|
||||
disk_is_gpt_partitioned = _is_disk_gpt_partitioned(device, node_uuid)
|
||||
disk_is_gpt_partitioned = (get_partition_table_type(device) == 'gpt')
|
||||
if disk_is_gpt_partitioned:
|
||||
_fix_gpt_structs(device, node_uuid)
|
||||
except Exception as e:
|
||||
|
@ -1032,7 +1036,7 @@ def create_config_drive_partition(node_uuid, device, configdrive):
|
|||
else:
|
||||
cur_parts = set(part['number'] for part in list_partitions(device))
|
||||
|
||||
if _is_disk_gpt_partitioned(device, node_uuid):
|
||||
if get_partition_table_type(device) == 'gpt':
|
||||
create_option = '0:-%dMB:0' % MAX_CONFIG_DRIVE_SIZE_MB
|
||||
utils.execute('sgdisk', '-n', create_option, device,
|
||||
run_as_root=True)
|
||||
|
|
|
@ -1252,21 +1252,20 @@ class OtherFunctionTestCase(base.IronicLibTestCase):
|
|||
'/dev/fake', run_as_root=True,
|
||||
use_standard_locale=True)
|
||||
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test_block_uuid_fallback_to_uuid(self, mock_execute):
|
||||
mock_execute.side_effect = [('', ''),
|
||||
('value', '')]
|
||||
self.assertEqual('value',
|
||||
disk_utils.block_uuid('/dev/fake'))
|
||||
execute_calls = [
|
||||
mock.call('blkid', '-s', 'UUID', '-o', 'value',
|
||||
'/dev/fake', check_exit_code=[0],
|
||||
run_as_root=True),
|
||||
mock.call('blkid', '-s', 'PARTUUID', '-o', 'value',
|
||||
'/dev/fake', check_exit_code=[0],
|
||||
run_as_root=True)
|
||||
]
|
||||
mock_execute.assert_has_calls(execute_calls)
|
||||
@mock.patch.object(disk_utils, 'get_device_information', autospec=True)
|
||||
def test_block_uuid(self, mock_get_device_info):
|
||||
mock_get_device_info.return_value = {'UUID': '123',
|
||||
'PARTUUID': '123456'}
|
||||
self.assertEqual('123', disk_utils.block_uuid('/dev/fake'))
|
||||
mock_get_device_info.assert_called_once_with(
|
||||
'/dev/fake', fields=['UUID', 'PARTUUID'])
|
||||
|
||||
@mock.patch.object(disk_utils, 'get_device_information', autospec=True)
|
||||
def test_block_uuid_fallback_to_uuid(self, mock_get_device_info):
|
||||
mock_get_device_info.return_value = {'PARTUUID': '123456'}
|
||||
self.assertEqual('123456', disk_utils.block_uuid('/dev/fake'))
|
||||
mock_get_device_info.assert_called_once_with(
|
||||
'/dev/fake', fields=['UUID', 'PARTUUID'])
|
||||
|
||||
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
|
@ -1392,41 +1391,6 @@ class WholeDiskPartitionTestCases(base.IronicLibTestCase):
|
|||
use_standard_locale=True)
|
||||
self.assertEqual(1, mock_log.call_count)
|
||||
|
||||
def test__is_disk_gpt_partitioned_true(self, mock_execute):
|
||||
blkid_output = 'gpt\n'
|
||||
mock_execute.return_value = (blkid_output, '')
|
||||
result = disk_utils._is_disk_gpt_partitioned('/dev/fake',
|
||||
self.node_uuid)
|
||||
self.assertTrue(result)
|
||||
mock_execute.assert_called_once_with('blkid', '-p', '-o', 'value',
|
||||
'-s', 'PTTYPE', '/dev/fake',
|
||||
use_standard_locale=True,
|
||||
run_as_root=True)
|
||||
|
||||
def test_is_disk_gpt_partitioned_false(self, mock_execute):
|
||||
blkid_output = 'dos\n'
|
||||
mock_execute.return_value = (blkid_output, '')
|
||||
result = disk_utils._is_disk_gpt_partitioned('/dev/fake',
|
||||
self.node_uuid)
|
||||
self.assertFalse(result)
|
||||
mock_execute.assert_called_once_with('blkid', '-p', '-o', 'value',
|
||||
'-s', 'PTTYPE', '/dev/fake',
|
||||
use_standard_locale=True,
|
||||
run_as_root=True)
|
||||
|
||||
@mock.patch.object(disk_utils.LOG, 'error', autospec=True)
|
||||
def test_is_disk_gpt_partitioned_exc(self, mock_log, mock_execute):
|
||||
mock_execute.side_effect = processutils.ProcessExecutionError
|
||||
self.assertRaisesRegex(exception.InstanceDeployFailure,
|
||||
'Failed to retrieve partition table type',
|
||||
disk_utils._is_disk_gpt_partitioned,
|
||||
self.dev, self.node_uuid)
|
||||
mock_execute.assert_called_once_with('blkid', '-p', '-o', 'value',
|
||||
'-s', 'PTTYPE', '/dev/fake',
|
||||
use_standard_locale=True,
|
||||
run_as_root=True)
|
||||
self.assertEqual(1, mock_log.call_count)
|
||||
|
||||
def test_fix_gpt_structs_fix_required(self, mock_execute):
|
||||
sgdisk_v_output = """
|
||||
Problem: The secondary header's self-pointer indicates that it doesn't reside
|
||||
|
@ -1480,7 +1444,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_fix_gpt_structs',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_is_disk_gpt_partitioned',
|
||||
@mock.patch.object(disk_utils, 'get_partition_table_type',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, 'list_partitions',
|
||||
autospec=True)
|
||||
|
@ -1490,7 +1454,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
def test_create_partition_exists(self, mock_get_configdrive,
|
||||
mock_get_labelled_partition,
|
||||
mock_list_partitions, mock_is_disk_gpt,
|
||||
mock_list_partitions, mock_table_type,
|
||||
mock_fix_gpt, mock_fix_gpt_partition,
|
||||
mock_dd, mock_unlink, mock_execute):
|
||||
config_url = 'http://1.2.3.4/cd'
|
||||
|
@ -1509,7 +1473,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
self.node_uuid)
|
||||
self.assertFalse(mock_list_partitions.called)
|
||||
self.assertFalse(mock_execute.called)
|
||||
self.assertFalse(mock_is_disk_gpt.called)
|
||||
self.assertFalse(mock_table_type.called)
|
||||
self.assertFalse(mock_fix_gpt.called)
|
||||
mock_dd.assert_called_with(configdrive_file, configdrive_part)
|
||||
mock_unlink.assert_called_with(configdrive_file)
|
||||
|
@ -1523,7 +1487,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_fix_gpt_structs',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_is_disk_gpt_partitioned',
|
||||
@mock.patch.object(disk_utils, 'get_partition_table_type',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, 'list_partitions',
|
||||
autospec=True)
|
||||
|
@ -1533,7 +1497,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
def test_create_partition_gpt(self, mock_get_configdrive,
|
||||
mock_get_labelled_partition,
|
||||
mock_list_partitions, mock_is_disk_gpt,
|
||||
mock_list_partitions, mock_table_type,
|
||||
mock_fix_gpt, mock_fix_gpt_partition,
|
||||
mock_dd, mock_unlink, mock_execute):
|
||||
config_url = 'http://1.2.3.4/cd'
|
||||
|
@ -1560,7 +1524,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
mock_get_configdrive.return_value = (configdrive_mb, configdrive_file)
|
||||
mock_get_labelled_partition.return_value = None
|
||||
|
||||
mock_is_disk_gpt.return_value = True
|
||||
mock_table_type.return_value = 'gpt'
|
||||
mock_list_partitions.side_effect = [initial_partitions,
|
||||
updated_partitions]
|
||||
expected_part = '/dev/fake4'
|
||||
|
@ -1580,7 +1544,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
])
|
||||
|
||||
self.assertEqual(2, mock_list_partitions.call_count)
|
||||
mock_is_disk_gpt.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_table_type.assert_called_with(self.dev)
|
||||
mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid)
|
||||
self.assertFalse(mock_fix_gpt.called)
|
||||
mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid)
|
||||
|
@ -1600,7 +1564,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_fix_gpt_structs',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_is_disk_gpt_partitioned',
|
||||
@mock.patch.object(disk_utils, 'get_partition_table_type',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, 'list_partitions',
|
||||
autospec=True)
|
||||
|
@ -1611,7 +1575,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
def _test_create_partition_mbr(self, mock_get_configdrive,
|
||||
mock_get_labelled_partition,
|
||||
mock_list_partitions,
|
||||
mock_is_disk_gpt, mock_fix_gpt,
|
||||
mock_table_type, mock_fix_gpt,
|
||||
mock_fix_gpt_partition,
|
||||
mock_disk_exceeds, mock_dd,
|
||||
mock_unlink, mock_log, mock_execute,
|
||||
|
@ -1645,7 +1609,6 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
mock_count.return_value = (2, 0)
|
||||
mock_get_configdrive.return_value = (configdrive_mb, configdrive_file)
|
||||
mock_get_labelled_partition.return_value = None
|
||||
mock_is_disk_gpt.return_value = False
|
||||
|
||||
self.node_uuid = "12345678-1234-1234-1234-1234567890abcxyz"
|
||||
if is_iscsi_device:
|
||||
|
@ -1684,7 +1647,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
check_exit_code=[0], delay_on_retry=True)
|
||||
])
|
||||
self.assertEqual(2, mock_list_partitions.call_count)
|
||||
mock_is_disk_gpt.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_table_type.assert_called_with(self.dev)
|
||||
mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_disk_exceeds.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_dd.assert_called_with(configdrive_file, expected_part)
|
||||
|
@ -1725,7 +1688,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_fix_gpt_structs',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_is_disk_gpt_partitioned',
|
||||
@mock.patch.object(disk_utils, 'get_partition_table_type',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, 'list_partitions',
|
||||
autospec=True)
|
||||
|
@ -1736,7 +1699,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
def test_create_partition_part_create_fail(self, mock_get_configdrive,
|
||||
mock_get_labelled_partition,
|
||||
mock_list_partitions,
|
||||
mock_is_disk_gpt, mock_fix_gpt,
|
||||
mock_table_type, mock_fix_gpt,
|
||||
mock_fix_gpt_partition,
|
||||
mock_disk_exceeds, mock_dd,
|
||||
mock_unlink, mock_execute,
|
||||
|
@ -1761,7 +1724,6 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
'flags': '', 'filesystem': '', 'size': 2046}]
|
||||
mock_get_configdrive.return_value = (configdrive_mb, configdrive_file)
|
||||
mock_get_labelled_partition.return_value = None
|
||||
mock_is_disk_gpt.return_value = False
|
||||
mock_disk_exceeds.return_value = False
|
||||
mock_list_partitions.side_effect = [initial_partitions,
|
||||
initial_partitions,
|
||||
|
@ -1788,7 +1750,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
|
||||
self.assertEqual(2, mock_list_partitions.call_count)
|
||||
mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_is_disk_gpt.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_table_type.assert_called_with(self.dev)
|
||||
mock_disk_exceeds.assert_called_with(self.dev, self.node_uuid)
|
||||
self.assertFalse(mock_fix_gpt.called)
|
||||
self.assertFalse(mock_dd.called)
|
||||
|
@ -1807,7 +1769,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_fix_gpt_structs',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_is_disk_gpt_partitioned',
|
||||
@mock.patch.object(disk_utils, 'get_partition_table_type',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, 'list_partitions',
|
||||
autospec=True)
|
||||
|
@ -1818,7 +1780,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
def test_create_partition_part_create_exc(self, mock_get_configdrive,
|
||||
mock_get_labelled_partition,
|
||||
mock_list_partitions,
|
||||
mock_is_disk_gpt, mock_fix_gpt,
|
||||
mock_table_type, mock_fix_gpt,
|
||||
mock_fix_gpt_partition,
|
||||
mock_disk_exceeds, mock_dd,
|
||||
mock_unlink, mock_execute,
|
||||
|
@ -1836,7 +1798,6 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
'flags': '', 'filesystem': '', 'size': 2046}]
|
||||
mock_get_configdrive.return_value = (configdrive_mb, configdrive_file)
|
||||
mock_get_labelled_partition.return_value = None
|
||||
mock_is_disk_gpt.return_value = False
|
||||
mock_disk_exceeds.return_value = False
|
||||
mock_list_partitions.side_effect = [initial_partitions,
|
||||
initial_partitions]
|
||||
|
@ -1857,7 +1818,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
run_as_root=True)
|
||||
self.assertEqual(1, mock_list_partitions.call_count)
|
||||
mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_is_disk_gpt.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_table_type.assert_called_with(self.dev)
|
||||
mock_disk_exceeds.assert_called_with(self.dev, self.node_uuid)
|
||||
self.assertFalse(mock_fix_gpt.called)
|
||||
self.assertFalse(mock_dd.called)
|
||||
|
@ -1873,7 +1834,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_fix_gpt_structs',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_is_disk_gpt_partitioned',
|
||||
@mock.patch.object(disk_utils, 'get_partition_table_type',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, 'list_partitions',
|
||||
autospec=True)
|
||||
|
@ -1884,7 +1845,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
def test_create_partition_num_parts_exceed(self, mock_get_configdrive,
|
||||
mock_get_labelled_partition,
|
||||
mock_list_partitions,
|
||||
mock_is_disk_gpt, mock_fix_gpt,
|
||||
mock_table_type, mock_fix_gpt,
|
||||
mock_fix_gpt_partition,
|
||||
mock_dd, mock_unlink,
|
||||
mock_count):
|
||||
|
@ -1903,7 +1864,6 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
'flags': '', 'filesystem': '', 'size': 2046}]
|
||||
mock_get_configdrive.return_value = (configdrive_mb, configdrive_file)
|
||||
mock_get_labelled_partition.return_value = None
|
||||
mock_is_disk_gpt.return_value = False
|
||||
mock_list_partitions.side_effect = [partitions, partitions]
|
||||
# 4 primary partitions, 0 logical partitions
|
||||
mock_count.return_value = (4, 0)
|
||||
|
@ -1916,7 +1876,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
mock_get_configdrive.assert_called_with(config_url, self.node_uuid)
|
||||
self.assertEqual(1, mock_list_partitions.call_count)
|
||||
mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_is_disk_gpt.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_table_type.assert_called_with(self.dev)
|
||||
self.assertFalse(mock_fix_gpt.called)
|
||||
self.assertFalse(mock_dd.called)
|
||||
mock_unlink.assert_called_with(configdrive_file)
|
||||
|
@ -1953,7 +1913,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
@mock.patch.object(disk_utils, 'fix_gpt_partition',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_is_disk_gpt_partitioned',
|
||||
@mock.patch.object(disk_utils, 'get_partition_table_type',
|
||||
autospec=True)
|
||||
@mock.patch.object(disk_utils, '_get_labelled_partition',
|
||||
autospec=True)
|
||||
|
@ -1961,7 +1921,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
autospec=True)
|
||||
def test_create_partition_conf_drive_error_counting(
|
||||
self, mock_get_configdrive, mock_get_labelled_partition,
|
||||
mock_is_disk_gpt, mock_fix_gpt_partition,
|
||||
mock_table_type, mock_fix_gpt_partition,
|
||||
mock_unlink, mock_execute, mock_count):
|
||||
config_url = 'http://1.2.3.4/cd'
|
||||
configdrive_file = '/tmp/xyz'
|
||||
|
@ -1969,7 +1929,6 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
|
||||
mock_get_configdrive.return_value = (configdrive_mb, configdrive_file)
|
||||
mock_get_labelled_partition.return_value = None
|
||||
mock_is_disk_gpt.return_value = False
|
||||
mock_count.side_effect = ValueError('Booooom')
|
||||
|
||||
self.assertRaisesRegex(exception.InstanceDeployFailure,
|
||||
|
@ -1980,7 +1939,7 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
|
|||
mock_get_configdrive.assert_called_with(config_url, self.node_uuid)
|
||||
mock_unlink.assert_called_with(configdrive_file)
|
||||
mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_is_disk_gpt.assert_called_with(self.dev, self.node_uuid)
|
||||
mock_table_type.assert_called_with(self.dev)
|
||||
mock_count.assert_called_once_with(self.dev)
|
||||
|
||||
|
||||
|
@ -2016,3 +1975,75 @@ class TriggerDeviceRescanTestCase(base.IronicLibTestCase):
|
|||
mock.call('partprobe', '/dev/fake', run_as_root=True, attempts=10),
|
||||
mock.call('sgdisk', '-v', '/dev/fake', run_as_root=True),
|
||||
])
|
||||
|
||||
|
||||
BLKID_PROBE = """
|
||||
/dev/nvme0n1: PTUUID="123456" PTTYPE="gpt"
|
||||
"""
|
||||
|
||||
BLKID_NORMAL = (
|
||||
'dev/nvme0n1p1: UUID="123" BLOCK_SIZE="512" TYPE="vfat" '
|
||||
'PARTLABEL="EFI System Partition" PARTUUID="123456"'
|
||||
)
|
||||
|
||||
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
class GetDeviceInformationTestCase(base.IronicLibTestCase):
|
||||
|
||||
def test_normal(self, mock_execute):
|
||||
mock_execute.return_value = BLKID_NORMAL, ""
|
||||
result = disk_utils.get_device_information('/dev/fake')
|
||||
self.assertEqual(
|
||||
{'UUID': '123', 'BLOCK_SIZE': '512', 'TYPE': 'vfat',
|
||||
'PARTLABEL': 'EFI System Partition', 'PARTUUID': '123456'},
|
||||
result
|
||||
)
|
||||
mock_execute.assert_called_once_with('blkid', '/dev/fake',
|
||||
use_standard_locale=True,
|
||||
run_as_root=True)
|
||||
|
||||
def test_probe(self, mock_execute):
|
||||
mock_execute.return_value = BLKID_PROBE, ""
|
||||
result = disk_utils.get_device_information('/dev/fake', probe=True)
|
||||
self.assertEqual({'PTUUID': '123456', 'PTTYPE': 'gpt'}, result)
|
||||
mock_execute.assert_called_once_with('blkid', '/dev/fake', '--probe',
|
||||
use_standard_locale=True,
|
||||
run_as_root=True)
|
||||
|
||||
def test_fields(self, mock_execute):
|
||||
mock_execute.return_value = BLKID_NORMAL, ""
|
||||
result = disk_utils.get_device_information('/dev/fake',
|
||||
fields=['UUID', 'LABEL'])
|
||||
# No filtering on our side, so returning all fake fields
|
||||
self.assertEqual(
|
||||
{'UUID': '123', 'BLOCK_SIZE': '512', 'TYPE': 'vfat',
|
||||
'PARTLABEL': 'EFI System Partition', 'PARTUUID': '123456'},
|
||||
result
|
||||
)
|
||||
mock_execute.assert_called_once_with('blkid', '/dev/fake',
|
||||
'--match-tag', 'UUID',
|
||||
'--match-tag', 'LABEL',
|
||||
use_standard_locale=True,
|
||||
run_as_root=True)
|
||||
|
||||
def test_empty(self, mock_execute):
|
||||
mock_execute.return_value = "\n", ""
|
||||
result = disk_utils.get_device_information('/dev/fake')
|
||||
self.assertEqual({}, result)
|
||||
mock_execute.assert_called_once_with('blkid', '/dev/fake',
|
||||
use_standard_locale=True,
|
||||
run_as_root=True)
|
||||
|
||||
|
||||
@mock.patch.object(disk_utils, 'get_device_information', autospec=True)
|
||||
class GetPartitionTableTypeTestCase(base.IronicLibTestCase):
|
||||
def test_ok(self, mock_get_device_info):
|
||||
mock_get_device_info.return_value = {'PTTYPE': 'gpt'}
|
||||
self.assertEqual('gpt',
|
||||
disk_utils.get_partition_table_type('/dev/fake'))
|
||||
mock_get_device_info.assert_called_once_with('/dev/fake', probe=True)
|
||||
|
||||
def test_missing(self, mock_get_device_info):
|
||||
mock_get_device_info.return_value = {}
|
||||
self.assertIsNone(disk_utils.get_partition_table_type('/dev/fake'))
|
||||
mock_get_device_info.assert_called_once_with('/dev/fake', probe=True)
|
||||
|
|
|
@ -733,3 +733,22 @@ class MountedTestCase(base.IronicLibTestCase):
|
|||
mock.call("umount", '/mnt/fake', run_as_root=True),
|
||||
])
|
||||
self.assertFalse(mock_rmtree.called)
|
||||
|
||||
|
||||
class ParseDeviceTagsTestCase(base.IronicLibTestCase):
|
||||
|
||||
def test_empty(self):
|
||||
result = utils.parse_device_tags("\n\n")
|
||||
self.assertEqual([], list(result))
|
||||
|
||||
def test_parse(self):
|
||||
tags = """
|
||||
PTUUID="00016a50" PTTYPE="dos" LABEL=""
|
||||
TYPE="vfat" PART_ENTRY_SCHEME="gpt" PART_ENTRY_NAME="EFI System Partition"
|
||||
"""
|
||||
result = list(utils.parse_device_tags(tags))
|
||||
self.assertEqual([
|
||||
{'PTUUID': '00016a50', 'PTTYPE': 'dos', 'LABEL': ''},
|
||||
{'TYPE': 'vfat', 'PART_ENTRY_SCHEME': 'gpt',
|
||||
'PART_ENTRY_NAME': 'EFI System Partition'}
|
||||
], result)
|
||||
|
|
|
@ -25,6 +25,7 @@ import ipaddress
|
|||
import logging
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import shutil
|
||||
import tempfile
|
||||
from urllib import parse as urlparse
|
||||
|
@ -638,3 +639,16 @@ def mounted(source, dest=None, opts=None, fs_type=None):
|
|||
LOG.warning(
|
||||
'Unable to remove temporary location %(dest)s: %(err)s',
|
||||
{'dest': dest, 'err': exc})
|
||||
|
||||
|
||||
def parse_device_tags(output):
|
||||
"""Parse tags from the lsblk/blkid output.
|
||||
|
||||
Parses format KEY="VALUE" KEY2="VALUE2".
|
||||
|
||||
:return: a generator yielding dicts with information from each line.
|
||||
"""
|
||||
for line in output.strip().split('\n'):
|
||||
if line.strip():
|
||||
yield {key: value for key, value in (v.split('=', 1)
|
||||
for v in shlex.split(line))}
|
||||
|
|
Loading…
Reference in New Issue