Merge "Support tenacity exponential backoff retry on resource sync"

This commit is contained in:
Zuul 2018-02-10 01:15:27 +00:00 committed by Gerrit Code Review
commit 98636290c5
2 changed files with 45 additions and 15 deletions

View File

@ -13,9 +13,8 @@
# limitations under the License.
import ast
import eventlet
import random
import six
import tenacity
from oslo_log import log as logging
@ -116,25 +115,55 @@ def serialize_input_data(input_data):
return {'input_data': _serialize(input_data)}
class wait_random_exponential(tenacity.wait_exponential):
"""Random wait strategy with a geometrically increasing amount of jitter.
Implements the truncated binary exponential backoff algorithm as used in
e.g. CSMA media access control. The retry occurs at a random time in a
(geometrically) expanding interval constrained by minimum and maximum
limits.
"""
def __init__(self, min=0, multiplier=1, max=tenacity._utils.MAX_WAIT,
exp_base=2):
super(wait_random_exponential, self).__init__(multiplier=multiplier,
max=(max-min),
exp_base=exp_base)
self._random = tenacity.wait_random(min=min, max=(min + multiplier))
def __call__(self, previous_attempt_number, delay_since_first_attempt):
jitter = super(wait_random_exponential,
self).__call__(previous_attempt_number,
delay_since_first_attempt)
self._random.wait_random_max = self._random.wait_random_min + jitter
return self._random(previous_attempt_number, delay_since_first_attempt)
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
predecessors, new_data):
rows_updated = None
sync_point = None
input_data = None
nconflicts = max(0, len(predecessors) - 2)
# limit to 10 seconds
max_wt = min(nconflicts * 0.01, 10)
while not rows_updated:
# Retry waits up to 60 seconds at most, with exponentially increasing
# amounts of jitter per resource still outstanding
wait_strategy = wait_random_exponential(max=60)
def init_jitter(existing_input_data):
nconflicts = max(0, len(predecessors) - len(existing_input_data) - 1)
# 10ms per potential conflict, up to a max of 10s in total
return min(nconflicts, 1000) * 0.01
@tenacity.retry(
retry=tenacity.retry_if_result(lambda r: r is None),
wait=wait_strategy
)
def _sync():
sync_point = get(cnxt, entity_id, current_traversal, is_update)
input_data = deserialize_input_data(sync_point.input_data)
wait_strategy.multiplier = init_jitter(input_data)
input_data.update(new_data)
rows_updated = update_input_data(
cnxt, entity_id, current_traversal, is_update,
sync_point.atomic_key, serialize_input_data(input_data))
# don't aggressively spin; induce some sleep
if not rows_updated:
eventlet.sleep(random.uniform(0, max_wt))
return input_data if rows_updated else None
input_data = _sync()
waiting = predecessors - set(input_data)
key = make_key(entity_id, current_traversal, is_update)
if waiting:

View File

@ -74,15 +74,16 @@ class SyncPointTestCase(common.HeatTestCase):
self.assertEqual({'input_data': {u'tuple:(3, 8)': None}}, res)
@mock.patch('heat.engine.sync_point.update_input_data', return_value=None)
@mock.patch('eventlet.sleep', side_effect=exception.DBError)
@mock.patch('time.sleep', side_effect=exception.DBError)
def sync_with_sleep(self, ctx, stack, mock_sleep_time, mock_uid):
resource = stack['C']
graph = stack.convergence_dependencies.graph()
mock_callback = mock.Mock()
sender = (3, True)
self.assertRaises(exception.DBError, sync_point.sync, ctx, resource.id,
stack.current_traversal, True, mock_callback,
set(graph[(resource.id, True)]), {})
set(graph[(resource.id, True)]), {sender: None})
return mock_sleep_time
def test_sync_with_time_throttle(self):
@ -92,4 +93,4 @@ class SyncPointTestCase(common.HeatTestCase):
convergence=True)
stack.converge_stack(stack.t, action=stack.CREATE)
mock_sleep_time = self.sync_with_sleep(ctx, stack)
mock_sleep_time.assert_called_once_with(mock.ANY)
self.assertTrue(mock_sleep_time.called)