Add what the watcher watches to the watcher as a property

Also adds some new tests to ensure it works as expected.

Change-Id: I4cd9af710f4d528284ea7942fc70e178916bea5a
This commit is contained in:
Joshua Harlow 2016-02-24 11:26:02 -08:00
parent 8166a435f7
commit 4d3d47d6af
2 changed files with 92 additions and 50 deletions

View File

@ -46,23 +46,31 @@ PERIODIC = 'periodic'
IMMEDIATE = 'immediate'
class Work(collections.namedtuple("Work",
['name', 'callback', 'args', 'kwargs'])):
"""Named unit of work that can be periodically scheduled and watched."""
def __call__(self):
return self.callback(*self.args, **self.kwargs)
class Watcher(object):
"""A **read-only** object representing a periodics callbacks activities."""
"""A **read-only** object representing a periodic callback's activities."""
_REPR_MSG_TPL = ("<Watcher object at 0x%(ident)x "
"("
"runs=%(runs)s,"
" successes=%(successes)s,"
" failures=%(failures)s,"
" elapsed=%(elapsed)0.2f,"
" elapsed_waiting=%(elapsed_waiting)0.2f"
")>")
def __init__(self, metrics):
def __init__(self, metrics, work):
self._metrics = metrics
self._work = work
def __repr__(self):
return self._REPR_MSG_TPL % dict(ident=id(self), **self._metrics)
return ("<Watcher(metrics=%(metrics)s, work=%(work)s)"
" object at 0x%(ident)x>") % dict(ident=id(self),
work=self._work,
metrics=self._metrics)
@property
def work(self):
"""**Read-only** named work tuple this object watches."""
return self._work
@property
def runs(self):
@ -260,11 +268,11 @@ class _Runner(object):
self.now_func = now_func
self.retain_traceback = retain_traceback
def run(self, cb, *args, **kwargs):
def run(self, work):
failure = None
started_at = self.now_func()
try:
cb(*args, **kwargs)
work()
except Exception:
# Until https://bugs.python.org/issue24451 is merged we have to
# capture and return the failure, so that we can have reliable
@ -274,11 +282,12 @@ class _Runner(object):
return (started_at, finished_at, failure)
def _build(now_func, callables, next_run_scheduler):
def _build(now_func, works, next_run_scheduler):
schedule = _Schedule()
now = None
immediates = collections.deque()
for index, (cb, _cb_name, args, kwargs) in enumerate(callables):
for index, work in enumerate(works):
cb = work.callback
if cb._periodic_run_immediately:
immediates.append(index)
else:
@ -538,7 +547,7 @@ class PeriodicWorker(object):
self._active = event_cls()
self._cond_cls = cond_cls
self._watchers = []
self._callables = []
self._works = []
for (cb, args, kwargs) in callables:
if not six.callable(cb):
raise ValueError("Periodic callback %r must be callable" % cb)
@ -552,11 +561,11 @@ class PeriodicWorker(object):
if args is None:
args = self._NO_OP_ARGS
if kwargs is None:
kwargs = self._NO_OP_KWARGS
cb_name = utils.get_callback_name(cb)
kwargs = self._NO_OP_KWARGS.copy()
cb_metrics = self._INITIAL_METRICS.copy()
watcher = Watcher(cb_metrics)
self._callables.append((cb, cb_name, args, kwargs))
work = Work(utils.get_callback_name(cb), cb, args, kwargs)
watcher = Watcher(cb_metrics, work)
self._works.append(work)
self._watchers.append((cb_metrics, watcher))
try:
strategy = self.BUILT_IN_STRATEGIES[schedule_strategy]
@ -568,7 +577,7 @@ class PeriodicWorker(object):
" %s selectable strategies"
% (schedule_strategy, valid_strategies))
self._immediates, self._schedule = _build(
now_func, self._callables, self._initial_schedule_strategy)
now_func, self._works, self._initial_schedule_strategy)
self._log = log or LOG
if executor_factory is None:
executor_factory = lambda: futurist.SynchronousExecutor()
@ -579,8 +588,8 @@ class PeriodicWorker(object):
self._now_func = now_func
def __len__(self):
"""How many callables are currently active."""
return len(self._callables)
"""How many callables/periodic work units are currently active."""
return len(self._works)
def _run(self, executor, runner):
"""Main worker run loop."""
@ -609,27 +618,26 @@ class PeriodicWorker(object):
when_next = next_run - now
if when_next <= 0:
# Run & schedule its next execution.
cb, cb_name, args, kwargs = self._callables[index]
self._log.debug("Submitting periodic function '%s'",
cb_name)
work = self._works[index]
self._log.debug("Submitting periodic"
" callback '%s'", work.name)
try:
fut = executor.submit(runner.run, cb, *args, **kwargs)
fut = executor.submit(runner.run, work)
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
# Restart after a short delay
delay = (self._RESCHEDULE_DELAY +
rnd.random() * self._RESCHEDULE_JITTER)
self._log.error("Failed to submit periodic function "
self._log.error("Failed to submit periodic callback "
"'%s', retrying after %.2f sec. "
"Error: %s",
cb_name, delay, exc)
work.name, delay, exc)
self._schedule.push(self._now_func() + delay,
index)
else:
barrier.incr()
fut.add_done_callback(functools.partial(_on_done,
PERIODIC,
cb, cb_name,
index,
work, index,
submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr())
else:
@ -644,26 +652,28 @@ class PeriodicWorker(object):
except IndexError:
pass
else:
cb, cb_name, args, kwargs = self._callables[index]
work = self._works[index]
submitted_at = self._now_func()
self._log.debug("Submitting immediate function '%s'", cb_name)
self._log.debug("Submitting immediate"
" callback '%s'", work.name)
try:
fut = executor.submit(runner.run, cb, *args, **kwargs)
fut = executor.submit(runner.run, work)
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
self._log.error("Failed to submit immediate function "
"'%s', retrying. Error: %s", cb_name, exc)
self._log.error("Failed to submit immediate callback "
"'%s', retrying. Error: %s", work.name,
exc)
# Restart as soon as possible
self._immediates.append(index)
else:
barrier.incr()
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
cb, cb_name,
index,
work, index,
submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr())
def _on_done(kind, cb, cb_name, index, submitted_at, fut):
def _on_done(kind, work, index, submitted_at, fut):
cb = work.callback
started_at, finished_at, failure = fut.result()
cb_metrics, _watcher = self._watchers[index]
cb_metrics['runs'] += 1
@ -720,7 +730,7 @@ class PeriodicWorker(object):
" are %s" % (c, set(_DEFAULT_COLS)))
tbl_rows = []
now = self._now_func()
for index, (cb, cb_name, _args, _kwargs) in enumerate(self._callables):
for index, work in enumerate(self._works):
_cb_metrics, watcher = self._watchers[index]
next_run = self._schedule.fetch_next_run(index)
if next_run is None:
@ -730,9 +740,9 @@ class PeriodicWorker(object):
active = False
runs_in = "%0.4fs" % (max(0.0, next_run - now))
cb_row = {
'Name': cb_name,
'Name': work.name,
'Active': active,
'Periodicity': cb._periodic_spacing,
'Periodicity': work.callback._periodic_spacing,
'Runs': watcher.runs,
'Runs in': runs_in,
'Failures': watcher.failures,
@ -779,11 +789,11 @@ class PeriodicWorker(object):
return None
now = self._now_func()
with self._waiter:
cb_index = len(self._callables)
cb_name = utils.get_callback_name(cb)
cb_index = len(self._works)
cb_metrics = self._INITIAL_METRICS.copy()
watcher = Watcher(cb_metrics)
self._callables.append((cb, cb_name, args, kwargs))
work = Work(utils.get_callback_name(cb), cb, args, kwargs)
watcher = Watcher(cb_metrics, work)
self._works.append(work)
self._watchers.append((cb_metrics, watcher))
if cb._periodic_run_immediately:
self._immediates.append(cb_index)
@ -805,9 +815,9 @@ class PeriodicWorker(object):
added)
:type allow_empty: bool
"""
if not self._callables and not allow_empty:
if not self._works and not allow_empty:
raise RuntimeError("A periodic worker can not start"
" without any callables")
" without any callables to process")
if self._active.is_set():
raise RuntimeError("A periodic worker can not be started"
" twice")
@ -856,7 +866,7 @@ class PeriodicWorker(object):
# keys) is able to see those changes.
cb_metrics[k] = 0
self._immediates, self._schedule = _build(
self._now_func, self._callables, self._initial_schedule_strategy)
self._now_func, self._works, self._initial_schedule_strategy)
def wait(self, timeout=None):
"""Waits for the :py:meth:`.start` method to gracefully exit.

View File

@ -117,6 +117,38 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
schedule_order = w._schedule._ordering
self.assertEqual([(expected_next, 0)], schedule_order)
def _run_work_up_to(self, stop_after_num_calls):
ev = self.event_cls()
called_tracker = []
@periodics.periodic(0.1, run_immediately=True)
def fast_periodic():
called_tracker.append(True)
if len(called_tracker) >= stop_after_num_calls:
ev.set()
callables = [
(fast_periodic, None, None),
]
worker_kwargs = self.worker_kwargs.copy()
w = periodics.PeriodicWorker(callables, **worker_kwargs)
with self.create_destroy(w.start):
ev.wait()
w.stop()
return list(w.iter_watchers())[0], fast_periodic
def test_watching_work(self):
for i in [3, 5, 9, 11]:
watcher, cb = self._run_work_up_to(i)
self.assertEqual(cb, watcher.work.callback)
self.assertGreaterEqual(i, watcher.runs)
self.assertGreaterEqual(i, watcher.successes)
self.assertEqual((), watcher.work.args)
self.assertEqual({}, watcher.work.kwargs)
def test_last_finished_strategy(self):
last_now = 3.2
nows = [