From 1ee42cc3ff585abd370c2e7974951c93f2816394 Mon Sep 17 00:00:00 2001 From: Zhenguo Niu Date: Mon, 25 Jul 2016 19:07:27 +0800 Subject: [PATCH] Parallel erase disk devices Currently we erase the disks one by one, which takes a long time to finish, this patch adds support to the IPA so that it can erase disks in parallel if told so. Story: 1546949 Task: 11548 Co-Authored-By: yuan liang Co-Authored-By: Kaifeng Wang Change-Id: If5cfb6ec000a654d07103c4b378d4c135249e238 --- ironic_python_agent/hardware.py | 25 ++++++-- .../tests/unit/test_hardware.py | 64 ++++++++++++++++++- ...l-erase-disk-devices-09ea33d5443aead0.yaml | 11 ++++ 3 files changed, 95 insertions(+), 5 deletions(-) create mode 100644 releasenotes/notes/parallel-erase-disk-devices-09ea33d5443aead0.yaml diff --git a/ironic_python_agent/hardware.py b/ironic_python_agent/hardware.py index 45414c6a1..f758c4c1a 100644 --- a/ironic_python_agent/hardware.py +++ b/ironic_python_agent/hardware.py @@ -16,6 +16,7 @@ import abc import binascii import functools import json +from multiprocessing.pool import ThreadPool import os import shlex import time @@ -355,7 +356,7 @@ class HardwareManager(object): """Attempt to erase a block device. Implementations should detect the type of device and erase it in the - most appropriate way possible. Generic implementations should support + most appropriate way possible. Generic implementations should support common erase mechanisms such as ATA secure erase, or multi-pass random writes. Operators with more specific needs should override this method in order to detect and handle "interesting" cases, or delegate to the @@ -367,6 +368,9 @@ class HardwareManager(object): parent class. Upstream submissions of common functionality are encouraged. + This interface could be called concurrently to speed up erasure, as + such, it should be implemented in a thread-safe way. + :param node: Ironic node object :param block_device: a BlockDevice indicating a device to be erased. :raises IncompatibleHardwareMethodError: when there is no known way to @@ -390,10 +394,23 @@ class HardwareManager(object): """ erase_results = {} block_devices = self.list_block_devices() + if not len(block_devices): + return {} + + info = node.get('driver_internal_info', {}) + max_pool_size = info.get('disk_erasure_concurrency', 1) + + thread_pool = ThreadPool(min(max_pool_size, len(block_devices))) for block_device in block_devices: - result = dispatch_to_managers( - 'erase_block_device', node=node, block_device=block_device) - erase_results[block_device.name] = result + params = {'node': node, 'block_device': block_device} + erase_results[block_device.name] = thread_pool.apply_async( + dispatch_to_managers, ('erase_block_device',), params) + thread_pool.close() + thread_pool.join() + + for device_name, result in erase_results.items(): + erase_results[device_name] = result.get() + return erase_results def wait_for_disks(self): diff --git a/ironic_python_agent/tests/unit/test_hardware.py b/ironic_python_agent/tests/unit/test_hardware.py index 3e914783f..316493d5e 100644 --- a/ironic_python_agent/tests/unit/test_hardware.py +++ b/ironic_python_agent/tests/unit/test_hardware.py @@ -13,6 +13,7 @@ # limitations under the License. import binascii +import multiprocessing import os import time @@ -23,6 +24,7 @@ from oslo_concurrency import processutils from oslo_config import cfg from oslo_utils import units import pyudev +import six from stevedore import extension from ironic_python_agent import errors @@ -1275,7 +1277,7 @@ class TestGenericHardwareManager(base.IronicAgentTest): mocked_listdir.assert_has_calls(expected_calls) @mock.patch.object(hardware, 'dispatch_to_managers', autospec=True) - def test_erase_devices(self, mocked_dispatch): + def test_erase_devices_no_parallel_by_default(self, mocked_dispatch): mocked_dispatch.return_value = 'erased device' self.hardware.list_block_devices = mock.Mock() @@ -1290,6 +1292,66 @@ class TestGenericHardwareManager(base.IronicAgentTest): self.assertEqual(expected, result) + @mock.patch('multiprocessing.pool.ThreadPool.apply_async', autospec=True) + @mock.patch.object(hardware, 'dispatch_to_managers', autospec=True) + def test_erase_devices_concurrency(self, mocked_dispatch, mocked_async): + internal_info = self.node['driver_internal_info'] + internal_info['disk_erasure_concurrency'] = 10 + mocked_dispatch.return_value = 'erased device' + + if six.PY3: + apply_result = multiprocessing.pool.ApplyResult({}, None, None) + else: + apply_result = multiprocessing.pool.ApplyResult({}, None) + apply_result._success = True + apply_result._ready = True + apply_result.get = lambda: 'erased device' + mocked_async.return_value = apply_result + + self.hardware.list_block_devices = mock.Mock() + self.hardware.list_block_devices.return_value = [ + hardware.BlockDevice('/dev/sdj', 'big', 1073741824, True), + hardware.BlockDevice('/dev/hdaa', 'small', 65535, False), + ] + + expected = {'/dev/hdaa': 'erased device', '/dev/sdj': 'erased device'} + + result = self.hardware.erase_devices(self.node, []) + + self.assertTrue(mocked_async.called) + self.assertEqual(expected, result) + + @mock.patch.object(hardware, 'ThreadPool', autospec=True) + def test_erase_devices_concurrency_pool_size(self, mocked_pool): + self.hardware.list_block_devices = mock.Mock() + self.hardware.list_block_devices.return_value = [ + hardware.BlockDevice('/dev/sdj', 'big', 1073741824, True), + hardware.BlockDevice('/dev/hdaa', 'small', 65535, False), + ] + + # Test pool size 10 with 2 disks + internal_info = self.node['driver_internal_info'] + internal_info['disk_erasure_concurrency'] = 10 + + self.hardware.erase_devices(self.node, []) + mocked_pool.assert_called_with(2) + + # Test default pool size with 2 disks + internal_info = self.node['driver_internal_info'] + del internal_info['disk_erasure_concurrency'] + + self.hardware.erase_devices(self.node, []) + mocked_pool.assert_called_with(1) + + @mock.patch.object(hardware, 'dispatch_to_managers', autospec=True) + def test_erase_devices_without_disk(self, mocked_dispatch): + self.hardware.list_block_devices = mock.Mock() + self.hardware.list_block_devices.return_value = [] + + expected = {} + result = self.hardware.erase_devices({}, []) + self.assertEqual(expected, result) + @mock.patch.object(utils, 'execute', autospec=True) def test_erase_block_device_ata_success(self, mocked_execute): mocked_execute.side_effect = [ diff --git a/releasenotes/notes/parallel-erase-disk-devices-09ea33d5443aead0.yaml b/releasenotes/notes/parallel-erase-disk-devices-09ea33d5443aead0.yaml new file mode 100644 index 000000000..0f04ecb04 --- /dev/null +++ b/releasenotes/notes/parallel-erase-disk-devices-09ea33d5443aead0.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Support parallel disk device erasure. This is controlled by the + ``driver_internal_info['agent_enable_parallel_erasure']`` passed + by ironic. +other: + - | + The ``HardwareManager.erase_block_device`` interface could be called + concurrently to support the feature of parallel disk device erasure, + it should be implemented in a thread-safe way. \ No newline at end of file