Retry _ensure_aggregates a limited number of times

Use the oslo_db provided wrap_db_retry to retry _ensure_aggregates a
maximum of 5 times when DBDuplicateEntry happens.

This replaces previous looping which had no guard to prevent unlimited
looping if for some reason DBDuplicateEntry kept happening. This is
possible, rarely, when there are is a small number of threads talking
to the same database, and a small number of aggregates of aggregates
being ensured a huge number of times (thousands) in a very small amount
of time. Under those circumstances a maximum recursion error was
possible.

The test of the new behavior is shamefully over-mocked but manages to
confirm the behavior against the real database, thus is in the
functional tree.

Change-Id: I67ab2b9a44264c9fd3a8e69a6fa17466473326f1
Closes-Bug: #1804453
This commit is contained in:
Chris Dent 2018-12-10 17:41:17 +00:00
parent 7c7eb5021f
commit cca30d5c4a
2 changed files with 80 additions and 17 deletions

View File

@ -511,9 +511,15 @@ def _anchors_for_sharing_providers(context, rp_ids, get_id=False):
return set([(r[0], r[1]) for r in context.session.execute(sel).fetchall()])
@oslo_db_api.wrap_db_retry(
max_retries=5, jitter=True,
exception_checker=lambda exc: isinstance(exc, db_exc.DBDuplicateEntry))
def _ensure_aggregate(ctx, agg_uuid):
"""Finds an aggregate and returns its internal ID. If not found, creates
the aggregate and returns the new aggregate's internal ID.
If there is a race to create the aggregate (which can happen under rare
high load conditions), retry up to 5 times.
"""
sel = sa.select([_AGG_TBL.c.id]).where(_AGG_TBL.c.uuid == agg_uuid)
res = ctx.session.execute(sel).fetchone()
@ -521,23 +527,14 @@ def _ensure_aggregate(ctx, agg_uuid):
return res[0]
LOG.debug("_ensure_aggregate() did not find aggregate %s. "
"Creating it.", agg_uuid)
try:
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
res = ctx.session.execute(ins_stmt)
agg_id = res.inserted_primary_key[0]
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
agg_uuid, agg_id)
return agg_id
except db_exc.DBDuplicateEntry:
# Something else added this agg_uuid in between our initial
# fetch above and when we tried flushing this session, so let's
# grab whatever that other thing added.
LOG.debug("_ensure_provider() failed to create new aggregate %s. "
"Another thread already created an aggregate record. "
"Looking up that aggregate record.",
agg_uuid)
return _ensure_aggregate(ctx, agg_uuid)
"Attempting to create it.", agg_uuid)
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
res = ctx.session.execute(ins_stmt)
agg_id = res.inserted_primary_key[0]
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
agg_uuid, agg_id)
return agg_id
@db_api.placement_context_manager.writer

View File

@ -2386,3 +2386,69 @@ class SharedProviderTestCase(tb.PlacementDbBaseTestCase):
100,
)
self.assertEqual([ss.id], got_ids)
# We don't want to waste time sleeping in these tests. It would add
# tens of seconds.
@mock.patch('time.sleep', return_value=None)
class TestEnsureAggregateRetry(tb.PlacementDbBaseTestCase):
@mock.patch('placement.objects.resource_provider.LOG')
def test_retry_happens(self, mock_log, mock_time):
"""Confirm that retrying on DBDuplicateEntry happens when ensuring
aggregates.
"""
magic_fetch_one_attrs = {'fetchone.return_value': None}
expected_id = 1
# The expected calls to the debug log, used to track the
# internal behavior of _ensure_aggregates.
expected_calls = [
mock.call('_ensure_aggregate() did not find aggregate %s. '
'Attempting to create it.',
uuidsentinel.agg1),
mock.call('_ensure_aggregate() did not find aggregate %s. '
'Attempting to create it.',
uuidsentinel.agg1),
mock.call('_ensure_aggregate() created new aggregate %s (id=%d).',
uuidsentinel.agg1, mock.ANY),
]
side_effects = [
# Fail to get an agg from db
mock.MagicMock(**magic_fetch_one_attrs),
# Fake a duplicate entry when creating
db_exc.DBDuplicateEntry,
# Fail to get an agg from db
mock.MagicMock(**magic_fetch_one_attrs),
# Create agg with success
mock.DEFAULT
]
facade = self.placement_db.get_enginefacade()
with facade.writer.using(self.context) as session:
with mock.patch.object(session, 'execute',
side_effect=side_effects):
rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1)
mock_log.debug.assert_has_calls(expected_calls)
agg_id = rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1)
self.assertEqual(expected_id, agg_id)
def test_retry_failsover(self, mock_time):
"""Confirm that the retry loop used when ensuring aggregates only
retries 5 times. After that it lets DBDuplicateEntry raise.
"""
magic_fetch_one_attrs = {'fetchone.return_value': None}
# Fail to create an aggregate five times.
side_effects = [
# Fail to get an agg from db
mock.MagicMock(**magic_fetch_one_attrs),
# Fake a duplicate entry when creating
db_exc.DBDuplicateEntry,
] * 6
facade = self.placement_db.get_enginefacade()
with facade.writer.using(self.context) as session:
with mock.patch.object(session, 'execute',
side_effect=side_effects):
self.assertRaises(
db_exc.DBDuplicateEntry, rp_obj._ensure_aggregate,
self.context, uuidsentinel.agg1)