From cca30d5c4a0cf5bd702ac5c5b9611e59197f828f Mon Sep 17 00:00:00 2001 From: Chris Dent Date: Mon, 10 Dec 2018 17:41:17 +0000 Subject: [PATCH] Retry _ensure_aggregates a limited number of times Use the oslo_db provided wrap_db_retry to retry _ensure_aggregates a maximum of 5 times when DBDuplicateEntry happens. This replaces previous looping which had no guard to prevent unlimited looping if for some reason DBDuplicateEntry kept happening. This is possible, rarely, when there are is a small number of threads talking to the same database, and a small number of aggregates of aggregates being ensured a huge number of times (thousands) in a very small amount of time. Under those circumstances a maximum recursion error was possible. The test of the new behavior is shamefully over-mocked but manages to confirm the behavior against the real database, thus is in the functional tree. Change-Id: I67ab2b9a44264c9fd3a8e69a6fa17466473326f1 Closes-Bug: #1804453 --- placement/objects/resource_provider.py | 31 ++++----- .../functional/db/test_resource_provider.py | 66 +++++++++++++++++++ 2 files changed, 80 insertions(+), 17 deletions(-) diff --git a/placement/objects/resource_provider.py b/placement/objects/resource_provider.py index ff2f56af4..08ec17a2f 100644 --- a/placement/objects/resource_provider.py +++ b/placement/objects/resource_provider.py @@ -511,9 +511,15 @@ def _anchors_for_sharing_providers(context, rp_ids, get_id=False): return set([(r[0], r[1]) for r in context.session.execute(sel).fetchall()]) +@oslo_db_api.wrap_db_retry( + max_retries=5, jitter=True, + exception_checker=lambda exc: isinstance(exc, db_exc.DBDuplicateEntry)) def _ensure_aggregate(ctx, agg_uuid): """Finds an aggregate and returns its internal ID. If not found, creates the aggregate and returns the new aggregate's internal ID. + + If there is a race to create the aggregate (which can happen under rare + high load conditions), retry up to 5 times. """ sel = sa.select([_AGG_TBL.c.id]).where(_AGG_TBL.c.uuid == agg_uuid) res = ctx.session.execute(sel).fetchone() @@ -521,23 +527,14 @@ def _ensure_aggregate(ctx, agg_uuid): return res[0] LOG.debug("_ensure_aggregate() did not find aggregate %s. " - "Creating it.", agg_uuid) - try: - ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid) - res = ctx.session.execute(ins_stmt) - agg_id = res.inserted_primary_key[0] - LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).", - agg_uuid, agg_id) - return agg_id - except db_exc.DBDuplicateEntry: - # Something else added this agg_uuid in between our initial - # fetch above and when we tried flushing this session, so let's - # grab whatever that other thing added. - LOG.debug("_ensure_provider() failed to create new aggregate %s. " - "Another thread already created an aggregate record. " - "Looking up that aggregate record.", - agg_uuid) - return _ensure_aggregate(ctx, agg_uuid) + "Attempting to create it.", agg_uuid) + + ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid) + res = ctx.session.execute(ins_stmt) + agg_id = res.inserted_primary_key[0] + LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).", + agg_uuid, agg_id) + return agg_id @db_api.placement_context_manager.writer diff --git a/placement/tests/functional/db/test_resource_provider.py b/placement/tests/functional/db/test_resource_provider.py index 9cf4b59db..c1e7f5c69 100644 --- a/placement/tests/functional/db/test_resource_provider.py +++ b/placement/tests/functional/db/test_resource_provider.py @@ -2386,3 +2386,69 @@ class SharedProviderTestCase(tb.PlacementDbBaseTestCase): 100, ) self.assertEqual([ss.id], got_ids) + + +# We don't want to waste time sleeping in these tests. It would add +# tens of seconds. +@mock.patch('time.sleep', return_value=None) +class TestEnsureAggregateRetry(tb.PlacementDbBaseTestCase): + + @mock.patch('placement.objects.resource_provider.LOG') + def test_retry_happens(self, mock_log, mock_time): + """Confirm that retrying on DBDuplicateEntry happens when ensuring + aggregates. + """ + magic_fetch_one_attrs = {'fetchone.return_value': None} + expected_id = 1 + # The expected calls to the debug log, used to track the + # internal behavior of _ensure_aggregates. + expected_calls = [ + mock.call('_ensure_aggregate() did not find aggregate %s. ' + 'Attempting to create it.', + uuidsentinel.agg1), + mock.call('_ensure_aggregate() did not find aggregate %s. ' + 'Attempting to create it.', + uuidsentinel.agg1), + mock.call('_ensure_aggregate() created new aggregate %s (id=%d).', + uuidsentinel.agg1, mock.ANY), + ] + side_effects = [ + # Fail to get an agg from db + mock.MagicMock(**magic_fetch_one_attrs), + # Fake a duplicate entry when creating + db_exc.DBDuplicateEntry, + # Fail to get an agg from db + mock.MagicMock(**magic_fetch_one_attrs), + # Create agg with success + mock.DEFAULT + ] + + facade = self.placement_db.get_enginefacade() + with facade.writer.using(self.context) as session: + with mock.patch.object(session, 'execute', + side_effect=side_effects): + rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1) + mock_log.debug.assert_has_calls(expected_calls) + agg_id = rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1) + self.assertEqual(expected_id, agg_id) + + def test_retry_failsover(self, mock_time): + """Confirm that the retry loop used when ensuring aggregates only + retries 5 times. After that it lets DBDuplicateEntry raise. + """ + magic_fetch_one_attrs = {'fetchone.return_value': None} + # Fail to create an aggregate five times. + side_effects = [ + # Fail to get an agg from db + mock.MagicMock(**magic_fetch_one_attrs), + # Fake a duplicate entry when creating + db_exc.DBDuplicateEntry, + ] * 6 + + facade = self.placement_db.get_enginefacade() + with facade.writer.using(self.context) as session: + with mock.patch.object(session, 'execute', + side_effect=side_effects): + self.assertRaises( + db_exc.DBDuplicateEntry, rp_obj._ensure_aggregate, + self.context, uuidsentinel.agg1)