Merge "DB operations retry on deadlock"

This commit is contained in:
Zuul 2018-02-07 23:49:15 +00:00 committed by Gerrit Code Review
commit 6884cf5a19
8 changed files with 125 additions and 1 deletions

View File

@ -25,6 +25,7 @@ from sqlalchemy.orm import exc as db_exc
from congress.db import api as db
from congress.db import db_ds_table_data as table_data
from congress.db import model_base
from congress.db import utils as db_utils
from congress import encryption
@ -77,6 +78,7 @@ def _decrypt_secret_config_fields(ds_db_obj):
return ds_db_obj
@db_utils.retry_on_db_error
def add_datasource(id_, name, driver, config, description,
enabled, session=None, secret_config_fields=None):
secret_config_fields = secret_config_fields or []
@ -94,12 +96,14 @@ def add_datasource(id_, name, driver, config, description,
return datasource
@db_utils.retry_on_db_error
def delete_datasource(id_, session=None):
session = session or db.get_session()
return session.query(Datasource).filter(
Datasource.id == id_).delete()
@db_utils.retry_on_db_error
def delete_datasource_with_data(id_, session=None):
session = session or db.get_session()
with session.begin(subtransactions=True):
@ -108,6 +112,7 @@ def delete_datasource_with_data(id_, session=None):
return deleted
@db_utils.retry_on_db_error
def get_datasource_name(name_or_id, session=None):
session = session or db.get_session()
datasource_obj = get_datasource(name_or_id, session)
@ -123,6 +128,7 @@ def get_datasource(name_or_id, session=None):
return db_object
@db_utils.retry_on_db_error
def get_datasource_by_id(id_, session=None):
session = session or db.get_session()
try:
@ -133,6 +139,7 @@ def get_datasource_by_id(id_, session=None):
pass
@db_utils.retry_on_db_error
def get_datasource_by_name(name, session=None):
session = session or db.get_session()
try:
@ -143,6 +150,7 @@ def get_datasource_by_name(name, session=None):
pass
@db_utils.retry_on_db_error
def get_datasources(session=None, deleted=False):
session = session or db.get_session()
return [_decrypt_secret_config_fields(ds_obj)

View File

@ -23,6 +23,7 @@ from sqlalchemy.orm import exc as db_exc
from congress.db import api as db
from congress.db import model_base
from congress.db import utils as db_utils
class DSTableData(model_base.BASE):
@ -34,6 +35,7 @@ class DSTableData(model_base.BASE):
tabledata = sa.Column(sa.Text(), nullable=False)
@db_utils.retry_on_db_error
def store_ds_table_data(ds_id, tablename, tabledata, session=None):
session = session or db.get_session()
tabledata = _json_encode_table_data(tabledata)
@ -45,6 +47,7 @@ def store_ds_table_data(ds_id, tablename, tabledata, session=None):
return new_row
@db_utils.retry_on_db_error
def delete_ds_table_data(ds_id, tablename=None, session=None):
session = session or db.get_session()
if tablename is None:
@ -56,6 +59,7 @@ def delete_ds_table_data(ds_id, tablename=None, session=None):
DSTableData.tablename == tablename).delete()
@db_utils.retry_on_db_error
def get_ds_table_data(ds_id, tablename=None, session=None):
session = session or db.get_session()
try:

View File

@ -24,6 +24,7 @@ from sqlalchemy.orm import exc as db_exc
from congress.db import api as db
from congress.db import model_base
from congress.db import utils as db_utils
class LibraryPolicy(model_base.BASE, model_base.HasId):
@ -58,6 +59,7 @@ class LibraryPolicy(model_base.BASE, model_base.HasId):
return d
@db_utils.retry_on_db_error
def add_policy(policy_dict, session=None):
session = session or db.get_session()
try:
@ -75,6 +77,7 @@ def add_policy(policy_dict, session=None):
"Policy with name %s already exists" % policy_dict['name'])
@db_utils.retry_on_db_error
def replace_policy(id_, policy_dict, session=None):
session = session or db.get_session()
try:
@ -94,17 +97,20 @@ def replace_policy(id_, policy_dict, session=None):
raise KeyError('No policy found with policy id %s' % id_)
@db_utils.retry_on_db_error
def delete_policy(id_, session=None):
session = session or db.get_session()
return session.query(LibraryPolicy).filter(
LibraryPolicy.id == id_).delete()
@db_utils.retry_on_db_error
def delete_policies(session=None):
session = session or db.get_session()
return session.query(LibraryPolicy).delete()
@db_utils.retry_on_db_error
def get_policy(id_, session=None):
session = session or db.get_session()
try:
@ -114,6 +120,7 @@ def get_policy(id_, session=None):
raise KeyError('No policy found with policy id %s' % id_)
@db_utils.retry_on_db_error
def get_policy_by_name(name, session=None):
session = session or db.get_session()
try:
@ -123,6 +130,7 @@ def get_policy_by_name(name, session=None):
raise KeyError('No policy found with policy name %s' % name)
@db_utils.retry_on_db_error
def get_policies(session=None):
session = session or db.get_session()
return (session.query(LibraryPolicy).all())

View File

