From 5e56a28608d9b3a37a26a89c4d4bf2a78cfb1683 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Thu, 26 Apr 2018 18:10:18 +0700 Subject: [PATCH] Get rid of a extra copy of workflow environment * Previously, we had two copies of the workflow environment passed by a user: one was in the 'params' fields under ke 'env' key and another one was copied into the 'context' field under the '__env' key so that we can evaluate expressions involving the env() function (YAQL or Jinja). This patch removes the copy from the 'context' field in favor of using an ad-hoc ContextView structure where we now also weave in the environment dictionary under the same key '__env'. Related-Bug: #1757966 Change-Id: I1204b082794b376787d126136a79dd204ec3af07 --- mistral/engine/actions.py | 15 ++- mistral/engine/base.py | 2 + mistral/engine/tasks.py | 4 +- mistral/engine/workflows.py | 39 ++++--- mistral/services/workflows.py | 3 - .../tests/unit/engine/test_action_defaults.py | 26 +++-- .../unit/engine/test_direct_workflow_rerun.py | 2 - mistral/tests/unit/engine/test_environment.py | 103 ++++++++++++++++-- mistral/tests/unit/engine/test_policies.py | 3 +- .../unit/engine/test_reverse_workflow.py | 22 +++- .../engine/test_reverse_workflow_rerun.py | 2 - .../tests/unit/engine/test_subworkflows.py | 2 +- .../tests/unit/engine/test_workflow_resume.py | 2 - .../tests/unit/engine/test_yaql_functions.py | 6 +- .../unit/services/test_workflow_service.py | 1 - .../unit/workflow/test_direct_workflow.py | 1 + mistral/workflow/data_flow.py | 40 +++---- mistral/workflow/direct_workflow.py | 2 + 18 files changed, 199 insertions(+), 76 deletions(-) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index a5d1a0d6b..412bf746f 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -369,7 +369,8 @@ class AdHocAction(PythonAction): self.action_spec.get_base() ) base_action_def = self._gather_base_actions( - action_def, base_action_def + action_def, + base_action_def ) super(AdHocAction, self).__init__( @@ -409,11 +410,17 @@ class AdHocAction(PythonAction): base_input_expr = action_spec.get_base_input() if base_input_expr: + wf_ex = ( + self.task_ex.workflow_execution if self.task_ex else None + ) + ctx_view = data_flow.ContextView( base_input_dict, self.task_ctx, + data_flow.get_workflow_environment_dict(wf_ex), self.wf_ctx ) + base_input_dict = expr.evaluate_recursively( base_input_expr, ctx_view @@ -435,8 +442,10 @@ class AdHocAction(PythonAction): if transformer is not None: result = ml_actions.Result( - data=expr.evaluate_recursively(transformer, - result.data), + data=expr.evaluate_recursively( + transformer, + result.data + ), error=result.error ) diff --git a/mistral/engine/base.py b/mistral/engine/base.py index d2a99e48c..8d95abb78 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -150,6 +150,7 @@ class TaskPolicy(object): ctx_view = data_flow.ContextView( task_ex.in_context, + data_flow.get_workflow_environment_dict(wf_ex), wf_ex.context, wf_ex.input ) @@ -168,6 +169,7 @@ class TaskPolicy(object): ctx_view = data_flow.ContextView( task_ex.in_context, + data_flow.get_workflow_environment_dict(wf_ex), wf_ex.context, wf_ex.input ) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 3f014fa6d..12b3f69e5 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -300,7 +300,7 @@ class Task(object): if not action_name: return {} - env = self.wf_ex.context.get('__env', {}) + env = self.wf_ex.params['env'] return env.get('__actions', {}).get(action_name, {}) @@ -431,6 +431,7 @@ class RegularTask(Task): ctx_view = data_flow.ContextView( input_dict, self.ctx, + data_flow.get_workflow_environment_dict(self.wf_ex), self.wf_ex.context, self.wf_ex.input ) @@ -460,6 +461,7 @@ class RegularTask(Task): def _evaluate_expression(self, expression, ctx=None): ctx_view = data_flow.ContextView( data_flow.get_current_task_dict(self.task_ex), + data_flow.get_workflow_environment_dict(self.wf_ex), ctx or self.ctx, self.wf_ex.context, self.wf_ex.input diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 93e2823da..9fbdddcb8 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -25,6 +25,7 @@ from mistral.engine import action_queue from mistral.engine import dispatcher from mistral.engine import utils as engine_utils from mistral import exceptions as exc +from mistral import expressions as expr from mistral.lang import parser as spec_parser from mistral.services import triggers from mistral.services import workflows as wf_service @@ -238,10 +239,12 @@ class Workflow(object): return db_api.acquire_lock(db_models.WorkflowExecution, self.wf_ex.id) def _get_final_context(self): + final_ctx = {} + wf_ctrl = wf_base.get_controller(self.wf_ex) - final_context = {} + try: - final_context = wf_ctrl.evaluate_workflow_final_context() + final_ctx = wf_ctrl.evaluate_workflow_final_context() except Exception as e: LOG.warning( 'Failed to get final context for workflow execution. ' @@ -251,7 +254,7 @@ class Workflow(object): str(e) ) - return final_context + return final_ctx def _create_execution(self, wf_def, input_dict, desc, params): self.wf_ex = db_api.create_workflow_execution({ @@ -272,16 +275,12 @@ class Workflow(object): self.wf_ex.input = input_dict or {} - env = _get_environment(params) - - if env: - params['env'] = env + params['env'] = _get_environment(params) self.wf_ex.params = params data_flow.add_openstack_data_to_context(self.wf_ex) data_flow.add_execution_to_context(self.wf_ex) - data_flow.add_environment_to_context(self.wf_ex) data_flow.add_workflow_variables_to_context(self.wf_ex, self.wf_spec) spec_parser.cache_workflow_spec_by_execution_id( @@ -486,10 +485,12 @@ class Workflow(object): def _get_environment(params): env = params.get('env', {}) - if isinstance(env, dict): - return env + if not env: + return {} - if isinstance(env, six.string_types): + if isinstance(env, dict): + env_dict = env + elif isinstance(env, six.string_types): env_db = db_api.load_environment(env) if not env_db: @@ -497,12 +498,18 @@ def _get_environment(params): 'Environment is not found: %s' % env ) - return env_db.variables + env_dict = env_db.variables + else: + raise exc.InputException( + 'Unexpected value type for environment [env=%s, type=%s]' + % (env, type(env)) + ) - raise exc.InputException( - 'Unexpected value type for environment [env=%s, type=%s]' - % (env, type(env)) - ) + if ('evaluate_env' in params and + not params['evaluate_env']): + return env_dict + else: + return expr.evaluate_recursively(env_dict, {'__env': env_dict}) def _build_fail_info_message(wf_ctrl, wf_ex): diff --git a/mistral/services/workflows.py b/mistral/services/workflows.py index 5f35fe119..a260a9674 100644 --- a/mistral/services/workflows.py +++ b/mistral/services/workflows.py @@ -16,7 +16,6 @@ from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral.lang import parser as spec_parser from mistral import utils -from mistral.workflow import data_flow from mistral.workflow import states from oslo_log import log as logging @@ -136,8 +135,6 @@ def update_workflow_execution_env(wf_ex, env): wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env) - data_flow.add_environment_to_context(wf_ex) - return wf_ex diff --git a/mistral/tests/unit/engine/test_action_defaults.py b/mistral/tests/unit/engine/test_action_defaults.py index 4cdb3466c..174783933 100644 --- a/mistral/tests/unit/engine/test_action_defaults.py +++ b/mistral/tests/unit/engine/test_action_defaults.py @@ -42,8 +42,8 @@ EXPECTED_ENV_AUTH = ('librarian', 'password123') WORKFLOW1 = """ --- version: "2.0" + wf1: - type: direct tasks: task1: action: std.http url="https://api.library.org/books" @@ -54,8 +54,8 @@ wf1: WORKFLOW2 = """ --- version: "2.0" + wf2: - type: direct tasks: task1: action: std.http url="https://api.library.org/books" timeout=60 @@ -66,10 +66,11 @@ wf2: WORKFLOW1_WITH_ITEMS = """ --- version: "2.0" + wf1_with_items: - type: direct input: - links + tasks: task1: with-items: link in <% $.links %> @@ -81,10 +82,11 @@ wf1_with_items: WORKFLOW2_WITH_ITEMS = """ --- version: "2.0" + wf2_with_items: - type: direct input: - links + tasks: task1: with-items: link in <% $.links %> @@ -95,7 +97,6 @@ wf2_with_items: class ActionDefaultTest(base.EngineTestCase): - @mock.patch.object( requests, 'request', mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK'))) @@ -116,11 +117,18 @@ class ActionDefaultTest(base.EngineTestCase): self._assert_single_item(wf_ex.task_executions, name='task1') requests.request.assert_called_with( - 'GET', 'https://api.library.org/books', - params=None, data=None, headers=None, cookies=None, - allow_redirects=None, proxies=None, verify=None, + 'GET', + 'https://api.library.org/books', + params=None, + data=None, + headers=None, + cookies=None, + allow_redirects=None, + proxies=None, + verify=None, auth=EXPECTED_ENV_AUTH, - timeout=ENV['__actions']['std.http']['timeout']) + timeout=ENV['__actions']['std.http']['timeout'] + ) @mock.patch.object( requests, 'request', diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index 3613cd088..09f70d160 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -338,7 +338,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertIsNotNone(wf_ex.state_info) self.assertEqual(3, len(task_execs)) self.assertDictEqual(env, wf_ex.params['env']) - self.assertDictEqual(env, wf_ex.context['__env']) task_10_ex = self._assert_single_item(task_execs, name='t10') task_21_ex = self._assert_single_item(task_execs, name='t21') @@ -362,7 +361,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.RUNNING, wf_ex.state) self.assertIsNone(wf_ex.state_info) self.assertDictEqual(updated_env, wf_ex.params['env']) - self.assertDictEqual(updated_env, wf_ex.context['__env']) # Await t30 success. self.await_task_success(task_30_ex.id) diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index f4bb1640b..6842d8333 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -14,6 +14,7 @@ import mock from oslo_config import cfg +import testtools from mistral.db.v2 import api as db_api from mistral.executors import default_executor as d_exe @@ -109,7 +110,6 @@ class EnvironmentTest(base.EngineTestCase): # Execution of 'wf2'. self.assertIsNotNone(wf2_ex) self.assertDictEqual({}, wf2_ex.input) - self.assertDictContainsSubset({'env': env}, wf2_ex.params) self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) @@ -122,19 +122,12 @@ class EnvironmentTest(base.EngineTestCase): wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2') wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1') - expected_start_params = { - 'task_name': 'task2', - 'task_execution_id': wf1_ex.task_execution_id, - 'env': env - } - expected_wf1_input = { 'param1': 'Bonnie', 'param2': 'Clyde' } self.assertIsNotNone(wf1_ex.task_execution_id) - self.assertDictContainsSubset(expected_start_params, wf1_ex.params) self.assertDictEqual(wf1_ex.input, expected_wf1_input) # Wait till workflow 'wf1' is completed. @@ -358,3 +351,97 @@ class EnvironmentTest(base.EngineTestCase): }, sub_wf_ex.output ) + + def test_env_not_copied_to_context(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: std.echo output="<% env().param1 %>" + publish: + result: <% task().result %> + """ + + wf_service.create_workflows(wf_text) + + env = { + 'param1': 'val1', + 'param2': 'val2', + 'param3': 'val3' + } + + wf_ex = self.engine.start_workflow('wf', env=env) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + t = self._assert_single_item( + wf_ex.task_executions, + name='task1' + ) + + self.assertDictEqual({'result': 'val1'}, t.published) + + self.assertNotIn('__env', wf_ex.context) + + @testtools.skip("Not implemented yet") + def test_subworkflow_env_no_duplicate(self): + wf_text = """--- + version: '2.0' + + parent_wf: + tasks: + task1: + workflow: sub_wf + + sub_wf: + output: + result: <% $.result %> + + tasks: + task1: + action: std.noop + publish: + result: <% env().param1 %> + """ + + wf_service.create_workflows(wf_text) + + env = { + 'param1': 'val1', + 'param2': 'val2', + 'param3': 'val3' + } + + parent_wf_ex = self.engine.start_workflow('parent_wf', env=env) + + self.await_workflow_success(parent_wf_ex.id) + + with db_api.transaction(): + parent_wf_ex = db_api.get_workflow_execution(parent_wf_ex.id) + + t = self._assert_single_item( + parent_wf_ex.task_executions, + name='task1' + ) + + sub_wf_ex = db_api.get_workflow_executions( + task_execution_id=t.id + )[0] + + self.assertDictEqual( + { + "result": "val1" + }, + sub_wf_ex.output + ) + + # The environment of the subworkflow must be empty. + # To evaluate expressions it should be taken from the + # parent workflow execution. + self.assertIsNone(sub_wf_ex.params['env']) + self.assertIsNone(sub_wf_ex.context['__env']) diff --git a/mistral/tests/unit/engine/test_policies.py b/mistral/tests/unit/engine/test_policies.py index 650f1ee04..ad8649a05 100644 --- a/mistral/tests/unit/engine/test_policies.py +++ b/mistral/tests/unit/engine/test_policies.py @@ -370,7 +370,8 @@ class PoliciesTest(base.EngineTestCase): wf_ex = models.WorkflowExecution( id='1-2-3-4', context={}, - input={} + input={}, + params={} ) task_ex = models.TaskExecution(in_context={'int_var': 5}) diff --git a/mistral/tests/unit/engine/test_reverse_workflow.py b/mistral/tests/unit/engine/test_reverse_workflow.py index ba5cf368d..2bfbf7da0 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow.py +++ b/mistral/tests/unit/engine/test_reverse_workflow.py @@ -68,7 +68,10 @@ class ReverseWorkflowEngineTest(base.EngineTestCase): wb_service.create_workbook_v2(WORKBOOK) def test_start_task1(self): - wf_input = {'param1': 'a', 'param2': 'b'} + wf_input = { + 'param1': 'a', + 'param2': 'b' + } wf_ex = self.engine.start_workflow( 'my_wb.wf1', @@ -81,7 +84,11 @@ class ReverseWorkflowEngineTest(base.EngineTestCase): self.assertIsNotNone(wf_ex) self.assertDictEqual(wf_input, wf_ex.input) self.assertDictEqual( - {'task_name': 'task1', 'namespace': ''}, + { + 'task_name': 'task1', + 'namespace': '', + 'env': {} + }, wf_ex.params ) @@ -105,7 +112,10 @@ class ReverseWorkflowEngineTest(base.EngineTestCase): self.assertDictEqual({'result1': 'a'}, task_ex.published) def test_start_task2(self): - wf_input = {'param1': 'a', 'param2': 'b'} + wf_input = { + 'param1': 'a', + 'param2': 'b' + } wf_ex = self.engine.start_workflow( 'my_wb.wf1', @@ -118,7 +128,11 @@ class ReverseWorkflowEngineTest(base.EngineTestCase): self.assertIsNotNone(wf_ex) self.assertDictEqual(wf_input, wf_ex.input) self.assertDictEqual( - {'task_name': 'task2', 'namespace': ''}, + { + 'task_name': 'task2', + 'namespace': '', + 'env': {} + }, wf_ex.params ) diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py index df22bf44c..4c292196b 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py @@ -217,7 +217,6 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): self.assertIsNotNone(wf_ex.state_info) self.assertEqual(2, len(task_execs)) self.assertDictEqual(env, wf_ex.params['env']) - self.assertDictEqual(env, wf_ex.context['__env']) task_1_ex = self._assert_single_item(task_execs, name='t1') task_2_ex = self._assert_single_item(task_execs, name='t2') @@ -240,7 +239,6 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.RUNNING, wf_ex.state) self.assertIsNone(wf_ex.state_info) self.assertDictEqual(updated_env, wf_ex.params['env']) - self.assertDictEqual(updated_env, wf_ex.context['__env']) # Wait for the workflow to succeed. self.await_workflow_success(wf_ex.id) diff --git a/mistral/tests/unit/engine/test_subworkflows.py b/mistral/tests/unit/engine/test_subworkflows.py index 663f53324..d0f537f9b 100644 --- a/mistral/tests/unit/engine/test_subworkflows.py +++ b/mistral/tests/unit/engine/test_subworkflows.py @@ -233,7 +233,7 @@ class SubworkflowsTest(base.EngineTestCase): self.assertEqual(project_id, wf2_ex.project_id) self.assertIsNotNone(wf2_ex) self.assertDictEqual({}, wf2_ex.input) - self.assertDictEqual({'namespace': ''}, wf2_ex.params) + self.assertDictEqual({'namespace': '', 'env': {}}, wf2_ex.params) self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) diff --git a/mistral/tests/unit/engine/test_workflow_resume.py b/mistral/tests/unit/engine/test_workflow_resume.py index 59efa0701..7d67bb7d9 100644 --- a/mistral/tests/unit/engine/test_workflow_resume.py +++ b/mistral/tests/unit/engine/test_workflow_resume.py @@ -429,7 +429,6 @@ class WorkflowResumeTest(base.EngineTestCase): self.assertEqual(states.PAUSED, wf_ex.state) self.assertEqual(2, len(task_execs)) self.assertDictEqual(env, wf_ex.params['env']) - self.assertDictEqual(env, wf_ex.context['__env']) self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.IDLE, task_2_ex.state) @@ -450,7 +449,6 @@ class WorkflowResumeTest(base.EngineTestCase): task_execs = wf_ex.task_executions self.assertDictEqual(updated_env, wf_ex.params['env']) - self.assertDictEqual(updated_env, wf_ex.context['__env']) self.assertEqual(3, len(task_execs)) # Check result of task2. diff --git a/mistral/tests/unit/engine/test_yaql_functions.py b/mistral/tests/unit/engine/test_yaql_functions.py index c5f43168d..b181a4d74 100644 --- a/mistral/tests/unit/engine/test_yaql_functions.py +++ b/mistral/tests/unit/engine/test_yaql_functions.py @@ -378,7 +378,11 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase): ) self.assertDictEqual( - {'param1': 'blablabla', 'namespace': ''}, + { + 'param1': 'blablabla', + 'namespace': '', + 'env': {} + }, execution['params'] ) diff --git a/mistral/tests/unit/services/test_workflow_service.py b/mistral/tests/unit/services/test_workflow_service.py index 2887b5dc8..40054ee87 100644 --- a/mistral/tests/unit/services/test_workflow_service.py +++ b/mistral/tests/unit/services/test_workflow_service.py @@ -220,7 +220,6 @@ class WorkflowServiceTest(base.DbTestCase): ) self.assertDictEqual(update_env, updated.params['env']) - self.assertDictEqual(update_env, updated.context['__env']) fetched = db_api.get_workflow_execution(created.id) diff --git a/mistral/tests/unit/workflow/test_direct_workflow.py b/mistral/tests/unit/workflow/test_direct_workflow.py index f78f8b219..3f68008f3 100644 --- a/mistral/tests/unit/workflow/test_direct_workflow.py +++ b/mistral/tests/unit/workflow/test_direct_workflow.py @@ -38,6 +38,7 @@ class DirectWorkflowControllerTest(base.DbTestCase): state=states.RUNNING, workflow_id=wfs[0].id, input={}, + params={}, context={} ) diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 6b23d8c50..fbcc05c90 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy - from oslo_config import cfg from oslo_log import log as logging @@ -193,6 +191,7 @@ def publish_variables(task_ex, task_spec): expr_ctx = ContextView( get_current_task_dict(task_ex), task_ex.in_context, + get_workflow_environment_dict(wf_ex), wf_ex.context, wf_ex.input ) @@ -264,7 +263,12 @@ def evaluate_workflow_output(wf_ex, wf_output, ctx): """ # Evaluate workflow 'output' clause using the final workflow context. - ctx_view = ContextView(ctx, wf_ex.context, wf_ex.input) + ctx_view = ContextView( + ctx, + get_workflow_environment_dict(wf_ex), + wf_ex.context, + wf_ex.input + ) output = expr.evaluate_recursively(wf_output, ctx_view) @@ -298,30 +302,16 @@ def add_execution_to_context(wf_ex): wf_ex.context['__execution'] = {'id': wf_ex.id} -def add_environment_to_context(wf_ex): - # TODO(rakhmerov): This is redundant, we can always get env from WF params - wf_ex.context = wf_ex.context or {} - - # If env variables are provided, add an evaluated copy into the context. - if 'env' in wf_ex.params: - env = copy.deepcopy(wf_ex.params['env']) - - if ('evaluate_env' in wf_ex.params and - not wf_ex.params['evaluate_env']): - wf_ex.context['__env'] = env - else: - wf_ex.context['__env'] = expr.evaluate_recursively( - env, - {'__env': env} - ) - - def add_workflow_variables_to_context(wf_ex, wf_spec): wf_ex.context = wf_ex.context or {} # The context for calculating workflow variables is workflow input # and other data already stored in workflow initial context. - ctx_view = ContextView(wf_ex.context, wf_ex.input) + ctx_view = ContextView( + get_workflow_environment_dict(wf_ex), + wf_ex.context, + wf_ex.input + ) wf_vars = expr.evaluate_recursively(wf_spec.get_vars(), ctx_view) @@ -335,3 +325,9 @@ def evaluate_object_fields(obj, context): for k, v in evaluated_fields.items(): setattr(obj, k, v) + + +def get_workflow_environment_dict(wf_ex): + env_dict = wf_ex.params['env'] if wf_ex and 'env' in wf_ex.params else {} + + return {'__env': env_dict} diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index 0b122902f..61c8e242c 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -198,6 +198,7 @@ class DirectWorkflowController(base.WorkflowController): for t_ex in lookup_utils.find_error_task_executions(self.wf_ex.id): ctx_view = data_flow.ContextView( data_flow.evaluate_task_outbound_context(t_ex), + data_flow.get_workflow_environment_dict(self.wf_ex), self.wf_ex.context, self.wf_ex.input ) @@ -252,6 +253,7 @@ class DirectWorkflowController(base.WorkflowController): ctx_view = data_flow.ContextView( data_flow.get_current_task_dict(task_ex), ctx or data_flow.evaluate_task_outbound_context(task_ex), + data_flow.get_workflow_environment_dict(self.wf_ex), self.wf_ex.context, self.wf_ex.input )