Flexible IPMI credential persistence method configuration

Instead of only file-based persistence which is reported to
potentially expose IPMI credential by persisting the temp file(s) for a
little too long.

User can now pass ``True`` to the ``ipmi_store_cred_in_env`` parameter
which instead stores IPMI password as an environment variable,
limiting exposure to the user session of Ironic.

Defaults to ``False``.

Closes-Bug: #2058749

Change-Id: Icd91e969e5c58bf42fc50958c3cd1acabd36ccdf
This commit is contained in:
cid 2024-04-26 11:46:14 +01:00
parent c1f3daf7b0
commit 03b4a75e7b
5 changed files with 922 additions and 46 deletions

View File

@ -82,6 +82,34 @@ with an IPMItool-based driver. For example::
--driver-info ipmi_username=<username> \
--driver-info ipmi_password=<password>
Changing The Default IPMI Credential Persistence Method
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- ``ipmi_store_cred_in_env``: IPMI password persistence method;
defaults to *False*.
The `ipmi_store_cred_in_env` configuration option allow users to switch
between file-based and environment variable persistence methods for
IPMI password.
For example, to switch to environment variable:
baremetal node create --driver ipmi \
--driver-info ipmi_address=<address> \
--driver-info ipmi_username=<username> \
--driver-info ipmi_password=<password> \
--driver-info ipmi_store_cred_in_env=True
And to swtich back to file-based temporary persistence:
baremetal node create --driver ipmi \
--driver-info ipmi_address=<address> \
--driver-info ipmi_username=<username> \
--driver-info ipmi_password=<password> \
--driver-info ipmi_store_cred_in_env=False
Advanced configuration
======================

View File

