diff --git a/mistral/engine/action_queue.py b/mistral/engine/action_queue.py index 3806ec567..d527d6ccc 100644 --- a/mistral/engine/action_queue.py +++ b/mistral/engine/action_queue.py @@ -57,7 +57,8 @@ def _process_queue(queue): for operation, args in queue: if operation == _RUN_ACTION: - action_ex, action_def, target, execution_context = args + action_ex, action_def, target, execution_context, \ + timeout = args executor.run_action( action_ex.id, @@ -67,6 +68,7 @@ def _process_queue(queue): action_ex.runtime_context.get('safe_rerun', False), execution_context, target=target, + timeout=timeout ) elif operation == _ON_ACTION_COMPLETE: action_ex_id, result, wf_action = args @@ -120,8 +122,9 @@ def process(func): return decorate -def schedule_run_action(action_ex, action_def, target, execution_context): - args = (action_ex, action_def, target, execution_context) +def schedule_run_action(action_ex, action_def, target, execution_context, + timeout): + args = (action_ex, action_def, target, execution_context, timeout) _get_queue().append((_RUN_ACTION, args)) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index baa34bd3b..a0b0a4a46 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -91,7 +91,8 @@ class Action(object): self.action_ex.state = state @abc.abstractmethod - def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False): + def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, + timeout=None): """Schedule action run. This method is needed to schedule action run so its result can @@ -100,6 +101,8 @@ class Action(object): executor asynchrony when executor doesn't immediately send a result). + :param timeout: a period of time in seconds after which execution of + action will be interrupted :param input_dict: Action input. :param target: Target (group of action executors). :param index: Action execution index. Makes sense for some types. @@ -111,13 +114,15 @@ class Action(object): @abc.abstractmethod def run(self, input_dict, target, index=0, desc='', save=True, - safe_rerun=False): + safe_rerun=False, timeout=None): """Immediately run action. This method runs method w/o scheduling its run for a later time. From engine perspective action will be processed in synchronous mode. + :param timeout: a period of time in seconds after which execution of + action will be interrupted :param input_dict: Action input. :param target: Target (group of action executors). :param index: Action execution index. Makes sense for some types. @@ -225,7 +230,8 @@ class PythonAction(Action): self._log_result(prev_state, result) @profiler.trace('action-schedule', hide_args=True) - def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False): + def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, + timeout=None): assert not self.action_ex # Assign the action execution ID here to minimize database calls. @@ -248,11 +254,12 @@ class PythonAction(Action): self.action_def, target, execution_context, + timeout=timeout ) @profiler.trace('action-run', hide_args=True) def run(self, input_dict, target, index=0, desc='', save=True, - safe_rerun=False): + safe_rerun=False, timeout=None): assert not self.action_ex input_dict = self._prepare_input(input_dict) @@ -284,7 +291,8 @@ class PythonAction(Action): safe_rerun, execution_context, target=target, - async_=False + async_=False, + timeout=timeout ) return self._prepare_output(result) @@ -512,7 +520,8 @@ class WorkflowAction(Action): pass @profiler.trace('workflkow-action-schedule', hide_args=True) - def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False): + def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, + timeout=None): assert not self.action_ex parent_wf_ex = self.task_ex.workflow_execution @@ -564,7 +573,7 @@ class WorkflowAction(Action): @profiler.trace('workflow-action-run', hide_args=True) def run(self, input_dict, target, index=0, desc='', save=True, - safe_rerun=True): + safe_rerun=True, timeout=None): raise NotImplementedError('Does not apply to this WorkflowAction.') def is_sync(self, input_dict): diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 47223b763..85583e01d 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -76,6 +76,7 @@ class DefaultEngine(base.Engine): sync = params.get('run_sync') save = params.get('save_result') target = params.get('target') + timeout = params.get('timeout') is_action_sync = action.is_sync(action_input) @@ -84,11 +85,12 @@ class DefaultEngine(base.Engine): "Action does not support synchronous execution.") if not sync and (save or not is_action_sync): - action.schedule(action_input, target) + action.schedule(action_input, target, timeout=timeout) return action.action_ex.get_clone() - output = action.run(action_input, target, save=False) + output = action.run(action_input, target, save=False, + timeout=timeout) state = states.SUCCESS if output.is_success() else states.ERROR diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 036f90823..42890cdaf 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -28,6 +28,8 @@ import six _CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task' _COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task' +_FAIL_IF_INCOMPLETE_TASK_PATH = \ + 'mistral.engine.policies._fail_task_if_incomplete' def _log_task_delay(task_ex, delay_sec): @@ -423,7 +425,7 @@ class TimeoutPolicy(base.TaskPolicy): scheduler.schedule_call( None, - 'mistral.engine.policies._fail_task_if_incomplete', + _FAIL_IF_INCOMPLETE_TASK_PATH, self.delay, task_ex_id=task_ex.id, timeout=self.delay diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 146c7bb67..7b60c94a2 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -424,7 +424,8 @@ class RegularTask(Task): action.schedule( input_dict, target, - safe_rerun=self.task_spec.get_safe_rerun() + safe_rerun=self.task_spec.get_safe_rerun(), + timeout=self._get_timeout() ) @profiler.trace('regular-task-get-target', hide_args=True) @@ -502,6 +503,22 @@ class RegularTask(Task): return actions.PythonAction(action_def, task_ex=self.task_ex) + def _get_timeout(self): + timeout = self.task_spec.get_policies().get_timeout() + + if not isinstance(timeout, (int, float)): + wf_ex = self.task_ex.workflow_execution + + ctx_view = data_flow.ContextView( + self.task_ex.in_context, + wf_ex.context, + wf_ex.input + ) + + timeout = expr.evaluate_recursively(data=timeout, context=ctx_view) + + return timeout if timeout > 0 else None + class WithItemsTask(RegularTask): """With-items task. @@ -588,7 +605,8 @@ class WithItemsTask(RegularTask): input_dict, target, index=i, - safe_rerun=self.task_spec.get_safe_rerun() + safe_rerun=self.task_spec.get_safe_rerun(), + timeout=self._get_timeout() ) self._decrease_capacity(1) diff --git a/mistral/executors/base.py b/mistral/executors/base.py index 5941824c0..474d4e40b 100644 --- a/mistral/executors/base.py +++ b/mistral/executors/base.py @@ -50,9 +50,11 @@ class Executor(object): @abc.abstractmethod def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, params, safe_rerun, execution_context, redelivered=False, - target=None, async_=True): + target=None, async_=True, timeout=None): """Runs action. + :param timeout: a period of time in seconds after which execution of + action will be interrupted :param action_ex_id: Corresponding action execution id. :param action_cls_str: Path to action class in dot notation. :param action_cls_attrs: Attributes of action class which diff --git a/mistral/executors/default_executor.py b/mistral/executors/default_executor.py index c1fdebe04..a2674fbad 100644 --- a/mistral/executors/default_executor.py +++ b/mistral/executors/default_executor.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from eventlet import timeout as ev_timeout from mistral_lib import actions as mistral_lib from oslo_log import log as logging from osprofiler import profiler @@ -35,7 +36,7 @@ class DefaultExecutor(base.Executor): @profiler.trace('default-executor-run-action', hide_args=True) def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, params, safe_rerun, execution_context, redelivered=False, - target=None, async_=True): + target=None, async_=True, timeout=None): """Runs action. :param action_ex_id: Action execution id. @@ -51,6 +52,8 @@ class DefaultExecutor(base.Executor): :param target: Target (group of action executors). :param async_: If True, run action in asynchronous mode (w/o waiting for completion). + :param timeout: a period of time in seconds after which execution of + action will be interrupted :return: Action result. """ @@ -102,14 +105,15 @@ class DefaultExecutor(base.Executor): # Run action. try: - - # NOTE(d0ugal): If the action is a subclass of mistral-lib we know - # that it expects to be passed the context. - if isinstance(action, mistral_lib.Action): - action_ctx = context.create_action_context(execution_context) - result = action.run(action_ctx) - else: - result = action.run() + with ev_timeout.Timeout(seconds=timeout): + # NOTE(d0ugal): If the action is a subclass of mistral-lib we + # know that it expects to be passed the context. + if isinstance(action, mistral_lib.Action): + action_ctx = context.create_action_context( + execution_context) + result = action.run(action_ctx) + else: + result = action.run() # Note: it's made for backwards compatibility with already # existing Mistral actions which don't return result as @@ -117,7 +121,7 @@ class DefaultExecutor(base.Executor): if not isinstance(result, mistral_lib.Result): result = mistral_lib.Result(data=result) - except Exception as e: + except BaseException as e: msg = ( "Failed to run action [action_ex_id=%s, action_cls='%s', " "attributes='%s', params='%s']\n %s" % ( diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py index a0dfdf4fd..a07e3a770 100644 --- a/mistral/executors/executor_server.py +++ b/mistral/executors/executor_server.py @@ -60,9 +60,14 @@ class ExecutorServer(service_base.MistralService): self._rpc_server.stop(graceful) def run_action(self, rpc_ctx, action_ex_id, action_cls_str, - action_cls_attrs, params, safe_rerun, execution_context): + action_cls_attrs, params, safe_rerun, execution_context, + timeout): """Receives calls over RPC to run action on executor. + :param timeout: a period of time in seconds after which execution of + action will be interrupted + :param execution_context: A dict of values providing information about + the current execution. :param rpc_ctx: RPC request context dictionary. :param action_ex_id: Action execution id. :param action_cls_str: Action class name. @@ -74,11 +79,13 @@ class ExecutorServer(service_base.MistralService): LOG.info( "Received RPC request 'run_action'[action_ex_id=%s, " - "action_cls_str=%s, action_cls_attrs=%s, params=%s]", + "action_cls_str=%s, action_cls_attrs=%s, params=%s, " + "timeout=%s]", action_ex_id, action_cls_str, action_cls_attrs, - utils.cut(params) + utils.cut(params), + timeout ) redelivered = rpc_ctx.redelivered or False @@ -90,7 +97,8 @@ class ExecutorServer(service_base.MistralService): params, safe_rerun, execution_context, - redelivered + redelivered, + timeout=timeout ) diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index aafc9ccb2..865936c0a 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -313,7 +313,7 @@ class ExecutorClient(exe.Executor): @profiler.trace('executor-client-run-action') def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, params, safe_rerun, execution_context, redelivered=False, - target=None, async_=True): + target=None, async_=True, timeout=None): """Sends a request to run action to executor. :param action_ex_id: Action execution id. @@ -322,11 +322,15 @@ class ExecutorClient(exe.Executor): :param params: Action input parameters. :param safe_rerun: If true, action would be re-run if executor dies during execution. + :param execution_context: A dict of values providing information about + the current execution. :param redelivered: Tells if given action was run before on another executor. :param target: Target (group of action executors). :param async_: If True, run action in asynchronous mode (w/o waiting for completion). + :param timeout: a period of time in seconds after which execution of + action will be interrupted :return: Action result. """ @@ -337,6 +341,7 @@ class ExecutorClient(exe.Executor): 'params': params, 'safe_rerun': safe_rerun, 'execution_context': execution_context, + 'timeout': timeout } rpc_client_method = (self._client.async_call diff --git a/mistral/tests/unit/api/v2/test_action_executions.py b/mistral/tests/unit/api/v2/test_action_executions.py index 9f2b6acfd..cfa82f370 100644 --- a/mistral/tests/unit/api/v2/test_action_executions.py +++ b/mistral/tests/unit/api/v2/test_action_executions.py @@ -288,6 +288,33 @@ class TestActionExecutionsController(base.APITest): run_sync=True ) + @mock.patch.object(rpc_clients.EngineClient, 'start_action') + def test_post_with_timeout(self, f): + f.return_value = ACTION_EX_DB.to_dict() + + resp = self.app.post_json( + '/v2/action_executions', + { + 'name': 'std.echo', + 'input': "{}", + 'params': '{"timeout": 2}' + } + ) + + self.assertEqual(201, resp.status_int) + + action_exec = copy.deepcopy(ACTION_EX) + del action_exec['task_name'] + + self.assertDictEqual(action_exec, resp.json) + + f.assert_called_once_with( + action_exec['name'], + json.loads(action_exec['input']), + description=None, + timeout=2 + ) + @mock.patch.object(rpc_clients.EngineClient, 'start_action') def test_post_json(self, f): f.return_value = ACTION_EX_DB.to_dict() diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 90056db61..43197c1da 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -80,8 +80,7 @@ workflows: def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs, params, safe_rerun, execution_context, target=None, - async_=True): - + async_=True, timeout=None): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor() @@ -91,7 +90,10 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs, action_cls_attrs, params, safe_rerun, - execution_context=execution_context + execution_context=execution_context, + target=target, + async_=async_, + timeout=timeout ) @@ -188,7 +190,8 @@ class EnvironmentTest(base.EngineTestCase): 'workflow_name': wf1_ex.name, 'action_execution_id': a_ex.id, }, - target=TARGET + target=TARGET, + timeout=None ) def test_subworkflow_env_task_input(self): diff --git a/mistral/tests/unit/engine/test_policies.py b/mistral/tests/unit/engine/test_policies.py index 7fa7ccc77..c65a54f3d 100644 --- a/mistral/tests/unit/engine/test_policies.py +++ b/mistral/tests/unit/engine/test_policies.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from eventlet import timeout import mock from oslo_config import cfg import requests @@ -1435,6 +1436,29 @@ class PoliciesTest(base.EngineTestCase): self.await_workflow_error(wf_ex.id) + def test_action_timeout(self): + wb = """--- + version: '2.0' + wf1: + tasks: + task1: + action: std.sleep seconds=10 + timeout: 2 + """ + + wf_service.create_workflows(wb) + wf_ex = self.engine.start_workflow('wf1') + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + action_ex = task_ex.action_executions[0] + + with timeout.Timeout(8): + self.await_workflow_error(wf_ex.id) + self.await_task_error(task_ex.id) + self.await_action_error(action_ex.id) + def test_pause_before_policy(self): wb_service.create_workbook_v2(PAUSE_BEFORE_WB) diff --git a/mistral/tests/unit/engine/test_run_action.py b/mistral/tests/unit/engine/test_run_action.py index 040ab4412..2ee7d3209 100644 --- a/mistral/tests/unit/engine/test_run_action.py +++ b/mistral/tests/unit/engine/test_run_action.py @@ -334,5 +334,6 @@ class RunActionEngineTest(base.EngineTestCase): run_mock.assert_called_once_with( {'input': 'Hello'}, None, - save=False + save=False, + timeout=None ) diff --git a/mistral/tests/unit/engine/test_safe_rerun.py b/mistral/tests/unit/engine/test_safe_rerun.py index 2aae32539..6e0439e62 100644 --- a/mistral/tests/unit/engine/test_safe_rerun.py +++ b/mistral/tests/unit/engine/test_safe_rerun.py @@ -26,7 +26,7 @@ from mistral.workflow import states def _run_at_target(action_ex_id, action_class_str, attributes, action_params, safe_rerun, execution_context, target=None, - async_=True): + async_=True, timeout=None): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor()