Fix jitter strategies

Seems like the wrong function signature was being decorated
so this fixes that and adds an initial test to make sure it
continues to work as expected.

Also fixes the invalid usage of random in the jitter
decorator.

Change-Id: If30fcc1430b84078dc5fc104aa28f06471e4c358
This commit is contained in:
Joshua Harlow 2016-02-24 12:06:36 -08:00
parent bd9f7ca8e8
commit 81991b6a09
2 changed files with 32 additions and 9 deletions

View File

@ -21,11 +21,9 @@ import heapq
import inspect
import logging
import math
import random
import threading
# For: https://wiki.openstack.org/wiki/Security/Projects/Bandit
from random import SystemRandom as random
from concurrent import futures
import six
@ -167,12 +165,13 @@ def _add_jitter(max_percent_jitter):
" equal to 0.0 and less than or equal to 1.0")
def wrapper(func):
rnd = random.SystemRandom()
@six.wraps(func)
def decorator(cb, metrics, now=None):
next_run = func(cb, metrics, now=now)
def decorator(cb, started_at, finished_at, metrics):
next_run = func(cb, started_at, finished_at, metrics)
how_often = cb._periodic_spacing
jitter = how_often * (random.random() * max_percent_jitter)
jitter = how_often * (rnd.random() * max_percent_jitter)
return next_run + jitter
decorator.__name__ += "_with_jitter"
@ -583,6 +582,7 @@ class PeriodicWorker(object):
def _run(self, executor, runner):
"""Main worker run loop."""
barrier = utils.Barrier(cond_cls=self._cond_cls)
rnd = random.SystemRandom()
def _process_scheduled():
# Figure out when we should run next (by selecting the
@ -616,7 +616,7 @@ class PeriodicWorker(object):
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
# Restart after a short delay
delay = (self._RESCHEDULE_DELAY +
random().random() * self._RESCHEDULE_JITTER)
rnd.random() * self._RESCHEDULE_JITTER)
self._log.error("Failed to submit periodic function "
"'%s', retrying after %.2f sec. "
"Error: %s",
@ -677,8 +677,7 @@ class PeriodicWorker(object):
elapsed_waiting = max(0, started_at - submitted_at)
cb_metrics['elapsed'] += elapsed
cb_metrics['elapsed_waiting'] += elapsed_waiting
next_run = self._schedule_strategy(cb,
started_at, finished_at,
next_run = self._schedule_strategy(cb, started_at, finished_at,
cb_metrics)
with self._waiter:
self._schedule.push(next_run, index)

View File

@ -134,6 +134,30 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
nows = list(reversed(nows))
self._test_strategy('last_finished', nows, last_now, 5.0)
def test_last_finished_strategy_jitter(self):
ev = self.event_cls()
calls = []
stop_after_calls = 3
@periodics.periodic(0.1, run_immediately=False)
def fast_periodic():
calls.append(True)
if len(calls) > stop_after_calls:
ev.set()
worker_kwargs = self.worker_kwargs.copy()
worker_kwargs['schedule_strategy'] = 'last_finished_jitter'
callables = [
(fast_periodic, None, None),
]
w = periodics.PeriodicWorker(callables, **worker_kwargs)
with self.create_destroy(w.start):
ev.wait()
w.stop()
self.assertGreaterEqual(len(calls), stop_after_calls)
def test_waiting_immediate_add_processed(self):
ran_at = []