From 7b45a50a42f88f137ea6fa72959f3731bff2974c Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Tue, 5 Sep 2017 17:08:11 +0700 Subject: [PATCH] Fix "with-items" locking * Previously we thought that we can avoid locking task execution when we process completion of an individual action inside WithItemsTask. But that was wrong because even when we don't need to control "concurrency" property we still can get a situation when two different action completions will be processed via scheduler and both will see all actions in state SUCCESS (because scheduler handles it in a different later transaction) and hence both will complete the task and run tasks from "on-xxx" clauses. This patch makes WithItemsTask always use locking. * Added db_utils.retry_on_deadlock for _scheduled_on_action_complete() method since it opens a new DB transaction and can potentially end up in a dead lock (due to MySQL nature) Closes-Bug: #1715116 Change-Id: I6f34409e7182af3ca5b13c17e6d6fb4302f9efed --- mistral/engine/task_handler.py | 2 + mistral/engine/tasks.py | 67 ++++++++++++++-------------------- 2 files changed, 30 insertions(+), 39 deletions(-) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index c1106cf28..500464d7e 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -19,6 +19,7 @@ from oslo_log import log as logging from osprofiler import profiler import traceback as tb +from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models from mistral.engine import action_queue @@ -402,6 +403,7 @@ def _schedule_refresh_task_state(task_ex, delay=0): ) +@db_utils.retry_on_deadlock @action_queue.process def _scheduled_on_action_complete(action_ex_id, wf_action): with db_api.transaction(): diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 31cb8580a..390a53eaf 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -492,52 +492,41 @@ class WithItemsTask(RegularTask): def on_action_complete(self, action_ex): assert self.task_ex - if (not self._get_concurrency() and - not self.task_spec.get_policies().get_retry()): - self._on_action_complete(action_ex) - else: - # If we need to control 'concurrency' we need to do atomic - # reads/writes to task runtime context. Locking prevents us - # from modifying runtime context simultaneously by multiple - # transactions. - with db_api.named_lock('with-items-%s' % self.task_ex.id): - # NOTE: We need to refresh task execution object right - # after the lock is acquired to make sure that we're - # working with a fresh state of its runtime context. - # Otherwise, SQLAlchemy session can contain a stale - # cached version of it so that we don't modify actual - # values (i.e. capacity). - db_api.refresh(self.task_ex) + with db_api.named_lock('with-items-%s' % self.task_ex.id): + # NOTE: We need to refresh task execution object right + # after the lock is acquired to make sure that we're + # working with a fresh state of its runtime context. + # Otherwise, SQLAlchemy session can contain a stale + # cached version of it so that we don't modify actual + # values (i.e. capacity). + db_api.refresh(self.task_ex) - self._on_action_complete(action_ex) + if self.is_completed(): + return - def _on_action_complete(self, action_ex): - if self.is_completed(): - return + self._increase_capacity() - self._increase_capacity() + if self.is_with_items_completed(): + state = self._get_final_state() - if self.is_with_items_completed(): - state = self._get_final_state() + # TODO(rakhmerov): Here we can define more informative messages + # in cases when action is successful and when it's not. + # For example, in state_info we can specify the cause action. + # The use of action_ex.output.get('result') for state_info is + # not accurate because there could be action executions that + # had failed or was cancelled prior to this action execution. + state_info = { + states.SUCCESS: None, + states.ERROR: 'One or more actions had failed.', + states.CANCELLED: 'One or more actions was cancelled.' + } - # TODO(rakhmerov): Here we can define more informative messages - # in cases when action is successful and when it's not. - # For example, in state_info we can specify the cause action. - # The use of action_ex.output.get('result') for state_info is not - # accurate because there could be action executions that had - # failed or was cancelled prior to this action execution. - state_info = { - states.SUCCESS: None, - states.ERROR: 'One or more actions had failed.', - states.CANCELLED: 'One or more actions was cancelled.' - } + self.complete(state, state_info[state]) - self.complete(state, state_info[state]) + return - return - - if self._has_more_iterations() and self._get_concurrency(): - self._schedule_actions() + if self._has_more_iterations() and self._get_concurrency(): + self._schedule_actions() def _schedule_actions(self): with_items_values = self._get_with_items_values()