From d9e734fa5b399bd72eff8c06e0264c2661cce4f6 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Mon, 26 Sep 2016 13:22:09 +0300 Subject: [PATCH] Remove environment data from task inbound context * It's redundant to keep environment data in task inbound context, it is immutable and we can always take it from workflow execution object which is more efficient from DB space consumption standpoint. The only case when it's allowed to modify data in a workflow environment is when we either resume or re-run a workflow and in this case we can change it for the whole workflow execution. Change-Id: I244c1768aaa306f8ad41084325107a40005d874c (cherry picked from commit bdf49b7193383b8e5ac8aed4d3a36c6be96e6040) --- mistral/engine/tasks.py | 15 +++++++++------ mistral/engine/workflows.py | 17 ----------------- mistral/services/workflows.py | 1 + mistral/workflow/base.py | 15 +-------------- 4 files changed, 11 insertions(+), 37 deletions(-) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index fbeeccb0b..d850ce25e 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -14,7 +14,6 @@ # limitations under the License. import abc -import copy import operator from oslo_log import log as logging from osprofiler import profiler @@ -241,7 +240,7 @@ class Task(object): if not action_name: return {} - env = self.task_ex.in_context.get('__env', {}) + env = self.wf_ex.context.get('__env', {}) return env.get('__actions', {}).get(action_name, {}) @@ -351,12 +350,16 @@ class RegularTask(Task): ) def _get_target(self, input_dict): + ctx_view = data_flow.ContextView( + input_dict, + self.ctx, + self.wf_ex.context, + self.wf_ex.input + ) + return expr.evaluate_recursively( self.task_spec.get_target(), - utils.merge_dicts( - copy.deepcopy(input_dict), - copy.deepcopy(self.ctx) - ) + ctx_view ) def _get_action_input(self, ctx=None): diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index d2e6ffdaf..9a53a94e4 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -140,11 +140,6 @@ class Workflow(object): # Calculate commands to process next. cmds = wf_ctrl.continue_workflow() - if env: - for cmd in cmds: - if isinstance(cmd, commands.RunExistingTask): - _update_task_environment(cmd.task_ex, env) - self._continue_workflow(cmds) def rerun(self, task_ex, reset=True, env=None): @@ -167,8 +162,6 @@ class Workflow(object): self.set_state(states.RUNNING, recursive=True) - _update_task_environment(task_ex, env) - wf_ctrl = wf_base.get_controller(self.wf_ex) # Calculate commands to process next. @@ -379,16 +372,6 @@ class Workflow(object): ) -def _update_task_environment(task_ex, env): - if env is None: - return - - task_ex.in_context['__env'] = utils.merge_dicts( - task_ex.in_context['__env'], - env - ) - - def _get_environment(params): env = params.get('env', {}) diff --git a/mistral/services/workflows.py b/mistral/services/workflows.py index 649edc1c7..ebfed895f 100644 --- a/mistral/services/workflows.py +++ b/mistral/services/workflows.py @@ -116,6 +116,7 @@ def update_workflow_execution_env(wf_ex, env): ) wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env) + data_flow.add_environment_to_context(wf_ex) return wf_ex diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 3015a7d61..cc7d5dd04 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -16,7 +16,6 @@ # limitations under the License. import abc -import copy from oslo_log import log as logging from osprofiler import profiler @@ -182,19 +181,7 @@ class WorkflowController(object): # to cover 'split' (aka 'merge') use case. upstream_task_execs = self._get_upstream_task_executions(task_spec) - ctx = data_flow.evaluate_upstream_context(upstream_task_execs) - - # TODO(rakhmerov): Seems like we can fully get rid of '__env' in - # task context if we are OK to have it only in workflow execution - # object (wf_ex.context). Now we can selectively modify env - # for some tasks if we resume or re-run a workflow. - if self.wf_ex.context: - ctx['__env'] = u.merge_dicts( - copy.deepcopy(ctx.get('__env', {})), - copy.deepcopy(self.wf_ex.context.get('__env', {})) - ) - - return ctx + return data_flow.evaluate_upstream_context(upstream_task_execs) @abc.abstractmethod def _get_upstream_task_executions(self, task_spec):