Add wait step

Adds a wait step to allow for finer grained workflows
and forcing interruptions which may be needed in some
cases with specialized hardware.

Change-Id: Idc338b761ebe35a4635022a324ca5acbf29fc462
This commit is contained in:
Julia Kreger 2023-03-30 09:18:33 -07:00
parent 2d8986bda4
commit 8fc8372e74
6 changed files with 119 additions and 1 deletions

View File

@ -63,6 +63,13 @@ perform specific actions.
| | driver specific code may request an CPU interrupt based | | | driver specific code may request an CPU interrupt based |
| | reset. This step can be executed on child nodes. | | | reset. This step can be executed on child nodes. |
+-----------+----------------------------------------------------------+ +-----------+----------------------------------------------------------+
| wait | Causes a brief pause in the overall step execution which |
| | pauses until the next heartbeat operation, unless a |
| | seconds argument is provided. If a *seconds* argument is |
| | provided, then the step execution will pause for the |
| | requested amount of time. |
+-----------+----------------------------------------------------------+
In the these cases, the interface upon which the method is expected is In the these cases, the interface upon which the method is expected is
ignored, and the step is acted upon based upon just the step's name. ignored, and the step is acted upon based upon just the step's name.

View File

@ -59,9 +59,28 @@ def warn_about_sqlite():
'autocommit support.') 'autocommit support.')
def warn_about_max_wait_parameters(conf):
max_wait = conf.conductor.max_conductor_wait_step_seconds
max_deploy_timeout = conf.conductor.deploy_callback_timeout
max_clean_timeout = conf.conductor.clean_callback_timeout
error_with = None
if max_wait >= max_deploy_timeout:
error_with = 'deploy_callback_timeout'
if max_wait >= max_clean_timeout:
error_with = 'clean_callback_timeout'
if error_with:
LOG.warning('The [conductor]max_conductor_wait_step_seconds '
'configuration parameter exceeds the value of '
'[conductor]%s, which could create a condition where '
'tasks may timeout. Ironic recommends a low default '
'value for [conductor]max_conductor_wait_step_seconds ',
'please re-evaluate your configuration.', error_with)
def issue_startup_warnings(conf): def issue_startup_warnings(conf):
warn_about_unsafe_shred_parameters(conf) warn_about_unsafe_shred_parameters(conf)
warn_about_sqlite() warn_about_sqlite()
warn_about_max_wait_parameters(conf)
def main(): def main():

View File

@ -11,6 +11,7 @@
# under the License. # under the License.
import collections import collections
import time
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
@ -825,6 +826,7 @@ def use_reserved_step_handler(task, step):
:param step: The requested step. :param step: The requested step.
""" """
step_name = step.get('step') step_name = step.get('step')
step_args = step.get('args', {})
if step_name and step_name in RESERVED_STEP_HANDLER_MAPPING.keys(): if step_name and step_name in RESERVED_STEP_HANDLER_MAPPING.keys():
call_to_use = RESERVED_STEP_HANDLER_MAPPING[step_name] call_to_use = RESERVED_STEP_HANDLER_MAPPING[step_name]
method = call_to_use[0] method = call_to_use[0]
@ -837,3 +839,30 @@ def use_reserved_step_handler(task, step):
# If we've reached this point, we're going to return None as # If we've reached this point, we're going to return None as
# there is no work for us to do. This allows the caller to # there is no work for us to do. This allows the caller to
# take its normal path. # take its normal path.
if step_name == 'wait':
# By default, we enter a wait state.
task.process_event('wait')
if 'seconds' in step_args:
# If we have a seconds argument, just pause.
rec_seconds = int(step_args['seconds'])
if rec_seconds > CONF.conductor.max_conductor_wait_step_seconds:
warning = (
_('A wait time exceeding the configured maximum '
'has been requested. Holding for %s, got %s.') %
(rec_seconds,
CONF.conductor.max_conductor_wait_step_seconds)
)
utils.node_history_record(task.node, event=warning,
event_type=task.node.provision_state)
LOG.warning(warning)
rec_seconds = CONF.conductor.max_conductor_wait_step_seconds
_sleep_wrapper(rec_seconds)
# Explicitly resume.
task.process_event('resume')
# Return True, which closed out execution until the next heartbeat.
return EXIT_STEPS
def _sleep_wrapper(seconds):
"""Wrapper for sleep to allow for unit testing."""
time.sleep(seconds)

