Merge "Retry _ensure_aggregates a limited number of times"

This commit is contained in:
Zuul 2018-12-18 16:04:57 +00:00 committed by Gerrit Code Review
commit 3d08aaff7f
2 changed files with 80 additions and 17 deletions

View File

@ -511,9 +511,15 @@ def _anchors_for_sharing_providers(context, rp_ids, get_id=False):
return set([(r[0], r[1]) for r in context.session.execute(sel).fetchall()])
@oslo_db_api.wrap_db_retry(
max_retries=5, jitter=True,
exception_checker=lambda exc: isinstance(exc, db_exc.DBDuplicateEntry))
def _ensure_aggregate(ctx, agg_uuid):
"""Finds an aggregate and returns its internal ID. If not found, creates
the aggregate and returns the new aggregate's internal ID.
If there is a race to create the aggregate (which can happen under rare
high load conditions), retry up to 5 times.
"""
sel = sa.select([_AGG_TBL.c.id]).where(_AGG_TBL.c.uuid == agg_uuid)
res = ctx.session.execute(sel).fetchone()
@ -521,23 +527,14 @@ def _ensure_aggregate(ctx, agg_uuid):
return res[0]
LOG.debug("_ensure_aggregate() did not find aggregate %s. "
"Creating it.", agg_uuid)
try:
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
res = ctx.session.execute(ins_stmt)
agg_id = res.inserted_primary_key[0]
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
agg_uuid, agg_id)
return agg_id
except db_exc.DBDuplicateEntry:
# Something else added this agg_uuid in between our initial
# fetch above and when we tried flushing this session, so let's
# grab whatever that other thing added.
LOG.debug("_ensure_provider() failed to create new aggregate %s. "
"Another thread already created an aggregate record. "
"Looking up that aggregate record.",
agg_uuid)
return _ensure_aggregate(ctx, agg_uuid)
"Attempting to create it.", agg_uuid)
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
res = ctx.session.execute(ins_stmt)
agg_id = res.inserted_primary_key[0]
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
agg_uuid, agg_id)
return agg_id
@db_api.placement_context_manager.writer

View File

@ -2386,3 +2386,69 @@ class SharedProviderTestCase(tb.PlacementDbBaseTestCase):
100,
)
self.assertEqual([ss.id], got_ids)
# We don't want to waste time sleeping in these tests. It would add
# tens of seconds.
@mock.patch('time.sleep', return_value=None)
class TestEnsureAggregateRetry(tb.PlacementDbBaseTestCase):
@mock.patch('placement.objects.resource_provider.LOG')
def test_retry_happens(self, mock_log, mock_time):
"""Confirm that retrying on DBDuplicateEntry happens when ensuring
aggregates.
"""
magic_fetch_one_attrs = {'fetchone.return_value': None}
expected_id = 1
# The expected calls to the debug log, used to track the
# internal behavior of _ensure_aggregates.
expected_calls = [
mock.call('_ensure_aggregate() did not find aggregate %s. '
'Attempting to create it.',
uuidsentinel.agg1),
mock.call('_ensure_aggregate() did not find aggregate %s. '
'Attempting to create it.',
uuidsentinel.agg1),
mock.call('_ensure_aggregate() created new aggregate %s (id=%d).',
uuidsentinel.agg1, mock.ANY),
]
side_effects = [
# Fail to get an agg from db
mock.MagicMock(**magic_fetch_one_attrs),
# Fake a duplicate entry when creating
db_exc.DBDuplicateEntry,
# Fail to get an agg from db
mock.MagicMock(**magic_fetch_one_attrs),
# Create agg with success
mock.DEFAULT
]
facade = self.placement_db.get_enginefacade()
with facade.writer.using(self.context) as session:
with mock.patch.object(session, 'execute',
side_effect=side_effects):
rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1)
mock_log.debug.assert_has_calls(expected_calls)
agg_id = rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1)
self.assertEqual(expected_id, agg_id)
def test_retry_failsover(self, mock_time):
"""Confirm that the retry loop used when ensuring aggregates only
retries 5 times. After that it lets DBDuplicateEntry raise.
"""
magic_fetch_one_attrs = {'fetchone.return_value': None}
# Fail to create an aggregate five times.
side_effects = [
# Fail to get an agg from db
mock.MagicMock(**magic_fetch_one_attrs),
# Fake a duplicate entry when creating
db_exc.DBDuplicateEntry,
] * 6
facade = self.placement_db.get_enginefacade()
with facade.writer.using(self.context) as session:
with mock.patch.object(session, 'execute',
side_effect=side_effects):
self.assertRaises(
db_exc.DBDuplicateEntry, rp_obj._ensure_aggregate,
self.context, uuidsentinel.agg1)