Gracefully handle DB disconnected connect errors
When the DB is disconnected, the Mistral API should retry the operation for a predefined amount of time at least for GET type requests as this error is highly probable to be caused by temporary failures. The handlind of Operational errors was already implemented. Change-Id: I3adb94dd695aeaa40d37956beae088d5618422c3
This commit is contained in:
parent
6aec017fec
commit
7184596443
|
@ -25,9 +25,11 @@ from mistral.services import security
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_RETRY_ERRORS = (db_exc.DBDeadlock, db_exc.DBConnectionError)
|
||||
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_exception_type(db_exc.DBDeadlock),
|
||||
retry=tenacity.retry_if_exception_type(_RETRY_ERRORS),
|
||||
stop=tenacity.stop_after_attempt(50),
|
||||
wait=tenacity.wait_incrementing(start=0, increment=0.1, max=2)
|
||||
)
|
||||
|
@ -46,18 +48,22 @@ def _with_auth_context(auth_ctx, func, *args, **kw):
|
|||
|
||||
try:
|
||||
return func(*args, **kw)
|
||||
except db_exc.DBDeadlock as e:
|
||||
except _RETRY_ERRORS:
|
||||
LOG.exception(
|
||||
"DB deadlock detected, operation will be retried: %s", func
|
||||
"DB error detected, operation will be retried: %s", func
|
||||
)
|
||||
|
||||
raise e
|
||||
raise
|
||||
finally:
|
||||
context.set_ctx(old_auth_ctx)
|
||||
|
||||
|
||||
def retry_on_deadlock(func):
|
||||
"""Decorates the given function so that it retries on a DB deadlock.
|
||||
def retry_on_db_error(func):
|
||||
"""Decorates the given function so that it retries on DB errors.
|
||||
|
||||
Note that the decorator retries the function/method only on some
|
||||
of the DB errors that are considered to be worth retrying, like
|
||||
deadlocks and disconnections.
|
||||
|
||||
:param func: Function to decorate.
|
||||
:return: Decorated function.
|
||||
|
|
|
@ -34,7 +34,7 @@ from mistral.workflow import states
|
|||
|
||||
|
||||
class DefaultEngine(base.Engine):
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@profiler.trace('engine-start-workflow', hide_args=True)
|
||||
def start_workflow(self, wf_identifier, wf_namespace='', wf_input=None,
|
||||
|
@ -53,7 +53,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def start_action(self, action_name, action_input,
|
||||
description=None, **params):
|
||||
|
@ -105,7 +105,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return db_api.create_action_execution(values)
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@profiler.trace('engine-on-action-complete', hide_args=True)
|
||||
def on_action_complete(self, action_ex_id, result, wf_action=False,
|
||||
|
@ -120,7 +120,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return action_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@profiler.trace('engine-on-action-update', hide_args=True)
|
||||
def on_action_update(self, action_ex_id, state, wf_action=False,
|
||||
|
@ -135,7 +135,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return action_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def pause_workflow(self, wf_ex_id):
|
||||
with db_api.transaction():
|
||||
|
@ -145,7 +145,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def rerun_workflow(self, task_ex_id, reset=True, env=None):
|
||||
with db_api.transaction():
|
||||
|
@ -157,7 +157,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def resume_workflow(self, wf_ex_id, env=None):
|
||||
with db_api.transaction():
|
||||
|
@ -167,7 +167,7 @@ class DefaultEngine(base.Engine):
|
|||
|
||||
return wf_ex.get_clone()
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def stop_workflow(self, wf_ex_id, state, message=None):
|
||||
with db_api.transaction():
|
||||
|
|
|
@ -491,7 +491,7 @@ class ConcurrencyPolicy(base.TaskPolicy):
|
|||
task_ex.runtime_context = runtime_context
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def _continue_task(task_ex_id):
|
||||
from mistral.engine import task_handler
|
||||
|
@ -500,7 +500,7 @@ def _continue_task(task_ex_id):
|
|||
task_handler.continue_task(db_api.get_task_execution(task_ex_id))
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def _complete_task(task_ex_id, state, state_info):
|
||||
from mistral.engine import task_handler
|
||||
|
@ -513,7 +513,7 @@ def _complete_task(task_ex_id, state, state_info):
|
|||
)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def _fail_task_if_incomplete(task_ex_id, timeout):
|
||||
from mistral.engine import task_handler
|
||||
|
|
|
@ -321,7 +321,7 @@ def _create_task(wf_ex, wf_spec, task_spec, ctx, task_ex=None,
|
|||
)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@profiler.trace('task-handler-refresh-task-state', hide_args=True)
|
||||
def _refresh_task_state(task_ex_id):
|
||||
|
@ -404,7 +404,7 @@ def _schedule_refresh_task_state(task_ex, delay=0):
|
|||
)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def _scheduled_on_action_complete(action_ex_id, wf_action):
|
||||
with db_api.transaction():
|
||||
|
@ -450,7 +450,7 @@ def schedule_on_action_complete(action_ex, delay=0):
|
|||
)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def _scheduled_on_action_update(action_ex_id, wf_action):
|
||||
with db_api.transaction():
|
||||
|
|
|
@ -86,7 +86,7 @@ def cancel_workflow(wf_ex, msg=None):
|
|||
stop_workflow(wf_ex, states.CANCELLED, msg)
|
||||
|
||||
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
@profiler.trace('workflow-handler-check-and-complete', hide_args=True)
|
||||
def _check_and_complete(wf_ex_id):
|
||||
|
|
|
@ -166,7 +166,7 @@ class Scheduler(object):
|
|||
self.delete_calls(db_calls)
|
||||
|
||||
@staticmethod
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
def _capture_calls():
|
||||
"""Captures delayed calls eligible for processing (based on time).
|
||||
|
||||
|
@ -280,7 +280,7 @@ class Scheduler(object):
|
|||
context.set_ctx(None)
|
||||
|
||||
@staticmethod
|
||||
@db_utils.retry_on_deadlock
|
||||
@db_utils.retry_on_db_error
|
||||
def delete_calls(db_calls):
|
||||
"""Deletes delayed calls.
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import functools
|
||||
import json
|
||||
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
import pecan
|
||||
import six
|
||||
|
@ -24,10 +25,12 @@ import tenacity
|
|||
import webob
|
||||
from wsme import exc as wsme_exc
|
||||
|
||||
|
||||
from mistral import context as auth_ctx
|
||||
from mistral.db.v2.sqlalchemy import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -237,9 +240,19 @@ def get_all(list_cls, cls, get_all_function, get_function,
|
|||
)
|
||||
|
||||
|
||||
class MistralRetrying(tenacity.Retrying):
|
||||
def call(self, fn, *args, **kwargs):
|
||||
try:
|
||||
return super(MistralRetrying, self).call(fn, *args, **kwargs)
|
||||
except tenacity.RetryError:
|
||||
raise exc.MistralError("The service is temporarily unavailable")
|
||||
|
||||
|
||||
def create_db_retry_object():
|
||||
return tenacity.Retrying(
|
||||
retry=tenacity.retry_if_exception_type(sa.exc.OperationalError),
|
||||
return MistralRetrying(
|
||||
retry=tenacity.retry_if_exception_type(
|
||||
(sa.exc.OperationalError, db_exc.DBConnectionError)
|
||||
),
|
||||
stop=tenacity.stop_after_attempt(10),
|
||||
wait=tenacity.wait_incrementing(increment=100) # 0.1 seconds
|
||||
wait=tenacity.wait_incrementing(increment=0.2) # 0.2 seconds
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue