Merge "DB operations retry on deadlock"
This commit is contained in:
commit
6884cf5a19
|
@ -25,6 +25,7 @@ from sqlalchemy.orm import exc as db_exc
|
|||
from congress.db import api as db
|
||||
from congress.db import db_ds_table_data as table_data
|
||||
from congress.db import model_base
|
||||
from congress.db import utils as db_utils
|
||||
from congress import encryption
|
||||
|
||||
|
||||
|
@ -77,6 +78,7 @@ def _decrypt_secret_config_fields(ds_db_obj):
|
|||
return ds_db_obj
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def add_datasource(id_, name, driver, config, description,
|
||||
enabled, session=None, secret_config_fields=None):
|
||||
secret_config_fields = secret_config_fields or []
|
||||
|
@ -94,12 +96,14 @@ def add_datasource(id_, name, driver, config, description,
|
|||
return datasource
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def delete_datasource(id_, session=None):
|
||||
session = session or db.get_session()
|
||||
return session.query(Datasource).filter(
|
||||
Datasource.id == id_).delete()
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def delete_datasource_with_data(id_, session=None):
|
||||
session = session or db.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
|
@ -108,6 +112,7 @@ def delete_datasource_with_data(id_, session=None):
|
|||
return deleted
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_datasource_name(name_or_id, session=None):
|
||||
session = session or db.get_session()
|
||||
datasource_obj = get_datasource(name_or_id, session)
|
||||
|
@ -123,6 +128,7 @@ def get_datasource(name_or_id, session=None):
|
|||
return db_object
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_datasource_by_id(id_, session=None):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -133,6 +139,7 @@ def get_datasource_by_id(id_, session=None):
|
|||
pass
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_datasource_by_name(name, session=None):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -143,6 +150,7 @@ def get_datasource_by_name(name, session=None):
|
|||
pass
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_datasources(session=None, deleted=False):
|
||||
session = session or db.get_session()
|
||||
return [_decrypt_secret_config_fields(ds_obj)
|
||||
|
|
|
@ -23,6 +23,7 @@ from sqlalchemy.orm import exc as db_exc
|
|||
|
||||
from congress.db import api as db
|
||||
from congress.db import model_base
|
||||
from congress.db import utils as db_utils
|
||||
|
||||
|
||||
class DSTableData(model_base.BASE):
|
||||
|
@ -34,6 +35,7 @@ class DSTableData(model_base.BASE):
|
|||
tabledata = sa.Column(sa.Text(), nullable=False)
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def store_ds_table_data(ds_id, tablename, tabledata, session=None):
|
||||
session = session or db.get_session()
|
||||
tabledata = _json_encode_table_data(tabledata)
|
||||
|
@ -45,6 +47,7 @@ def store_ds_table_data(ds_id, tablename, tabledata, session=None):
|
|||
return new_row
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def delete_ds_table_data(ds_id, tablename=None, session=None):
|
||||
session = session or db.get_session()
|
||||
if tablename is None:
|
||||
|
@ -56,6 +59,7 @@ def delete_ds_table_data(ds_id, tablename=None, session=None):
|
|||
DSTableData.tablename == tablename).delete()
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_ds_table_data(ds_id, tablename=None, session=None):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
|
|
@ -24,6 +24,7 @@ from sqlalchemy.orm import exc as db_exc
|
|||
|
||||
from congress.db import api as db
|
||||
from congress.db import model_base
|
||||
from congress.db import utils as db_utils
|
||||
|
||||
|
||||
class LibraryPolicy(model_base.BASE, model_base.HasId):
|
||||
|
@ -58,6 +59,7 @@ class LibraryPolicy(model_base.BASE, model_base.HasId):
|
|||
return d
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def add_policy(policy_dict, session=None):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -75,6 +77,7 @@ def add_policy(policy_dict, session=None):
|
|||
"Policy with name %s already exists" % policy_dict['name'])
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def replace_policy(id_, policy_dict, session=None):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -94,17 +97,20 @@ def replace_policy(id_, policy_dict, session=None):
|
|||
raise KeyError('No policy found with policy id %s' % id_)
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def delete_policy(id_, session=None):
|
||||
session = session or db.get_session()
|
||||
return session.query(LibraryPolicy).filter(
|
||||
LibraryPolicy.id == id_).delete()
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def delete_policies(session=None):
|
||||
session = session or db.get_session()
|
||||
return session.query(LibraryPolicy).delete()
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policy(id_, session=None):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -114,6 +120,7 @@ def get_policy(id_, session=None):
|
|||
raise KeyError('No policy found with policy id %s' % id_)
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policy_by_name(name, session=None):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -123,6 +130,7 @@ def get_policy_by_name(name, session=None):
|
|||
raise KeyError('No policy found with policy name %s' % name)
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policies(session=None):
|
||||
session = session or db.get_session()
|
||||
return (session.query(LibraryPolicy).all())
|
||||
|
|
|
@ -25,6 +25,7 @@ from sqlalchemy.orm import exc as db_exc
|
|||
|
||||
from congress.db import api as db
|
||||
from congress.db import model_base
|
||||
from congress.db import utils as db_utils
|
||||
|
||||
|
||||
class Policy(model_base.BASE, model_base.HasId, model_base.HasAudit):
|
||||
|
@ -88,6 +89,7 @@ class PolicyDeleted(model_base.BASE, model_base.HasId, model_base.HasAudit):
|
|||
self.updated_at = policy_obj.updated_at
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def add_policy(id_, name, abbreviation, description, owner, kind,
|
||||
deleted=False, session=None):
|
||||
if session:
|
||||
|
@ -114,6 +116,7 @@ def add_policy(id_, name, abbreviation, description, owner, kind,
|
|||
raise KeyError("Policy with name %s already exists" % name)
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def delete_policy(id_, session=None):
|
||||
session = session or db.get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
|
@ -133,6 +136,7 @@ def delete_policy(id_, session=None):
|
|||
PolicyDeleted.id == id_).soft_delete()
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policy_by_id(id_, session=None, deleted=False):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -144,6 +148,7 @@ def get_policy_by_id(id_, session=None, deleted=False):
|
|||
pass
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policy_by_name(name, session=None, deleted=False):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -155,6 +160,7 @@ def get_policy_by_name(name, session=None, deleted=False):
|
|||
pass
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policy(name_or_id, session=None, deleted=False):
|
||||
# Try to retrieve policy either by id or name
|
||||
db_object = (get_policy_by_id(name_or_id, session, deleted) or
|
||||
|
@ -164,6 +170,7 @@ def get_policy(name_or_id, session=None, deleted=False):
|
|||
return db_object
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policies(session=None, deleted=False):
|
||||
session = session or db.get_session()
|
||||
return (session.query(Policy).
|
||||
|
@ -171,6 +178,7 @@ def get_policies(session=None, deleted=False):
|
|||
all())
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def policy_name(name_or_id, session=None):
|
||||
session = session or db.get_session()
|
||||
try:
|
||||
|
@ -216,6 +224,7 @@ class PolicyRule(model_base.BASE, model_base.HasId, model_base.HasAudit):
|
|||
return d
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def add_policy_rule(id, policy_name, rule, comment, deleted=False,
|
||||
rule_name="", session=None):
|
||||
if session:
|
||||
|
@ -236,12 +245,14 @@ def add_policy_rule(id, policy_name, rule, comment, deleted=False,
|
|||
return policy_rule
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def delete_policy_rule(id, session=None):
|
||||
"""Specify either the ID or the NAME, and that policy is deleted."""
|
||||
session = session or db.get_session()
|
||||
return session.query(PolicyRule).filter(PolicyRule.id == id).soft_delete()
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policy_rule(id, policy_name, session=None, deleted=False):
|
||||
session = session or db.get_session()
|
||||
rule_query = (session.query(PolicyRule).
|
||||
|
@ -256,6 +267,7 @@ def get_policy_rule(id, policy_name, session=None, deleted=False):
|
|||
pass
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
def get_policy_rules(policy_name=None, session=None,
|
||||
deleted=False):
|
||||
session = session or db.get_session()
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
# Copyright 2018 - VMware Inc
|
||||
# Copyright 2016 - Nokia Networks
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import functools
|
||||
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
import tenacity
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_RETRY_ERRORS = (db_exc.DBDeadlock,)
|
||||
|
||||
|
||||
def retry_on_db_error(func):
|
||||
"""Decorates the given function so that it retries on DB deadlock errors.
|
||||
|
||||
:param func: Function to decorate.
|
||||
:return: Decorated function.
|
||||
"""
|
||||
@functools.wraps(func)
|
||||
@tenacity.retry(
|
||||
reraise=True,
|
||||
retry=tenacity.retry_if_exception_type(_RETRY_ERRORS),
|
||||
stop=tenacity.stop_after_attempt(10),
|
||||
wait=tenacity.wait_incrementing(start=0, increment=0.1, max=2)
|
||||
)
|
||||
def decorate(*args, **kw):
|
||||
try:
|
||||
return func(*args, **kw)
|
||||
except db_exc.DBDeadlock:
|
||||
LOG.exception(
|
||||
"DB error detected, operation will be retried: %s", func)
|
||||
raise
|
||||
return decorate
|
|
@ -0,0 +1,45 @@
|
|||
# Copyright (c) 2018 VMware
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
from __future__ import division
|
||||
from __future__ import absolute_import
|
||||
|
||||
import mock
|
||||
from oslo_db import exception as db_exc
|
||||
import testtools
|
||||
|
||||
from congress.db import utils as db_utils
|
||||
|
||||
|
||||
class TestUtils(testtools.TestCase):
|
||||
|
||||
def _get_fail_then_succeed_func(self, failure_exception):
|
||||
fail_then_succeed = mock.Mock(
|
||||
side_effect=[failure_exception, True])
|
||||
|
||||
# set name as required by functools.wrap
|
||||
fail_then_succeed.__name__ = 'fail_then_suceed'
|
||||
return fail_then_succeed
|
||||
|
||||
def test_no_retry_on_unknown_db_error(self):
|
||||
fail_then_succeed = db_utils.retry_on_db_error(
|
||||
self._get_fail_then_succeed_func(db_exc.DBError))
|
||||
self.assertRaises(db_exc.DBError, fail_then_succeed)
|
||||
|
||||
def test_retry_on_db_deadlock_error(self):
|
||||
fail_then_succeed = db_utils.retry_on_db_error(
|
||||
self._get_fail_then_succeed_func(db_exc.DBDeadlock))
|
||||
self.assertTrue(fail_then_succeed())
|
|
@ -28,6 +28,7 @@ python-dateutil>=2.4.2 # BSD
|
|||
python-glanceclient>=2.8.0 # Apache-2.0
|
||||
Routes>=2.3.1 # MIT
|
||||
six>=1.10.0 # MIT
|
||||
tenacity>=3.2.1 # Apache-2.0
|
||||
oslo.concurrency>=3.25.0 # Apache-2.0
|
||||
oslo.config>=5.1.0 # Apache-2.0
|
||||
oslo.context>=2.19.2 # Apache-2.0
|
||||
|
|
|
@ -13,7 +13,6 @@ python-subunit>=1.0.0 # Apache-2.0/BSD
|
|||
testrepository>=0.0.18 # Apache-2.0/BSD
|
||||
testscenarios>=0.4 # Apache-2.0/BSD
|
||||
testtools>=2.2.0 # MIT
|
||||
tenacity>=3.2.1 # Apache-2.0
|
||||
|
||||
# Doc requirements
|
||||
openstackdocstheme>=1.18.1 # Apache-2.0
|
||||
|
|
Loading…
Reference in New Issue