Process all task batches in wf output evaluation

All batches must be processed in workflow output evaluation. An
empty batch means only that no tasks were end tasks in the queried
slice.

Closes-Bug: 1811775
Change-Id: I0ed4e690f67966ba2d145ad6430b517bd896ced6
This commit is contained in:
Andras Kovi 2019-01-15 09:33:06 +01:00
parent e907e0ed69
commit 81af1b4838
2 changed files with 21 additions and 24 deletions

View File

@ -29,53 +29,52 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
class DirectWorkflowWithCyclesTest(base.EngineTestCase):
def test_simple_cycle(self):
wf_text = """
version: '2.0'
version: 2.0
wf:
vars:
cnt: 0
workflow_cycle_in_out_verify:
input:
- num_of_cycles
output:
cnt: <% $.cnt %>
abc: <% $.counter %>
tasks:
task1:
on-complete:
- task2
task2:
action: std.echo output=2
initialize:
publish:
cnt: <% $.cnt + 1 %>
counter: 0
on-success:
- task3
- increment
task3:
action: std.echo output=3
increment:
action: std.noop
publish:
counter: <% $.counter + 1 %>
on-success:
- task2: <% $.cnt < 2 %>
- increment: <% $.counter < $.num_of_cycles %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
wf_ex = self.engine.start_workflow(
'workflow_cycle_in_out_verify',
wf_input={"num_of_cycles": 21}
)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual({'cnt': 2}, wf_ex.output)
self.assertDictEqual({'abc': 21}, wf_ex.output)
t_execs = wf_ex.task_executions
# Expecting one execution for task1 and two executions
# for task2 and task3 because of the cycle 'task2 <-> task3'.
self._assert_single_item(t_execs, name='task1')
self._assert_multiple_items(t_execs, 2, name='task2')
self._assert_multiple_items(t_execs, 2, name='task3')
self._assert_single_item(t_execs, name='initialize')
self._assert_multiple_items(t_execs, 21, name='increment')
self.assertEqual(5, len(t_execs))
self.assertEqual(22, len(t_execs))
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertTrue(all(states.SUCCESS == t_ex.state for t_ex in t_execs))

View File

@ -169,8 +169,6 @@ class DirectWorkflowController(base.WorkflowController):
ctx = {}
for batch in self._find_end_task_executions_as_batches():
if not batch:
break
for t_ex in batch:
ctx = utils.merge_dicts(