Merge "Optimize action scheduling"

This commit is contained in:
Zuul 2019-03-18 17:19:25 +00:00 committed by Gerrit Code Review
commit 54c34492d0
3 changed files with 53 additions and 20 deletions

View File

@ -211,6 +211,11 @@ class Action(object):
class PythonAction(Action):
"""Regular Python action."""
def __init__(self, action_def, action_ex=None, task_ex=None):
super(PythonAction, self).__init__(action_def, action_ex, task_ex)
self._prepared_input = None
@profiler.trace('action-complete', hide_args=True)
def complete(self, result):
assert self.action_ex
@ -238,6 +243,8 @@ class PythonAction(Action):
timeout=None):
assert not self.action_ex
self.validate_input(input_dict)
# Assign the action execution ID here to minimize database calls.
# Otherwise, the input property of the action execution DB object needs
# to be updated with the action execution ID after the action execution
@ -277,8 +284,9 @@ class PythonAction(Action):
safe_rerun=False, timeout=None):
assert not self.action_ex
input_dict = self._prepare_input(input_dict)
runtime_ctx = self._prepare_runtime_context(index, safe_rerun)
self.validate_input(input_dict)
prepared_input_dict = self._prepare_input(input_dict)
# Assign the action execution ID here to minimize database calls.
# Otherwise, the input property of the action execution DB object needs
@ -288,8 +296,8 @@ class PythonAction(Action):
if save:
self._create_action_execution(
input_dict,
runtime_ctx,
prepared_input_dict,
self._prepare_runtime_context(index, safe_rerun),
self.is_sync(input_dict),
desc=desc,
action_ex_id=action_ex_id
@ -303,7 +311,7 @@ class PythonAction(Action):
self.action_ex.id if self.action_ex else None,
self.action_def.action_class,
self.action_def.attributes or {},
input_dict,
prepared_input_dict,
safe_rerun,
execution_context,
target=target,
@ -314,14 +322,13 @@ class PythonAction(Action):
return self._prepare_output(result)
def is_sync(self, input_dict):
input_dict = self._prepare_input(input_dict)
prepared_input_dict = self._prepare_input(input_dict)
a = a_m.get_action_class(self.action_def.name)(**input_dict)
a = a_m.get_action_class(self.action_def.name)(**prepared_input_dict)
return a.is_sync()
def validate_input(self, input_dict):
# NOTE(kong): Don't validate action input if action initialization
# method contains ** argument.
if '**' in self.action_def.input:
@ -337,19 +344,20 @@ class PythonAction(Action):
)
def _prepare_execution_context(self):
exc_ctx = {}
if self.task_ex:
wf_ex = self.task_ex.workflow_execution
exc_ctx['workflow_execution_id'] = wf_ex.id
exc_ctx['task_execution_id'] = self.task_ex.id
exc_ctx['workflow_name'] = wf_ex.name
if self.action_ex:
exc_ctx['action_execution_id'] = self.action_ex.id
callback_url = '/v2/action_executions/%s' % self.action_ex.id
exc_ctx['callback_url'] = callback_url
exc_ctx['callback_url'] = (
'/v2/action_executions/%s' % self.action_ex.id
)
return exc_ctx
@ -379,6 +387,7 @@ class PythonAction(Action):
class AdHocAction(PythonAction):
"""Ad-hoc action."""
@profiler.trace('ad-hoc-action-init', hide_args=True)
def __init__(self, action_def, action_ex=None, task_ex=None, task_ctx=None,
wf_ctx=None):
self.action_spec = spec_parser.get_action_spec(action_def.spec)
@ -408,6 +417,7 @@ class AdHocAction(PythonAction):
self.task_ctx = task_ctx or {}
self.wf_ctx = wf_ctx or {}
@profiler.trace('ad-hoc-action-validate-input', hide_args=True)
def validate_input(self, input_dict):
expected_input = self.action_spec.get_input()
@ -422,11 +432,16 @@ class AdHocAction(PythonAction):
self._prepare_input(input_dict)
)
@profiler.trace('ad-hoc-action-prepare-input', hide_args=True)
def _prepare_input(self, input_dict):
if self._prepared_input is not None:
return self._prepared_input
base_input_dict = input_dict
for action_def in self.adhoc_action_defs:
action_spec = spec_parser.get_action_spec(action_def.spec)
for k, v in action_spec.get_input().items():
if (k not in base_input_dict or
base_input_dict[k] is utils.NotDefined):
@ -453,8 +468,13 @@ class AdHocAction(PythonAction):
else:
base_input_dict = {}
return super(AdHocAction, self)._prepare_input(base_input_dict)
self._prepared_input = super(AdHocAction, self)._prepare_input(
base_input_dict
)
return self._prepared_input
@profiler.trace('ad-hoc-action-prepare-output', hide_args=True)
def _prepare_output(self, result):
# In case of error, we don't transform a result.
if not result.is_error():
@ -476,6 +496,7 @@ class AdHocAction(PythonAction):
return result
@profiler.trace('ad-hoc-action-prepare-runtime-context', hide_args=True)
def _prepare_runtime_context(self, index, safe_rerun):
ctx = super(AdHocAction, self)._prepare_runtime_context(
index,
@ -489,6 +510,7 @@ class AdHocAction(PythonAction):
{'adhoc_action_name': self.adhoc_action_def.name}
)
@profiler.trace('ad-hoc-action-gather-base-actions', hide_args=True)
def _gather_base_actions(self, action_def, base_action_def):
"""Find all base ad-hoc actions and store them
@ -537,6 +559,7 @@ class WorkflowAction(Action):
def __init__(self, wf_name, **kwargs):
super(WorkflowAction, self).__init__(None, **kwargs)
self.wf_name = wf_name
@profiler.trace('workflow-action-complete', hide_args=True)
@ -544,11 +567,13 @@ class WorkflowAction(Action):
# No-op because in case of workflow result is already processed.
pass
@profiler.trace('workflkow-action-schedule', hide_args=True)
@profiler.trace('workflow-action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
assert not self.action_ex
self.validate_input(input_dict)
parent_wf_ex = self.task_ex.workflow_execution
parent_wf_spec = spec_parser.get_workflow_spec_by_execution_id(
parent_wf_ex.id

View File

@ -103,8 +103,12 @@ class DefaultEngine(base.Engine):
return action.action_ex.get_clone()
output = action.run(action_input, target, save=False,
timeout=timeout)
output = action.run(
action_input,
target,
save=False,
timeout=timeout
)
state = states.SUCCESS if output.is_success() else states.ERROR

View File

@ -571,6 +571,9 @@ class RegularTask(Task):
@profiler.trace('regular-task-get-target', hide_args=True)
def _get_target(self, input_dict):
if not self.task_spec.get_target():
return None
ctx_view = data_flow.ContextView(
input_dict,
self.ctx,
@ -586,7 +589,11 @@ class RegularTask(Task):
@profiler.trace('regular-task-get-action-input', hide_args=True)
def _get_action_input(self, ctx=None):
input_dict = self._evaluate_expression(self.task_spec.get_input(), ctx)
input_spec = self.task_spec.get_input()
input_dict = (
self._evaluate_expression(input_spec, ctx) if input_spec else {}
)
if not isinstance(input_dict, dict):
raise exc.InputException(
@ -610,10 +617,7 @@ class RegularTask(Task):
self.wf_ex.input
)
return expr.evaluate_recursively(
expression,
ctx_view
)
return expr.evaluate_recursively(expression, ctx_view)
def _build_action(self):
action_name = self.task_spec.get_action_name()