Retry _ensure_aggregates a limited number of times
Use the oslo_db provided wrap_db_retry to retry _ensure_aggregates a maximum of 5 times when DBDuplicateEntry happens. This replaces previous looping which had no guard to prevent unlimited looping if for some reason DBDuplicateEntry kept happening. This is possible, rarely, when there are is a small number of threads talking to the same database, and a small number of aggregates of aggregates being ensured a huge number of times (thousands) in a very small amount of time. Under those circumstances a maximum recursion error was possible. The test of the new behavior is shamefully over-mocked but manages to confirm the behavior against the real database, thus is in the functional tree. Change-Id: I67ab2b9a44264c9fd3a8e69a6fa17466473326f1 Closes-Bug: #1804453
This commit is contained in:
parent
7c7eb5021f
commit
cca30d5c4a
|
@ -511,9 +511,15 @@ def _anchors_for_sharing_providers(context, rp_ids, get_id=False):
|
|||
return set([(r[0], r[1]) for r in context.session.execute(sel).fetchall()])
|
||||
|
||||
|
||||
@oslo_db_api.wrap_db_retry(
|
||||
max_retries=5, jitter=True,
|
||||
exception_checker=lambda exc: isinstance(exc, db_exc.DBDuplicateEntry))
|
||||
def _ensure_aggregate(ctx, agg_uuid):
|
||||
"""Finds an aggregate and returns its internal ID. If not found, creates
|
||||
the aggregate and returns the new aggregate's internal ID.
|
||||
|
||||
If there is a race to create the aggregate (which can happen under rare
|
||||
high load conditions), retry up to 5 times.
|
||||
"""
|
||||
sel = sa.select([_AGG_TBL.c.id]).where(_AGG_TBL.c.uuid == agg_uuid)
|
||||
res = ctx.session.execute(sel).fetchone()
|
||||
|
@ -521,23 +527,14 @@ def _ensure_aggregate(ctx, agg_uuid):
|
|||
return res[0]
|
||||
|
||||
LOG.debug("_ensure_aggregate() did not find aggregate %s. "
|
||||
"Creating it.", agg_uuid)
|
||||
try:
|
||||
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
|
||||
res = ctx.session.execute(ins_stmt)
|
||||
agg_id = res.inserted_primary_key[0]
|
||||
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
|
||||
agg_uuid, agg_id)
|
||||
return agg_id
|
||||
except db_exc.DBDuplicateEntry:
|
||||
# Something else added this agg_uuid in between our initial
|
||||
# fetch above and when we tried flushing this session, so let's
|
||||
# grab whatever that other thing added.
|
||||
LOG.debug("_ensure_provider() failed to create new aggregate %s. "
|
||||
"Another thread already created an aggregate record. "
|
||||
"Looking up that aggregate record.",
|
||||
agg_uuid)
|
||||
return _ensure_aggregate(ctx, agg_uuid)
|
||||
"Attempting to create it.", agg_uuid)
|
||||
|
||||
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
|
||||
res = ctx.session.execute(ins_stmt)
|
||||
agg_id = res.inserted_primary_key[0]
|
||||
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
|
||||
agg_uuid, agg_id)
|
||||
return agg_id
|
||||
|
||||
|
||||
@db_api.placement_context_manager.writer
|
||||
|
|
|
@ -2386,3 +2386,69 @@ class SharedProviderTestCase(tb.PlacementDbBaseTestCase):
|
|||
100,
|
||||
)
|
||||
self.assertEqual([ss.id], got_ids)
|
||||
|
||||
|
||||
# We don't want to waste time sleeping in these tests. It would add
|
||||
# tens of seconds.
|
||||
@mock.patch('time.sleep', return_value=None)
|
||||
class TestEnsureAggregateRetry(tb.PlacementDbBaseTestCase):
|
||||
|
||||
@mock.patch('placement.objects.resource_provider.LOG')
|
||||
def test_retry_happens(self, mock_log, mock_time):
|
||||
"""Confirm that retrying on DBDuplicateEntry happens when ensuring
|
||||
aggregates.
|
||||
"""
|
||||
magic_fetch_one_attrs = {'fetchone.return_value': None}
|
||||
expected_id = 1
|
||||
# The expected calls to the debug log, used to track the
|
||||
# internal behavior of _ensure_aggregates.
|
||||
expected_calls = [
|
||||
mock.call('_ensure_aggregate() did not find aggregate %s. '
|
||||
'Attempting to create it.',
|
||||
uuidsentinel.agg1),
|
||||
mock.call('_ensure_aggregate() did not find aggregate %s. '
|
||||
'Attempting to create it.',
|
||||
uuidsentinel.agg1),
|
||||
mock.call('_ensure_aggregate() created new aggregate %s (id=%d).',
|
||||
uuidsentinel.agg1, mock.ANY),
|
||||
]
|
||||
side_effects = [
|
||||
# Fail to get an agg from db
|
||||
mock.MagicMock(**magic_fetch_one_attrs),
|
||||
# Fake a duplicate entry when creating
|
||||
db_exc.DBDuplicateEntry,
|
||||
# Fail to get an agg from db
|
||||
mock.MagicMock(**magic_fetch_one_attrs),
|
||||
# Create agg with success
|
||||
mock.DEFAULT
|
||||
]
|
||||
|
||||
facade = self.placement_db.get_enginefacade()
|
||||
with facade.writer.using(self.context) as session:
|
||||
with mock.patch.object(session, 'execute',
|
||||
side_effect=side_effects):
|
||||
rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1)
|
||||
mock_log.debug.assert_has_calls(expected_calls)
|
||||
agg_id = rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1)
|
||||
self.assertEqual(expected_id, agg_id)
|
||||
|
||||
def test_retry_failsover(self, mock_time):
|
||||
"""Confirm that the retry loop used when ensuring aggregates only
|
||||
retries 5 times. After that it lets DBDuplicateEntry raise.
|
||||
"""
|
||||
magic_fetch_one_attrs = {'fetchone.return_value': None}
|
||||
# Fail to create an aggregate five times.
|
||||
side_effects = [
|
||||
# Fail to get an agg from db
|
||||
mock.MagicMock(**magic_fetch_one_attrs),
|
||||
# Fake a duplicate entry when creating
|
||||
db_exc.DBDuplicateEntry,
|
||||
] * 6
|
||||
|
||||
facade = self.placement_db.get_enginefacade()
|
||||
with facade.writer.using(self.context) as session:
|
||||
with mock.patch.object(session, 'execute',
|
||||
side_effect=side_effects):
|
||||
self.assertRaises(
|
||||
db_exc.DBDuplicateEntry, rp_obj._ensure_aggregate,
|
||||
self.context, uuidsentinel.agg1)
|
||||
|
|
Loading…
Reference in New Issue