diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 64624f82e..5ea886471 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -211,6 +211,11 @@ class Action(object): class PythonAction(Action): """Regular Python action.""" + def __init__(self, action_def, action_ex=None, task_ex=None): + super(PythonAction, self).__init__(action_def, action_ex, task_ex) + + self._prepared_input = None + @profiler.trace('action-complete', hide_args=True) def complete(self, result): assert self.action_ex @@ -238,6 +243,8 @@ class PythonAction(Action): timeout=None): assert not self.action_ex + self.validate_input(input_dict) + # Assign the action execution ID here to minimize database calls. # Otherwise, the input property of the action execution DB object needs # to be updated with the action execution ID after the action execution @@ -277,8 +284,9 @@ class PythonAction(Action): safe_rerun=False, timeout=None): assert not self.action_ex - input_dict = self._prepare_input(input_dict) - runtime_ctx = self._prepare_runtime_context(index, safe_rerun) + self.validate_input(input_dict) + + prepared_input_dict = self._prepare_input(input_dict) # Assign the action execution ID here to minimize database calls. # Otherwise, the input property of the action execution DB object needs @@ -288,8 +296,8 @@ class PythonAction(Action): if save: self._create_action_execution( - input_dict, - runtime_ctx, + prepared_input_dict, + self._prepare_runtime_context(index, safe_rerun), self.is_sync(input_dict), desc=desc, action_ex_id=action_ex_id @@ -303,7 +311,7 @@ class PythonAction(Action): self.action_ex.id if self.action_ex else None, self.action_def.action_class, self.action_def.attributes or {}, - input_dict, + prepared_input_dict, safe_rerun, execution_context, target=target, @@ -314,14 +322,13 @@ class PythonAction(Action): return self._prepare_output(result) def is_sync(self, input_dict): - input_dict = self._prepare_input(input_dict) + prepared_input_dict = self._prepare_input(input_dict) - a = a_m.get_action_class(self.action_def.name)(**input_dict) + a = a_m.get_action_class(self.action_def.name)(**prepared_input_dict) return a.is_sync() def validate_input(self, input_dict): - # NOTE(kong): Don't validate action input if action initialization # method contains ** argument. if '**' in self.action_def.input: @@ -337,19 +344,20 @@ class PythonAction(Action): ) def _prepare_execution_context(self): - exc_ctx = {} if self.task_ex: wf_ex = self.task_ex.workflow_execution + exc_ctx['workflow_execution_id'] = wf_ex.id exc_ctx['task_execution_id'] = self.task_ex.id exc_ctx['workflow_name'] = wf_ex.name if self.action_ex: exc_ctx['action_execution_id'] = self.action_ex.id - callback_url = '/v2/action_executions/%s' % self.action_ex.id - exc_ctx['callback_url'] = callback_url + exc_ctx['callback_url'] = ( + '/v2/action_executions/%s' % self.action_ex.id + ) return exc_ctx @@ -379,6 +387,7 @@ class PythonAction(Action): class AdHocAction(PythonAction): """Ad-hoc action.""" + @profiler.trace('ad-hoc-action-init', hide_args=True) def __init__(self, action_def, action_ex=None, task_ex=None, task_ctx=None, wf_ctx=None): self.action_spec = spec_parser.get_action_spec(action_def.spec) @@ -408,6 +417,7 @@ class AdHocAction(PythonAction): self.task_ctx = task_ctx or {} self.wf_ctx = wf_ctx or {} + @profiler.trace('ad-hoc-action-validate-input', hide_args=True) def validate_input(self, input_dict): expected_input = self.action_spec.get_input() @@ -422,11 +432,16 @@ class AdHocAction(PythonAction): self._prepare_input(input_dict) ) + @profiler.trace('ad-hoc-action-prepare-input', hide_args=True) def _prepare_input(self, input_dict): + if self._prepared_input is not None: + return self._prepared_input + base_input_dict = input_dict for action_def in self.adhoc_action_defs: action_spec = spec_parser.get_action_spec(action_def.spec) + for k, v in action_spec.get_input().items(): if (k not in base_input_dict or base_input_dict[k] is utils.NotDefined): @@ -453,8 +468,13 @@ class AdHocAction(PythonAction): else: base_input_dict = {} - return super(AdHocAction, self)._prepare_input(base_input_dict) + self._prepared_input = super(AdHocAction, self)._prepare_input( + base_input_dict + ) + return self._prepared_input + + @profiler.trace('ad-hoc-action-prepare-output', hide_args=True) def _prepare_output(self, result): # In case of error, we don't transform a result. if not result.is_error(): @@ -476,6 +496,7 @@ class AdHocAction(PythonAction): return result + @profiler.trace('ad-hoc-action-prepare-runtime-context', hide_args=True) def _prepare_runtime_context(self, index, safe_rerun): ctx = super(AdHocAction, self)._prepare_runtime_context( index, @@ -489,6 +510,7 @@ class AdHocAction(PythonAction): {'adhoc_action_name': self.adhoc_action_def.name} ) + @profiler.trace('ad-hoc-action-gather-base-actions', hide_args=True) def _gather_base_actions(self, action_def, base_action_def): """Find all base ad-hoc actions and store them @@ -537,6 +559,7 @@ class WorkflowAction(Action): def __init__(self, wf_name, **kwargs): super(WorkflowAction, self).__init__(None, **kwargs) + self.wf_name = wf_name @profiler.trace('workflow-action-complete', hide_args=True) @@ -544,11 +567,13 @@ class WorkflowAction(Action): # No-op because in case of workflow result is already processed. pass - @profiler.trace('workflkow-action-schedule', hide_args=True) + @profiler.trace('workflow-action-schedule', hide_args=True) def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, timeout=None): assert not self.action_ex + self.validate_input(input_dict) + parent_wf_ex = self.task_ex.workflow_execution parent_wf_spec = spec_parser.get_workflow_spec_by_execution_id( parent_wf_ex.id diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index cb6c9eede..4a9123f21 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -103,8 +103,12 @@ class DefaultEngine(base.Engine): return action.action_ex.get_clone() - output = action.run(action_input, target, save=False, - timeout=timeout) + output = action.run( + action_input, + target, + save=False, + timeout=timeout + ) state = states.SUCCESS if output.is_success() else states.ERROR diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index b1cc9f6b5..2dc5b5373 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -571,6 +571,9 @@ class RegularTask(Task): @profiler.trace('regular-task-get-target', hide_args=True) def _get_target(self, input_dict): + if not self.task_spec.get_target(): + return None + ctx_view = data_flow.ContextView( input_dict, self.ctx, @@ -586,7 +589,11 @@ class RegularTask(Task): @profiler.trace('regular-task-get-action-input', hide_args=True) def _get_action_input(self, ctx=None): - input_dict = self._evaluate_expression(self.task_spec.get_input(), ctx) + input_spec = self.task_spec.get_input() + + input_dict = ( + self._evaluate_expression(input_spec, ctx) if input_spec else {} + ) if not isinstance(input_dict, dict): raise exc.InputException( @@ -610,10 +617,7 @@ class RegularTask(Task): self.wf_ex.input ) - return expr.evaluate_recursively( - expression, - ctx_view - ) + return expr.evaluate_recursively(expression, ctx_view) def _build_action(self): action_name = self.task_spec.get_action_name()