Merge "Fix how Mistral calculates workflow output"
This commit is contained in:
commit
8720a2711b
|
@ -62,6 +62,10 @@ def refresh(model):
|
|||
IMPL.refresh(model)
|
||||
|
||||
|
||||
def expire_all():
|
||||
IMPL.expire_all()
|
||||
|
||||
|
||||
# Locking.
|
||||
|
||||
|
||||
|
|
|
@ -125,6 +125,11 @@ def refresh(model, session=None):
|
|||
session.refresh(model)
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def expire_all(session=None):
|
||||
session.expire_all()
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def acquire_lock(model, id, session=None):
|
||||
# Expire all so all objects queried after lock is acquired
|
||||
|
|
|
@ -414,6 +414,16 @@ class Workflow(object):
|
|||
if incomplete_tasks_count > 0:
|
||||
return incomplete_tasks_count
|
||||
|
||||
LOG.debug("Workflow completed [id=%s]", self.wf_ex.id)
|
||||
|
||||
# NOTE(rakhmerov): Once we know that the workflow has completed,
|
||||
# we need to expire all the objects in the DB session to make sure
|
||||
# to read the most relevant data from the DB (that's already been
|
||||
# committed in parallel transactions). Otherwise, some data like
|
||||
# workflow context may be stale and decisions made upon it will be
|
||||
# wrong.
|
||||
db_api.expire_all()
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
||||
|
||||
if wf_ctrl.any_cancels():
|
||||
|
|
|
@ -816,6 +816,44 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
|||
|
||||
self.assertDictEqual({}, wf_ex.output)
|
||||
|
||||
def test_output_expression(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
output:
|
||||
continue_flag: <% $.continue_flag %>
|
||||
|
||||
task-defaults:
|
||||
on-error:
|
||||
- task2
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.fail
|
||||
on-success: task3
|
||||
|
||||
task2:
|
||||
action: std.noop
|
||||
publish:
|
||||
continue_flag: false
|
||||
|
||||
task3:
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf')
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(2, len(wf_ex.task_executions))
|
||||
self.assertDictEqual({'continue_flag': False}, wf_ex.output)
|
||||
|
||||
def test_triggered_by(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
---
|
||||
|
||||
fixes:
|
||||
- |
|
||||
Workflow output sometimes was not calculated correctly due to
|
||||
the race condition between different transactions: the one that
|
||||
checks workflow completion (i.e. calls "check_and_complete") and
|
||||
the one that processes action execution completion (i.e. calls
|
||||
"on_action_complete"). Calculating output sometimes was based on
|
||||
stale data cached by the SQLAlchemy session. To fix this, we just
|
||||
need to expire all objects in the session so that they are
|
||||
refreshed automatically if we read their state in order to make
|
||||
required calculations. The corresponding change was made.
|
Loading…
Reference in New Issue