@ -71,9 +71,13 @@ REQUIRED_PROPERTIES = {
}
OPTIONAL_PROPERTIES = {
'ipmi_password': _("password. Optional."),
'ipmi_hex_kg_key': _('Kg key for IPMIv2 authentication. '
'The key is expected in hexadecimal format. '
'Optional.'),
'ipmi_store_cred_in_env': _("Determines how IPMI credentials are stored "
"during execution; either as a temporary file "
"(the default) or as an environment variable. "
"Default is False. Optional."),
'ipmi_hex_kg_key': _("Kg key for IPMIv2 authentication. "
"The key is expected in hexadecimal format. "
"Optional."),
'ipmi_port': _("remote IPMI RMCP port. Optional."),
'ipmi_priv_level': _("privilege level; default is ADMINISTRATOR. One of "
"%s. Optional.") % ', '.join(VALID_PRIV_LEVELS),
@ -342,6 +346,7 @@ def _parse_driver_info(node):
address = info.get('ipmi_address')
username = info.get('ipmi_username')
password = str(info.get('ipmi_password', ''))
store_cred_in_env = info.get('ipmi_store_cred_in_env', False)
hex_kg_key = info.get('ipmi_hex_kg_key')
dest_port = info.get('ipmi_port')
port = (info.get('ipmi_terminal_port')
@ -443,6 +448,7 @@ def _parse_driver_info(node):
'dest_port': dest_port,
'username': username,
'password': password,
'store_cred_in_env': store_cred_in_env,
'hex_kg_key': hex_kg_key,
'port': port,
'uuid': node.uuid,
@ -495,6 +501,25 @@ def _get_ipmitool_args(driver_info, pw_file=None):
return args
def _persist_ipmi_password(driver_info):
"""Persists IPMI password for passing to the ipmitool
:param driver_info: driver info with the ipmitool parameters
"""
store_cred_in_env = driver_info['store_cred_in_env']
if store_cred_in_env:
password = driver_info.get('password')
if password:
os.environ['PASSWORD'] = password
return '-E', None
path = _console_pwfile_path(driver_info['uuid'])
pw_file = console_utils.make_persistent_password_file(
path, driver_info['password'] or '\0')
return '-f', pw_file
def _ipmitool_timing_args():
if not _is_option_supported('timing'):
return []
@ -644,43 +669,52 @@ def _exec_ipmitool(driver_info, command, check_exit_code=None,
# 'ipmitool' command will prompt password if there is no '-f'
# option, we set it to '\0' to write a password file to support
# empty password
with _make_password_file(driver_info['password'] or '\0') as pw_file:
cmd_args.append('-f')
cmd_args.append(pw_file)
flag, _ = _persist_ipmi_password(driver_info)
if flag == '-E':
cmd_args.append('-E')
cmd_args.append('PASSWORD')
cmd_args.extend(command.split(" "))
try:
out, err = utils.execute(*cmd_args, **extra_args)
return out, err
except processutils.ProcessExecutionError as e:
if change_cs and check_cipher_suite_errors(e.stderr):
actual_cs = update_cipher_suite_cmd(actual_cs, args)
else:
with _make_password_file(driver_info['password']
or '\0') as pw_file:
cmd_args.append('-f')
cmd_args.append(pw_file)
cmd_args.extend(command.split(" "))
try:
out, err = utils.execute(*cmd_args, **extra_args)
return out, err
except processutils.ProcessExecutionError as e:
if change_cs and check_cipher_suite_errors(e.stderr):
actual_cs = update_cipher_suite_cmd(actual_cs, args)
else:
change_cs = False
with excutils.save_and_reraise_exception() as ctxt:
err_list = [
x for x in (
IPMITOOL_RETRYABLE_FAILURES
+ CONF.ipmi.additional_retryable_ipmi_errors)
if x in str(e)]
# If Ironic is doing retries then retry all errors
retry_failures = (err_list
or not CONF.ipmi.use_ipmitool_retries)
if ((time.time() > end_time)
or (num_tries == 0)
or not retry_failures):
LOG.error('IPMI Error while attempting "%(cmd)s" '
'for node %(node)s. Error: %(error)s',
{'node': driver_info['uuid'],
'cmd': e.cmd, 'error': e})
else:
change_cs = False
with excutils.save_and_reraise_exception() as ctxt:
err_list = [
x for x in (
IPMITOOL_RETRYABLE_FAILURES
+ CONF.ipmi.additional_retryable_ipmi_errors)
if x in str(e)]
# If Ironic is doing retries then retry all errors
retry_failures = (err_list
or not CONF.ipmi.use_ipmitool_retries)
if ((time.time() > end_time)
or (num_tries == 0)
or not retry_failures):
LOG.error('IPMI Error while attempting "%(cmd)s" '
'for node %(node)s. Error: %(error)s',
{'node': driver_info['uuid'],
'cmd': e.cmd, 'error': e})
else:
ctxt.reraise = False
LOG.warning('IPMI Error encountered, retrying '
'"%(cmd)s" for node %(node)s. '
'Error: %(error)s',
{'node': driver_info['uuid'],
'cmd': e.cmd, 'error': e})
finally:
LAST_CMD_TIME[driver_info['address']] = time.time()
ctxt.reraise = False
LOG.warning('IPMI Error encountered, retrying '
'"%(cmd)s" for node %(node)s. '
'Error: %(error)s',
{'node': driver_info['uuid'],
'cmd': e.cmd, 'error': e})
finally:
LAST_CMD_TIME[driver_info['address']] = time.time()
def _set_and_wait(task, power_action, driver_info, timeout=None):
@ -1543,17 +1577,16 @@ class IPMIConsole(base.ConsoleInterface):
created
:raises: ConsoleSubprocessFailed when invoking the subprocess failed
"""
path = _console_pwfile_path(driver_info['uuid'])
pw_file = console_utils.make_persistent_password_file(
path, driver_info['password'] or '\0')
ipmi_cmd = self._get_ipmi_cmd(driver_info, pw_file)
_, path = _persist_ipmi_password(driver_info)
ipmi_cmd = self._get_ipmi_cmd(driver_info, pw_file=path)
ipmi_cmd += ' sol activate'
try:
start_method(driver_info['uuid'], driver_info['port'], ipmi_cmd)
except (exception.ConsoleError, exception.ConsoleSubprocessFailed):
with excutils.save_and_reraise_exception():
ironic_utils.unlink_without_raise(path)
if path:
ironic_utils.unlink_without_raise(path)
class IPMIShellinaboxConsole(IPMIConsole):

View File

@ -6307,7 +6307,7 @@ class ManagerTestProperties(mgr_utils.ServiceSetUpMixin, db_base.DbTestCase):
'ipmi_target_address', 'ipmi_local_address',
'ipmi_protocol_version', 'ipmi_force_boot_device',
'ipmi_disable_boot_timeout', 'ipmi_hex_kg_key',
'ipmi_cipher_suite']
'ipmi_cipher_suite', 'ipmi_store_cred_in_env']
self._check_driver_properties("ipmi", expected)
def test_driver_properties_snmp(self):

View File

@ -475,7 +475,7 @@ class IPMIToolPrivateMethodTestCaseMeta(type):
class Base(db_base.DbTestCase):
def setUp(self):
def setUp(self, store_cred_in_env=False):
super(Base, self).setUp()
self.config(enabled_power_interfaces=['fake', 'ipmitool'],
enabled_management_interfaces=['fake', 'ipmitool'],
@ -485,13 +485,18 @@ class Base(db_base.DbTestCase):
'ipmitool-shellinabox',
'no-console'])
self.config(debug=True, group="ipmi")
driver_info = dict(INFO_DICT)
if store_cred_in_env:
driver_info['ipmi_store_cred_in_env'] = True
self.node = obj_utils.create_test_node(
self.context,
console_interface='ipmitool-socat',
management_interface='ipmitool',
power_interface='ipmitool',
vendor_interface='ipmitool',
driver_info=INFO_DICT)
driver_info=driver_info)
self.info = ipmi._parse_driver_info(self.node)
self.console = ipmi.IPMISocatConsole()
self.management = ipmi.IPMIManagement()
@ -1922,6 +1927,810 @@ class IPMIToolPrivateMethodTestCase(
self.assertEqual(['-R', '7', '-N', '1'], ipmi._ipmitool_timing_args())
class IPMIToolPrivateMethodOnEnvironmentPersistenceTestCase(
Base,
metaclass=IPMIToolPrivateMethodTestCaseMeta):
def setUp(self):
super(IPMIToolPrivateMethodOnEnvironmentPersistenceTestCase,
self).setUp(store_cred_in_env=True)
self._mock_system_random_distribution()
mock_sleep_fixture = self.useFixture(
fixtures.MockPatchObject(time, 'sleep', autospec=True))
self.mock_sleep = mock_sleep_fixture.mock
def _mock_system_random_distribution(self):
m = mock.patch.object(random.SystemRandom, 'gauss', return_value=0.5,
autospec=True)
m.start()
self.addCleanup(m.stop)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_first_call_to_address(self, mock_exec,
mock_support):
ipmi.LAST_CMD_TIME = {}
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
self.assertFalse(self.mock_sleep.called)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_second_call_to_address_sleep(
self, mock_exec, mock_support):
ipmi.LAST_CMD_TIME = {}
args = [[
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
], [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'D', 'E', 'F',
]]
expected = [mock.call('timing'),
mock.call('timing')]
mock_support.return_value = False
mock_exec.side_effect = [(None, None), (None, None)]
ipmi._exec_ipmitool(self.info, 'A B C')
mock_exec.assert_called_with(*args[0])
ipmi._exec_ipmitool(self.info, 'D E F')
self.assertTrue(self.mock_sleep.called)
self.assertEqual(expected, mock_support.call_args_list)
mock_exec.assert_called_with(*args[1])
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_second_call_to_address_no_sleep(
self, mock_exec, mock_support):
ipmi.LAST_CMD_TIME = {}
args = [[
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
], [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'D', 'E', 'F',
]]
expected = [mock.call('timing'),
mock.call('timing')]
mock_support.return_value = False
mock_exec.side_effect = [(None, None), (None, None)]
ipmi._exec_ipmitool(self.info, 'A B C')
mock_exec.assert_called_with(*args[0])
# act like enough time has passed
ipmi.LAST_CMD_TIME[self.info['address']] = (
time.time() - CONF.ipmi.min_command_interval)
ipmi._exec_ipmitool(self.info, 'D E F')
self.assertFalse(self.mock_sleep.called)
self.assertEqual(expected, mock_support.call_args_list)
mock_exec.assert_called_with(*args[1])
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_two_calls_to_diff_address(
self, mock_exec, mock_support):
ipmi.LAST_CMD_TIME = {}
args = [[
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
], [
'ipmitool',
'-I', 'lanplus',
'-H', '127.127.127.127',
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'D', 'E', 'F',
]]
expected = [mock.call('timing'),
mock.call('timing')]
mock_support.return_value = False
mock_exec.side_effect = [(None, None), (None, None)]
ipmi._exec_ipmitool(self.info, 'A B C')
mock_exec.assert_called_with(*args[0])
self.info['address'] = '127.127.127.127'
ipmi._exec_ipmitool(self.info, 'D E F')
self.assertFalse(self.mock_sleep.called)
self.assertEqual(expected, mock_support.call_args_list)
mock_exec.assert_called_with(*args[1])
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_without_timing(
self, mock_exec, mock_support):
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_timing(
self, mock_exec, mock_support):
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-R', '7',
'-N', '5',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = True
mock_exec.return_value = (None, None)
self.config(use_ipmitool_retries=True, group='ipmi')
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_ironic_retries(
self, mock_exec, mock_support):
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-R', '1',
'-N', '5',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = True
mock_exec.return_value = (None, None)
self.config(use_ipmitool_retries=False, group='ipmi')
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_timeout(
self, mock_exec, mock_support):
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-R', '7',
'-N', '5',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = True
mock_exec.return_value = (None, None)
self.config(use_ipmitool_retries=True, group='ipmi')
ipmi._exec_ipmitool(self.info, 'A B C', kill_on_timeout=True)
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args, timeout=60)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_ironic_retries_multiple(
self, mock_exec, mock_support):
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stderr="Unknown"
),
processutils.ProcessExecutionError(
stderr="Unknown"
),
processutils.ProcessExecutionError(
stderr="Unknown"
),
]
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=3, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_support.assert_called_once_with('timing')
self.assertEqual(3, mock_exec.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_without_username(
self, mock_exec, mock_support):
# An undefined username is treated the same as an empty username and
# will cause no user (-U) to be specified.
self.info['username'] = None
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_empty_username(
self, mock_exec, mock_support):
# An empty username is treated the same as an undefined username and
# will cause no user (-U) to be specified.
self.info['username'] = ""
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_without_password(self, mock_exec,
mock_support):
# An undefined password is treated the same as an empty password and
# will cause a NULL (\0) password to be used"""
self.info['password'] = None
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_empty_password(self, mock_exec,
mock_support):
# An empty password is treated the same as an undefined password and
# will cause a NULL (\0) password to be used"""
self.info['password'] = ""
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_dual_bridging(self,
mock_exec,
mock_support):
driver_info = dict(BRIDGE_INFO_DICT)
driver_info['ipmi_store_cred_in_env'] = True
node = obj_utils.get_test_node(self.context,
driver_info=driver_info)
# when support for dual bridge command is called returns True
mock_support.return_value = True
info = ipmi._parse_driver_info(node)
args = [
'ipmitool',
'-I', 'lanplus',
'-H', info['address'],
'-L', info['priv_level'],
'-U', info['username'],
'-m', info['local_address'],
'-B', info['transit_channel'],
'-T', info['transit_address'],
'-b', info['target_channel'],
'-t', info['target_address'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
expected = [mock.call('dual_bridge'),
mock.call('timing')]
# When support for timing command is called returns False
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(info, 'A B C')
self.assertEqual(expected, mock_support.call_args_list)
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_single_bridging(self,
mock_exec,
mock_support):
single_bridge_info = dict(BRIDGE_INFO_DICT)
single_bridge_info['ipmi_bridging'] = 'single'
single_bridge_info['ipmi_store_cred_in_env'] = True
node = obj_utils.get_test_node(self.context,
driver_info=single_bridge_info)
# when support for single bridge command is called returns True
mock_support.return_value = True
info = ipmi._parse_driver_info(node)
info['transit_channel'] = info['transit_address'] = None
args = [
'ipmitool',
'-I', 'lanplus',
'-H', info['address'],
'-L', info['priv_level'],
'-U', info['username'],
'-m', info['local_address'],
'-b', info['target_channel'],
'-t', info['target_address'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
expected = [mock.call('single_bridge'),
mock.call('timing')]
# When support for timing command is called returns False
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(info, 'A B C')
self.assertEqual(expected, mock_support.call_args_list)
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_exception(self, mock_exec, mock_support):
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
self.config(use_ipmitool_retries=True, group='ipmi')
mock_support.return_value = False
mock_exec.side_effect = processutils.ProcessExecutionError("x")
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
self.assertEqual(1, mock_exec.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_IPMI_version_1_5(
self, mock_exec, mock_support):
self.info['protocol_version'] = '1.5'
# Assert it uses "-I lan" (1.5) instead of "-I lanplus" (2.0)
args = [
'ipmitool',
'-I', 'lan',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_port(self, mock_exec, mock_support):
self.info['dest_port'] = '1623'
ipmi.LAST_CMD_TIME = {}
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-p', '1623',
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
self.assertFalse(self.mock_sleep.called)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_cipher_suite(self, mock_exec, mock_support):
self.info['cipher_suite'] = '3'
ipmi.LAST_CMD_TIME = {}
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-C', '3',
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args)
self.assertFalse(self.mock_sleep.called)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_cipher_suite_error_noconfig(
self, mock_exec, mock_check_cs, mock_support):
no_matching_error = 'Error in open session response message : ' \
'no matching cipher suite\n\nError: ' \
'Unable to establish IPMI v2 / RMCP+ session\n'
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=2, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
self.config(cipher_suite_versions=[], group='ipmi')
ipmi.LAST_CMD_TIME = {}
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
]
mock_check_cs.return_value = False
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_support.assert_called_once_with('timing')
calls = [mock.call(*args), mock.call(*args)]
mock_exec.assert_has_calls(calls)
self.assertEqual(2, mock_exec.call_count)
self.assertEqual(0, mock_check_cs.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_cipher_suite_set_with_error_noconfig(
self, mock_exec, mock_check_cs, mock_support):
no_matching_error = 'Error in open session response message : ' \
'no matching cipher suite\n\nError: ' \
'Unable to establish IPMI v2 / RMCP+ session\n'
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=2, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
self.config(cipher_suite_versions=[], group='ipmi')
ipmi.LAST_CMD_TIME = {}
self.info['cipher_suite'] = '17'
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-C', '17',
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
]
mock_check_cs.return_value = False
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_support.assert_called_once_with('timing')
calls = [mock.call(*args), mock.call(*args)]
mock_exec.assert_has_calls(calls)
self.assertEqual(2, mock_exec.call_count)
self.assertEqual(0, mock_check_cs.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_cipher_suite_set_with_error_config(
self, mock_exec, mock_check_cs, mock_support):
no_matching_error = 'Error in open session response message : ' \
'no matching cipher suite\n\nError: ' \
'Unable to establish IPMI v2 / RMCP+ session\n'
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=2, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
self.config(cipher_suite_versions=[0, 1, 2, 3], group='ipmi')
ipmi.LAST_CMD_TIME = {}
self.info['cipher_suite'] = '17'
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-C', '17',
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
]
mock_check_cs.return_value = False
self.assertRaises(processutils.ProcessExecutionError,
ipmi._exec_ipmitool,
self.info, 'A B C')
mock_support.assert_called_once_with('timing')
calls = [mock.call(*args), mock.call(*args)]
mock_exec.assert_has_calls(calls)
self.assertEqual(2, mock_exec.call_count)
self.assertEqual(0, mock_check_cs.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(ipmi, 'check_cipher_suite_errors', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_try_different_cipher_suite(
self, mock_exec, mock_check_cs, mock_support):
self.config(min_command_interval=1, group='ipmi')
self.config(command_retry_timeout=4, group='ipmi')
self.config(use_ipmitool_retries=False, group='ipmi')
cs_list = ['0', '1', '2', '3', '17']
self.config(cipher_suite_versions=cs_list, group='ipmi')
no_matching_error = 'Error in open session response message : ' \
'no matching cipher suite\n\nError: ' \
'Unable to establish IPMI v2 / RMCP+ session\n'
unsupported_error = 'Unsupported cipher suite ID : 17\n\n' \
'Error: Unable to establish IPMI v2 / RMCP+ session\n'
ipmi.LAST_CMD_TIME = {}
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
args_cs17 = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-C', '17',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
args_cs3 = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-C', '3',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.side_effect = [
processutils.ProcessExecutionError(
stdout='',
stderr=no_matching_error),
processutils.ProcessExecutionError(
stdout='',
stderr=unsupported_error),
processutils.ProcessExecutionError(stderr="Unknown"),
('', ''),
]
mock_check_cs.side_effect = [True, True, False]
ipmi._exec_ipmitool(self.info, 'A B C')
mock_support.assert_called_once_with('timing')
execute_calls = [mock.call(*args), mock.call(*args_cs17),
mock.call(*args_cs3), mock.call(*args_cs3)]
mock_exec.assert_has_calls(execute_calls)
self.assertEqual(4, mock_exec.call_count)
check_calls = [mock.call(no_matching_error),
mock.call(unsupported_error), mock.call('Unknown')]
mock_check_cs.assert_has_calls(check_calls)
self.assertEqual(3, mock_check_cs.call_count)
@mock.patch.object(ipmi, '_is_option_supported', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__exec_ipmitool_with_check_exit_code(self, mock_exec,
mock_support):
args = [
'ipmitool',
'-I', 'lanplus',
'-H', self.info['address'],
'-L', self.info['priv_level'],
'-U', self.info['username'],
'-v',
'-E', 'PASSWORD',
'A', 'B', 'C',
]
mock_support.return_value = False
mock_exec.return_value = (None, None)
ipmi._exec_ipmitool(self.info, 'A B C', check_exit_code=[0, 1])
mock_support.assert_called_once_with('timing')
mock_exec.assert_called_once_with(*args, check_exit_code=[0, 1])
class IPMIToolDriverTestCase(Base):
@mock.patch.object(ipmi, "_parse_driver_info", autospec=True)

View File

@ -0,0 +1,6 @@
---
features:
- |
Adds a new configuration option ``ipmi_store_cred_in_env`` to allow
switching between file-based and environment variable persistence for
IPMI credentials. Defaults to ``False``.