Merge "Fix how Mistral calculates workflow output"

This commit is contained in:
Zuul 2018-09-13 10:16:12 +00:00 committed by Gerrit Code Review
commit 8720a2711b
5 changed files with 70 additions and 0 deletions

View File

@ -62,6 +62,10 @@ def refresh(model):
IMPL.refresh(model)
def expire_all():
IMPL.expire_all()
# Locking.

View File

@ -125,6 +125,11 @@ def refresh(model, session=None):
session.refresh(model)
@b.session_aware()
def expire_all(session=None):
session.expire_all()
@b.session_aware()
def acquire_lock(model, id, session=None):
# Expire all so all objects queried after lock is acquired

View File

@ -414,6 +414,16 @@ class Workflow(object):
if incomplete_tasks_count > 0:
return incomplete_tasks_count
LOG.debug("Workflow completed [id=%s]", self.wf_ex.id)
# NOTE(rakhmerov): Once we know that the workflow has completed,
# we need to expire all the objects in the DB session to make sure
# to read the most relevant data from the DB (that's already been
# committed in parallel transactions). Otherwise, some data like
# workflow context may be stale and decisions made upon it will be
# wrong.
db_api.expire_all()
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
if wf_ctrl.any_cancels():

View File

@ -816,6 +816,44 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
self.assertDictEqual({}, wf_ex.output)
def test_output_expression(self):
wf_text = """---
version: '2.0'
wf:
output:
continue_flag: <% $.continue_flag %>
task-defaults:
on-error:
- task2
tasks:
task1:
action: std.fail
on-success: task3
task2:
action: std.noop
publish:
continue_flag: false
task3:
action: std.noop
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertDictEqual({'continue_flag': False}, wf_ex.output)
def test_triggered_by(self):
wf_text = """---
version: '2.0'

View File

@ -0,0 +1,13 @@
---
fixes:
- |
Workflow output sometimes was not calculated correctly due to
the race condition between different transactions: the one that
checks workflow completion (i.e. calls "check_and_complete") and
the one that processes action execution completion (i.e. calls
"on_action_complete"). Calculating output sometimes was based on
stale data cached by the SQLAlchemy session. To fix this, we just
need to expire all objects in the session so that they are
refreshed automatically if we read their state in order to make
required calculations. The corresponding change was made.