Add what the watcher watches to the watcher as a property

Also adds some new tests to ensure it works as expected.

Change-Id: I4cd9af710f4d528284ea7942fc70e178916bea5a
This commit is contained in:
Joshua Harlow 2016-02-24 11:26:02 -08:00
parent 8166a435f7
commit 4d3d47d6af
2 changed files with 92 additions and 50 deletions

View File

@ -46,23 +46,31 @@ PERIODIC = 'periodic'
IMMEDIATE = 'immediate' IMMEDIATE = 'immediate'
class Work(collections.namedtuple("Work",
['name', 'callback', 'args', 'kwargs'])):
"""Named unit of work that can be periodically scheduled and watched."""
def __call__(self):
return self.callback(*self.args, **self.kwargs)
class Watcher(object): class Watcher(object):
"""A **read-only** object representing a periodics callbacks activities.""" """A **read-only** object representing a periodic callback's activities."""
_REPR_MSG_TPL = ("<Watcher object at 0x%(ident)x " def __init__(self, metrics, work):
"("
"runs=%(runs)s,"
" successes=%(successes)s,"
" failures=%(failures)s,"
" elapsed=%(elapsed)0.2f,"
" elapsed_waiting=%(elapsed_waiting)0.2f"
")>")
def __init__(self, metrics):
self._metrics = metrics self._metrics = metrics
self._work = work
def __repr__(self): def __repr__(self):
return self._REPR_MSG_TPL % dict(ident=id(self), **self._metrics) return ("<Watcher(metrics=%(metrics)s, work=%(work)s)"
" object at 0x%(ident)x>") % dict(ident=id(self),
work=self._work,
metrics=self._metrics)
@property
def work(self):
"""**Read-only** named work tuple this object watches."""
return self._work
@property @property
def runs(self): def runs(self):
@ -260,11 +268,11 @@ class _Runner(object):
self.now_func = now_func self.now_func = now_func
self.retain_traceback = retain_traceback self.retain_traceback = retain_traceback
def run(self, cb, *args, **kwargs): def run(self, work):
failure = None failure = None
started_at = self.now_func() started_at = self.now_func()
try: try:
cb(*args, **kwargs) work()
except Exception: except Exception:
# Until https://bugs.python.org/issue24451 is merged we have to # Until https://bugs.python.org/issue24451 is merged we have to
# capture and return the failure, so that we can have reliable # capture and return the failure, so that we can have reliable
@ -274,11 +282,12 @@ class _Runner(object):
return (started_at, finished_at, failure) return (started_at, finished_at, failure)
def _build(now_func, callables, next_run_scheduler): def _build(now_func, works, next_run_scheduler):
schedule = _Schedule() schedule = _Schedule()
now = None now = None
immediates = collections.deque() immediates = collections.deque()
for index, (cb, _cb_name, args, kwargs) in enumerate(callables): for index, work in enumerate(works):
cb = work.callback
if cb._periodic_run_immediately: if cb._periodic_run_immediately:
immediates.append(index) immediates.append(index)
else: else:
@ -538,7 +547,7 @@ class PeriodicWorker(object):
self._active = event_cls() self._active = event_cls()
self._cond_cls = cond_cls self._cond_cls = cond_cls
self._watchers = [] self._watchers = []
self._callables = [] self._works = []
for (cb, args, kwargs) in callables: for (cb, args, kwargs) in callables:
if not six.callable(cb): if not six.callable(cb):
raise ValueError("Periodic callback %r must be callable" % cb) raise ValueError("Periodic callback %r must be callable" % cb)
@ -552,11 +561,11 @@ class PeriodicWorker(object):
if args is None: if args is None:
args = self._NO_OP_ARGS args = self._NO_OP_ARGS
if kwargs is None: if kwargs is None:
kwargs = self._NO_OP_KWARGS kwargs = self._NO_OP_KWARGS.copy()
cb_name = utils.get_callback_name(cb)
cb_metrics = self._INITIAL_METRICS.copy() cb_metrics = self._INITIAL_METRICS.copy()
watcher = Watcher(cb_metrics) work = Work(utils.get_callback_name(cb), cb, args, kwargs)
self._callables.append((cb, cb_name, args, kwargs)) watcher = Watcher(cb_metrics, work)
self._works.append(work)
self._watchers.append((cb_metrics, watcher)) self._watchers.append((cb_metrics, watcher))
try: try:
strategy = self.BUILT_IN_STRATEGIES[schedule_strategy] strategy = self.BUILT_IN_STRATEGIES[schedule_strategy]
@ -568,7 +577,7 @@ class PeriodicWorker(object):
" %s selectable strategies" " %s selectable strategies"
% (schedule_strategy, valid_strategies)) % (schedule_strategy, valid_strategies))
self._immediates, self._schedule = _build( self._immediates, self._schedule = _build(
now_func, self._callables, self._initial_schedule_strategy) now_func, self._works, self._initial_schedule_strategy)
self._log = log or LOG self._log = log or LOG
if executor_factory is None: if executor_factory is None:
executor_factory = lambda: futurist.SynchronousExecutor() executor_factory = lambda: futurist.SynchronousExecutor()
@ -579,8 +588,8 @@ class PeriodicWorker(object):
self._now_func = now_func self._now_func = now_func
def __len__(self): def __len__(self):
"""How many callables are currently active.""" """How many callables/periodic work units are currently active."""
return len(self._callables) return len(self._works)
def _run(self, executor, runner): def _run(self, executor, runner):
"""Main worker run loop.""" """Main worker run loop."""
@ -609,27 +618,26 @@ class PeriodicWorker(object):
when_next = next_run - now when_next = next_run - now
if when_next <= 0: if when_next <= 0:
# Run & schedule its next execution. # Run & schedule its next execution.
cb, cb_name, args, kwargs = self._callables[index] work = self._works[index]
self._log.debug("Submitting periodic function '%s'", self._log.debug("Submitting periodic"
cb_name) " callback '%s'", work.name)
try: try:
fut = executor.submit(runner.run, cb, *args, **kwargs) fut = executor.submit(runner.run, work)
except _SCHEDULE_RETRY_EXCEPTIONS as exc: except _SCHEDULE_RETRY_EXCEPTIONS as exc:
# Restart after a short delay # Restart after a short delay
delay = (self._RESCHEDULE_DELAY + delay = (self._RESCHEDULE_DELAY +
rnd.random() * self._RESCHEDULE_JITTER) rnd.random() * self._RESCHEDULE_JITTER)
self._log.error("Failed to submit periodic function " self._log.error("Failed to submit periodic callback "
"'%s', retrying after %.2f sec. " "'%s', retrying after %.2f sec. "
"Error: %s", "Error: %s",
cb_name, delay, exc) work.name, delay, exc)
self._schedule.push(self._now_func() + delay, self._schedule.push(self._now_func() + delay,
index) index)
else: else:
barrier.incr() barrier.incr()
fut.add_done_callback(functools.partial(_on_done, fut.add_done_callback(functools.partial(_on_done,
PERIODIC, PERIODIC,
cb, cb_name, work, index,
index,
submitted_at)) submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr()) fut.add_done_callback(lambda _fut: barrier.decr())
else: else:
@ -644,26 +652,28 @@ class PeriodicWorker(object):
except IndexError: except IndexError:
pass pass
else: else:
cb, cb_name, args, kwargs = self._callables[index] work = self._works[index]
submitted_at = self._now_func() submitted_at = self._now_func()
self._log.debug("Submitting immediate function '%s'", cb_name) self._log.debug("Submitting immediate"
" callback '%s'", work.name)
try: try:
fut = executor.submit(runner.run, cb, *args, **kwargs) fut = executor.submit(runner.run, work)
except _SCHEDULE_RETRY_EXCEPTIONS as exc: except _SCHEDULE_RETRY_EXCEPTIONS as exc:
self._log.error("Failed to submit immediate function " self._log.error("Failed to submit immediate callback "
"'%s', retrying. Error: %s", cb_name, exc) "'%s', retrying. Error: %s", work.name,
exc)
# Restart as soon as possible # Restart as soon as possible
self._immediates.append(index) self._immediates.append(index)
else: else:
barrier.incr() barrier.incr()
fut.add_done_callback(functools.partial(_on_done, fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE, IMMEDIATE,
cb, cb_name, work, index,
index,
submitted_at)) submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr()) fut.add_done_callback(lambda _fut: barrier.decr())
def _on_done(kind, cb, cb_name, index, submitted_at, fut): def _on_done(kind, work, index, submitted_at, fut):
cb = work.callback
started_at, finished_at, failure = fut.result() started_at, finished_at, failure = fut.result()
cb_metrics, _watcher = self._watchers[index] cb_metrics, _watcher = self._watchers[index]
cb_metrics['runs'] += 1 cb_metrics['runs'] += 1
@ -720,7 +730,7 @@ class PeriodicWorker(object):
" are %s" % (c, set(_DEFAULT_COLS))) " are %s" % (c, set(_DEFAULT_COLS)))
tbl_rows = [] tbl_rows = []
now = self._now_func() now = self._now_func()
for index, (cb, cb_name, _args, _kwargs) in enumerate(self._callables): for index, work in enumerate(self._works):
_cb_metrics, watcher = self._watchers[index] _cb_metrics, watcher = self._watchers[index]
next_run = self._schedule.fetch_next_run(index) next_run = self._schedule.fetch_next_run(index)
if next_run is None: if next_run is None:
@ -730,9 +740,9 @@ class PeriodicWorker(object):
active = False active = False
runs_in = "%0.4fs" % (max(0.0, next_run - now)) runs_in = "%0.4fs" % (max(0.0, next_run - now))
cb_row = { cb_row = {
'Name': cb_name, 'Name': work.name,
'Active': active, 'Active': active,
'Periodicity': cb._periodic_spacing, 'Periodicity': work.callback._periodic_spacing,
'Runs': watcher.runs, 'Runs': watcher.runs,
'Runs in': runs_in, 'Runs in': runs_in,
'Failures': watcher.failures, 'Failures': watcher.failures,
@ -779,11 +789,11 @@ class PeriodicWorker(object):
return None return None
now = self._now_func() now = self._now_func()
with self._waiter: with self._waiter:
cb_index = len(self._callables) cb_index = len(self._works)
cb_name = utils.get_callback_name(cb)
cb_metrics = self._INITIAL_METRICS.copy() cb_metrics = self._INITIAL_METRICS.copy()
watcher = Watcher(cb_metrics) work = Work(utils.get_callback_name(cb), cb, args, kwargs)
self._callables.append((cb, cb_name, args, kwargs)) watcher = Watcher(cb_metrics, work)
self._works.append(work)
self._watchers.append((cb_metrics, watcher)) self._watchers.append((cb_metrics, watcher))
if cb._periodic_run_immediately: if cb._periodic_run_immediately:
self._immediates.append(cb_index) self._immediates.append(cb_index)
@ -805,9 +815,9 @@ class PeriodicWorker(object):
added) added)
:type allow_empty: bool :type allow_empty: bool
""" """
if not self._callables and not allow_empty: if not self._works and not allow_empty:
raise RuntimeError("A periodic worker can not start" raise RuntimeError("A periodic worker can not start"
" without any callables") " without any callables to process")
if self._active.is_set(): if self._active.is_set():
raise RuntimeError("A periodic worker can not be started" raise RuntimeError("A periodic worker can not be started"
" twice") " twice")
@ -856,7 +866,7 @@ class PeriodicWorker(object):
# keys) is able to see those changes. # keys) is able to see those changes.
cb_metrics[k] = 0 cb_metrics[k] = 0
self._immediates, self._schedule = _build( self._immediates, self._schedule = _build(
self._now_func, self._callables, self._initial_schedule_strategy) self._now_func, self._works, self._initial_schedule_strategy)
def wait(self, timeout=None): def wait(self, timeout=None):
"""Waits for the :py:meth:`.start` method to gracefully exit. """Waits for the :py:meth:`.start` method to gracefully exit.

