Merge "Ensure we check callback '_is_periodic' on add"

This commit is contained in:
Jenkins 2015-07-24 00:35:54 +00:00 committed by Gerrit Code Review
commit 7a8679f834
3 changed files with 151 additions and 32 deletions

View File

@ -35,10 +35,13 @@ Periodics
.. autoclass:: futurist.periodics.PeriodicWorker
:members:
:special-members: __init__
:special-members: __init__, __len__
.. autofunction:: futurist.periodics.periodic
.. autoclass:: futurist.periodics.Watcher
:members:
-------------
Miscellaneous
-------------

View File

@ -37,6 +37,54 @@ _REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing',
'_periodic_run_immediately')
class Watcher(object):
"""A **read-only** object representing a periodics callbacks activities."""
def __init__(self, metrics):
self._metrics = metrics
@property
def runs(self):
"""How many times the periodic callback has been ran."""
return self._metrics['runs']
@property
def successes(self):
"""How many times the periodic callback ran successfully."""
return self._metrics['successes']
@property
def failures(self):
"""How many times the periodic callback ran unsuccessfully."""
return self._metrics['failures']
@property
def elapsed(self):
"""Total amount of time the periodic callback has ran for."""
return self._metrics['elapsed']
@property
def elapsed_waiting(self):
"""Total amount of time the periodic callback has waited to run for."""
return self._metrics['elapsed_waiting']
@property
def average_elapsed_waiting(self):
"""Avg. amount of time the periodic callback has waited to run for.
This may raise a ``ZeroDivisionError`` if there has been no runs.
"""
return self._metrics['elapsed_waiting'] / self._metrics['runs']
@property
def average_elapsed(self):
"""Avg. amount of time the periodic callback has ran for.
This may raise a ``ZeroDivisionError`` if there has been no runs.
"""
return self._metrics['elapsed'] / self._metrics['runs']
def _check_attrs(obj):
"""Checks that a periodic function/method has all the expected attributes.
@ -136,10 +184,10 @@ def _now_plus_periodicity(cb, now):
class _Schedule(object):
"""Internal heap-based structure that maintains the schedule/ordering.
This stores a heap composed of the following (next_run, index) where
next_run is the next desired runtime for the callback that is stored
This stores a heap composed of the following ``(next_run, index)`` where
``next_run`` is the next desired runtime for the callback that is stored
somewhere with the index provided. The index is saved so that if two
functions with the same next_run time are inserted, that the one with
functions with the same ``next_run`` time are inserted, that the one with
the smaller index is preferred (it is also saved so that on pop we can
know what the index of the callback we should call is).
"""
@ -299,7 +347,8 @@ class PeriodicWorker(object):
:type schedule_strategy: string
:param now_func: callable that can return the current time offset
from some point (used in calculating elapsed times
and next times to run)
and next times to run); preferably this is
monotonically increasing
:type now_func: callable
"""
callables = []
@ -357,13 +406,14 @@ class PeriodicWorker(object):
:type schedule_strategy: string
:param now_func: callable that can return the current time offset
from some point (used in calculating elapsed times
and next times to run)
and next times to run); preferably this is
monotonically increasing
:type now_func: callable
"""
self._tombstone = event_cls()
self._waiter = cond_cls()
self._dead = event_cls()
self._metrics = []
self._watchers = []
self._callables = []
for (cb, args, kwargs) in callables:
if not six.callable(cb):
@ -380,8 +430,10 @@ class PeriodicWorker(object):
if kwargs is None:
kwargs = self._NO_OP_KWARGS
cb_name = utils.get_callback_name(cb)
cb_metrics = self._INITIAL_METRICS.copy()
watcher = Watcher(cb_metrics)
self._callables.append((cb, cb_name, args, kwargs))
self._metrics.append(self._INITIAL_METRICS.copy())
self._watchers.append((cb_metrics, watcher))
try:
strategy = self.BUILT_IN_STRATEGIES[schedule_strategy]
self._schedule_strategy = strategy[0]
@ -400,6 +452,7 @@ class PeriodicWorker(object):
self._now_func = now_func
def __len__(self):
"""How many callables are currently active."""
return len(self._callables)
def _run(self, executor):
@ -407,23 +460,23 @@ class PeriodicWorker(object):
def _on_done(kind, cb, cb_name, index, submitted_at, fut):
started_at, finished_at, pretty_tb = fut.result()
metrics = self._metrics[index]
metrics['runs'] += 1
cb_metrics, _watcher = self._watchers[index]
cb_metrics['runs'] += 1
if pretty_tb is not None:
how_often = cb._periodic_spacing
self._log.error("Failed to call %s '%s' (it runs every"
" %0.2f seconds):\n%s", kind, cb_name,
how_often, pretty_tb)
metrics['failures'] += 1
cb_metrics['failures'] += 1
else:
metrics['successes'] += 1
cb_metrics['successes'] += 1
elapsed = max(0, finished_at - started_at)
elapsed_waiting = max(0, started_at - submitted_at)
metrics['elapsed'] += elapsed
metrics['elapsed_waiting'] += elapsed_waiting
cb_metrics['elapsed'] += elapsed
cb_metrics['elapsed_waiting'] += elapsed_waiting
next_run = self._schedule_strategy(cb,
started_at, finished_at,
metrics)
cb_metrics)
with self._waiter:
self._schedule.push(next_run, index)
self._waiter.notify_all()
@ -485,25 +538,30 @@ class PeriodicWorker(object):
# TODO(harlowja): this may be to verbose for people?
if not self._log.isEnabledFor(logging.DEBUG):
return
for index, metrics in enumerate(self._metrics):
watcher_it = self.iter_watchers()
for index, watcher in enumerate(watcher_it):
cb, cb_name, _args, _kwargs = self._callables[index]
runs = metrics['runs']
self._log.debug("Stopped running callback[%s] '%s' periodically:",
index, cb_name)
self._log.debug(" Periodicity = %ss", cb._periodic_spacing)
self._log.debug(" Runs = %s", runs)
self._log.debug(" Failures = %s", metrics['failures'])
self._log.debug(" Successes = %s", metrics['successes'])
if runs > 0:
avg_elapsed = metrics['elapsed'] / runs
avg_elapsed_waiting = metrics['elapsed_waiting'] / runs
self._log.debug(" Average elapsed = %0.4fs", avg_elapsed)
self._log.debug(" Runs = %s", watcher.runs)
self._log.debug(" Failures = %s", watcher.failures)
self._log.debug(" Successes = %s", watcher.successes)
try:
self._log.debug(" Average elapsed = %0.4fs",
watcher.average_elapsed)
self._log.debug(" Average elapsed waiting = %0.4fs",
avg_elapsed_waiting)
watcher.average_elapsed_waiting)
except ZeroDivisionError:
pass
def add(self, cb, *args, **kwargs):
"""Adds a new periodic callback to the current worker.
Returns a :py:class:`.Watcher` if added successfully or the value
``None`` if not (or raises a ``ValueError`` if the callback is not
correctly formed and/or decorated).
:param cb: a callable object/method/function previously decorated
with the :py:func:`.periodic` decorator
:type cb: callable
@ -514,18 +572,23 @@ class PeriodicWorker(object):
if missing_attrs:
raise ValueError("Periodic callback %r missing required"
" attributes %s" % (cb, missing_attrs))
if not cb._is_periodic:
return None
now = self._now_func()
with self._waiter:
index = len(self._callables)
cb_index = len(self._callables)
cb_name = utils.get_callback_name(cb)
cb_metrics = self._INITIAL_METRICS.copy()
watcher = Watcher(cb_metrics)
self._callables.append((cb, cb_name, args, kwargs))
self._metrics.append(self._INITIAL_METRICS.copy())
self._watchers.append((cb_metrics, watcher))
if cb._periodic_run_immediately:
self._immediates.append(index)
self._immediates.append(cb_index)
else:
next_run = self._initial_schedule_strategy(cb, now)
self._schedule.push(next_run, index)
self._schedule.push(next_run, cb_index)
self._waiter.notify_all()
return watcher
def start(self, allow_empty=False):
"""Starts running (will not return until :py:meth:`.stop` is called).
@ -557,13 +620,18 @@ class PeriodicWorker(object):
self._tombstone.set()
self._waiter.notify_all()
def iter_watchers(self):
"""Iterator/generator over all the currently maintained watchers."""
for _cb_metrics, watcher in self._watchers:
yield watcher
def reset(self):
"""Resets the workers internal state."""
self._tombstone.clear()
self._dead.clear()
for metrics in self._metrics:
for k in list(six.iterkeys(metrics)):
metrics[k] = 0
for cb_metrics, _watcher in self._watchers:
for k in list(six.iterkeys(cb_metrics)):
cb_metrics[k] = 0
self._immediates, self._schedule = _build(
self._now_func, self._callables, self._initial_schedule_strategy)

View File

@ -191,6 +191,54 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
self.sleep(0.1)
w.stop()
def test_not_added(self):
@periodics.periodic(0.5)
def no_add_me():
pass
no_add_me._is_periodic = False
@periodics.periodic(0.5)
def add_me():
pass
w = periodics.PeriodicWorker([], **self.worker_kwargs)
self.assertEqual(0, len(w))
self.assertIsNone(w.add(no_add_me))
self.assertEqual(0, len(w))
self.assertIsNotNone(w.add(add_me))
self.assertEqual(1, len(w))
def test_watcher(self):
def cb():
pass
callables = [
(every_one_sec, (cb,), None),
(every_half_sec, (cb,), None),
]
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
w = periodics.PeriodicWorker(callables,
executor_factory=executor_factory,
**self.worker_kwargs)
with self.create_destroy(w.start):
self.sleep(2.0)
w.stop()
for watcher in w.iter_watchers():
self.assertGreaterEqual(watcher.runs, 1)
w.reset()
for watcher in w.iter_watchers():
self.assertEqual(watcher.runs, 0)
self.assertEqual(watcher.successes, 0)
self.assertEqual(watcher.failures, 0)
self.assertEqual(watcher.elapsed, 0)
self.assertEqual(watcher.elapsed_waiting, 0)
def test_worker(self):
called = []