Add '__task_execution' structure to task execution context on the fly

* Previously we stored the data structure describing the current
  task execution (id and name) in the inbound task execution context
  directly so that it'd be saved to DB. This was needed to evaluate
  YAQL/Jinja function task() without parameters properly. However,
  it's not needed, we can just build a context view on the fly
  just before evaluating an expression.

Closes-Bug: #1764704
(cherry picked from commit 6b7b58ed6c)

Change-Id: I9ea71cdc169c0c6f537429c75b5f1bd4079b87bf
This commit is contained in:
Renat Akhmerov 2018-04-17 17:38:07 +07:00 committed by Dougal Matthews
parent cc63730a04
commit 20248cc2a7
6 changed files with 29 additions and 37 deletions

View File

@ -311,6 +311,7 @@ class RetryPolicy(base.TaskPolicy):
wf_ex = task_ex.workflow_execution
ctx_view = data_flow.ContextView(
data_flow.get_current_task_dict(task_ex),
data_flow.evaluate_task_outbound_context(task_ex),
wf_ex.context,
wf_ex.input

View File

@ -265,8 +265,6 @@ class Task(object):
task_name = self.task_spec.get_name()
task_type = self.task_spec.get_type()
data_flow.add_current_task_to_context(self.ctx, task_id, task_name)
values = {
'id': task_id,
'name': task_name,
@ -379,16 +377,13 @@ class RegularTask(Task):
self._schedule_actions()
def _update_inbound_context(self):
task_ex = self.task_ex
assert task_ex
assert self.task_ex
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
self.ctx = wf_ctrl.get_task_inbound_context(self.task_spec)
data_flow.add_current_task_to_context(self.ctx, task_ex.id,
task_ex.name)
utils.update_dict(task_ex.in_context, self.ctx)
utils.update_dict(self.task_ex.in_context, self.ctx)
def _update_triggered_by(self):
assert self.task_ex
@ -463,17 +458,17 @@ class RegularTask(Task):
)
def _evaluate_expression(self, expression, ctx=None):
ctx = ctx or self.ctx
ctx_view = data_flow.ContextView(
ctx,
data_flow.get_current_task_dict(self.task_ex),
ctx or self.ctx,
self.wf_ex.context,
self.wf_ex.input
)
input_dict = expr.evaluate_recursively(
return expr.evaluate_recursively(
expression,
ctx_view
)
return input_dict
def _build_action(self):
action_name = self.task_spec.get_action_name()

View File

@ -813,14 +813,6 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
task2_1_ex = self._assert_single_item(tasks_execs, name='task2_1')
task2_2_ex = self._assert_single_item(tasks_execs, name='task2_2')
# TODO(rakhmerov): Find out why '__task_execution' is still
# in the inbound context
del task0_ex.in_context['__task_execution']
del task1_1_ex.in_context['__task_execution']
del task1_2_ex.in_context['__task_execution']
del task2_1_ex.in_context['__task_execution']
del task2_2_ex.in_context['__task_execution']
self.assertDictEqual({}, task0_ex.in_context)
self.assertDictEqual({'var0': 'val0'}, task1_1_ex.in_context)
self.assertDictEqual(

View File

@ -206,10 +206,12 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task1_ex = self._assert_single_item(
wf_ex.task_executions, name='task1'
wf_ex.task_executions,
name='task1'
)
task2_ex = self._assert_single_item(
wf_ex.task_executions, name='task2'
wf_ex.task_executions,
name='task2'
)
self.assertDictEqual(
@ -229,6 +231,11 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
task2_ex.published
)
# The internal data needed for evaluation of the task() function
# should not be persisted to DB.
self.assertNotIn('__task_execution', task1_ex.in_context)
self.assertNotIn('__task_execution', task2_ex.in_context)
def test_task_function_no_name_on_complete_case(self):
wf_text = """---
version: '2.0'

View File

@ -190,7 +190,12 @@ def publish_variables(task_ex, task_spec):
wf_ex = task_ex.workflow_execution
expr_ctx = ContextView(task_ex.in_context, wf_ex.context, wf_ex.input)
expr_ctx = ContextView(
get_current_task_dict(task_ex),
task_ex.in_context,
wf_ex.context,
wf_ex.input
)
if task_ex.name in expr_ctx:
LOG.warning(
@ -268,19 +273,14 @@ def evaluate_workflow_output(wf_ex, wf_output, ctx):
return output or ctx
def add_current_task_to_context(ctx, task_id, task_name):
ctx['__task_execution'] = {
'id': task_id,
'name': task_name
def get_current_task_dict(task_ex):
return {
'__task_execution': {
'id': task_ex.id,
'name': task_ex.name
}
}
return ctx
def remove_internal_data_from_context(ctx):
if '__task_execution' in ctx:
del ctx['__task_execution']
def add_openstack_data_to_context(wf_ex):
wf_ex.context = wf_ex.context or {}

View File

@ -125,8 +125,6 @@ class DirectWorkflowController(base.WorkflowController):
elif not t_s:
t_s = self.wf_spec.get_tasks()[task_ex.name]
data_flow.remove_internal_data_from_context(ctx)
triggered_by = [
{
'task_id': task_ex.id,
@ -176,8 +174,6 @@ class DirectWorkflowController(base.WorkflowController):
data_flow.evaluate_task_outbound_context(t_ex)
)
data_flow.remove_internal_data_from_context(ctx)
return ctx
def get_logical_task_state(self, task_ex):
@ -248,6 +244,7 @@ class DirectWorkflowController(base.WorkflowController):
t_name = task_ex.name
ctx_view = data_flow.ContextView(
data_flow.get_current_task_dict(task_ex),
ctx or data_flow.evaluate_task_outbound_context(task_ex),
self.wf_ex.context,
self.wf_ex.input