Do not copy workflow environment into subworkflows

* We previously always copied a workflow environment of a parent
  workflow into a subworkflow when starting it. However, this is
  redundant because we now have 'root_execution_id' field in the
  the workflow execution model so that we can always get an
  environment of a subworkflow just by accessing the root execution.
  It saves a lot of space in DB and increases performance in cases
  when we have a large workflow environment and many subworkflows.

Related-Bug: #1757966
Change-Id: I15077240ba53663a6267b886ab7b081a7dde2710
This commit is contained in:
Renat Akhmerov 2018-04-27 17:01:31 +07:00
parent b77769cf44
commit f2a9bd45ab
4 changed files with 13 additions and 16 deletions

View File

@ -565,10 +565,6 @@ class WorkflowAction(Action):
'namespace': parent_wf_ex.params['namespace']
}
if 'env' in parent_wf_ex.params:
wf_params['env'] = parent_wf_ex.params['env']
wf_params['evaluate_env'] = parent_wf_ex.params.get('evaluate_env')
if 'notify' in parent_wf_ex.params:
wf_params['notify'] = parent_wf_ex.params['notify']

View File

@ -14,7 +14,6 @@
import mock
from oslo_config import cfg
import testtools
from mistral.db.v2 import api as db_api
from mistral.executors import default_executor as d_exe
@ -394,7 +393,6 @@ class EnvironmentTest(base.EngineTestCase):
self.assertNotIn('__env', wf_ex.context)
@testtools.skip("Not implemented yet")
def test_subworkflow_env_no_duplicate(self):
wf_text = """---
version: '2.0'
@ -449,5 +447,5 @@ class EnvironmentTest(base.EngineTestCase):
# The environment of the subworkflow must be empty.
# To evaluate expressions it should be taken from the
# parent workflow execution.
self.assertIsNone(sub_wf_ex.params['env'])
self.assertIsNone(sub_wf_ex.context['__env'])
self.assertDictEqual({}, sub_wf_ex.params['env'])
self.assertNotIn('__env', sub_wf_ex.context)

View File

@ -367,14 +367,8 @@ class SubworkflowsTest(base.EngineTestCase):
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
expected_start_params = {
'task_name': 'task2',
'task_execution_id': wf1_ex.task_execution_id,
'env': env
}
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset(expected_start_params, wf1_ex.params)
self.assertDictContainsSubset({}, wf1_ex.params)
# Wait till workflow 'wf1' is completed.
self.await_workflow_success(wf1_ex.id)

View File

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_log import log as logging
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral import expressions as expr
@ -328,6 +329,14 @@ def evaluate_object_fields(obj, context):
def get_workflow_environment_dict(wf_ex):
env_dict = wf_ex.params['env'] if wf_ex and 'env' in wf_ex.params else {}
if not wf_ex:
return {}
if wf_ex.root_execution_id:
return get_workflow_environment_dict(
db_api.get_workflow_execution(wf_ex.root_execution_id)
)
env_dict = wf_ex.params['env'] if 'env' in wf_ex.params else {}
return {'__env': env_dict}