Merge "Propagated a task timeout to a action execution"
This commit is contained in:
commit
23828f6a33
|
@ -57,7 +57,8 @@ def _process_queue(queue):
|
|||
|
||||
for operation, args in queue:
|
||||
if operation == _RUN_ACTION:
|
||||
action_ex, action_def, target, execution_context = args
|
||||
action_ex, action_def, target, execution_context, \
|
||||
timeout = args
|
||||
|
||||
executor.run_action(
|
||||
action_ex.id,
|
||||
|
@ -67,6 +68,7 @@ def _process_queue(queue):
|
|||
action_ex.runtime_context.get('safe_rerun', False),
|
||||
execution_context,
|
||||
target=target,
|
||||
timeout=timeout
|
||||
)
|
||||
elif operation == _ON_ACTION_COMPLETE:
|
||||
action_ex_id, result, wf_action = args
|
||||
|
@ -120,8 +122,9 @@ def process(func):
|
|||
return decorate
|
||||
|
||||
|
||||
def schedule_run_action(action_ex, action_def, target, execution_context):
|
||||
args = (action_ex, action_def, target, execution_context)
|
||||
def schedule_run_action(action_ex, action_def, target, execution_context,
|
||||
timeout):
|
||||
args = (action_ex, action_def, target, execution_context, timeout)
|
||||
_get_queue().append((_RUN_ACTION, args))
|
||||
|
||||
|
||||
|
|
|
@ -91,7 +91,8 @@ class Action(object):
|
|||
self.action_ex.state = state
|
||||
|
||||
@abc.abstractmethod
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
|
||||
timeout=None):
|
||||
"""Schedule action run.
|
||||
|
||||
This method is needed to schedule action run so its result can
|
||||
|
@ -100,6 +101,8 @@ class Action(object):
|
|||
executor asynchrony when executor doesn't immediately send a
|
||||
result).
|
||||
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:param input_dict: Action input.
|
||||
:param target: Target (group of action executors).
|
||||
:param index: Action execution index. Makes sense for some types.
|
||||
|
@ -111,13 +114,15 @@ class Action(object):
|
|||
|
||||
@abc.abstractmethod
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
safe_rerun=False):
|
||||
safe_rerun=False, timeout=None):
|
||||
"""Immediately run action.
|
||||
|
||||
This method runs method w/o scheduling its run for a later time.
|
||||
From engine perspective action will be processed in synchronous
|
||||
mode.
|
||||
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:param input_dict: Action input.
|
||||
:param target: Target (group of action executors).
|
||||
:param index: Action execution index. Makes sense for some types.
|
||||
|
@ -225,7 +230,8 @@ class PythonAction(Action):
|
|||
self._log_result(prev_state, result)
|
||||
|
||||
@profiler.trace('action-schedule', hide_args=True)
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
|
||||
timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
|
@ -248,11 +254,12 @@ class PythonAction(Action):
|
|||
self.action_def,
|
||||
target,
|
||||
execution_context,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
@profiler.trace('action-run', hide_args=True)
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
safe_rerun=False):
|
||||
safe_rerun=False, timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
input_dict = self._prepare_input(input_dict)
|
||||
|
@ -284,7 +291,8 @@ class PythonAction(Action):
|
|||
safe_rerun,
|
||||
execution_context,
|
||||
target=target,
|
||||
async_=False
|
||||
async_=False,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return self._prepare_output(result)
|
||||
|
@ -512,7 +520,8 @@ class WorkflowAction(Action):
|
|||
pass
|
||||
|
||||
@profiler.trace('workflkow-action-schedule', hide_args=True)
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
|
||||
timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
parent_wf_ex = self.task_ex.workflow_execution
|
||||
|
@ -564,7 +573,7 @@ class WorkflowAction(Action):
|
|||
|
||||
@profiler.trace('workflow-action-run', hide_args=True)
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
safe_rerun=True):
|
||||
safe_rerun=True, timeout=None):
|
||||
raise NotImplementedError('Does not apply to this WorkflowAction.')
|
||||
|
||||
def is_sync(self, input_dict):
|
||||
|
|
|
@ -76,6 +76,7 @@ class DefaultEngine(base.Engine):
|
|||
sync = params.get('run_sync')
|
||||
save = params.get('save_result')
|
||||
target = params.get('target')
|
||||
timeout = params.get('timeout')
|
||||
|
||||
is_action_sync = action.is_sync(action_input)
|
||||
|
||||
|
@ -84,11 +85,12 @@ class DefaultEngine(base.Engine):
|
|||
"Action does not support synchronous execution.")
|
||||
|
||||
if not sync and (save or not is_action_sync):
|
||||
action.schedule(action_input, target)
|
||||
action.schedule(action_input, target, timeout=timeout)
|
||||
|
||||
return action.action_ex.get_clone()
|
||||
|
||||
output = action.run(action_input, target, save=False)
|
||||
output = action.run(action_input, target, save=False,
|
||||
timeout=timeout)
|
||||
|
||||
state = states.SUCCESS if output.is_success() else states.ERROR
|
||||
|
||||
|
|
|
@ -28,6 +28,8 @@ import six
|
|||
|
||||
_CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task'
|
||||
_COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task'
|
||||
_FAIL_IF_INCOMPLETE_TASK_PATH = \
|
||||
'mistral.engine.policies._fail_task_if_incomplete'
|
||||
|
||||
|
||||
def _log_task_delay(task_ex, delay_sec):
|
||||
|
@ -423,7 +425,7 @@ class TimeoutPolicy(base.TaskPolicy):
|
|||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.engine.policies._fail_task_if_incomplete',
|
||||
_FAIL_IF_INCOMPLETE_TASK_PATH,
|
||||
self.delay,
|
||||
task_ex_id=task_ex.id,
|
||||
timeout=self.delay
|
||||
|
|
|
@ -424,7 +424,8 @@ class RegularTask(Task):
|
|||
action.schedule(
|
||||
input_dict,
|
||||
target,
|
||||
safe_rerun=self.task_spec.get_safe_rerun()
|
||||
safe_rerun=self.task_spec.get_safe_rerun(),
|
||||
timeout=self._get_timeout()
|
||||
)
|
||||
|
||||
@profiler.trace('regular-task-get-target', hide_args=True)
|
||||
|
@ -502,6 +503,22 @@ class RegularTask(Task):
|
|||
|
||||
return actions.PythonAction(action_def, task_ex=self.task_ex)
|
||||
|
||||
def _get_timeout(self):
|
||||
timeout = self.task_spec.get_policies().get_timeout()
|
||||
|
||||
if not isinstance(timeout, (int, float)):
|
||||
wf_ex = self.task_ex.workflow_execution
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
self.task_ex.in_context,
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
|
||||
timeout = expr.evaluate_recursively(data=timeout, context=ctx_view)
|
||||
|
||||
return timeout if timeout > 0 else None
|
||||
|
||||
|
||||
class WithItemsTask(RegularTask):
|
||||
"""With-items task.
|
||||
|
@ -588,7 +605,8 @@ class WithItemsTask(RegularTask):
|
|||
input_dict,
|
||||
target,
|
||||
index=i,
|
||||
safe_rerun=self.task_spec.get_safe_rerun()
|
||||
safe_rerun=self.task_spec.get_safe_rerun(),
|
||||
timeout=self._get_timeout()
|
||||
)
|
||||
|
||||
self._decrease_capacity(1)
|
||||
|
|
|
@ -50,9 +50,11 @@ class Executor(object):
|
|||
@abc.abstractmethod
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
target=None, async_=True, timeout=None):
|
||||
"""Runs action.
|
||||
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:param action_ex_id: Corresponding action execution id.
|
||||
:param action_cls_str: Path to action class in dot notation.
|
||||
:param action_cls_attrs: Attributes of action class which
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from eventlet import timeout as ev_timeout
|
||||
from mistral_lib import actions as mistral_lib
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
@ -35,7 +36,7 @@ class DefaultExecutor(base.Executor):
|
|||
@profiler.trace('default-executor-run-action', hide_args=True)
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
target=None, async_=True, timeout=None):
|
||||
"""Runs action.
|
||||
|
||||
:param action_ex_id: Action execution id.
|
||||
|
@ -51,6 +52,8 @@ class DefaultExecutor(base.Executor):
|
|||
:param target: Target (group of action executors).
|
||||
:param async_: If True, run action in asynchronous mode (w/o waiting
|
||||
for completion).
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:return: Action result.
|
||||
"""
|
||||
|
||||
|
@ -102,14 +105,15 @@ class DefaultExecutor(base.Executor):
|
|||
|
||||
# Run action.
|
||||
try:
|
||||
|
||||
# NOTE(d0ugal): If the action is a subclass of mistral-lib we know
|
||||
# that it expects to be passed the context.
|
||||
if isinstance(action, mistral_lib.Action):
|
||||
action_ctx = context.create_action_context(execution_context)
|
||||
result = action.run(action_ctx)
|
||||
else:
|
||||
result = action.run()
|
||||
with ev_timeout.Timeout(seconds=timeout):
|
||||
# NOTE(d0ugal): If the action is a subclass of mistral-lib we
|
||||
# know that it expects to be passed the context.
|
||||
if isinstance(action, mistral_lib.Action):
|
||||
action_ctx = context.create_action_context(
|
||||
execution_context)
|
||||
result = action.run(action_ctx)
|
||||
else:
|
||||
result = action.run()
|
||||
|
||||
# Note: it's made for backwards compatibility with already
|
||||
# existing Mistral actions which don't return result as
|
||||
|
@ -117,7 +121,7 @@ class DefaultExecutor(base.Executor):
|
|||
if not isinstance(result, mistral_lib.Result):
|
||||
result = mistral_lib.Result(data=result)
|
||||
|
||||
except Exception as e:
|
||||
except BaseException as e:
|
||||
msg = (
|
||||
"Failed to run action [action_ex_id=%s, action_cls='%s', "
|
||||
"attributes='%s', params='%s']\n %s" % (
|
||||
|
|
|
@ -60,9 +60,14 @@ class ExecutorServer(service_base.MistralService):
|
|||
self._rpc_server.stop(graceful)
|
||||
|
||||
def run_action(self, rpc_ctx, action_ex_id, action_cls_str,
|
||||
action_cls_attrs, params, safe_rerun, execution_context):
|
||||
action_cls_attrs, params, safe_rerun, execution_context,
|
||||
timeout):
|
||||
"""Receives calls over RPC to run action on executor.
|
||||
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:param execution_context: A dict of values providing information about
|
||||
the current execution.
|
||||
:param rpc_ctx: RPC request context dictionary.
|
||||
:param action_ex_id: Action execution id.
|
||||
:param action_cls_str: Action class name.
|
||||
|
@ -74,11 +79,13 @@ class ExecutorServer(service_base.MistralService):
|
|||
|
||||
LOG.info(
|
||||
"Received RPC request 'run_action'[action_ex_id=%s, "
|
||||
"action_cls_str=%s, action_cls_attrs=%s, params=%s]",
|
||||
"action_cls_str=%s, action_cls_attrs=%s, params=%s, "
|
||||
"timeout=%s]",
|
||||
action_ex_id,
|
||||
action_cls_str,
|
||||
action_cls_attrs,
|
||||
utils.cut(params)
|
||||
utils.cut(params),
|
||||
timeout
|
||||
)
|
||||
|
||||
redelivered = rpc_ctx.redelivered or False
|
||||
|
@ -90,7 +97,8 @@ class ExecutorServer(service_base.MistralService):
|
|||
params,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
redelivered
|
||||
redelivered,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -313,7 +313,7 @@ class ExecutorClient(exe.Executor):
|
|||
@profiler.trace('executor-client-run-action')
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
target=None, async_=True, timeout=None):
|
||||
"""Sends a request to run action to executor.
|
||||
|
||||
:param action_ex_id: Action execution id.
|
||||
|
@ -322,11 +322,15 @@ class ExecutorClient(exe.Executor):
|
|||
:param params: Action input parameters.
|
||||
:param safe_rerun: If true, action would be re-run if executor dies
|
||||
during execution.
|
||||
:param execution_context: A dict of values providing information about
|
||||
the current execution.
|
||||
:param redelivered: Tells if given action was run before on another
|
||||
executor.
|
||||
:param target: Target (group of action executors).
|
||||
:param async_: If True, run action in asynchronous mode (w/o waiting
|
||||
for completion).
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:return: Action result.
|
||||
"""
|
||||
|
||||
|
@ -337,6 +341,7 @@ class ExecutorClient(exe.Executor):
|
|||
'params': params,
|
||||
'safe_rerun': safe_rerun,
|
||||
'execution_context': execution_context,
|
||||
'timeout': timeout
|
||||
}
|
||||
|
||||
rpc_client_method = (self._client.async_call
|
||||
|
|
|
@ -288,6 +288,33 @@ class TestActionExecutionsController(base.APITest):
|
|||
run_sync=True
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
|
||||
def test_post_with_timeout(self, f):
|
||||
f.return_value = ACTION_EX_DB.to_dict()
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/v2/action_executions',
|
||||
{
|
||||
'name': 'std.echo',
|
||||
'input': "{}",
|
||||
'params': '{"timeout": 2}'
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
action_exec = copy.deepcopy(ACTION_EX)
|
||||
del action_exec['task_name']
|
||||
|
||||
self.assertDictEqual(action_exec, resp.json)
|
||||
|
||||
f.assert_called_once_with(
|
||||
action_exec['name'],
|
||||
json.loads(action_exec['input']),
|
||||
description=None,
|
||||
timeout=2
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
|
||||
def test_post_json(self, f):
|
||||
f.return_value = ACTION_EX_DB.to_dict()
|
||||
|
|
|
@ -80,8 +80,7 @@ workflows:
|
|||
|
||||
def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, execution_context, target=None,
|
||||
async_=True):
|
||||
|
||||
async_=True, timeout=None):
|
||||
# We'll just call executor directly for testing purposes.
|
||||
executor = d_exe.DefaultExecutor()
|
||||
|
||||
|
@ -91,7 +90,10 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
|
|||
action_cls_attrs,
|
||||
params,
|
||||
safe_rerun,
|
||||
execution_context=execution_context
|
||||
execution_context=execution_context,
|
||||
target=target,
|
||||
async_=async_,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
|
||||
|
@ -188,7 +190,8 @@ class EnvironmentTest(base.EngineTestCase):
|
|||
'workflow_name': wf1_ex.name,
|
||||
'action_execution_id': a_ex.id,
|
||||
},
|
||||
target=TARGET
|
||||
target=TARGET,
|
||||
timeout=None
|
||||
)
|
||||
|
||||
def test_subworkflow_env_task_input(self):
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from eventlet import timeout
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
import requests
|
||||
|
@ -1435,6 +1436,29 @@ class PoliciesTest(base.EngineTestCase):
|
|||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
def test_action_timeout(self):
|
||||
wb = """---
|
||||
version: '2.0'
|
||||
wf1:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.sleep seconds=10
|
||||
timeout: 2
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wb)
|
||||
wf_ex = self.engine.start_workflow('wf1')
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
action_ex = task_ex.action_executions[0]
|
||||
|
||||
with timeout.Timeout(8):
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
self.await_task_error(task_ex.id)
|
||||
self.await_action_error(action_ex.id)
|
||||
|
||||
def test_pause_before_policy(self):
|
||||
wb_service.create_workbook_v2(PAUSE_BEFORE_WB)
|
||||
|
||||
|
|
|
@ -334,5 +334,6 @@ class RunActionEngineTest(base.EngineTestCase):
|
|||
run_mock.assert_called_once_with(
|
||||
{'input': 'Hello'},
|
||||
None,
|
||||
save=False
|
||||
save=False,
|
||||
timeout=None
|
||||
)
|
||||
|
|
|
@ -26,7 +26,7 @@ from mistral.workflow import states
|
|||
|
||||
def _run_at_target(action_ex_id, action_class_str, attributes,
|
||||
action_params, safe_rerun, execution_context, target=None,
|
||||
async_=True):
|
||||
async_=True, timeout=None):
|
||||
# We'll just call executor directly for testing purposes.
|
||||
executor = d_exe.DefaultExecutor()
|
||||
|
||||
|
|
Loading…
Reference in New Issue