From 4cd1b58c8c4ae2a9da31afc1a87647003d6ac128 Mon Sep 17 00:00:00 2001 From: Eugene Nikanorov Date: Thu, 22 Jan 2015 15:54:29 +0300 Subject: [PATCH] Refactor retry mechanism used in some DB operations Use oslo_db helper that will allow to restart the whole transaction in case it needs a certain operation to be repeated. This is a workaround for the REPEATABLE READ problem where retrying logic will not work because queries inside a transation will not see updates made by other transactions. So, run every attempt in a separate transaction. Conflicts: neutron/plugins/ml2/drivers/helpers.py neutron/plugins/ml2/plugin.py neutron/tests/unit/ml2/test_ml2_plugin.py (cherry picked from commit 5dbb34b56fc42d9c68bf6647910a437a2ad6b29e) Change-Id: I68f9ae8019879725df58f5da2c83bb699a548255 Closes-Bug: #1382064 --- neutron/common/exceptions.py | 9 +++ neutron/db/api.py | 85 +++++++++++++++++++++++ neutron/plugins/ml2/drivers/helpers.py | 55 +++++++-------- neutron/plugins/ml2/plugin.py | 11 ++- neutron/tests/unit/ml2/test_helpers.py | 13 ++-- neutron/tests/unit/ml2/test_ml2_plugin.py | 14 ++++ 6 files changed, 147 insertions(+), 40 deletions(-) diff --git a/neutron/common/exceptions.py b/neutron/common/exceptions.py index be62388aa6e..536f92885f5 100644 --- a/neutron/common/exceptions.py +++ b/neutron/common/exceptions.py @@ -335,3 +335,12 @@ class DeviceIDNotOwnedByTenant(Conflict): class InvalidCIDR(BadRequest): message = _("Invalid CIDR %(input)s given as IP prefix") + + +class RetryRequest(Exception): + """Error raised when DB operation needs to be retried. + + That could be intentionally raised by the code without any real DB errors. + """ + def __init__(self, inner_exc): + self.inner_exc = inner_exc diff --git a/neutron/db/api.py b/neutron/db/api.py index 777c2b6eead..87d0f0e8314 100644 --- a/neutron/db/api.py +++ b/neutron/db/api.py @@ -13,11 +13,24 @@ # License for the specific language governing permissions and limitations # under the License. +import six +import sys +import time + from oslo.config import cfg +from oslo.db import exception as oslo_db_exc from oslo.db.sqlalchemy import session +from neutron.common import exceptions as exc +from neutron.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + _FACADE = None +MAX_RETRIES = 10 + def _create_facade_lazily(): global _FACADE @@ -39,3 +52,75 @@ def get_session(autocommit=True, expire_on_commit=False): facade = _create_facade_lazily() return facade.get_session(autocommit=autocommit, expire_on_commit=expire_on_commit) + + +class wrap_db_retry(object): + """Retry db.api methods, if db_error raised + + Retry decorated db.api methods. This decorator catches db_error and retries + function in a loop until it succeeds, or until maximum retries count + will be reached. + + Keyword arguments: + + :param retry_interval: seconds between transaction retries + :type retry_interval: int + + :param max_retries: max number of retries before an error is raised + :type max_retries: int + + :param inc_retry_interval: determine increase retry interval or not + :type inc_retry_interval: bool + + :param max_retry_interval: max interval value between retries + :type max_retry_interval: int + """ + + def __init__(self, retry_interval=0, max_retries=0, inc_retry_interval=0, + max_retry_interval=0, retry_on_disconnect=False, + retry_on_deadlock=False, retry_on_request=False): + super(wrap_db_retry, self).__init__() + + self.db_error = () + if retry_on_disconnect: + self.db_error += (oslo_db_exc.DBConnectionError, ) + if retry_on_deadlock: + self.db_error += (oslo_db_exc.DBDeadlock, ) + if retry_on_request: + self.db_error += (exc.RetryRequest, ) + self.retry_interval = retry_interval + self.max_retries = max_retries + self.inc_retry_interval = inc_retry_interval + self.max_retry_interval = max_retry_interval + + def __call__(self, f): + @six.wraps(f) + def wrapper(*args, **kwargs): + next_interval = self.retry_interval + remaining = self.max_retries + db_error = self.db_error + + while True: + try: + return f(*args, **kwargs) + except db_error as e: + if remaining == 0: + LOG.exception(_('DB exceeded retry limit.')) + if isinstance(e, exc.RetryRequest): + six.reraise(type(e.inner_exc), + e.inner_exc, + sys.exc_info()[2]) + raise e + if remaining != -1: + remaining -= 1 + LOG.exception(_('DB error.')) + # NOTE(vsergeyev): We are using patched time module, so + # this effectively yields the execution + # context to another green thread. + time.sleep(next_interval) + if self.inc_retry_interval: + next_interval = min( + next_interval * 2, + self.max_retry_interval + ) + return wrapper diff --git a/neutron/plugins/ml2/drivers/helpers.py b/neutron/plugins/ml2/drivers/helpers.py index d3050989ddd..938a2441533 100644 --- a/neutron/plugins/ml2/drivers/helpers.py +++ b/neutron/plugins/ml2/drivers/helpers.py @@ -107,37 +107,32 @@ class TypeDriverHelper(api.TypeDriver): filter_by(allocated=False, **filters)) # Selected segment can be allocated before update by someone else, - # We retry until update success or DB_MAX_RETRIES retries - for attempt in range(1, DB_MAX_RETRIES + 1): - alloc = select.first() + alloc = select.first() - if not alloc: - # No resource available - return + if not alloc: + # No resource available + return - raw_segment = dict((k, alloc[k]) for k in self.primary_keys) - LOG.debug("%(type)s segment allocate from pool, attempt " - "%(attempt)s started with %(segment)s ", - {"type": network_type, "attempt": attempt, + raw_segment = dict((k, alloc[k]) for k in self.primary_keys) + LOG.debug("%(type)s segment allocate from pool " + "started with %(segment)s ", + {"type": network_type, + "segment": raw_segment}) + count = (session.query(self.model). + filter_by(allocated=False, **raw_segment). + update({"allocated": True})) + if count: + LOG.debug("%(type)s segment allocate from pool " + "success with %(segment)s ", + {"type": network_type, "segment": raw_segment}) - count = (session.query(self.model). - filter_by(allocated=False, **raw_segment). - update({"allocated": True})) - if count: - LOG.debug("%(type)s segment allocate from pool, attempt " - "%(attempt)s success with %(segment)s ", - {"type": network_type, "attempt": attempt, - "segment": raw_segment}) - return alloc + return alloc - # Segment allocated since select - LOG.debug("Allocate %(type)s segment from pool, " - "attempt %(attempt)s failed with segment " - "%(segment)s", - {"type": network_type, "attempt": attempt, - "segment": raw_segment}) - - LOG.warning(_("Allocate %(type)s segment from pool failed " - "after %(number)s failed attempts"), - {"type": network_type, "number": DB_MAX_RETRIES}) - raise exc.NoNetworkFoundInMaximumAllowedAttempts() + # Segment allocated since select + LOG.debug("Allocate %(type)s segment from pool " + "failed with segment %(segment)s", + {"type": network_type, + "segment": raw_segment}) + # saving real exception in case we exceeded amount of attempts + raise exc.RetryRequest( + exc.NoNetworkFoundInMaximumAllowedAttempts()) diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index b370b54bb04..9dcf6f84526 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -484,7 +484,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, # TODO(apech): Need to override bulk operations - def create_network(self, context, network): + def _create_network_db(self, context, network): net_data = network['network'] tenant_id = self._get_tenant_id_for_create(context, net_data) session = context.session @@ -501,7 +501,16 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, mech_context = driver_context.NetworkContext(self, context, result) self.mechanism_manager.create_network_precommit(mech_context) + return result, mech_context + @db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES, + retry_on_request=True) + def _create_network_with_retries(self, context, network): + return self._create_network_db(context, network) + + def create_network(self, context, network): + result, mech_context = self._create_network_with_retries(context, + network) try: self.mechanism_manager.create_network_postcommit(mech_context) except ml2_exc.MechanismDriverError: diff --git a/neutron/tests/unit/ml2/test_helpers.py b/neutron/tests/unit/ml2/test_helpers.py index 2297fb9f744..fbdfbae9fec 100644 --- a/neutron/tests/unit/ml2/test_helpers.py +++ b/neutron/tests/unit/ml2/test_helpers.py @@ -134,15 +134,10 @@ class HelpersTest(testlib_api.SqlTestCase): def test_allocate_partial_segment_first_attempt_fails(self): expected = dict(physical_network=TENANT_NET) with mock.patch.object(query.Query, 'update', side_effect=[0, 1]): + self.assertRaises( + exc.RetryRequest, + self.driver.allocate_partially_specified_segment, + self.session, **expected) observed = self.driver.allocate_partially_specified_segment( self.session, **expected) self.check_raw_segment(expected, observed) - - def test_allocate_partial_segment_all_attempts_fail(self): - with mock.patch.object(query.Query, 'update', return_value=0): - with mock.patch.object(helpers.LOG, 'warning') as log_warning: - self.assertRaises( - exc.NoNetworkFoundInMaximumAllowedAttempts, - self.driver.allocate_partially_specified_segment, - self.session) - log_warning.assert_called_once_with(mock.ANY, mock.ANY) diff --git a/neutron/tests/unit/ml2/test_ml2_plugin.py b/neutron/tests/unit/ml2/test_ml2_plugin.py index 831dad15ddc..28e38c7223e 100644 --- a/neutron/tests/unit/ml2/test_ml2_plugin.py +++ b/neutron/tests/unit/ml2/test_ml2_plugin.py @@ -23,6 +23,7 @@ from neutron.common import constants from neutron.common import exceptions as exc from neutron.common import utils from neutron import context +from neutron.db import api as db_api from neutron.db import db_base_plugin_v2 as base_plugin from neutron.extensions import external_net as external_net from neutron.extensions import l3agentscheduler @@ -129,6 +130,19 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2, side_effect=exc.SubnetNotFound(subnet_id="1")): plugin._delete_subnets(None, [mock.MagicMock()]) + def test_create_network_segment_allocation_fails(self): + plugin = manager.NeutronManager.get_plugin() + with mock.patch.object(plugin.type_manager, 'create_network_segments', + side_effect=exc.RetryRequest(ValueError())) as f: + self.assertRaises(ValueError, + plugin.create_network, + context.get_admin_context(), + {'network': {'tenant_id': 'sometenant', + 'name': 'dummy', + 'admin_state_up': True, + 'shared': False}}) + self.assertEqual(db_api.MAX_RETRIES + 1, f.call_count) + class TestMl2SubnetsV2(test_plugin.TestSubnetsV2, Ml2PluginV2TestCase):