Reduce spec parsing some more

This removes even more parsing of specification, both for workflow and
task, during a workflow execution.

Related-Bug: #1541786
Change-Id: Ib400308fabd6d62a061767956f4023b2b712e60f
This commit is contained in:
Thomas Herve 2016-03-15 13:51:23 +01:00
parent 8a2d4b29d7
commit 1cdf79f44c
5 changed files with 52 additions and 50 deletions

View File

@ -261,39 +261,31 @@ def resolve_action_definition(action_spec_name, wf_name=None,
return action_db
def transform_result(result, task_ex=None, action_ex=None):
def transform_result(result, task_ex, task_spec):
"""Transforms task result accounting for ad-hoc actions.
In case if the given result is an action result and action is
an ad-hoc action the method transforms the result according to
ad-hoc action configuration.
:param task_ex: Task DB model.
:param result: Result of task action/workflow.
:param task_ex: Task DB model.
:param task_spec: Task specification.
"""
if result.is_error():
return result
action_spec_name = None
if task_ex:
action_spec_name = spec_parser.get_task_spec(
task_ex.spec).get_action_name()
elif action_ex:
if action_ex.spec:
action_spec_name = spec_parser.get_action_spec(action_ex.spec)
else:
action_spec_name = action_ex.name
action_spec_name = task_spec.get_action_name()
if action_spec_name:
wf_ex = task_ex.workflow_execution if task_ex else None
wf_spec_name = wf_ex.spec['name'] if task_ex else None
wf_ex = task_ex.workflow_execution
wf_spec_name = wf_ex.spec['name']
return transform_action_result(
action_spec_name,
result,
wf_ex.workflow_name if wf_ex else None,
wf_spec_name if wf_ex else None,
wf_ex.workflow_name,
wf_spec_name,
)
return result

View File

@ -58,7 +58,7 @@ class DefaultEngine(base.Engine, coordination.Service):
with db_api.transaction():
# The new workflow execution will be in an IDLE
# state on initial record creation.
wf_ex_id = wf_ex_service.create_workflow_execution(
wf_ex_id, wf_spec = wf_ex_service.create_workflow_execution(
wf_identifier,
wf_input,
description,
@ -71,7 +71,6 @@ class DefaultEngine(base.Engine, coordination.Service):
# at dispatching commands.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_handler.set_execution_state(wf_ex, states.RUNNING)
wf_ctrl = wf_base.WorkflowController.get_controller(
@ -81,7 +80,8 @@ class DefaultEngine(base.Engine, coordination.Service):
self._dispatch_workflow_commands(
wf_ex,
wf_ctrl.continue_workflow()
wf_ctrl.continue_workflow(),
wf_spec
)
return wf_ex.get_clone()
@ -151,6 +151,7 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ex_id = task_ex.workflow_execution_id
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_trace.info(
task_ex,
@ -161,11 +162,11 @@ class DefaultEngine(base.Engine, coordination.Service):
task_ex.state = state
task_ex.state_info = state_info
self._on_task_state_change(task_ex, wf_ex)
self._on_task_state_change(task_ex, wf_ex, wf_spec)
def _on_task_state_change(self, task_ex, wf_ex, task_state=states.SUCCESS):
task_spec = spec_parser.get_task_spec(task_ex.spec)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
def _on_task_state_change(self, task_ex, wf_ex, wf_spec,
task_state=states.SUCCESS):
task_spec = wf_spec.get_tasks()[task_ex.name]
# We must be sure that if task is completed,
# it was also completed in previous transaction.
@ -184,17 +185,17 @@ class DefaultEngine(base.Engine, coordination.Service):
task_ex.processed = True
self._dispatch_workflow_commands(wf_ex, cmds)
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
self._check_workflow_completion(wf_ex, wf_ctrl)
self._check_workflow_completion(wf_ex, wf_ctrl, wf_spec)
elif task_handler.need_to_continue(task_ex, task_spec):
# Re-run existing task.
cmds = [commands.RunExistingTask(task_ex, reset=False)]
self._dispatch_workflow_commands(wf_ex, cmds)
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
@staticmethod
def _check_workflow_completion(wf_ex, wf_ctrl):
def _check_workflow_completion(wf_ex, wf_ctrl, wf_spec):
if states.is_paused_or_completed(wf_ex.state):
return
@ -211,7 +212,8 @@ class DefaultEngine(base.Engine, coordination.Service):
if wf_ctrl.all_errors_handled():
wf_handler.succeed_workflow(
wf_ex,
wf_ctrl.evaluate_workflow_final_context()
wf_ctrl.evaluate_workflow_final_context(),
wf_spec
)
else:
state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex)
@ -236,8 +238,13 @@ class DefaultEngine(base.Engine, coordination.Service):
wf_ex_id = action_ex.task_execution.workflow_execution_id
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
task_ex = task_handler.on_action_complete(action_ex, result)
task_ex = task_handler.on_action_complete(
action_ex,
wf_spec,
result
)
# If workflow is on pause or completed then there's no
# need to continue workflow.
@ -258,6 +265,7 @@ class DefaultEngine(base.Engine, coordination.Service):
self._on_task_state_change(
task_ex,
wf_ex,
wf_spec,
task_state=prev_task_state
)
@ -317,13 +325,15 @@ class DefaultEngine(base.Engine, coordination.Service):
if states.is_completed(t_ex.state) and not t_ex.processed:
t_ex.processed = True
self._dispatch_workflow_commands(wf_ex, cmds)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
self._dispatch_workflow_commands(wf_ex, cmds, wf_spec)
if not cmds:
if not wf_utils.find_incomplete_task_executions(wf_ex):
wf_handler.succeed_workflow(
wf_ex,
wf_ctrl.evaluate_workflow_final_context()
wf_ctrl.evaluate_workflow_final_context(),
wf_spec
)
return wf_ex.get_clone()
@ -389,9 +399,11 @@ class DefaultEngine(base.Engine, coordination.Service):
LOG.warning(
"Failed to get final context for %s: %s" % (wf_ex, e)
)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
return wf_handler.succeed_workflow(
wf_ex,
final_context,
wf_spec,
message
)
elif state == states.ERROR:
@ -404,7 +416,7 @@ class DefaultEngine(base.Engine, coordination.Service):
# TODO(rakhmerov): Implement.
raise NotImplementedError
def _dispatch_workflow_commands(self, wf_ex, wf_cmds):
def _dispatch_workflow_commands(self, wf_ex, wf_cmds, wf_spec):
if not wf_cmds:
return
@ -412,7 +424,7 @@ class DefaultEngine(base.Engine, coordination.Service):
if isinstance(cmd, commands.RunTask) and cmd.is_waiting():
task_handler.defer_task(cmd)
elif isinstance(cmd, commands.RunTask):
task_handler.run_new_task(cmd)
task_handler.run_new_task(cmd, wf_spec)
elif isinstance(cmd, commands.RunExistingTask):
task_handler.run_existing_task(
cmd.task_ex.id,

View File

@ -117,11 +117,10 @@ def defer_task(wf_cmd):
_create_task_execution(wf_ex, task_spec, ctx, state=states.WAITING)
def run_new_task(wf_cmd):
def run_new_task(wf_cmd, wf_spec):
"""Runs a task."""
ctx = wf_cmd.ctx
wf_ex = wf_cmd.wf_ex
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
task_spec = wf_cmd.task_spec
# NOTE(xylan): Need to think how to get rid of this weird judgment to keep
@ -155,7 +154,7 @@ def run_new_task(wf_cmd):
_run_existing_task(task_ex, task_spec, wf_spec)
def on_action_complete(action_ex, result):
def on_action_complete(action_ex, wf_spec, result):
"""Handles event of action result arrival.
Given action result this method performs analysis of the workflow
@ -163,6 +162,7 @@ def on_action_complete(action_ex, result):
scheduled for execution.
:param action_ex: Action execution objects the result belongs to.
:param wf_spec: Worflow specification.
:param result: Task action/workflow output wrapped into
mistral.workflow.utils.Result instance.
:return List of engine commands that need to be performed.
@ -175,18 +175,15 @@ def on_action_complete(action_ex, result):
isinstance(action_ex, models.WorkflowExecution)):
return task_ex
result = action_handler.transform_result(result, task_ex)
task_spec = wf_spec.get_tasks()[task_ex.name]
wf_ex = task_ex.workflow_execution
result = action_handler.transform_result(result, task_ex, task_spec)
# Ignore workflow executions because they're handled during
# workflow completion.
if not isinstance(action_ex, models.WorkflowExecution):
action_handler.store_action_result(action_ex, result)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
task_spec = wf_spec.get_tasks()[task_ex.name]
if result.is_success():
task_state = states.SUCCESS
task_state_info = None
@ -480,11 +477,12 @@ def _schedule_run_workflow(task_ex, task_spec, wf_input, index,
wf_params[k] = v
del wf_input[k]
wf_ex_id = wf_ex_service.create_workflow_execution(
wf_ex_id, _ = wf_ex_service.create_workflow_execution(
wf_def.name,
wf_input,
"sub-workflow execution",
wf_params
wf_params,
wf_spec
)
scheduler.schedule_call(

View File

@ -19,15 +19,12 @@ from mistral.engine import task_handler
from mistral import exceptions as exc
from mistral.services import scheduler
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
def succeed_workflow(wf_ex, final_context, state_info=None):
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
def succeed_workflow(wf_ex, final_context, wf_spec, state_info=None):
# Fail workflow if output is not successfully evaluated.
try:
wf_ex.output = data_flow.evaluate_workflow_output(

View File

@ -71,11 +71,14 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
return wf_ex
def create_workflow_execution(wf_identifier, wf_input, description, params):
def create_workflow_execution(wf_identifier, wf_input, description, params,
wf_spec=None):
params = canonize_workflow_params(params)
wf_def = db_api.get_workflow_definition(wf_identifier)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
if wf_spec is None:
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
eng_utils.validate_input(wf_def, wf_input, wf_spec)
@ -89,4 +92,4 @@ def create_workflow_execution(wf_identifier, wf_input, description, params):
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier)
return wf_ex.id
return wf_ex.id, wf_spec