Get rid of a extra copy of workflow environment

* Previously, we had two copies of the workflow environment
  passed by a user: one was in the 'params' fields under ke 'env'
  key and another one was copied into the 'context' field under
  the '__env' key so that we can evaluate expressions involving
  the env() function (YAQL or Jinja).
  This patch removes the copy from the 'context' field in favor
  of using an ad-hoc ContextView structure where we now also
  weave in the environment dictionary under the same key '__env'.

Related-Bug: #1757966
Change-Id: I1204b082794b376787d126136a79dd204ec3af07
This commit is contained in:
Renat Akhmerov 2018-04-26 18:10:18 +07:00
parent 77fb66ce1e
commit 2275925302
18 changed files with 199 additions and 76 deletions

View File

@ -378,7 +378,8 @@ class AdHocAction(PythonAction):
)
base_action_def = self._gather_base_actions(
action_def, base_action_def
action_def,
base_action_def
)
super(AdHocAction, self).__init__(
@ -418,11 +419,17 @@ class AdHocAction(PythonAction):
base_input_expr = action_spec.get_base_input()
if base_input_expr:
wf_ex = (
self.task_ex.workflow_execution if self.task_ex else None
)
ctx_view = data_flow.ContextView(
base_input_dict,
self.task_ctx,
data_flow.get_workflow_environment_dict(wf_ex),
self.wf_ctx
)
base_input_dict = expr.evaluate_recursively(
base_input_expr,
ctx_view
@ -444,8 +451,10 @@ class AdHocAction(PythonAction):
if transformer is not None:
result = ml_actions.Result(
data=expr.evaluate_recursively(transformer,
result.data),
data=expr.evaluate_recursively(
transformer,
result.data
),
error=result.error
)

View File

@ -152,6 +152,7 @@ class TaskPolicy(object):
ctx_view = data_flow.ContextView(
task_ex.in_context,
data_flow.get_workflow_environment_dict(wf_ex),
wf_ex.context,
wf_ex.input
)
@ -170,6 +171,7 @@ class TaskPolicy(object):
ctx_view = data_flow.ContextView(
task_ex.in_context,
data_flow.get_workflow_environment_dict(wf_ex),
wf_ex.context,
wf_ex.input
)

View File

@ -296,7 +296,7 @@ class Task(object):
if not action_name:
return {}
env = self.wf_ex.context.get('__env', {})
env = self.wf_ex.params['env']
return env.get('__actions', {}).get(action_name, {})
@ -428,6 +428,7 @@ class RegularTask(Task):
ctx_view = data_flow.ContextView(
input_dict,
self.ctx,
data_flow.get_workflow_environment_dict(self.wf_ex),
self.wf_ex.context,
self.wf_ex.input
)
@ -457,6 +458,7 @@ class RegularTask(Task):
def _evaluate_expression(self, expression, ctx=None):
ctx_view = data_flow.ContextView(
data_flow.get_current_task_dict(self.task_ex),
data_flow.get_workflow_environment_dict(self.wf_ex),
ctx or self.ctx,
self.wf_ex.context,
self.wf_ex.input

View File

@ -25,6 +25,7 @@ from mistral.engine import action_queue
from mistral.engine import dispatcher
from mistral.engine import utils as engine_utils
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.lang import parser as spec_parser
from mistral.services import triggers
from mistral.services import workflows as wf_service
@ -242,10 +243,12 @@ class Workflow(object):
return db_api.acquire_lock(db_models.WorkflowExecution, self.wf_ex.id)
def _get_final_context(self):
final_ctx = {}
wf_ctrl = wf_base.get_controller(self.wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
final_ctx = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
'Failed to get final context for workflow execution. '
@ -255,7 +258,7 @@ class Workflow(object):
str(e)
)
return final_context
return final_ctx
def _create_execution(self, wf_def, wf_ex_id, input_dict, desc, params):
self.wf_ex = db_api.create_workflow_execution({
@ -277,16 +280,12 @@ class Workflow(object):
self.wf_ex.input = input_dict or {}
env = _get_environment(params)
if env:
params['env'] = env
params['env'] = _get_environment(params)
self.wf_ex.params = params
data_flow.add_openstack_data_to_context(self.wf_ex)
data_flow.add_execution_to_context(self.wf_ex)
data_flow.add_environment_to_context(self.wf_ex)
data_flow.add_workflow_variables_to_context(self.wf_ex, self.wf_spec)
spec_parser.cache_workflow_spec_by_execution_id(
@ -502,10 +501,12 @@ class Workflow(object):
def _get_environment(params):
env = params.get('env', {})
if isinstance(env, dict):
return env
if not env:
return {}
if isinstance(env, six.string_types):
if isinstance(env, dict):
env_dict = env
elif isinstance(env, six.string_types):
env_db = db_api.load_environment(env)
if not env_db:
@ -513,12 +514,18 @@ def _get_environment(params):
'Environment is not found: %s' % env
)
return env_db.variables
env_dict = env_db.variables
else:
raise exc.InputException(
'Unexpected value type for environment [env=%s, type=%s]'
% (env, type(env))
)
raise exc.InputException(
'Unexpected value type for environment [env=%s, type=%s]'
% (env, type(env))
)
if ('evaluate_env' in params and
not params['evaluate_env']):
return env_dict
else:
return expr.evaluate_recursively(env_dict, {'__env': env_dict})
def _build_fail_info_message(wf_ctrl, wf_ex):

View File

@ -16,7 +16,6 @@ from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from mistral import utils
from mistral.workflow import data_flow
from mistral.workflow import states
from oslo_log import log as logging
@ -136,8 +135,6 @@ def update_workflow_execution_env(wf_ex, env):
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)
data_flow.add_environment_to_context(wf_ex)
return wf_ex

View File

@ -42,8 +42,8 @@ EXPECTED_ENV_AUTH = ('librarian', 'password123')
WORKFLOW1 = """
---
version: "2.0"
wf1:
type: direct
tasks:
task1:
action: std.http url="https://api.library.org/books"
@ -54,8 +54,8 @@ wf1:
WORKFLOW2 = """
---
version: "2.0"
wf2:
type: direct
tasks:
task1:
action: std.http url="https://api.library.org/books" timeout=60
@ -66,10 +66,11 @@ wf2:
WORKFLOW1_WITH_ITEMS = """
---
version: "2.0"
wf1_with_items:
type: direct
input:
- links
tasks:
task1:
with-items: link in <% $.links %>
@ -81,10 +82,11 @@ wf1_with_items:
WORKFLOW2_WITH_ITEMS = """
---
version: "2.0"
wf2_with_items:
type: direct
input:
- links
tasks:
task1:
with-items: link in <% $.links %>
@ -95,7 +97,6 @@ wf2_with_items:
class ActionDefaultTest(base.EngineTestCase):
@mock.patch.object(
requests, 'request',
mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK')))
@ -116,11 +117,18 @@ class ActionDefaultTest(base.EngineTestCase):
self._assert_single_item(wf_ex.task_executions, name='task1')
requests.request.assert_called_with(
'GET', 'https://api.library.org/books',
params=None, data=None, headers=None, cookies=None,
allow_redirects=None, proxies=None, verify=None,
'GET',
'https://api.library.org/books',
params=None,
data=None,
headers=None,
cookies=None,
allow_redirects=None,
proxies=None,
verify=None,
auth=EXPECTED_ENV_AUTH,
timeout=ENV['__actions']['std.http']['timeout'])
timeout=ENV['__actions']['std.http']['timeout']
)
@mock.patch.object(
requests, 'request',

View File

@ -338,7 +338,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertDictEqual(env, wf_ex.context['__env'])
task_10_ex = self._assert_single_item(task_execs, name='t10')
task_21_ex = self._assert_single_item(task_execs, name='t21')
@ -362,7 +361,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertDictEqual(updated_env, wf_ex.context['__env'])
# Await t30 success.
self.await_task_success(task_30_ex.id)

View File

@ -14,6 +14,7 @@
import mock
from oslo_config import cfg
import testtools
from mistral.db.v2 import api as db_api
from mistral.executors import default_executor as d_exe
@ -113,7 +114,6 @@ class EnvironmentTest(base.EngineTestCase):
# Execution of 'wf2'.
self.assertIsNotNone(wf2_ex)
self.assertDictEqual({}, wf2_ex.input)
self.assertDictContainsSubset({'env': env}, wf2_ex.params)
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
@ -126,19 +126,12 @@ class EnvironmentTest(base.EngineTestCase):
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
expected_start_params = {
'task_name': 'task2',
'task_execution_id': wf1_ex.task_execution_id,
'env': env
}
expected_wf1_input = {
'param1': 'Bonnie',
'param2': 'Clyde'
}
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictContainsSubset(expected_start_params, wf1_ex.params)
self.assertDictEqual(wf1_ex.input, expected_wf1_input)
# Wait till workflow 'wf1' is completed.
@ -364,3 +357,97 @@ class EnvironmentTest(base.EngineTestCase):
},
sub_wf_ex.output
)
def test_env_not_copied_to_context(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.echo output="<% env().param1 %>"
publish:
result: <% task().result %>
"""
wf_service.create_workflows(wf_text)
env = {
'param1': 'val1',
'param2': 'val2',
'param3': 'val3'
}
wf_ex = self.engine.start_workflow('wf', env=env)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
t = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
self.assertDictEqual({'result': 'val1'}, t.published)
self.assertNotIn('__env', wf_ex.context)
@testtools.skip("Not implemented yet")
def test_subworkflow_env_no_duplicate(self):
wf_text = """---
version: '2.0'
parent_wf:
tasks:
task1:
workflow: sub_wf
sub_wf:
output:
result: <% $.result %>
tasks:
task1:
action: std.noop
publish:
result: <% env().param1 %>
"""
wf_service.create_workflows(wf_text)
env = {
'param1': 'val1',
'param2': 'val2',
'param3': 'val3'
}
parent_wf_ex = self.engine.start_workflow('parent_wf', env=env)
self.await_workflow_success(parent_wf_ex.id)
with db_api.transaction():
parent_wf_ex = db_api.get_workflow_execution(parent_wf_ex.id)
t = self._assert_single_item(
parent_wf_ex.task_executions,
name='task1'
)
sub_wf_ex = db_api.get_workflow_executions(
task_execution_id=t.id
)[0]
self.assertDictEqual(
{
"result": "val1"
},
sub_wf_ex.output
)
# The environment of the subworkflow must be empty.
# To evaluate expressions it should be taken from the
# parent workflow execution.
self.assertIsNone(sub_wf_ex.params['env'])
self.assertIsNone(sub_wf_ex.context['__env'])

View File

@ -372,7 +372,8 @@ class PoliciesTest(base.EngineTestCase):
wf_ex = models.WorkflowExecution(
id='1-2-3-4',
context={},
input={}
input={},
params={}
)
task_ex = models.TaskExecution(in_context={'int_var': 5})

View File

@ -68,7 +68,10 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
wb_service.create_workbook_v2(WORKBOOK)
def test_start_task1(self):
wf_input = {'param1': 'a', 'param2': 'b'}
wf_input = {
'param1': 'a',
'param2': 'b'
}
wf_ex = self.engine.start_workflow(
'my_wb.wf1',
@ -80,7 +83,11 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
self.assertIsNotNone(wf_ex)
self.assertDictEqual(wf_input, wf_ex.input)
self.assertDictEqual(
{'task_name': 'task1', 'namespace': ''},
{
'task_name': 'task1',
'namespace': '',
'env': {}
},
wf_ex.params
)
@ -104,7 +111,10 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
self.assertDictEqual({'result1': 'a'}, task_ex.published)
def test_start_task2(self):
wf_input = {'param1': 'a', 'param2': 'b'}
wf_input = {
'param1': 'a',
'param2': 'b'
}
wf_ex = self.engine.start_workflow(
'my_wb.wf1',
@ -116,7 +126,11 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
self.assertIsNotNone(wf_ex)
self.assertDictEqual(wf_input, wf_ex.input)
self.assertDictEqual(
{'task_name': 'task2', 'namespace': ''},
{
'task_name': 'task2',
'namespace': '',
'env': {}
},
wf_ex.params
)

View File

@ -215,7 +215,6 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertDictEqual(env, wf_ex.context['__env'])
task_1_ex = self._assert_single_item(task_execs, name='t1')
task_2_ex = self._assert_single_item(task_execs, name='t2')
@ -238,7 +237,6 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertDictEqual(updated_env, wf_ex.context['__env'])
# Wait for the workflow to succeed.
self.await_workflow_success(wf_ex.id)

View File

@ -233,7 +233,7 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertEqual(project_id, wf2_ex.project_id)
self.assertIsNotNone(wf2_ex)
self.assertDictEqual({}, wf2_ex.input)
self.assertDictEqual({'namespace': ''}, wf2_ex.params)
self.assertDictEqual({'namespace': '', 'env': {}}, wf2_ex.params)
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)

View File

@ -427,7 +427,6 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(2, len(task_execs))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertDictEqual(env, wf_ex.context['__env'])
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.IDLE, task_2_ex.state)
@ -448,7 +447,6 @@ class WorkflowResumeTest(base.EngineTestCase):
task_execs = wf_ex.task_executions
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertDictEqual(updated_env, wf_ex.context['__env'])
self.assertEqual(3, len(task_execs))
# Check result of task2.

View File

@ -377,7 +377,11 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
)
self.assertDictEqual(
{'param1': 'blablabla', 'namespace': ''},
{
'param1': 'blablabla',
'namespace': '',
'env': {}
},
execution['params']
)

View File

@ -232,7 +232,6 @@ class WorkflowServiceTest(base.DbTestCase):
)
self.assertDictEqual(update_env, updated.params['env'])
self.assertDictEqual(update_env, updated.context['__env'])
fetched = db_api.get_workflow_execution(created.id)

View File

@ -38,6 +38,7 @@ class DirectWorkflowControllerTest(base.DbTestCase):
state=states.RUNNING,
workflow_id=wfs[0].id,
input={},
params={},
context={}
)

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from oslo_config import cfg
from oslo_log import log as logging
@ -193,6 +191,7 @@ def publish_variables(task_ex, task_spec):
expr_ctx = ContextView(
get_current_task_dict(task_ex),
task_ex.in_context,
get_workflow_environment_dict(wf_ex),
wf_ex.context,
wf_ex.input
)
@ -264,7 +263,12 @@ def evaluate_workflow_output(wf_ex, wf_output, ctx):
"""
# Evaluate workflow 'output' clause using the final workflow context.
ctx_view = ContextView(ctx, wf_ex.context, wf_ex.input)
ctx_view = ContextView(
ctx,
get_workflow_environment_dict(wf_ex),
wf_ex.context,
wf_ex.input
)
output = expr.evaluate_recursively(wf_output, ctx_view)
@ -298,30 +302,16 @@ def add_execution_to_context(wf_ex):
wf_ex.context['__execution'] = {'id': wf_ex.id}
def add_environment_to_context(wf_ex):
# TODO(rakhmerov): This is redundant, we can always get env from WF params
wf_ex.context = wf_ex.context or {}
# If env variables are provided, add an evaluated copy into the context.
if 'env' in wf_ex.params:
env = copy.deepcopy(wf_ex.params['env'])
if ('evaluate_env' in wf_ex.params and
not wf_ex.params['evaluate_env']):
wf_ex.context['__env'] = env
else:
wf_ex.context['__env'] = expr.evaluate_recursively(
env,
{'__env': env}
)
def add_workflow_variables_to_context(wf_ex, wf_spec):
wf_ex.context = wf_ex.context or {}
# The context for calculating workflow variables is workflow input
# and other data already stored in workflow initial context.
ctx_view = ContextView(wf_ex.context, wf_ex.input)
ctx_view = ContextView(
get_workflow_environment_dict(wf_ex),
wf_ex.context,
wf_ex.input
)
wf_vars = expr.evaluate_recursively(wf_spec.get_vars(), ctx_view)
@ -335,3 +325,9 @@ def evaluate_object_fields(obj, context):
for k, v in evaluated_fields.items():
setattr(obj, k, v)
def get_workflow_environment_dict(wf_ex):
env_dict = wf_ex.params['env'] if wf_ex and 'env' in wf_ex.params else {}
return {'__env': env_dict}

View File

@ -198,6 +198,7 @@ class DirectWorkflowController(base.WorkflowController):
for t_ex in lookup_utils.find_error_task_executions(self.wf_ex.id):
ctx_view = data_flow.ContextView(
data_flow.evaluate_task_outbound_context(t_ex),
data_flow.get_workflow_environment_dict(self.wf_ex),
self.wf_ex.context,
self.wf_ex.input
)
@ -252,6 +253,7 @@ class DirectWorkflowController(base.WorkflowController):
ctx_view = data_flow.ContextView(
data_flow.get_current_task_dict(task_ex),
ctx or data_flow.evaluate_task_outbound_context(task_ex),
data_flow.get_workflow_environment_dict(self.wf_ex),
self.wf_ex.context,
self.wf_ex.input
)