Fix deletion of delayed calls
* Deletion of delayed calls is incorrect. A list of delayed calls gets deleted within one DB transaction and if at least one object is not deleted because of a DBDeadlock exception (on MySQL) then the entire transaction fails and, what's more important, the exception is swallowed by the try-finally block without reraising it so that it could be handled by the "retry_on_deadlock" decorator. This patch fixes this problem by reraising the initial exception. * Added "retry_on_deadlock" decorator to all methods methods that open DB transactions and where we have a risk of hitting a deadlock. Change-Id: I816c8c2a940e38cf1698d76e1019671249238598
This commit is contained in:
parent
826760fbdc
commit
397a562788
|
@ -34,6 +34,7 @@ from mistral.workflow import states
|
|||
|
||||
|
||||
class DefaultEngine(base.Engine):
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
@profiler.trace('engine-start-workflow', hide_args=True)
|
||||
def start_workflow(self, wf_identifier, wf_namespace='', wf_input=None,
|
||||
|
@ -52,6 +53,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def start_action(self, action_name, action_input,
|
||||
description=None, **params):
|
||||
|
@ -133,6 +135,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return action_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def pause_workflow(self, wf_ex_id):
|
||||
with db_api.transaction():
|
||||
|
@ -142,6 +145,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def rerun_workflow(self, task_ex_id, reset=True, env=None):
|
||||
with db_api.transaction():
|
||||
|
@ -153,6 +157,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def resume_workflow(self, wf_ex_id, env=None):
|
||||
with db_api.transaction():
|
||||
|
@ -162,6 +167,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def stop_workflow(self, wf_ex_id, state, message=None):
|
||||
with db_api.transaction():
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import base
|
||||
|
@ -453,6 +454,7 @@ class ConcurrencyPolicy(base.TaskPolicy):
|
|||
task_ex.runtime_context = runtime_context
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def _continue_task(task_ex_id):
|
||||
from mistral.engine import task_handler
|
||||
|
@ -461,6 +463,7 @@ def _continue_task(task_ex_id):
|
|||
task_handler.continue_task(db_api.get_task_execution(task_ex_id))
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def _complete_task(task_ex_id, state, state_info):
|
||||
from mistral.engine import task_handler
|
||||
|
@ -473,6 +476,7 @@ def _complete_task(task_ex_id, state, state_info):
|
|||
)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def _fail_task_if_incomplete(task_ex_id, timeout):
|
||||
from mistral.engine import task_handler
|
||||
|
|
|
@ -321,6 +321,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
|
|||
)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
@profiler.trace('task-handler-refresh-task-state', hide_args=True)
|
||||
def _refresh_task_state(task_ex_id):
|
||||
|
@ -449,6 +450,7 @@ def schedule_on_action_complete(action_ex, delay=0):
|
|||
)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
def _scheduled_on_action_update(action_ex_id, wf_action):
|
||||
with db_api.transaction():
|
||||
|
|
|
@ -17,6 +17,7 @@ from oslo_log import log as logging
|
|||
from osprofiler import profiler
|
||||
import traceback as tb
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import action_queue
|
||||
from mistral.engine import workflows
|
||||
|
@ -82,6 +83,7 @@ def cancel_workflow(wf_ex, msg=None):
|
|||
stop_workflow(wf_ex, states.CANCELLED, msg)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@action_queue.process
|
||||
@profiler.trace('workflow-handler-check-and-complete', hide_args=True)
|
||||
def _check_and_complete(wf_ex_id):
|
||||
|
|
|
@ -146,6 +146,9 @@ class Scheduler(object):
|
|||
# Select and capture calls matching time criteria.
|
||||
db_calls = self._capture_calls()
|
||||
|
||||
if not db_calls:
|
||||
return
|
||||
|
||||
# Determine target methods, deserialize arguments etc.
|
||||
prepared_calls = self._prepare_calls(db_calls)
|
||||
|
||||
|
@ -287,6 +290,11 @@ class Scheduler(object):
|
|||
"exception=%s]", call, e
|
||||
)
|
||||
|
||||
# We have to re-raise any exception because the transaction
|
||||
# would be already invalid anyway. If it's a deadlock then
|
||||
# it will be handled.
|
||||
raise e
|
||||
|
||||
LOG.debug("Scheduler deleted %s delayed calls.", len(db_calls))
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue