diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index aa5645310..7f62a0b6a 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -261,39 +261,31 @@ def resolve_action_definition(action_spec_name, wf_name=None, return action_db -def transform_result(result, task_ex=None, action_ex=None): +def transform_result(result, task_ex, task_spec): """Transforms task result accounting for ad-hoc actions. In case if the given result is an action result and action is an ad-hoc action the method transforms the result according to ad-hoc action configuration. - :param task_ex: Task DB model. :param result: Result of task action/workflow. + :param task_ex: Task DB model. + :param task_spec: Task specification. """ if result.is_error(): return result - action_spec_name = None - - if task_ex: - action_spec_name = spec_parser.get_task_spec( - task_ex.spec).get_action_name() - elif action_ex: - if action_ex.spec: - action_spec_name = spec_parser.get_action_spec(action_ex.spec) - else: - action_spec_name = action_ex.name + action_spec_name = task_spec.get_action_name() if action_spec_name: - wf_ex = task_ex.workflow_execution if task_ex else None - wf_spec_name = wf_ex.spec['name'] if task_ex else None + wf_ex = task_ex.workflow_execution + wf_spec_name = wf_ex.spec['name'] return transform_action_result( action_spec_name, result, - wf_ex.workflow_name if wf_ex else None, - wf_spec_name if wf_ex else None, + wf_ex.workflow_name, + wf_spec_name, ) return result diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index ea2f789ed..46cdcb37f 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -58,7 +58,7 @@ class DefaultEngine(base.Engine, coordination.Service): with db_api.transaction(): # The new workflow execution will be in an IDLE # state on initial record creation. - wf_ex_id = wf_ex_service.create_workflow_execution( + wf_ex_id, wf_spec = wf_ex_service.create_workflow_execution( wf_identifier, wf_input, description, @@ -71,7 +71,6 @@ class DefaultEngine(base.Engine, coordination.Service): # at dispatching commands. with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex_id) - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) wf_handler.set_execution_state(wf_ex, states.RUNNING) wf_ctrl = wf_base.WorkflowController.get_controller( @@ -81,7 +80,8 @@ class DefaultEngine(base.Engine, coordination.Service): self._dispatch_workflow_commands( wf_ex, - wf_ctrl.continue_workflow() + wf_ctrl.continue_workflow(), + wf_spec ) return wf_ex.get_clone() @@ -151,6 +151,7 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ex_id = task_ex.workflow_execution_id wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) wf_trace.info( task_ex, @@ -161,11 +162,11 @@ class DefaultEngine(base.Engine, coordination.Service): task_ex.state = state task_ex.state_info = state_info - self._on_task_state_change(task_ex, wf_ex) + self._on_task_state_change(task_ex, wf_ex, wf_spec) - def _on_task_state_change(self, task_ex, wf_ex, task_state=states.SUCCESS): - task_spec = spec_parser.get_task_spec(task_ex.spec) - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + def _on_task_state_change(self, task_ex, wf_ex, wf_spec, + task_state=states.SUCCESS): + task_spec = wf_spec.get_tasks()[task_ex.name] # We must be sure that if task is completed, # it was also completed in previous transaction. @@ -184,17 +185,17 @@ class DefaultEngine(base.Engine, coordination.Service): task_ex.processed = True - self._dispatch_workflow_commands(wf_ex, cmds) + self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) - self._check_workflow_completion(wf_ex, wf_ctrl) + self._check_workflow_completion(wf_ex, wf_ctrl, wf_spec) elif task_handler.need_to_continue(task_ex, task_spec): # Re-run existing task. cmds = [commands.RunExistingTask(task_ex, reset=False)] - self._dispatch_workflow_commands(wf_ex, cmds) + self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) @staticmethod - def _check_workflow_completion(wf_ex, wf_ctrl): + def _check_workflow_completion(wf_ex, wf_ctrl, wf_spec): if states.is_paused_or_completed(wf_ex.state): return @@ -211,7 +212,8 @@ class DefaultEngine(base.Engine, coordination.Service): if wf_ctrl.all_errors_handled(): wf_handler.succeed_workflow( wf_ex, - wf_ctrl.evaluate_workflow_final_context() + wf_ctrl.evaluate_workflow_final_context(), + wf_spec ) else: state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex) @@ -236,8 +238,13 @@ class DefaultEngine(base.Engine, coordination.Service): wf_ex_id = action_ex.task_execution.workflow_execution_id wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - task_ex = task_handler.on_action_complete(action_ex, result) + task_ex = task_handler.on_action_complete( + action_ex, + wf_spec, + result + ) # If workflow is on pause or completed then there's no # need to continue workflow. @@ -258,6 +265,7 @@ class DefaultEngine(base.Engine, coordination.Service): self._on_task_state_change( task_ex, wf_ex, + wf_spec, task_state=prev_task_state ) @@ -317,13 +325,15 @@ class DefaultEngine(base.Engine, coordination.Service): if states.is_completed(t_ex.state) and not t_ex.processed: t_ex.processed = True - self._dispatch_workflow_commands(wf_ex, cmds) + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) + self._dispatch_workflow_commands(wf_ex, cmds, wf_spec) if not cmds: if not wf_utils.find_incomplete_task_executions(wf_ex): wf_handler.succeed_workflow( wf_ex, - wf_ctrl.evaluate_workflow_final_context() + wf_ctrl.evaluate_workflow_final_context(), + wf_spec ) return wf_ex.get_clone() @@ -389,9 +399,11 @@ class DefaultEngine(base.Engine, coordination.Service): LOG.warning( "Failed to get final context for %s: %s" % (wf_ex, e) ) + wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) return wf_handler.succeed_workflow( wf_ex, final_context, + wf_spec, message ) elif state == states.ERROR: @@ -404,7 +416,7 @@ class DefaultEngine(base.Engine, coordination.Service): # TODO(rakhmerov): Implement. raise NotImplementedError - def _dispatch_workflow_commands(self, wf_ex, wf_cmds): + def _dispatch_workflow_commands(self, wf_ex, wf_cmds, wf_spec): if not wf_cmds: return @@ -412,7 +424,7 @@ class DefaultEngine(base.Engine, coordination.Service): if isinstance(cmd, commands.RunTask) and cmd.is_waiting(): task_handler.defer_task(cmd) elif isinstance(cmd, commands.RunTask): - task_handler.run_new_task(cmd) + task_handler.run_new_task(cmd, wf_spec) elif isinstance(cmd, commands.RunExistingTask): task_handler.run_existing_task( cmd.task_ex.id, diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 370c39156..f9d100946 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -117,11 +117,10 @@ def defer_task(wf_cmd): _create_task_execution(wf_ex, task_spec, ctx, state=states.WAITING) -def run_new_task(wf_cmd): +def run_new_task(wf_cmd, wf_spec): """Runs a task.""" ctx = wf_cmd.ctx wf_ex = wf_cmd.wf_ex - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) task_spec = wf_cmd.task_spec # NOTE(xylan): Need to think how to get rid of this weird judgment to keep @@ -155,7 +154,7 @@ def run_new_task(wf_cmd): _run_existing_task(task_ex, task_spec, wf_spec) -def on_action_complete(action_ex, result): +def on_action_complete(action_ex, wf_spec, result): """Handles event of action result arrival. Given action result this method performs analysis of the workflow @@ -163,6 +162,7 @@ def on_action_complete(action_ex, result): scheduled for execution. :param action_ex: Action execution objects the result belongs to. + :param wf_spec: Worflow specification. :param result: Task action/workflow output wrapped into mistral.workflow.utils.Result instance. :return List of engine commands that need to be performed. @@ -175,18 +175,15 @@ def on_action_complete(action_ex, result): isinstance(action_ex, models.WorkflowExecution)): return task_ex - result = action_handler.transform_result(result, task_ex) + task_spec = wf_spec.get_tasks()[task_ex.name] - wf_ex = task_ex.workflow_execution + result = action_handler.transform_result(result, task_ex, task_spec) # Ignore workflow executions because they're handled during # workflow completion. if not isinstance(action_ex, models.WorkflowExecution): action_handler.store_action_result(action_ex, result) - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - task_spec = wf_spec.get_tasks()[task_ex.name] - if result.is_success(): task_state = states.SUCCESS task_state_info = None @@ -480,11 +477,12 @@ def _schedule_run_workflow(task_ex, task_spec, wf_input, index, wf_params[k] = v del wf_input[k] - wf_ex_id = wf_ex_service.create_workflow_execution( + wf_ex_id, _ = wf_ex_service.create_workflow_execution( wf_def.name, wf_input, "sub-workflow execution", - wf_params + wf_params, + wf_spec ) scheduler.schedule_call( diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 184893841..2255677b7 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -19,15 +19,12 @@ from mistral.engine import task_handler from mistral import exceptions as exc from mistral.services import scheduler from mistral.utils import wf_trace -from mistral.workbook import parser as spec_parser from mistral.workflow import data_flow from mistral.workflow import states from mistral.workflow import utils as wf_utils -def succeed_workflow(wf_ex, final_context, state_info=None): - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - +def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None): # Fail workflow if output is not successfully evaluated. try: wf_ex.output = data_flow.evaluate_workflow_output( diff --git a/mistral/services/executions.py b/mistral/services/executions.py index 9b8e26d92..89aae84b0 100644 --- a/mistral/services/executions.py +++ b/mistral/services/executions.py @@ -71,11 +71,14 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): return wf_ex -def create_workflow_execution(wf_identifier, wf_input, description, params): +def create_workflow_execution(wf_identifier, wf_input, description, params, + wf_spec=None): params = canonize_workflow_params(params) wf_def = db_api.get_workflow_definition(wf_identifier) - wf_spec = spec_parser.get_workflow_spec(wf_def.spec) + + if wf_spec is None: + wf_spec = spec_parser.get_workflow_spec(wf_def.spec) eng_utils.validate_input(wf_def, wf_input, wf_spec) @@ -89,4 +92,4 @@ def create_workflow_execution(wf_identifier, wf_input, description, params): wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier) - return wf_ex.id + return wf_ex.id, wf_spec