Reduce spec parsing in workflow lifecycle
During workflow creation and execution, its specification is parsed multiple times uselessly. This reduces the load by simply passing around the specification when it's available. Change-Id: I5babdbf8e9bc0cf7f69cf724bc7f6b0b270d1667 Closes-Bug: #1541786
This commit is contained in:
parent
6e3ce52ee1
commit
65a84b469a
|
@ -286,8 +286,7 @@ def transform_result(result, task_ex=None, action_ex=None):
|
|||
|
||||
if action_spec_name:
|
||||
wf_ex = task_ex.workflow_execution if task_ex else None
|
||||
wf_spec_name = (spec_parser.get_workflow_spec(
|
||||
wf_ex.spec).get_name() if task_ex else None)
|
||||
wf_spec_name = wf_ex.spec['name'] if task_ex else None
|
||||
|
||||
return transform_action_result(
|
||||
action_spec_name,
|
||||
|
|
|
@ -176,7 +176,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
|||
if task_ex.state == states.RUNNING_DELAYED:
|
||||
return
|
||||
|
||||
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
|
||||
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex, wf_spec)
|
||||
|
||||
# Calculate commands to process next.
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
|
|
|
@ -96,9 +96,15 @@ def _run_existing_task(task_ex, task_spec, wf_spec):
|
|||
# In some cases we can have no input, e.g. in case of 'with-items'.
|
||||
if input_dicts:
|
||||
for index, input_d in input_dicts:
|
||||
_run_action_or_workflow(task_ex, task_spec, input_d, index)
|
||||
_run_action_or_workflow(
|
||||
task_ex,
|
||||
task_spec,
|
||||
input_d,
|
||||
index,
|
||||
wf_spec
|
||||
)
|
||||
else:
|
||||
_schedule_noop_action(task_ex, task_spec)
|
||||
_schedule_noop_action(task_ex, task_spec, wf_spec)
|
||||
|
||||
|
||||
def defer_task(wf_cmd):
|
||||
|
@ -365,7 +371,7 @@ def _get_workflow_input(task_spec, ctx):
|
|||
return expr.evaluate_recursively(task_spec.get_input(), ctx)
|
||||
|
||||
|
||||
def _run_action_or_workflow(task_ex, task_spec, input_dict, index):
|
||||
def _run_action_or_workflow(task_ex, task_spec, input_dict, index, wf_spec):
|
||||
t_name = task_ex.name
|
||||
|
||||
if task_spec.get_action_name():
|
||||
|
@ -375,14 +381,14 @@ def _run_action_or_workflow(task_ex, task_spec, input_dict, index):
|
|||
(t_name, task_spec.get_action_name())
|
||||
)
|
||||
|
||||
_schedule_run_action(task_ex, task_spec, input_dict, index)
|
||||
_schedule_run_action(task_ex, task_spec, input_dict, index, wf_spec)
|
||||
elif task_spec.get_workflow_name():
|
||||
wf_trace.info(
|
||||
task_ex,
|
||||
"Task '%s' is RUNNING [workflow_name = %s]" %
|
||||
(t_name, task_spec.get_workflow_name()))
|
||||
|
||||
_schedule_run_workflow(task_ex, task_spec, input_dict, index)
|
||||
_schedule_run_workflow(task_ex, task_spec, input_dict, index, wf_spec)
|
||||
|
||||
|
||||
def _get_action_defaults(task_ex, task_spec):
|
||||
|
@ -391,10 +397,7 @@ def _get_action_defaults(task_ex, task_spec):
|
|||
return actions.get(task_spec.get_action_name(), {})
|
||||
|
||||
|
||||
def _schedule_run_action(task_ex, task_spec, action_input, index):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
def _schedule_run_action(task_ex, task_spec, action_input, index, wf_spec):
|
||||
action_spec_name = task_spec.get_action_name()
|
||||
|
||||
action_def = action_handler.resolve_definition(
|
||||
|
@ -424,9 +427,8 @@ def _schedule_run_action(task_ex, task_spec, action_input, index):
|
|||
)
|
||||
|
||||
|
||||
def _schedule_noop_action(task_ex, task_spec):
|
||||
def _schedule_noop_action(task_ex, task_spec, wf_spec):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
action_def = action_handler.resolve_action_definition(
|
||||
'std.noop',
|
||||
|
@ -450,9 +452,9 @@ def _schedule_noop_action(task_ex, task_spec):
|
|||
)
|
||||
|
||||
|
||||
def _schedule_run_workflow(task_ex, task_spec, wf_input, index):
|
||||
def _schedule_run_workflow(task_ex, task_spec, wf_input, index,
|
||||
parent_wf_spec):
|
||||
parent_wf_ex = task_ex.workflow_execution
|
||||
parent_wf_spec = spec_parser.get_workflow_spec(parent_wf_ex.spec)
|
||||
|
||||
wf_spec_name = task_spec.get_workflow_name()
|
||||
|
||||
|
|
|
@ -56,4 +56,4 @@ class WorkflowControllerTest(base.BaseTest):
|
|||
|
||||
mock_get_spec.assert_called_once_with("spec")
|
||||
mock_get_class.assert_called_once_with("direct")
|
||||
mock_handler_cls.assert_called_once_with(wf_ex)
|
||||
mock_handler_cls.assert_called_once_with(wf_ex, mock_wf_spec)
|
||||
|
|
|
@ -88,7 +88,7 @@ def instantiate_spec(spec_cls, data):
|
|||
|
||||
spec.validate_semantics()
|
||||
|
||||
return cls(data)
|
||||
return spec
|
||||
|
||||
raise exc.DSLParsingException(
|
||||
'Failed to find a specification class to instantiate '
|
||||
|
|
|
@ -114,8 +114,6 @@ class TaskSpec(base.BaseSpec):
|
|||
def validate_schema(self):
|
||||
super(TaskSpec, self).validate_schema()
|
||||
|
||||
self._transform_with_items()
|
||||
|
||||
action = self._data.get('action')
|
||||
workflow = self._data.get('workflow')
|
||||
|
||||
|
|
|
@ -40,13 +40,17 @@ class WorkflowController(object):
|
|||
by Mistral.
|
||||
"""
|
||||
|
||||
def __init__(self, wf_ex):
|
||||
def __init__(self, wf_ex, wf_spec=None):
|
||||
"""Creates a new workflow controller.
|
||||
|
||||
:param wf_ex: Workflow execution.
|
||||
|
||||
:param wf_spec: Workflow specification.
|
||||
"""
|
||||
self.wf_ex = wf_ex
|
||||
self.wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
if wf_spec is None:
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
self.wf_spec = wf_spec
|
||||
|
||||
def _update_task_ex_env(self, task_ex, env):
|
||||
if not env:
|
||||
|
@ -194,4 +198,7 @@ class WorkflowController(object):
|
|||
if not wf_spec:
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex['spec'])
|
||||
|
||||
return WorkflowController._get_class(wf_spec.get_type())(wf_ex)
|
||||
return WorkflowController._get_class(wf_spec.get_type())(
|
||||
wf_ex,
|
||||
wf_spec
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue