Merge "Fix "with-items" locking"

This commit is contained in:
Jenkins 2017-09-07 11:32:25 +00:00 committed by Gerrit Code Review
commit 6b5a9981dd
2 changed files with 30 additions and 39 deletions

View File

@ -19,6 +19,7 @@ from oslo_log import log as logging
from osprofiler import profiler
import traceback as tb
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.engine import action_queue
@ -402,6 +403,7 @@ def _schedule_refresh_task_state(task_ex, delay=0):
)
@db_utils.retry_on_deadlock
@action_queue.process
def _scheduled_on_action_complete(action_ex_id, wf_action):
with db_api.transaction():

View File

@ -492,52 +492,41 @@ class WithItemsTask(RegularTask):
def on_action_complete(self, action_ex):
assert self.task_ex
if (not self._get_concurrency() and
not self.task_spec.get_policies().get_retry()):
self._on_action_complete(action_ex)
else:
# If we need to control 'concurrency' we need to do atomic
# reads/writes to task runtime context. Locking prevents us
# from modifying runtime context simultaneously by multiple
# transactions.
with db_api.named_lock('with-items-%s' % self.task_ex.id):
# NOTE: We need to refresh task execution object right
# after the lock is acquired to make sure that we're
# working with a fresh state of its runtime context.
# Otherwise, SQLAlchemy session can contain a stale
# cached version of it so that we don't modify actual
# values (i.e. capacity).
db_api.refresh(self.task_ex)
with db_api.named_lock('with-items-%s' % self.task_ex.id):
# NOTE: We need to refresh task execution object right
# after the lock is acquired to make sure that we're
# working with a fresh state of its runtime context.
# Otherwise, SQLAlchemy session can contain a stale
# cached version of it so that we don't modify actual
# values (i.e. capacity).
db_api.refresh(self.task_ex)
self._on_action_complete(action_ex)
if self.is_completed():
return
def _on_action_complete(self, action_ex):
if self.is_completed():
return
self._increase_capacity()
self._increase_capacity()
if self.is_with_items_completed():
state = self._get_final_state()
if self.is_with_items_completed():
state = self._get_final_state()
# TODO(rakhmerov): Here we can define more informative messages
# in cases when action is successful and when it's not.
# For example, in state_info we can specify the cause action.
# The use of action_ex.output.get('result') for state_info is
# not accurate because there could be action executions that
# had failed or was cancelled prior to this action execution.
state_info = {
states.SUCCESS: None,
states.ERROR: 'One or more actions had failed.',
states.CANCELLED: 'One or more actions was cancelled.'
}
# TODO(rakhmerov): Here we can define more informative messages
# in cases when action is successful and when it's not.
# For example, in state_info we can specify the cause action.
# The use of action_ex.output.get('result') for state_info is not
# accurate because there could be action executions that had
# failed or was cancelled prior to this action execution.
state_info = {
states.SUCCESS: None,
states.ERROR: 'One or more actions had failed.',
states.CANCELLED: 'One or more actions was cancelled.'
}
self.complete(state, state_info[state])
self.complete(state, state_info[state])
return
return
if self._has_more_iterations() and self._get_concurrency():
self._schedule_actions()
if self._has_more_iterations() and self._get_concurrency():
self._schedule_actions()
def _schedule_actions(self):
with_items_values = self._get_with_items_values()