Merge "Retry _ensure_aggregates a limited number of times"
This commit is contained in:
commit
3d08aaff7f
|
@ -511,9 +511,15 @@ def _anchors_for_sharing_providers(context, rp_ids, get_id=False):
|
|||
return set([(r[0], r[1]) for r in context.session.execute(sel).fetchall()])
|
||||
|
||||
|
||||
@oslo_db_api.wrap_db_retry(
|
||||
max_retries=5, jitter=True,
|
||||
exception_checker=lambda exc: isinstance(exc, db_exc.DBDuplicateEntry))
|
||||
def _ensure_aggregate(ctx, agg_uuid):
|
||||
"""Finds an aggregate and returns its internal ID. If not found, creates
|
||||
the aggregate and returns the new aggregate's internal ID.
|
||||
|
||||
If there is a race to create the aggregate (which can happen under rare
|
||||
high load conditions), retry up to 5 times.
|
||||
"""
|
||||
sel = sa.select([_AGG_TBL.c.id]).where(_AGG_TBL.c.uuid == agg_uuid)
|
||||
res = ctx.session.execute(sel).fetchone()
|
||||
|
@ -521,23 +527,14 @@ def _ensure_aggregate(ctx, agg_uuid):
|
|||
return res[0]
|
||||
|
||||
LOG.debug("_ensure_aggregate() did not find aggregate %s. "
|
||||
"Creating it.", agg_uuid)
|
||||
try:
|
||||
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
|
||||
res = ctx.session.execute(ins_stmt)
|
||||
agg_id = res.inserted_primary_key[0]
|
||||
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
|
||||
agg_uuid, agg_id)
|
||||
return agg_id
|
||||
except db_exc.DBDuplicateEntry:
|
||||
# Something else added this agg_uuid in between our initial
|
||||
# fetch above and when we tried flushing this session, so let's
|
||||
# grab whatever that other thing added.
|
||||
LOG.debug("_ensure_provider() failed to create new aggregate %s. "
|
||||
"Another thread already created an aggregate record. "
|
||||
"Looking up that aggregate record.",
|
||||
agg_uuid)
|
||||
return _ensure_aggregate(ctx, agg_uuid)
|
||||
"Attempting to create it.", agg_uuid)
|
||||
|
||||
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
|
||||
res = ctx.session.execute(ins_stmt)
|
||||
agg_id = res.inserted_primary_key[0]
|
||||
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
|
||||
agg_uuid, agg_id)
|
||||
return agg_id
|
||||
|
||||
|
||||
@db_api.placement_context_manager.writer
|
||||
|
|
|
@ -2386,3 +2386,69 @@ class SharedProviderTestCase(tb.PlacementDbBaseTestCase):
|
|||
100,
|
||||
)
|
||||
self.assertEqual([ss.id], got_ids)
|
||||
|
||||
|
||||
# We don't want to waste time sleeping in these tests. It would add
|
||||
# tens of seconds.
|
||||
@mock.patch('time.sleep', return_value=None)
|
||||
class TestEnsureAggregateRetry(tb.PlacementDbBaseTestCase):
|
||||
|
||||
@mock.patch('placement.objects.resource_provider.LOG')
|
||||
def test_retry_happens(self, mock_log, mock_time):
|
||||
"""Confirm that retrying on DBDuplicateEntry happens when ensuring
|
||||
aggregates.
|
||||
"""
|
||||
magic_fetch_one_attrs = {'fetchone.return_value': None}
|
||||
expected_id = 1
|
||||
# The expected calls to the debug log, used to track the
|
||||
# internal behavior of _ensure_aggregates.
|
||||
expected_calls = [
|
||||
mock.call('_ensure_aggregate() did not find aggregate %s. '
|
||||
'Attempting to create it.',
|
||||
uuidsentinel.agg1),
|
||||
mock.call('_ensure_aggregate() did not find aggregate %s. '
|
||||
'Attempting to create it.',
|
||||
uuidsentinel.agg1),
|
||||
mock.call('_ensure_aggregate() created new aggregate %s (id=%d).',
|
||||
uuidsentinel.agg1, mock.ANY),
|
||||
]
|
||||
side_effects = [
|
||||
# Fail to get an agg from db
|
||||
mock.MagicMock(**magic_fetch_one_attrs),
|
||||
# Fake a duplicate entry when creating
|
||||
db_exc.DBDuplicateEntry,
|
||||
# Fail to get an agg from db
|
||||
mock.MagicMock(**magic_fetch_one_attrs),
|
||||
# Create agg with success
|
||||
mock.DEFAULT
|
||||
]
|
||||
|
||||
facade = self.placement_db.get_enginefacade()
|
||||
with facade.writer.using(self.context) as session:
|
||||
with mock.patch.object(session, 'execute',
|
||||
side_effect=side_effects):
|
||||
rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1)
|
||||
mock_log.debug.assert_has_calls(expected_calls)
|
||||
agg_id = rp_obj._ensure_aggregate(self.context, uuidsentinel.agg1)
|
||||
self.assertEqual(expected_id, agg_id)
|
||||
|
||||
def test_retry_failsover(self, mock_time):
|
||||
"""Confirm that the retry loop used when ensuring aggregates only
|
||||
retries 5 times. After that it lets DBDuplicateEntry raise.
|
||||
"""
|
||||
magic_fetch_one_attrs = {'fetchone.return_value': None}
|
||||
# Fail to create an aggregate five times.
|
||||
side_effects = [
|
||||
# Fail to get an agg from db
|
||||
mock.MagicMock(**magic_fetch_one_attrs),
|
||||
# Fake a duplicate entry when creating
|
||||
db_exc.DBDuplicateEntry,
|
||||
] * 6
|
||||
|
||||
facade = self.placement_db.get_enginefacade()
|
||||
with facade.writer.using(self.context) as session:
|
||||
with mock.patch.object(session, 'execute',
|
||||
side_effect=side_effects):
|
||||
self.assertRaises(
|
||||
db_exc.DBDuplicateEntry, rp_obj._ensure_aggregate,
|
||||
self.context, uuidsentinel.agg1)
|
||||
|
|
Loading…
Reference in New Issue