Do not copy workflow environment into subworkflows

* We previously always copied a workflow environment of a parent
  workflow into a subworkflow when starting it. However, this is
  redundant because we now have 'root_execution_id' field in the
  the workflow execution model so that we can always get an
  environment of a subworkflow just by accessing the root execution.
  It saves a lot of space in DB and increases performance in cases
  when we have a large workflow environment and many subworkflows.

Change-Id: Id8f5e917b45fedf09ecf2d28798a6949cf5f5c99
Related-Bug: #1757966
This commit is contained in:
Renat Akhmerov 2018-04-27 17:01:31 +07:00
parent e29e182679
commit 7ec98b457c
4 changed files with 13 additions and 16 deletions

View File

@ -547,10 +547,6 @@ class WorkflowAction(Action):
'namespace': parent_wf_ex.params['namespace']
}
if 'env' in parent_wf_ex.params:
wf_params['env'] = parent_wf_ex.params['env']
wf_params['evaluate_env'] = parent_wf_ex.params.get('evaluate_env')
for k, v in list(input_dict.items()):
if k not in wf_spec.get_input():
wf_params[k] = v

View File

@ -14,7 +14,6 @@
import mock
from oslo_config import cfg
import testtools
from mistral.db.v2 import api as db_api
from mistral.executors import default_executor as d_exe
@ -388,7 +387,6 @@ class EnvironmentTest(base.EngineTestCase):
self.assertNotIn('__env', wf_ex.context)
@testtools.skip("Not implemented yet")
def test_subworkflow_env_no_duplicate(self):
wf_text = """---
version: '2.0'
@ -443,5 +441,5 @@ class EnvironmentTest(base.EngineTestCase):
# The environment of the subworkflow must be empty.
# To evaluate expressions it should be taken from the
# parent workflow execution.
self.assertIsNone(sub_wf_ex.params['env'])
self.assertIsNone(sub_wf_ex.context['__env'])
self.assertDictEqual({}, sub_wf_ex.params['env'])
self.assertNotIn('__env', sub_wf_ex.context)

View File

@ -367,14 +367,8 @@ class SubworkflowsTest(base.EngineTestCase):
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
expected_start_params = {
'task_name': 'task2',
'task_execution_id': wf1_ex.task_execution_id,
'env': env
}
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset(expected_start_params, wf1_ex.params)
self.assertDictContainsSubset({}, wf1_ex.params)
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)

View File

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral import expressions as expr
@ -328,6 +329,14 @@ def evaluate_object_fields(obj, context):
def get_workflow_environment_dict(wf_ex):
env_dict = wf_ex.params['env'] if wf_ex and 'env' in wf_ex.params else {}
if not wf_ex:
return {}
if wf_ex.root_execution_id:
return get_workflow_environment_dict(
db_api.get_workflow_execution(wf_ex.root_execution_id)
)
env_dict = wf_ex.params['env'] if 'env' in wf_ex.params else {}
return {'__env': env_dict}