From df944dfb6299baef62710ee321f0b1c21c319d1d Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Wed, 12 Sep 2018 15:05:54 +0700 Subject: [PATCH] Fix how Mistral calculates workflow output * Workflow output sometimes is not calculated correctly due to the race condition between different transactions: the one that checks workflow completion (i.e. calls "check_and_complete") and the one that processes action execution completion (i.e. calls "on_action_complete"). Calculating output sometimes was based on stale data cached by the SQLAlchemy session. To fix this, we just need to expire all objects in the session so that they are refreshed automatically if we read their state in order to make required calculations. See the bug description for more details on how the problem was observed. * Added another test for direct workflow that formally checks calculation of workflow output. It doesn't pretend to test the aforementioned issue (it can be reproduced only with a big number of attempts, and/or under load). It's for the sake of the test module completeness. Change-Id: I4a7e7fd9a4bbb6e93df169b4b40bc2d83ccfce89 Closes-Bug: #1792090 (cherry picked from commit dfdff78315f72999dbd269e5fc4c4065a1b13013) --- mistral/db/v2/api.py | 4 ++ mistral/db/v2/sqlalchemy/api.py | 5 +++ mistral/engine/workflows.py | 10 +++++ .../tests/unit/engine/test_direct_workflow.py | 38 +++++++++++++++++++ .../fix_workflow_output-cee5df431679de6b.yaml | 13 +++++++ 5 files changed, 70 insertions(+) create mode 100644 releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 05262a974..954d989db 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -62,6 +62,10 @@ def refresh(model): IMPL.refresh(model) +def expire_all(): + IMPL.expire_all() + + # Locking. diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index dbd9acc25..35722e9af 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -122,6 +122,11 @@ def refresh(model, session=None): session.refresh(model) +@b.session_aware() +def expire_all(session=None): + session.expire_all() + + @b.session_aware() def acquire_lock(model, id, session=None): # Expire all so all objects queried after lock is acquired diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 9fbdddcb8..e2b139834 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -369,6 +369,16 @@ class Workflow(object): if incomplete_tasks_count > 0: return incomplete_tasks_count + LOG.debug("Workflow completed [id=%s]", self.wf_ex.id) + + # NOTE(rakhmerov): Once we know that the workflow has completed, + # we need to expire all the objects in the DB session to make sure + # to read the most relevant data from the DB (that's already been + # committed in parallel transactions). Otherwise, some data like + # workflow context may be stale and decisions made upon it will be + # wrong. + db_api.expire_all() + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) if wf_ctrl.any_cancels(): diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index 257e36a54..f0652fb18 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -682,6 +682,44 @@ class DirectWorkflowEngineTest(base.EngineTestCase): self.assertDictEqual({}, wf_ex.output) + def test_output_expression(self): + wf_text = """--- + version: '2.0' + + wf: + output: + continue_flag: <% $.continue_flag %> + + task-defaults: + on-error: + - task2 + + tasks: + task1: + action: std.fail + on-success: task3 + + task2: + action: std.noop + publish: + continue_flag: false + + task3: + action: std.noop + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(2, len(wf_ex.task_executions)) + self.assertDictEqual({'continue_flag': False}, wf_ex.output) + def test_triggered_by(self): wf_text = """--- version: '2.0' diff --git a/releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml b/releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml new file mode 100644 index 000000000..46dec0a4d --- /dev/null +++ b/releasenotes/notes/fix_workflow_output-cee5df431679de6b.yaml @@ -0,0 +1,13 @@ +--- + +fixes: + - | + Workflow output sometimes was not calculated correctly due to + the race condition between different transactions: the one that + checks workflow completion (i.e. calls "check_and_complete") and + the one that processes action execution completion (i.e. calls + "on_action_complete"). Calculating output sometimes was based on + stale data cached by the SQLAlchemy session. To fix this, we just + need to expire all objects in the session so that they are + refreshed automatically if we read their state in order to make + required calculations. The corresponding change was made.