Add '__task_execution' structure to task execution context on the fly
* Previously we stored the data structure describing the current
task execution (id and name) in the inbound task execution context
directly so that it'd be saved to DB. This was needed to evaluate
YAQL/Jinja function task() without parameters properly. However,
it's not needed, we can just build a context view on the fly
just before evaluating an expression.
Closes-Bug: #1764704
(cherry picked from commit 6b7b58ed6c
)
Change-Id: I9ea71cdc169c0c6f537429c75b5f1bd4079b87bf
This commit is contained in:
parent
cc63730a04
commit
20248cc2a7
|
@ -311,6 +311,7 @@ class RetryPolicy(base.TaskPolicy):
|
|||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.get_current_task_dict(task_ex),
|
||||
data_flow.evaluate_task_outbound_context(task_ex),
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
|
|
|
@ -265,8 +265,6 @@ class Task(object):
|
|||
task_name = self.task_spec.get_name()
|
||||
task_type = self.task_spec.get_type()
|
||||
|
||||
data_flow.add_current_task_to_context(self.ctx, task_id, task_name)
|
||||
|
||||
values = {
|
||||
'id': task_id,
|
||||
'name': task_name,
|
||||
|
@ -379,16 +377,13 @@ class RegularTask(Task):
|
|||
self._schedule_actions()
|
||||
|
||||
def _update_inbound_context(self):
|
||||
task_ex = self.task_ex
|
||||
assert task_ex
|
||||
assert self.task_ex
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
||||
|
||||
self.ctx = wf_ctrl.get_task_inbound_context(self.task_spec)
|
||||
data_flow.add_current_task_to_context(self.ctx, task_ex.id,
|
||||
task_ex.name)
|
||||
|
||||
utils.update_dict(task_ex.in_context, self.ctx)
|
||||
utils.update_dict(self.task_ex.in_context, self.ctx)
|
||||
|
||||
def _update_triggered_by(self):
|
||||
assert self.task_ex
|
||||
|
@ -463,17 +458,17 @@ class RegularTask(Task):
|
|||
)
|
||||
|
||||
def _evaluate_expression(self, expression, ctx=None):
|
||||
ctx = ctx or self.ctx
|
||||
ctx_view = data_flow.ContextView(
|
||||
ctx,
|
||||
data_flow.get_current_task_dict(self.task_ex),
|
||||
ctx or self.ctx,
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
)
|
||||
input_dict = expr.evaluate_recursively(
|
||||
|
||||
return expr.evaluate_recursively(
|
||||
expression,
|
||||
ctx_view
|
||||
)
|
||||
return input_dict
|
||||
|
||||
def _build_action(self):
|
||||
action_name = self.task_spec.get_action_name()
|
||||
|
|
|
@ -813,14 +813,6 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
|||
task2_1_ex = self._assert_single_item(tasks_execs, name='task2_1')
|
||||
task2_2_ex = self._assert_single_item(tasks_execs, name='task2_2')
|
||||
|
||||
# TODO(rakhmerov): Find out why '__task_execution' is still
|
||||
# in the inbound context
|
||||
del task0_ex.in_context['__task_execution']
|
||||
del task1_1_ex.in_context['__task_execution']
|
||||
del task1_2_ex.in_context['__task_execution']
|
||||
del task2_1_ex.in_context['__task_execution']
|
||||
del task2_2_ex.in_context['__task_execution']
|
||||
|
||||
self.assertDictEqual({}, task0_ex.in_context)
|
||||
self.assertDictEqual({'var0': 'val0'}, task1_1_ex.in_context)
|
||||
self.assertDictEqual(
|
||||
|
|
|
@ -206,10 +206,12 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
|
|||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task1_ex = self._assert_single_item(
|
||||
wf_ex.task_executions, name='task1'
|
||||
wf_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
task2_ex = self._assert_single_item(
|
||||
wf_ex.task_executions, name='task2'
|
||||
wf_ex.task_executions,
|
||||
name='task2'
|
||||
)
|
||||
|
||||
self.assertDictEqual(
|
||||
|
@ -229,6 +231,11 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
|
|||
task2_ex.published
|
||||
)
|
||||
|
||||
# The internal data needed for evaluation of the task() function
|
||||
# should not be persisted to DB.
|
||||
self.assertNotIn('__task_execution', task1_ex.in_context)
|
||||
self.assertNotIn('__task_execution', task2_ex.in_context)
|
||||
|
||||
def test_task_function_no_name_on_complete_case(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
|
|
@ -190,7 +190,12 @@ def publish_variables(task_ex, task_spec):
|
|||
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
expr_ctx = ContextView(task_ex.in_context, wf_ex.context, wf_ex.input)
|
||||
expr_ctx = ContextView(
|
||||
get_current_task_dict(task_ex),
|
||||
task_ex.in_context,
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
|
||||
if task_ex.name in expr_ctx:
|
||||
LOG.warning(
|
||||
|
@ -268,19 +273,14 @@ def evaluate_workflow_output(wf_ex, wf_output, ctx):
|
|||
return output or ctx
|
||||
|
||||
|
||||
def add_current_task_to_context(ctx, task_id, task_name):
|
||||
ctx['__task_execution'] = {
|
||||
'id': task_id,
|
||||
'name': task_name
|
||||
def get_current_task_dict(task_ex):
|
||||
return {
|
||||
'__task_execution': {
|
||||
'id': task_ex.id,
|
||||
'name': task_ex.name
|
||||
}
|
||||
}
|
||||
|
||||
return ctx
|
||||
|
||||
|
||||
def remove_internal_data_from_context(ctx):
|
||||
if '__task_execution' in ctx:
|
||||
del ctx['__task_execution']
|
||||
|
||||
|
||||
def add_openstack_data_to_context(wf_ex):
|
||||
wf_ex.context = wf_ex.context or {}
|
||||
|
|
|
@ -125,8 +125,6 @@ class DirectWorkflowController(base.WorkflowController):
|
|||
elif not t_s:
|
||||
t_s = self.wf_spec.get_tasks()[task_ex.name]
|
||||
|
||||
data_flow.remove_internal_data_from_context(ctx)
|
||||
|
||||
triggered_by = [
|
||||
{
|
||||
'task_id': task_ex.id,
|
||||
|
@ -176,8 +174,6 @@ class DirectWorkflowController(base.WorkflowController):
|
|||
data_flow.evaluate_task_outbound_context(t_ex)
|
||||
)
|
||||
|
||||
data_flow.remove_internal_data_from_context(ctx)
|
||||
|
||||
return ctx
|
||||
|
||||
def get_logical_task_state(self, task_ex):
|
||||
|
@ -248,6 +244,7 @@ class DirectWorkflowController(base.WorkflowController):
|
|||
t_name = task_ex.name
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.get_current_task_dict(task_ex),
|
||||
ctx or data_flow.evaluate_task_outbound_context(task_ex),
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
|
|
Loading…
Reference in New Issue