View File

@ -366,6 +366,20 @@ opts = [
'for operators to use if needed to perform specific ' 'for operators to use if needed to perform specific '
'tasks where this is known acceptable. Use at your' 'tasks where this is known acceptable. Use at your'
'own risk!')), 'own risk!')),
cfg.IntOpt('max_conductor_wait_step_seconds',
default=30,
min=0,
max=1800,
mutable=True,
help=_('The maximum number of seconds which a step can '
'be requested to explicitly sleep or wait. This '
'value should be changed sparingly as it holds a '
'conductor thread and if used across many nodes at '
'once can exhaust a conductor\'s resources. This'
'capability has a hard coded maximum wait of 1800 '
'seconds, or 30 minutes. If you need to wait longer '
'than the maximum value, we recommend exploring '
'hold steps.')),
] ]

View File

@ -1324,7 +1324,7 @@ class ReservedStepsHandlerTestCase(db_base.DbTestCase):
def _test_reserved_step(self, step, mock_power_action): def _test_reserved_step(self, step, mock_power_action):
node = obj_utils.create_test_node( node = obj_utils.create_test_node(
self.context, driver='fake-hardware', self.context, driver='fake-hardware',
provision_state=states.VERIFYING, provision_state=states.CLEANING,
target_provision_state=states.MANAGEABLE, target_provision_state=states.MANAGEABLE,
last_error=None, last_error=None,
clean_step=None) clean_step=None)
@ -1341,3 +1341,45 @@ class ReservedStepsHandlerTestCase(db_base.DbTestCase):
def test_reserved_step_power_reboot(self): def test_reserved_step_power_reboot(self):
self._test_reserved_step({'step': 'reboot'}) self._test_reserved_step({'step': 'reboot'})
class ReservedStepHandlerByNameTestCase(db_base.DbTestCase):
def setUp(self):
super(ReservedStepHandlerByNameTestCase, self).setUp()
@mock.patch.object(conductor_steps, '_sleep_wrapper', autospec=True)
def _test_reserved_step(self, step, mock_sleep):
CONF.set_override('max_conductor_wait_step_seconds', 2,
group='conductor')
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.CLEANING,
target_provision_state=states.MANAGEABLE,
last_error=None,
clean_step=None)
with task_manager.acquire(
self.context, node.uuid, shared=False) as task:
res = conductor_steps.use_reserved_step_handler(task, step)
self.assertTrue(res)
if step.get('step') == 'wait':
if 'args' in step:
self.assertEqual(task.node.provision_state,
states.CLEANING)
if step['args']['seconds'] == 3:
mock_sleep.assert_called_once_with(2)
else:
mock_sleep.assert_called_once_with(
step['args']['seconds'])
else:
self.assertEqual(task.node.provision_state,
states.CLEANWAIT)
mock_sleep.assert_not_called()
def test_reserved_step_wait(self):
self._test_reserved_step({'step': 'wait'})
def test_reserved_step_wait_time_to_long(self):
self._test_reserved_step({'step': 'wait', 'args': {'seconds': 3}})
def test_reserved_step_wait_time(self):
self._test_reserved_step({'step': 'wait', 'args': {'seconds': 1}})

View File

@ -0,0 +1,7 @@
---
features:
- |
Adds a ``wait`` clean/deploy step, which takes an optional argument,
passed in a step definition of ``seconds`` to force an explicit
pause of the current process. Otherwise the next heartbeat action
triggers resumption of the process.