Fixing result ordering in 'with-items'

Closes-Bug: #1442102

Change-Id: Iacc948c2d797c33b90523e272e37d34820bab006
This commit is contained in:
Nikolay Mahotkin 2015-04-14 15:23:57 +03:00
parent 95f3e57216
commit aa187d14ef
10 changed files with 106 additions and 24 deletions

View File

@ -102,6 +102,11 @@ class Execution(mb.MistralSecureModelBase):
state_info = sa.Column(sa.String(1024), nullable=True)
tags = sa.Column(st.JsonListType())
# Runtime context like iteration_no of a repeater.
# Effectively internal engine properties which will be used to determine
# execution of a task.
runtime_context = sa.Column(st.JsonDictType())
class ActionExecution(Execution):
"""Contains action execution information."""
@ -150,11 +155,6 @@ class TaskExecution(Execution):
in_context = sa.Column(st.JsonLongDictType())
published = sa.Column(st.JsonDictType())
# Runtime context like iteration_no of a repeater.
# Effectively internal engine properties which will be used to determine
# execution of a task.
runtime_context = sa.Column(st.JsonDictType())
for cls in utils.iter_subclasses(Execution):
event.listen(

View File

@ -353,6 +353,9 @@ class DefaultEngine(base.Engine):
'output': {},
'context': copy.copy(wf_input) or {},
'task_execution_id': params.get('task_execution_id'),
'runtime_context': {
'with_items_index': params.get('with_items_index', 0)
},
})
data_flow.add_openstack_data_to_context(wf_ex.context)

View File

@ -66,8 +66,8 @@ def _run_existing_task(task_ex, task_spec, wf_spec):
# In some cases we can have no input, e.g. in case of 'with-items'.
if input_dicts:
for input_d in input_dicts:
_run_action_or_workflow(task_ex, task_spec, input_d)
for index, input_d in enumerate(input_dicts):
_run_action_or_workflow(task_ex, task_spec, input_d, index)
else:
_schedule_noop_action(task_ex, task_spec)
@ -163,7 +163,7 @@ def _create_task_execution(wf_ex, task_spec, ctx):
return task_ex
def _create_action_execution(task_ex, action_def, action_input):
def _create_action_execution(task_ex, action_def, action_input, index=0):
action_ex = db_api.create_action_execution({
'name': action_def.name,
'task_execution_id': task_ex.id,
@ -171,7 +171,8 @@ def _create_action_execution(task_ex, action_def, action_input):
'spec': action_def.spec,
'project_id': task_ex.project_id,
'state': states.RUNNING,
'input': action_input}
'input': action_input,
'runtime_context': {'with_items_index': index}}
)
# Add to collection explicitly so that it's in a proper
@ -332,7 +333,7 @@ def _get_workflow_input(task_spec, ctx):
return expr.evaluate_recursively(task_spec.get_input(), ctx)
def _run_action_or_workflow(task_ex, task_spec, input_dict):
def _run_action_or_workflow(task_ex, task_spec, input_dict, index):
t_name = task_ex.name
if task_spec.get_action_name():
@ -342,14 +343,14 @@ def _run_action_or_workflow(task_ex, task_spec, input_dict):
(t_name, task_spec.get_action_name())
)
_schedule_run_action(task_ex, task_spec, input_dict)
_schedule_run_action(task_ex, task_spec, input_dict, index)
elif task_spec.get_workflow_name():
wf_trace.info(
task_ex,
"Task '%s' is RUNNING [workflow_name = %s]" %
(t_name, task_spec.get_workflow_name()))
_schedule_run_workflow(task_ex, task_spec, input_dict)
_schedule_run_workflow(task_ex, task_spec, input_dict, index)
def _get_action_defaults(task_ex, task_spec):
@ -358,7 +359,7 @@ def _get_action_defaults(task_ex, task_spec):
return actions.get(task_spec.get_action_name(), {})
def _schedule_run_action(task_ex, task_spec, action_input):
def _schedule_run_action(task_ex, task_spec, action_input, index):
wf_ex = task_ex.workflow_execution
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
@ -383,7 +384,9 @@ def _schedule_run_action(task_ex, task_spec, action_input):
base_name
)
action_ex = _create_action_execution(task_ex, action_def, action_input)
action_ex = _create_action_execution(
task_ex, action_def, action_input, index
)
target = expr.evaluate_recursively(
task_spec.get_target(),
@ -441,7 +444,7 @@ def run_action(action_ex_id, target):
)
def _schedule_run_workflow(task_ex, task_spec, wf_input):
def _schedule_run_workflow(task_ex, task_spec, wf_input, index):
parent_wf_ex = task_ex.workflow_execution
parent_wf_spec = spec_parser.get_workflow_spec(parent_wf_ex.spec)
@ -455,7 +458,10 @@ def _schedule_run_workflow(task_ex, task_spec, wf_input):
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
wf_params = {'task_execution_id': task_ex.id}
wf_params = {
'task_execution_id': task_ex.id,
'with_items_index': index
}
if 'env' in parent_wf_ex.params:
wf_params['env'] = parent_wf_ex.params['env']

