From 483bfbcdb6bbed8558fc5fc4a836553c2eceaf11 Mon Sep 17 00:00:00 2001 From: cid Date: Fri, 26 Apr 2024 11:46:14 +0100 Subject: [PATCH] Flexible IPMI credential persistence method configuration Instead of only file-based persistence which is reported to potentially expose IPMI credential by persisting the temp file(s) for a little too long. User can now pass ``True`` to the ``ipmi_store_cred_in_env`` parameter which instead stores IPMI password as an environment variable, limiting exposure to the user session of Ironic. Defaults to ``False``. Closes-Bug: #2058749 Change-Id: Icd91e969e5c58bf42fc50958c3cd1acabd36ccdf --- doc/source/admin/drivers/ipmitool.rst | 28 + ironic/drivers/modules/ipmitool.py | 119 ++- ironic/tests/unit/conductor/test_manager.py | 2 +- .../unit/drivers/modules/test_ipmitool.py | 806 ++++++++++++++++++ ...method_configuration-e5ed052576576d71.yaml | 6 + 5 files changed, 917 insertions(+), 44 deletions(-) create mode 100644 releasenotes/notes/flexible_ipmi_credential_persistence_method_configuration-e5ed052576576d71.yaml diff --git a/doc/source/admin/drivers/ipmitool.rst b/doc/source/admin/drivers/ipmitool.rst index 6f9f0440f5..c754653474 100644 --- a/doc/source/admin/drivers/ipmitool.rst +++ b/doc/source/admin/drivers/ipmitool.rst @@ -82,6 +82,34 @@ with an IPMItool-based driver. For example:: --driver-info ipmi_username= \ --driver-info ipmi_password= + +Changing The Default IPMI Credential Persistence Method +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +- ``ipmi_store_cred_in_env``: IPMI password persistence method; + defaults to *False*. + +The `ipmi_store_cred_in_env` configuration option allow users to switch +between file-based and environment variable persistence methods for +IPMI password. + +For example, to switch to environment variable: + + baremetal node create --driver ipmi \ + --driver-info ipmi_address=
\ + --driver-info ipmi_username= \ + --driver-info ipmi_password= \ + --driver-info ipmi_store_cred_in_env=True + +And to swtich back to file-based temporary persistence: + + baremetal node create --driver ipmi \ + --driver-info ipmi_address=
\ + --driver-info ipmi_username= \ + --driver-info ipmi_password= \ + --driver-info ipmi_store_cred_in_env=False + + Advanced configuration ====================== diff --git a/ironic/drivers/modules/ipmitool.py b/ironic/drivers/modules/ipmitool.py index f2553b88a6..9c9b119a0e 100644 --- a/ironic/drivers/modules/ipmitool.py +++ b/ironic/drivers/modules/ipmitool.py @@ -71,9 +71,13 @@ REQUIRED_PROPERTIES = { } OPTIONAL_PROPERTIES = { 'ipmi_password': _("password. Optional."), - 'ipmi_hex_kg_key': _('Kg key for IPMIv2 authentication. ' - 'The key is expected in hexadecimal format. ' - 'Optional.'), + 'ipmi_store_cred_in_env': _("Determines how IPMI credentials are stored " + "during execution; either as a temporary file " + "(the default) or as an environment variable. " + "Default is False. Optional."), + 'ipmi_hex_kg_key': _("Kg key for IPMIv2 authentication. " + "The key is expected in hexadecimal format. " + "Optional."), 'ipmi_port': _("remote IPMI RMCP port. Optional."), 'ipmi_priv_level': _("privilege level; default is ADMINISTRATOR. One of " "%s. Optional.") % ', '.join(VALID_PRIV_LEVELS), @@ -342,6 +346,7 @@ def _parse_driver_info(node): address = info.get('ipmi_address') username = info.get('ipmi_username') password = str(info.get('ipmi_password', '')) + store_cred_in_env = info.get('ipmi_store_cred_in_env', False) hex_kg_key = info.get('ipmi_hex_kg_key') dest_port = info.get('ipmi_port') port = (info.get('ipmi_terminal_port') @@ -443,6 +448,7 @@ def _parse_driver_info(node): 'dest_port': dest_port, 'username': username, 'password': password, + 'store_cred_in_env': store_cred_in_env, 'hex_kg_key': hex_kg_key, 'port': port, 'uuid': node.uuid, @@ -495,6 +501,25 @@ def _get_ipmitool_args(driver_info, pw_file=None): return args +def _persist_ipmi_password(driver_info): + """Persists IPMI password for passing to the ipmitool + + :param driver_info: driver info with the ipmitool parameters + """ + store_cred_in_env = driver_info['store_cred_in_env'] + if store_cred_in_env: + password = driver_info.get('password') + if password: + os.environ['PASSWORD'] = password + return '-E', None + + path = _console_pwfile_path(driver_info['uuid']) + pw_file = console_utils.make_persistent_password_file( + path, driver_info['password'] or '\0') + + return '-f', pw_file + + def _ipmitool_timing_args(): if not _is_option_supported('timing'): return [] @@ -644,43 +669,52 @@ def _exec_ipmitool(driver_info, command, check_exit_code=None, # 'ipmitool' command will prompt password if there is no '-f' # option, we set it to '\0' to write a password file to support # empty password - with _make_password_file(driver_info['password'] or '\0') as pw_file: - cmd_args.append('-f') - cmd_args.append(pw_file) + + flag, _ = _persist_ipmi_password(driver_info) + if flag == '-E': + cmd_args.append('-E') + cmd_args.append('PASSWORD') cmd_args.extend(command.split(" ")) - try: - out, err = utils.execute(*cmd_args, **extra_args) - return out, err - except processutils.ProcessExecutionError as e: - if change_cs and check_cipher_suite_errors(e.stderr): - actual_cs = update_cipher_suite_cmd(actual_cs, args) + else: + with _make_password_file(driver_info['password'] + or '\0') as pw_file: + cmd_args.append('-f') + cmd_args.append(pw_file) + cmd_args.extend(command.split(" ")) + + try: + out, err = utils.execute(*cmd_args, **extra_args) + return out, err + except processutils.ProcessExecutionError as e: + if change_cs and check_cipher_suite_errors(e.stderr): + actual_cs = update_cipher_suite_cmd(actual_cs, args) + else: + change_cs = False + with excutils.save_and_reraise_exception() as ctxt: + err_list = [ + x for x in ( + IPMITOOL_RETRYABLE_FAILURES + + CONF.ipmi.additional_retryable_ipmi_errors) + if x in str(e)] + # If Ironic is doing retries then retry all errors + retry_failures = (err_list + or not CONF.ipmi.use_ipmitool_retries) + if ((time.time() > end_time) + or (num_tries == 0) + or not retry_failures): + LOG.error('IPMI Error while attempting "%(cmd)s" ' + 'for node %(node)s. Error: %(error)s', + {'node': driver_info['uuid'], + 'cmd': e.cmd, 'error': e}) else: - change_cs = False - with excutils.save_and_reraise_exception() as ctxt: - err_list = [ - x for x in ( - IPMITOOL_RETRYABLE_FAILURES - + CONF.ipmi.additional_retryable_ipmi_errors) - if x in str(e)] - # If Ironic is doing retries then retry all errors - retry_failures = (err_list - or not CONF.ipmi.use_ipmitool_retries) - if ((time.time() > end_time) - or (num_tries == 0) - or not retry_failures): - LOG.error('IPMI Error while attempting "%(cmd)s" ' - 'for node %(node)s. Error: %(error)s', - {'node': driver_info['uuid'], - 'cmd': e.cmd, 'error': e}) - else: - ctxt.reraise = False - LOG.warning('IPMI Error encountered, retrying ' - '"%(cmd)s" for node %(node)s. ' - 'Error: %(error)s', - {'node': driver_info['uuid'], - 'cmd': e.cmd, 'error': e}) - finally: - LAST_CMD_TIME[driver_info['address']] = time.time() + ctxt.reraise = False + LOG.warning('IPMI Error encountered, retrying ' + '"%(cmd)s" for node %(node)s. ' + 'Error: %(error)s', + {'node': driver_info['uuid'], + 'cmd': e.cmd, 'error': e}) + finally: + LAST_CMD_TIME[driver_info['address']] = time.time() def _set_and_wait(task, power_action, driver_info, timeout=None): @@ -1543,17 +1577,16 @@ class IPMIConsole(base.ConsoleInterface): created :raises: ConsoleSubprocessFailed when invoking the subprocess failed """ - path = _console_pwfile_path(driver_info['uuid']) - pw_file = console_utils.make_persistent_password_file( - path, driver_info['password'] or '\0') - ipmi_cmd = self._get_ipmi_cmd(driver_info, pw_file) + _, path = _persist_ipmi_password(driver_info) + ipmi_cmd = self._get_ipmi_cmd(driver_info, pw_file=path) ipmi_cmd += ' sol activate' try: start_method(driver_info['uuid'], driver_info['port'], ipmi_cmd) except (exception.ConsoleError, exception.ConsoleSubprocessFailed): with excutils.save_and_reraise_exception(): - ironic_utils.unlink_without_raise(path) + if path: + ironic_utils.unlink_without_raise(path) class IPMIShellinaboxConsole(IPMIConsole): diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index 2917ef8e0d..420a75af12 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -6307,7 +6307,7 @@ class ManagerTestProperties(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase): 'ipmi_target_address', 'ipmi_local_address', 'ipmi_protocol_version', 'ipmi_force_boot_device', 'ipmi_disable_boot_timeout', 'ipmi_hex_kg_key', - 'ipmi_cipher_suite'] + 'ipmi_cipher_suite', 'ipmi_store_cred_in_env'] self._check_driver_properties("ipmi", expected) def test_driver_properties_snmp(self): diff --git a/ironic/tests/unit/drivers/modules/test_ipmitool.py b/ironic/tests/unit/drivers/modules/test_ipmitool.py index aa5da4697b..b27bc82415 100644 --- a/ironic/tests/unit/drivers/modules/test_ipmitool.py +++ b/ironic/tests/unit/drivers/modules/test_ipmitool.py @@ -1922,6 +1922,812 @@ class IPMIToolPrivateMethodTestCase( self.assertEqual(['-R', '7', '-N', '1'], ipmi._ipmitool_timing_args()) +class IPMIToolPrivateMethodOnEnvironmentPersistenceTestCase( + Base, + metaclass=IPMIToolPrivateMethodTestCaseMeta): + + def setUp(self): + + INFO_DICT['ipmi_store_cred_in_env'] = True + BRIDGE_INFO_DICT['ipmi_store_cred_in_env'] = True + + super(IPMIToolPrivateMethodOnEnvironmentPersistenceTestCase, + self).setUp() + + self._mock_system_random_distribution() + + mock_sleep_fixture = self.useFixture( + fixtures.MockPatchObject(time, 'sleep', autospec=True)) + self.mock_sleep = mock_sleep_fixture.mock + + def _mock_system_random_distribution(self): + m = mock.patch.object(random.SystemRandom, 'gauss', return_value=0.5, + autospec=True) + m.start() + self.addCleanup(m.stop) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_first_call_to_address(self, mock_exec, + mock_support): + print(self.info) + ipmi.LAST_CMD_TIME = {} + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + + ipmi._exec_ipmitool(self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + self.assertFalse(self.mock_sleep.called) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_second_call_to_address_sleep( + self, mock_exec, mock_support): + ipmi.LAST_CMD_TIME = {} + args = [[ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ], [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'D', 'E', 'F', + ]] + + expected = [mock.call('timing'), + mock.call('timing')] + mock_support.return_value = False + mock_exec.side_effect = [(None, None), (None, None)] + + ipmi._exec_ipmitool(self.info, 'A B C') + mock_exec.assert_called_with(*args[0]) + + ipmi._exec_ipmitool(self.info, 'D E F') + self.assertTrue(self.mock_sleep.called) + self.assertEqual(expected, mock_support.call_args_list) + mock_exec.assert_called_with(*args[1]) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_second_call_to_address_no_sleep( + self, mock_exec, mock_support): + ipmi.LAST_CMD_TIME = {} + args = [[ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ], [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'D', 'E', 'F', + ]] + + expected = [mock.call('timing'), + mock.call('timing')] + mock_support.return_value = False + mock_exec.side_effect = [(None, None), (None, None)] + + ipmi._exec_ipmitool(self.info, 'A B C') + mock_exec.assert_called_with(*args[0]) + # act like enough time has passed + ipmi.LAST_CMD_TIME[self.info['address']] = ( + time.time() - CONF.ipmi.min_command_interval) + ipmi._exec_ipmitool(self.info, 'D E F') + self.assertFalse(self.mock_sleep.called) + self.assertEqual(expected, mock_support.call_args_list) + mock_exec.assert_called_with(*args[1]) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_two_calls_to_diff_address( + self, mock_exec, mock_support): + ipmi.LAST_CMD_TIME = {} + args = [[ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ], [ + 'ipmitool', + '-I', 'lanplus', + '-H', '127.127.127.127', + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'D', 'E', 'F', + ]] + + expected = [mock.call('timing'), + mock.call('timing')] + mock_support.return_value = False + mock_exec.side_effect = [(None, None), (None, None)] + + ipmi._exec_ipmitool(self.info, 'A B C') + mock_exec.assert_called_with(*args[0]) + self.info['address'] = '127.127.127.127' + ipmi._exec_ipmitool(self.info, 'D E F') + self.assertFalse(self.mock_sleep.called) + self.assertEqual(expected, mock_support.call_args_list) + mock_exec.assert_called_with(*args[1]) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_without_timing( + self, mock_exec, mock_support): + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + + ipmi._exec_ipmitool(self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_timing( + self, mock_exec, mock_support): + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-R', '7', + '-N', '5', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = True + mock_exec.return_value = (None, None) + + self.config(use_ipmitool_retries=True, group='ipmi') + + ipmi._exec_ipmitool(self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_ironic_retries( + self, mock_exec, mock_support): + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-R', '1', + '-N', '5', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = True + mock_exec.return_value = (None, None) + + self.config(use_ipmitool_retries=False, group='ipmi') + + ipmi._exec_ipmitool(self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_timeout( + self, mock_exec, mock_support): + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-R', '7', + '-N', '5', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = True + mock_exec.return_value = (None, None) + + self.config(use_ipmitool_retries=True, group='ipmi') + ipmi._exec_ipmitool(self.info, 'A B C', kill_on_timeout=True) + + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args, timeout=60) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_ironic_retries_multiple( + self, mock_exec, mock_support): + + mock_exec.side_effect = [ + processutils.ProcessExecutionError( + stderr="Unknown" + ), + processutils.ProcessExecutionError( + stderr="Unknown" + ), + processutils.ProcessExecutionError( + stderr="Unknown" + ), + ] + + self.config(min_command_interval=1, group='ipmi') + self.config(command_retry_timeout=3, group='ipmi') + self.config(use_ipmitool_retries=False, group='ipmi') + + self.assertRaises(processutils.ProcessExecutionError, + ipmi._exec_ipmitool, + self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + self.assertEqual(3, mock_exec.call_count) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_without_username( + self, mock_exec, mock_support): + # An undefined username is treated the same as an empty username and + # will cause no user (-U) to be specified. + self.info['username'] = None + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(self.info, 'A B C') + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_empty_username( + self, mock_exec, mock_support): + # An empty username is treated the same as an undefined username and + # will cause no user (-U) to be specified. + self.info['username'] = "" + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(self.info, 'A B C') + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_without_password(self, mock_exec, + mock_support): + # An undefined password is treated the same as an empty password and + # will cause a NULL (\0) password to be used""" + self.info['password'] = None + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(self.info, 'A B C') + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_empty_password(self, mock_exec, + mock_support): + # An empty password is treated the same as an undefined password and + # will cause a NULL (\0) password to be used""" + self.info['password'] = "" + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(self.info, 'A B C') + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_dual_bridging(self, + mock_exec, + mock_support): + + node = obj_utils.get_test_node(self.context, + driver_info=BRIDGE_INFO_DICT) + # when support for dual bridge command is called returns True + mock_support.return_value = True + info = ipmi._parse_driver_info(node) + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', info['address'], + '-L', info['priv_level'], + '-U', info['username'], + '-m', info['local_address'], + '-B', info['transit_channel'], + '-T', info['transit_address'], + '-b', info['target_channel'], + '-t', info['target_address'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + expected = [mock.call('dual_bridge'), + mock.call('timing')] + # When support for timing command is called returns False + mock_support.return_value = False + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(info, 'A B C') + self.assertEqual(expected, mock_support.call_args_list) + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_single_bridging(self, + mock_exec, + mock_support): + single_bridge_info = dict(BRIDGE_INFO_DICT) + single_bridge_info['ipmi_bridging'] = 'single' + print(BRIDGE_INFO_DICT) + node = obj_utils.get_test_node(self.context, + driver_info=single_bridge_info) + # when support for single bridge command is called returns True + mock_support.return_value = True + info = ipmi._parse_driver_info(node) + info['transit_channel'] = info['transit_address'] = None + + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', info['address'], + '-L', info['priv_level'], + '-U', info['username'], + '-m', info['local_address'], + '-b', info['target_channel'], + '-t', info['target_address'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + expected = [mock.call('single_bridge'), + mock.call('timing')] + # When support for timing command is called returns False + mock_support.return_value = False + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(info, 'A B C') + self.assertEqual(expected, mock_support.call_args_list) + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_exception(self, mock_exec, mock_support): + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + self.config(use_ipmitool_retries=True, group='ipmi') + + mock_support.return_value = False + mock_exec.side_effect = processutils.ProcessExecutionError("x") + self.assertRaises(processutils.ProcessExecutionError, + ipmi._exec_ipmitool, + self.info, 'A B C') + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + self.assertEqual(1, mock_exec.call_count) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_IPMI_version_1_5( + self, mock_exec, mock_support): + self.info['protocol_version'] = '1.5' + # Assert it uses "-I lan" (1.5) instead of "-I lanplus" (2.0) + args = [ + 'ipmitool', + '-I', 'lan', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(self.info, 'A B C') + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_port(self, mock_exec, mock_support): + self.info['dest_port'] = '1623' + ipmi.LAST_CMD_TIME = {} + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-p', '1623', + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + + ipmi._exec_ipmitool(self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + self.assertFalse(self.mock_sleep.called) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_cipher_suite(self, mock_exec, mock_support): + self.info['cipher_suite'] = '3' + ipmi.LAST_CMD_TIME = {} + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-C', '3', + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.return_value = (None, None) + + ipmi._exec_ipmitool(self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args) + self.assertFalse(self.mock_sleep.called) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_cipher_suite_error_noconfig( + self, mock_exec, mock_check_cs, mock_support): + no_matching_error = 'Error in open session response message : ' \ + 'no matching cipher suite\n\nError: ' \ + 'Unable to establish IPMI v2 / RMCP+ session\n' + self.config(min_command_interval=1, group='ipmi') + self.config(command_retry_timeout=2, group='ipmi') + self.config(use_ipmitool_retries=False, group='ipmi') + self.config(cipher_suite_versions=[], group='ipmi') + ipmi.LAST_CMD_TIME = {} + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.side_effect = [ + processutils.ProcessExecutionError( + stdout='', + stderr=no_matching_error), + processutils.ProcessExecutionError( + stdout='', + stderr=no_matching_error), + ] + mock_check_cs.return_value = False + + self.assertRaises(processutils.ProcessExecutionError, + ipmi._exec_ipmitool, + self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + + calls = [mock.call(*args), mock.call(*args)] + mock_exec.assert_has_calls(calls) + self.assertEqual(2, mock_exec.call_count) + self.assertEqual(0, mock_check_cs.call_count) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_cipher_suite_set_with_error_noconfig( + self, mock_exec, mock_check_cs, mock_support): + no_matching_error = 'Error in open session response message : ' \ + 'no matching cipher suite\n\nError: ' \ + 'Unable to establish IPMI v2 / RMCP+ session\n' + self.config(min_command_interval=1, group='ipmi') + self.config(command_retry_timeout=2, group='ipmi') + self.config(use_ipmitool_retries=False, group='ipmi') + self.config(cipher_suite_versions=[], group='ipmi') + ipmi.LAST_CMD_TIME = {} + self.info['cipher_suite'] = '17' + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-C', '17', + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.side_effect = [ + processutils.ProcessExecutionError( + stdout='', + stderr=no_matching_error), + processutils.ProcessExecutionError( + stdout='', + stderr=no_matching_error), + ] + mock_check_cs.return_value = False + + self.assertRaises(processutils.ProcessExecutionError, + ipmi._exec_ipmitool, + self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + + calls = [mock.call(*args), mock.call(*args)] + mock_exec.assert_has_calls(calls) + self.assertEqual(2, mock_exec.call_count) + self.assertEqual(0, mock_check_cs.call_count) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_cipher_suite_set_with_error_config( + self, mock_exec, mock_check_cs, mock_support): + no_matching_error = 'Error in open session response message : ' \ + 'no matching cipher suite\n\nError: ' \ + 'Unable to establish IPMI v2 / RMCP+ session\n' + self.config(min_command_interval=1, group='ipmi') + self.config(command_retry_timeout=2, group='ipmi') + self.config(use_ipmitool_retries=False, group='ipmi') + self.config(cipher_suite_versions=[0, 1, 2, 3], group='ipmi') + ipmi.LAST_CMD_TIME = {} + self.info['cipher_suite'] = '17' + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-C', '17', + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.side_effect = [ + processutils.ProcessExecutionError( + stdout='', + stderr=no_matching_error), + processutils.ProcessExecutionError( + stdout='', + stderr=no_matching_error), + ] + mock_check_cs.return_value = False + + self.assertRaises(processutils.ProcessExecutionError, + ipmi._exec_ipmitool, + self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + + calls = [mock.call(*args), mock.call(*args)] + mock_exec.assert_has_calls(calls) + self.assertEqual(2, mock_exec.call_count) + self.assertEqual(0, mock_check_cs.call_count) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_try_different_cipher_suite( + self, mock_exec, mock_check_cs, mock_support): + + self.config(min_command_interval=1, group='ipmi') + self.config(command_retry_timeout=4, group='ipmi') + self.config(use_ipmitool_retries=False, group='ipmi') + cs_list = ['0', '1', '2', '3', '17'] + self.config(cipher_suite_versions=cs_list, group='ipmi') + no_matching_error = 'Error in open session response message : ' \ + 'no matching cipher suite\n\nError: ' \ + 'Unable to establish IPMI v2 / RMCP+ session\n' + unsupported_error = 'Unsupported cipher suite ID : 17\n\n' \ + 'Error: Unable to establish IPMI v2 / RMCP+ session\n' + ipmi.LAST_CMD_TIME = {} + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + args_cs17 = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-C', '17', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + args_cs3 = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-C', '3', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + + mock_support.return_value = False + mock_exec.side_effect = [ + processutils.ProcessExecutionError( + stdout='', + stderr=no_matching_error), + processutils.ProcessExecutionError( + stdout='', + stderr=unsupported_error), + processutils.ProcessExecutionError(stderr="Unknown"), + ('', ''), + ] + mock_check_cs.side_effect = [True, True, False] + + ipmi._exec_ipmitool(self.info, 'A B C') + + mock_support.assert_called_once_with('timing') + + execute_calls = [mock.call(*args), mock.call(*args_cs17), + mock.call(*args_cs3), mock.call(*args_cs3)] + mock_exec.assert_has_calls(execute_calls) + self.assertEqual(4, mock_exec.call_count) + check_calls = [mock.call(no_matching_error), + mock.call(unsupported_error), mock.call('Unknown')] + mock_check_cs.assert_has_calls(check_calls) + self.assertEqual(3, mock_check_cs.call_count) + + @mock.patch.object(ipmi, '_is_option_supported', autospec=True) + @mock.patch.object(utils, 'execute', autospec=True) + def test__exec_ipmitool_with_check_exit_code(self, mock_exec, + mock_support): + args = [ + 'ipmitool', + '-I', 'lanplus', + '-H', self.info['address'], + '-L', self.info['priv_level'], + '-U', self.info['username'], + '-v', + '-E', 'PASSWORD', + 'A', 'B', 'C', + ] + mock_support.return_value = False + mock_exec.return_value = (None, None) + ipmi._exec_ipmitool(self.info, 'A B C', check_exit_code=[0, 1]) + mock_support.assert_called_once_with('timing') + mock_exec.assert_called_once_with(*args, check_exit_code=[0, 1]) + + class IPMIToolDriverTestCase(Base): @mock.patch.object(ipmi, "_parse_driver_info", autospec=True) diff --git a/releasenotes/notes/flexible_ipmi_credential_persistence_method_configuration-e5ed052576576d71.yaml b/releasenotes/notes/flexible_ipmi_credential_persistence_method_configuration-e5ed052576576d71.yaml new file mode 100644 index 0000000000..7c0524877a --- /dev/null +++ b/releasenotes/notes/flexible_ipmi_credential_persistence_method_configuration-e5ed052576576d71.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Adds a new configuration option ``ipmi_store_cred_in_env`` to allow + switching between file-based and environment variable persistence for + IPMI credentials. Defaults to ``False``.