View File

@ -117,6 +117,38 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
schedule_order = w._schedule._ordering schedule_order = w._schedule._ordering
self.assertEqual([(expected_next, 0)], schedule_order) self.assertEqual([(expected_next, 0)], schedule_order)
def _run_work_up_to(self, stop_after_num_calls):
ev = self.event_cls()
called_tracker = []
@periodics.periodic(0.1, run_immediately=True)
def fast_periodic():
called_tracker.append(True)
if len(called_tracker) >= stop_after_num_calls:
ev.set()
callables = [
(fast_periodic, None, None),
]
worker_kwargs = self.worker_kwargs.copy()
w = periodics.PeriodicWorker(callables, **worker_kwargs)
with self.create_destroy(w.start):
ev.wait()
w.stop()
return list(w.iter_watchers())[0], fast_periodic
def test_watching_work(self):
for i in [3, 5, 9, 11]:
watcher, cb = self._run_work_up_to(i)
self.assertEqual(cb, watcher.work.callback)
self.assertGreaterEqual(i, watcher.runs)
self.assertGreaterEqual(i, watcher.successes)
self.assertEqual((), watcher.work.args)
self.assertEqual({}, watcher.work.kwargs)
def test_last_finished_strategy(self): def test_last_finished_strategy(self):
last_now = 3.2 last_now = 3.2
nows = [ nows = [