Remove environment data from task inbound context
* It's redundant to keep environment data in task inbound context,
it is immutable and we can always take it from workflow execution
object which is more efficient from DB space consumption
standpoint. The only case when it's allowed to modify data
in a workflow environment is when we either resume or re-run
a workflow and in this case we can change it for the whole
workflow execution.
Change-Id: I244c1768aaa306f8ad41084325107a40005d874c
(cherry picked from commit bdf49b7193
)
This commit is contained in:
parent
026e9d6c99
commit
d9e734fa5b
|
@ -14,7 +14,6 @@
|
|||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
import operator
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
@ -241,7 +240,7 @@ class Task(object):
|
|||
if not action_name:
|
||||
return {}
|
||||
|
||||
env = self.task_ex.in_context.get('__env', {})
|
||||
env = self.wf_ex.context.get('__env', {})
|
||||
|
||||
return env.get('__actions', {}).get(action_name, {})
|
||||
|
||||
|
@ -351,12 +350,16 @@ class RegularTask(Task):
|
|||
)
|
||||
|
||||
def _get_target(self, input_dict):
|
||||
ctx_view = data_flow.ContextView(
|
||||
input_dict,
|
||||
self.ctx,
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
)
|
||||
|
||||
return expr.evaluate_recursively(
|
||||
self.task_spec.get_target(),
|
||||
utils.merge_dicts(
|
||||
copy.deepcopy(input_dict),
|
||||
copy.deepcopy(self.ctx)
|
||||
)
|
||||
ctx_view
|
||||
)
|
||||
|
||||
def _get_action_input(self, ctx=None):
|
||||
|
|
|
@ -140,11 +140,6 @@ class Workflow(object):
|
|||
# Calculate commands to process next.
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
|
||||
if env:
|
||||
for cmd in cmds:
|
||||
if isinstance(cmd, commands.RunExistingTask):
|
||||
_update_task_environment(cmd.task_ex, env)
|
||||
|
||||
self._continue_workflow(cmds)
|
||||
|
||||
def rerun(self, task_ex, reset=True, env=None):
|
||||
|
@ -167,8 +162,6 @@ class Workflow(object):
|
|||
|
||||
self.set_state(states.RUNNING, recursive=True)
|
||||
|
||||
_update_task_environment(task_ex, env)
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
||||
|
||||
# Calculate commands to process next.
|
||||
|
@ -379,16 +372,6 @@ class Workflow(object):
|
|||
)
|
||||
|
||||
|
||||
def _update_task_environment(task_ex, env):
|
||||
if env is None:
|
||||
return
|
||||
|
||||
task_ex.in_context['__env'] = utils.merge_dicts(
|
||||
task_ex.in_context['__env'],
|
||||
env
|
||||
)
|
||||
|
||||
|
||||
def _get_environment(params):
|
||||
env = params.get('env', {})
|
||||
|
||||
|
|
|
@ -116,6 +116,7 @@ def update_workflow_execution_env(wf_ex, env):
|
|||
)
|
||||
|
||||
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)
|
||||
|
||||
data_flow.add_environment_to_context(wf_ex)
|
||||
|
||||
return wf_ex
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
@ -182,19 +181,7 @@ class WorkflowController(object):
|
|||
# to cover 'split' (aka 'merge') use case.
|
||||
upstream_task_execs = self._get_upstream_task_executions(task_spec)
|
||||
|
||||
ctx = data_flow.evaluate_upstream_context(upstream_task_execs)
|
||||
|
||||
# TODO(rakhmerov): Seems like we can fully get rid of '__env' in
|
||||
# task context if we are OK to have it only in workflow execution
|
||||
# object (wf_ex.context). Now we can selectively modify env
|
||||
# for some tasks if we resume or re-run a workflow.
|
||||
if self.wf_ex.context:
|
||||
ctx['__env'] = u.merge_dicts(
|
||||
copy.deepcopy(ctx.get('__env', {})),
|
||||
copy.deepcopy(self.wf_ex.context.get('__env', {}))
|
||||
)
|
||||
|
||||
return ctx
|
||||
return data_flow.evaluate_upstream_context(upstream_task_execs)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _get_upstream_task_executions(self, task_spec):
|
||||
|
|
Loading…
Reference in New Issue