@ -25,6 +25,7 @@ from sqlalchemy.orm import exc as db_exc
from congress.db import api as db
from congress.db import model_base
from congress.db import utils as db_utils
class Policy(model_base.BASE, model_base.HasId, model_base.HasAudit):
@ -88,6 +89,7 @@ class PolicyDeleted(model_base.BASE, model_base.HasId, model_base.HasAudit):
self.updated_at = policy_obj.updated_at
@db_utils.retry_on_db_error
def add_policy(id_, name, abbreviation, description, owner, kind,
deleted=False, session=None):
if session:
@ -114,6 +116,7 @@ def add_policy(id_, name, abbreviation, description, owner, kind,
raise KeyError("Policy with name %s already exists" % name)
@db_utils.retry_on_db_error
def delete_policy(id_, session=None):
session = session or db.get_session()
with session.begin(subtransactions=True):
@ -133,6 +136,7 @@ def delete_policy(id_, session=None):
PolicyDeleted.id == id_).soft_delete()
@db_utils.retry_on_db_error
def get_policy_by_id(id_, session=None, deleted=False):
session = session or db.get_session()
try:
@ -144,6 +148,7 @@ def get_policy_by_id(id_, session=None, deleted=False):
pass
@db_utils.retry_on_db_error
def get_policy_by_name(name, session=None, deleted=False):
session = session or db.get_session()
try:
@ -155,6 +160,7 @@ def get_policy_by_name(name, session=None, deleted=False):
pass
@db_utils.retry_on_db_error
def get_policy(name_or_id, session=None, deleted=False):
# Try to retrieve policy either by id or name
db_object = (get_policy_by_id(name_or_id, session, deleted) or
@ -164,6 +170,7 @@ def get_policy(name_or_id, session=None, deleted=False):
return db_object
@db_utils.retry_on_db_error
def get_policies(session=None, deleted=False):
session = session or db.get_session()
return (session.query(Policy).
@ -171,6 +178,7 @@ def get_policies(session=None, deleted=False):
all())
@db_utils.retry_on_db_error
def policy_name(name_or_id, session=None):
session = session or db.get_session()
try:
@ -216,6 +224,7 @@ class PolicyRule(model_base.BASE, model_base.HasId, model_base.HasAudit):
return d
@db_utils.retry_on_db_error
def add_policy_rule(id, policy_name, rule, comment, deleted=False,
rule_name="", session=None):
if session:
@ -236,12 +245,14 @@ def add_policy_rule(id, policy_name, rule, comment, deleted=False,
return policy_rule
@db_utils.retry_on_db_error
def delete_policy_rule(id, session=None):
"""Specify either the ID or the NAME, and that policy is deleted."""
session = session or db.get_session()
return session.query(PolicyRule).filter(PolicyRule.id == id).soft_delete()
@db_utils.retry_on_db_error
def get_policy_rule(id, policy_name, session=None, deleted=False):
session = session or db.get_session()
rule_query = (session.query(PolicyRule).
@ -256,6 +267,7 @@ def get_policy_rule(id, policy_name, session=None, deleted=False):
pass
@db_utils.retry_on_db_error
def get_policy_rules(policy_name=None, session=None,
deleted=False):
session = session or db.get_session()

47
congress/db/utils.py Normal file
View File

@ -0,0 +1,47 @@
# Copyright 2018 - VMware Inc
# Copyright 2016 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from oslo_db import exception as db_exc
from oslo_log import log as logging
import tenacity
LOG = logging.getLogger(__name__)
_RETRY_ERRORS = (db_exc.DBDeadlock,)
def retry_on_db_error(func):
"""Decorates the given function so that it retries on DB deadlock errors.
:param func: Function to decorate.
:return: Decorated function.
"""
@functools.wraps(func)
@tenacity.retry(
reraise=True,
retry=tenacity.retry_if_exception_type(_RETRY_ERRORS),
stop=tenacity.stop_after_attempt(10),
wait=tenacity.wait_incrementing(start=0, increment=0.1, max=2)
)
def decorate(*args, **kw):
try:
return func(*args, **kw)
except db_exc.DBDeadlock:
LOG.exception(
"DB error detected, operation will be retried: %s", func)
raise
return decorate

View File

@ -0,0 +1,45 @@
# Copyright (c) 2018 VMware
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import mock
from oslo_db import exception as db_exc
import testtools
from congress.db import utils as db_utils
class TestUtils(testtools.TestCase):
def _get_fail_then_succeed_func(self, failure_exception):
fail_then_succeed = mock.Mock(
side_effect=[failure_exception, True])
# set name as required by functools.wrap
fail_then_succeed.__name__ = 'fail_then_suceed'
return fail_then_succeed
def test_no_retry_on_unknown_db_error(self):
fail_then_succeed = db_utils.retry_on_db_error(
self._get_fail_then_succeed_func(db_exc.DBError))
self.assertRaises(db_exc.DBError, fail_then_succeed)
def test_retry_on_db_deadlock_error(self):
fail_then_succeed = db_utils.retry_on_db_error(
self._get_fail_then_succeed_func(db_exc.DBDeadlock))
self.assertTrue(fail_then_succeed())

View File

@ -28,6 +28,7 @@ python-dateutil>=2.4.2 # BSD
python-glanceclient>=2.8.0 # Apache-2.0
Routes>=2.3.1 # MIT
six>=1.10.0 # MIT
tenacity>=3.2.1 # Apache-2.0
oslo.concurrency>=3.25.0 # Apache-2.0
oslo.config>=5.1.0 # Apache-2.0
oslo.context>=2.19.2 # Apache-2.0

View File

@ -13,7 +13,6 @@ python-subunit>=1.0.0 # Apache-2.0/BSD
testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testtools>=2.2.0 # MIT
tenacity>=3.2.1 # Apache-2.0
# Doc requirements
openstackdocstheme>=1.18.1 # Apache-2.0