diff --git a/placement/objects/resource_provider.py b/placement/objects/resource_provider.py index ff2f56af4..08ec17a2f 100644 --- a/placement/objects/resource_provider.py +++ b/placement/objects/resource_provider.py @@ -511,9 +511,15 @@ def _anchors_for_sharing_providers(context, rp_ids, get_id=False): return set([(r[0], r[1]) for r in context.session.execute(sel).fetchall()]) +@oslo_db_api.wrap_db_retry( + max_retries=5, jitter=True, + exception_checker=lambda exc: isinstance(exc, db_exc.DBDuplicateEntry)) def _ensure_aggregate(ctx, agg_uuid): """Finds an aggregate and returns its internal ID. If not found, creates the aggregate and returns the new aggregate's internal ID. + + If there is a race to create the aggregate (which can happen under rare + high load conditions), retry up to 5 times. """ sel = sa.select([_AGG_TBL.c.id]).where(_AGG_TBL.c.uuid == agg_uuid) res = ctx.session.execute(sel).fetchone() @@ -521,23 +527,14 @@ def _ensure_aggregate(ctx, agg_uuid): return res[0] LOG.debug("_ensure_aggregate() did not find aggregate %s. " - "Creating it.", agg_uuid) - try: - ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid) - res = ctx.session.execute(ins_stmt) - agg_id = res.inserted_primary_key[0] - LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).", - agg_uuid, agg_id) - return agg_id - except db_exc.DBDuplicateEntry: - # Something else added this agg_uuid in between our initial - # fetch above and when we tried flushing this session, so let's - # grab whatever that other thing added. - LOG.debug("_ensure_provider() failed to create new aggregate %s. " - "Another thread already created an aggregate record. " - "Looking up that aggregate record.", - agg_uuid) - return _ensure_aggregate(ctx, agg_uuid) + "Attempting to create it.", agg_uuid) + + ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid) + res = ctx.session.execute(ins_stmt) + agg_id = res.inserted_primary_key[0] + LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).", + agg_uuid, agg_id) + return agg_id @db_api.placement_context_manager.writer diff --git a/placement/tests/functional/db/test_resource_provider.py b/placement/tests/functional/db/test_resource_provider.py index 9cf4b59db..c1e7f5c69 100644 --- a/placement/tests/functional/db/test_resource_provider.py +++ b/placement/tests/functional/db/test_resource_provider.py @@ -2386,3 +2386,69 @@ class SharedProviderTestCase(tb.PlacementDbBaseTestCase): 100, ) self.assertEqual([ss.id], got_ids) + + +# We don't want to waste time sleeping in these tests. It would add +# tens of seconds. +@mock.patch('time.sleep', return_value=None) +class TestEnsureAggregateRetry(tb.PlacementDbBaseTestCase): + + @mock.patch('placement.objects.resource_provider.LOG') + def test_retry_happens(self, mock_log, mock_time): + """Confirm that retrying on DBDuplicateEntry happens when ensuring + aggregates. + """ + magic_fetch_one_attrs = {'fetchone.return_value': None} + expected_id = 1 + # The expected calls to the debug log, used to track the + # internal behavior of _ensure_aggregates. + expected_calls = [ + mock.call('_ensure_aggregate() did not find aggregate %s. ' + 'Attempting to create it.', + uuidsentinel.agg1), + mock.call('_ensure_aggregate() did not find aggregate %s. ' + 'Attempting to create it.', + uuidsentinel.agg1), + mock.call('_ensure_aggregate() created new aggregate %s (id=%d).', + uuidsentinel.agg1, mock.ANY), + ] + side_effects = [ + # Fail to get an agg from db + mock.MagicMock(**magic_fetch_one_attrs), + # Fake a duplicate entry when creating + db_exc.DBDuplicateEntry, + # Fail to get an agg from db + mock.MagicMock(**magic_fetch_one_attrs), + # Create agg with success + mock.DEFAULT + ] + + facade = self.placement_db.get_enginefacade() + with facade.writer.using(self.context) as session: + with mock.patch.object(session, 'execute', + side_effect=side_effects): + rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1) + mock_log.debug.assert_has_calls(expected_calls) + agg_id = rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1) + self.assertEqual(expected_id, agg_id) + + def test_retry_failsover(self, mock_time): + """Confirm that the retry loop used when ensuring aggregates only + retries 5 times. After that it lets DBDuplicateEntry raise. + """ + magic_fetch_one_attrs = {'fetchone.return_value': None} + # Fail to create an aggregate five times. + side_effects = [ + # Fail to get an agg from db + mock.MagicMock(**magic_fetch_one_attrs), + # Fake a duplicate entry when creating + db_exc.DBDuplicateEntry, + ] * 6 + + facade = self.placement_db.get_enginefacade() + with facade.writer.using(self.context) as session: + with mock.patch.object(session, 'execute', + side_effect=side_effects): + self.assertRaises( + db_exc.DBDuplicateEntry, rp_obj._ensure_aggregate, + self.context, uuidsentinel.agg1)