Refactor retry mechanism used in some DB operations
Use oslo_db helper that will allow to restart the whole
transaction in case it needs a certain operation to be repeated.
This is a workaround for the REPEATABLE READ problem where
retrying logic will not work because queries inside a transation
will not see updates made by other transactions.
So, run every attempt in a separate transaction.
Conflicts:
neutron/plugins/ml2/drivers/helpers.py
neutron/plugins/ml2/plugin.py
neutron/tests/unit/ml2/test_ml2_plugin.py
(cherry picked from commit 5dbb34b56f
)
Change-Id: I68f9ae8019879725df58f5da2c83bb699a548255
Closes-Bug: #1382064
This commit is contained in:
parent
d9c78880d2
commit
4cd1b58c8c
|
@ -335,3 +335,12 @@ class DeviceIDNotOwnedByTenant(Conflict):
|
|||
|
||||
class InvalidCIDR(BadRequest):
|
||||
message = _("Invalid CIDR %(input)s given as IP prefix")
|
||||
|
||||
|
||||
class RetryRequest(Exception):
|
||||
"""Error raised when DB operation needs to be retried.
|
||||
|
||||
That could be intentionally raised by the code without any real DB errors.
|
||||
"""
|
||||
def __init__(self, inner_exc):
|
||||
self.inner_exc = inner_exc
|
||||
|
|
|
@ -13,11 +13,24 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
import sys
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.db import exception as oslo_db_exc
|
||||
from oslo.db.sqlalchemy import session
|
||||
|
||||
from neutron.common import exceptions as exc
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_FACADE = None
|
||||
|
||||
MAX_RETRIES = 10
|
||||
|
||||
|
||||
def _create_facade_lazily():
|
||||
global _FACADE
|
||||
|
@ -39,3 +52,75 @@ def get_session(autocommit=True, expire_on_commit=False):
|
|||
facade = _create_facade_lazily()
|
||||
return facade.get_session(autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit)
|
||||
|
||||
|
||||
class wrap_db_retry(object):
|
||||
"""Retry db.api methods, if db_error raised
|
||||
|
||||
Retry decorated db.api methods. This decorator catches db_error and retries
|
||||
function in a loop until it succeeds, or until maximum retries count
|
||||
will be reached.
|
||||
|
||||
Keyword arguments:
|
||||
|
||||
:param retry_interval: seconds between transaction retries
|
||||
:type retry_interval: int
|
||||
|
||||
:param max_retries: max number of retries before an error is raised
|
||||
:type max_retries: int
|
||||
|
||||
:param inc_retry_interval: determine increase retry interval or not
|
||||
:type inc_retry_interval: bool
|
||||
|
||||
:param max_retry_interval: max interval value between retries
|
||||
:type max_retry_interval: int
|
||||
"""
|
||||
|
||||
def __init__(self, retry_interval=0, max_retries=0, inc_retry_interval=0,
|
||||
max_retry_interval=0, retry_on_disconnect=False,
|
||||
retry_on_deadlock=False, retry_on_request=False):
|
||||
super(wrap_db_retry, self).__init__()
|
||||
|
||||
self.db_error = ()
|
||||
if retry_on_disconnect:
|
||||
self.db_error += (oslo_db_exc.DBConnectionError, )
|
||||
if retry_on_deadlock:
|
||||
self.db_error += (oslo_db_exc.DBDeadlock, )
|
||||
if retry_on_request:
|
||||
self.db_error += (exc.RetryRequest, )
|
||||
self.retry_interval = retry_interval
|
||||
self.max_retries = max_retries
|
||||
self.inc_retry_interval = inc_retry_interval
|
||||
self.max_retry_interval = max_retry_interval
|
||||
|
||||
def __call__(self, f):
|
||||
@six.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
next_interval = self.retry_interval
|
||||
remaining = self.max_retries
|
||||
db_error = self.db_error
|
||||
|
||||
while True:
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except db_error as e:
|
||||
if remaining == 0:
|
||||
LOG.exception(_('DB exceeded retry limit.'))
|
||||
if isinstance(e, exc.RetryRequest):
|
||||
six.reraise(type(e.inner_exc),
|
||||
e.inner_exc,
|
||||
sys.exc_info()[2])
|
||||
raise e
|
||||
if remaining != -1:
|
||||
remaining -= 1
|
||||
LOG.exception(_('DB error.'))
|
||||
# NOTE(vsergeyev): We are using patched time module, so
|
||||
# this effectively yields the execution
|
||||
# context to another green thread.
|
||||
time.sleep(next_interval)
|
||||
if self.inc_retry_interval:
|
||||
next_interval = min(
|
||||
next_interval * 2,
|
||||
self.max_retry_interval
|
||||
)
|
||||
return wrapper
|
||||
|
|
|
@ -107,37 +107,32 @@ class TypeDriverHelper(api.TypeDriver):
|
|||
filter_by(allocated=False, **filters))
|
||||
|
||||
# Selected segment can be allocated before update by someone else,
|
||||
# We retry until update success or DB_MAX_RETRIES retries
|
||||
for attempt in range(1, DB_MAX_RETRIES + 1):
|
||||
alloc = select.first()
|
||||
alloc = select.first()
|
||||
|
||||
if not alloc:
|
||||
# No resource available
|
||||
return
|
||||
if not alloc:
|
||||
# No resource available
|
||||
return
|
||||
|
||||
raw_segment = dict((k, alloc[k]) for k in self.primary_keys)
|
||||
LOG.debug("%(type)s segment allocate from pool, attempt "
|
||||
"%(attempt)s started with %(segment)s ",
|
||||
{"type": network_type, "attempt": attempt,
|
||||
raw_segment = dict((k, alloc[k]) for k in self.primary_keys)
|
||||
LOG.debug("%(type)s segment allocate from pool "
|
||||
"started with %(segment)s ",
|
||||
{"type": network_type,
|
||||
"segment": raw_segment})
|
||||
count = (session.query(self.model).
|
||||
filter_by(allocated=False, **raw_segment).
|
||||
update({"allocated": True}))
|
||||
if count:
|
||||
LOG.debug("%(type)s segment allocate from pool "
|
||||
"success with %(segment)s ",
|
||||
{"type": network_type,
|
||||
"segment": raw_segment})
|
||||
count = (session.query(self.model).
|
||||
filter_by(allocated=False, **raw_segment).
|
||||
update({"allocated": True}))
|
||||
if count:
|
||||
LOG.debug("%(type)s segment allocate from pool, attempt "
|
||||
"%(attempt)s success with %(segment)s ",
|
||||
{"type": network_type, "attempt": attempt,
|
||||
"segment": raw_segment})
|
||||
return alloc
|
||||
return alloc
|
||||
|
||||
# Segment allocated since select
|
||||
LOG.debug("Allocate %(type)s segment from pool, "
|
||||
"attempt %(attempt)s failed with segment "
|
||||
"%(segment)s",
|
||||
{"type": network_type, "attempt": attempt,
|
||||
"segment": raw_segment})
|
||||
|
||||
LOG.warning(_("Allocate %(type)s segment from pool failed "
|
||||
"after %(number)s failed attempts"),
|
||||
{"type": network_type, "number": DB_MAX_RETRIES})
|
||||
raise exc.NoNetworkFoundInMaximumAllowedAttempts()
|
||||
# Segment allocated since select
|
||||
LOG.debug("Allocate %(type)s segment from pool "
|
||||
"failed with segment %(segment)s",
|
||||
{"type": network_type,
|
||||
"segment": raw_segment})
|
||||
# saving real exception in case we exceeded amount of attempts
|
||||
raise exc.RetryRequest(
|
||||
exc.NoNetworkFoundInMaximumAllowedAttempts())
|
||||
|
|
|
@ -484,7 +484,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
|
||||
# TODO(apech): Need to override bulk operations
|
||||
|
||||
def create_network(self, context, network):
|
||||
def _create_network_db(self, context, network):
|
||||
net_data = network['network']
|
||||
tenant_id = self._get_tenant_id_for_create(context, net_data)
|
||||
session = context.session
|
||||
|
@ -501,7 +501,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
mech_context = driver_context.NetworkContext(self, context,
|
||||
result)
|
||||
self.mechanism_manager.create_network_precommit(mech_context)
|
||||
return result, mech_context
|
||||
|
||||
@db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
|
||||
retry_on_request=True)
|
||||
def _create_network_with_retries(self, context, network):
|
||||
return self._create_network_db(context, network)
|
||||
|
||||
def create_network(self, context, network):
|
||||
result, mech_context = self._create_network_with_retries(context,
|
||||
network)
|
||||
try:
|
||||
self.mechanism_manager.create_network_postcommit(mech_context)
|
||||
except ml2_exc.MechanismDriverError:
|
||||
|
|
|
@ -134,15 +134,10 @@ class HelpersTest(testlib_api.SqlTestCase):
|
|||
def test_allocate_partial_segment_first_attempt_fails(self):
|
||||
expected = dict(physical_network=TENANT_NET)
|
||||
with mock.patch.object(query.Query, 'update', side_effect=[0, 1]):
|
||||
self.assertRaises(
|
||||
exc.RetryRequest,
|
||||
self.driver.allocate_partially_specified_segment,
|
||||
self.session, **expected)
|
||||
observed = self.driver.allocate_partially_specified_segment(
|
||||
self.session, **expected)
|
||||
self.check_raw_segment(expected, observed)
|
||||
|
||||
def test_allocate_partial_segment_all_attempts_fail(self):
|
||||
with mock.patch.object(query.Query, 'update', return_value=0):
|
||||
with mock.patch.object(helpers.LOG, 'warning') as log_warning:
|
||||
self.assertRaises(
|
||||
exc.NoNetworkFoundInMaximumAllowedAttempts,
|
||||
self.driver.allocate_partially_specified_segment,
|
||||
self.session)
|
||||
log_warning.assert_called_once_with(mock.ANY, mock.ANY)
|
||||
|
|
|
@ -23,6 +23,7 @@ from neutron.common import constants
|
|||
from neutron.common import exceptions as exc
|
||||
from neutron.common import utils
|
||||
from neutron import context
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db import db_base_plugin_v2 as base_plugin
|
||||
from neutron.extensions import external_net as external_net
|
||||
from neutron.extensions import l3agentscheduler
|
||||
|
@ -129,6 +130,19 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
|
|||
side_effect=exc.SubnetNotFound(subnet_id="1")):
|
||||
plugin._delete_subnets(None, [mock.MagicMock()])
|
||||
|
||||
def test_create_network_segment_allocation_fails(self):
|
||||
plugin = manager.NeutronManager.get_plugin()
|
||||
with mock.patch.object(plugin.type_manager, 'create_network_segments',
|
||||
side_effect=exc.RetryRequest(ValueError())) as f:
|
||||
self.assertRaises(ValueError,
|
||||
plugin.create_network,
|
||||
context.get_admin_context(),
|
||||
{'network': {'tenant_id': 'sometenant',
|
||||
'name': 'dummy',
|
||||
'admin_state_up': True,
|
||||
'shared': False}})
|
||||
self.assertEqual(db_api.MAX_RETRIES + 1, f.call_count)
|
||||
|
||||
|
||||
class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
|
||||
Ml2PluginV2TestCase):
|
||||
|
|
Loading…
Reference in New Issue