View File

@ -294,6 +294,7 @@ class ExecutionTestsV2(base.TestCase):
[ex_id['id'] for ex_id in body['executions']])
resp, body = self.client.get_object('executions', exec_id)
# TODO(nmakhotkin): Fix this loop. It is infinite now.
while body['state'] != 'SUCCESS':
resp, body = self.client.get_object('executions', exec_id)
self.assertEqual(200, resp.status)

View File

@ -467,7 +467,8 @@ class DataFlowTest(test_base.BaseTest):
task_ex.executions.append(models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=True
accepted=True,
runtime_context={'with_items_index': 0}
))
self.assertEqual(1, data_flow.get_task_execution_result(task_ex))
@ -475,12 +476,14 @@ class DataFlowTest(test_base.BaseTest):
task_ex.executions.append(models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=True
accepted=True,
runtime_context={'with_items_index': 0}
))
task_ex.executions.append(models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=False
accepted=False,
runtime_context={'with_items_index': 0}
))
self.assertEqual([1, 1], data_flow.get_task_execution_result(task_ex))

View File

@ -111,7 +111,7 @@ class SubworkflowsTest(base.EngineTestCase):
# Execution of 'wf2'.
self.assertIsNotNone(wf2_ex)
self.assertDictEqual({}, wf2_ex.input)
self.assertDictEqual({'env': env}, wf2_ex.params)
self.assertDictContainsSubset({'env': env}, wf2_ex.params)
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
@ -136,7 +136,7 @@ class SubworkflowsTest(base.EngineTestCase):
}
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictEqual(wf1_ex.params, expected_start_params)
self.assertDictContainsSubset(expected_start_params, wf1_ex.params)
self.assertDictEqual(wf1_ex.input, expected_wf1_input)
# Wait till workflow 'wf1' is completed.

View File

@ -100,7 +100,7 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertEqual(project_id, wf1_ex.project_id)
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictEqual(
self.assertDictContainsSubset(
{
'task_name': 'task2',
'task_execution_id': wf1_ex.task_execution_id
@ -198,7 +198,7 @@ class SubworkflowsTest(base.EngineTestCase):
}
self.assertIsNotNone(wf1_ex.task_execution_id)
self.assertDictEqual(wf1_ex.params, expected_start_params)
self.assertDictContainsSubset(expected_start_params, wf1_ex.params)
# Wait till workflow 'wf1' is completed.
self._await(lambda: self.is_execution_success(wf1_ex.id))

View File

@ -15,11 +15,14 @@
import copy
from oslo.config import cfg
from mistral.actions import base as action_base
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.services import action_manager
from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine import base
from mistral import utils
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
@ -139,6 +142,18 @@ WF_INPUT_URLS = {
}
class RandomSleepEchoAction(action_base.Action):
def __init__(self, output):
self.output = output
def run(self):
utils.random_sleep(1)
return self.output
def test(self):
utils.random_sleep(1)
class WithItemsEngineTest(base.EngineTestCase):
def test_with_items_simple(self):
wb_service.create_workbook_v2(WORKBOOK)
@ -375,3 +390,51 @@ class WithItemsEngineTest(base.EngineTestCase):
)
self.assertIn("Invalid array in 'with-items'", exception.message)
def test_with_items_results_order(self):
workbook = """---
version: "2.0"
name: wb1
workflows:
with_items:
type: direct
tasks:
task1:
with-items: i in [1, 2, 3]
action: sleep_echo output=<% $.i %>
publish:
one_two_three: <% $.task1 %>
"""
# Register random sleep action in the DB.
action_manager.register_action_class(
'sleep_echo',
'%s.%s' % (
RandomSleepEchoAction.__module__,
RandomSleepEchoAction.__name__
), {}
)
wb_service.create_workbook_v2(workbook)
# Start workflow.
wf_ex = self.engine.start_workflow('wb1.with_items', {})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.SUCCESS, task1.state)
published = task1.published
# Now we can check order of results explicitly.
self.assertEqual([1, 2, 3], published['one_two_three'])

View File

@ -110,7 +110,8 @@ class DirectWorkflowControllerTest(base.DbTestCase):
workflow_name='wf',
state=states.SUCCESS,
output={'result': 'Hey'},
accepted=True
accepted=True,
runtime_context={'with_items_index': 0}
)
)

View File

@ -80,6 +80,11 @@ def _extract_execution_result(ex):
def get_task_execution_result(task_ex):
action_execs = task_ex.executions
action_execs.sort(
key=lambda x: x.runtime_context.get('with_items_index')
)
results = [
_extract_execution_result(ex)
for ex in task_ex.executions