From 4d3d47d6afdb6429e2ebc20da2b5dec9834de253 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 24 Feb 2016 11:26:02 -0800 Subject: [PATCH] Add what the watcher watches to the watcher as a property Also adds some new tests to ensure it works as expected. Change-Id: I4cd9af710f4d528284ea7942fc70e178916bea5a --- futurist/periodics.py | 110 +++++++++++++++++-------------- futurist/tests/test_periodics.py | 32 +++++++++ 2 files changed, 92 insertions(+), 50 deletions(-) diff --git a/futurist/periodics.py b/futurist/periodics.py index 24a5949..725f9f3 100644 --- a/futurist/periodics.py +++ b/futurist/periodics.py @@ -46,23 +46,31 @@ PERIODIC = 'periodic' IMMEDIATE = 'immediate' +class Work(collections.namedtuple("Work", + ['name', 'callback', 'args', 'kwargs'])): + """Named unit of work that can be periodically scheduled and watched.""" + + def __call__(self): + return self.callback(*self.args, **self.kwargs) + + class Watcher(object): - """A **read-only** object representing a periodics callbacks activities.""" + """A **read-only** object representing a periodic callback's activities.""" - _REPR_MSG_TPL = ("") - - def __init__(self, metrics): + def __init__(self, metrics, work): self._metrics = metrics + self._work = work def __repr__(self): - return self._REPR_MSG_TPL % dict(ident=id(self), **self._metrics) + return ("") % dict(ident=id(self), + work=self._work, + metrics=self._metrics) + + @property + def work(self): + """**Read-only** named work tuple this object watches.""" + return self._work @property def runs(self): @@ -260,11 +268,11 @@ class _Runner(object): self.now_func = now_func self.retain_traceback = retain_traceback - def run(self, cb, *args, **kwargs): + def run(self, work): failure = None started_at = self.now_func() try: - cb(*args, **kwargs) + work() except Exception: # Until https://bugs.python.org/issue24451 is merged we have to # capture and return the failure, so that we can have reliable @@ -274,11 +282,12 @@ class _Runner(object): return (started_at, finished_at, failure) -def _build(now_func, callables, next_run_scheduler): +def _build(now_func, works, next_run_scheduler): schedule = _Schedule() now = None immediates = collections.deque() - for index, (cb, _cb_name, args, kwargs) in enumerate(callables): + for index, work in enumerate(works): + cb = work.callback if cb._periodic_run_immediately: immediates.append(index) else: @@ -538,7 +547,7 @@ class PeriodicWorker(object): self._active = event_cls() self._cond_cls = cond_cls self._watchers = [] - self._callables = [] + self._works = [] for (cb, args, kwargs) in callables: if not six.callable(cb): raise ValueError("Periodic callback %r must be callable" % cb) @@ -552,11 +561,11 @@ class PeriodicWorker(object): if args is None: args = self._NO_OP_ARGS if kwargs is None: - kwargs = self._NO_OP_KWARGS - cb_name = utils.get_callback_name(cb) + kwargs = self._NO_OP_KWARGS.copy() cb_metrics = self._INITIAL_METRICS.copy() - watcher = Watcher(cb_metrics) - self._callables.append((cb, cb_name, args, kwargs)) + work = Work(utils.get_callback_name(cb), cb, args, kwargs) + watcher = Watcher(cb_metrics, work) + self._works.append(work) self._watchers.append((cb_metrics, watcher)) try: strategy = self.BUILT_IN_STRATEGIES[schedule_strategy] @@ -568,7 +577,7 @@ class PeriodicWorker(object): " %s selectable strategies" % (schedule_strategy, valid_strategies)) self._immediates, self._schedule = _build( - now_func, self._callables, self._initial_schedule_strategy) + now_func, self._works, self._initial_schedule_strategy) self._log = log or LOG if executor_factory is None: executor_factory = lambda: futurist.SynchronousExecutor() @@ -579,8 +588,8 @@ class PeriodicWorker(object): self._now_func = now_func def __len__(self): - """How many callables are currently active.""" - return len(self._callables) + """How many callables/periodic work units are currently active.""" + return len(self._works) def _run(self, executor, runner): """Main worker run loop.""" @@ -609,27 +618,26 @@ class PeriodicWorker(object): when_next = next_run - now if when_next <= 0: # Run & schedule its next execution. - cb, cb_name, args, kwargs = self._callables[index] - self._log.debug("Submitting periodic function '%s'", - cb_name) + work = self._works[index] + self._log.debug("Submitting periodic" + " callback '%s'", work.name) try: - fut = executor.submit(runner.run, cb, *args, **kwargs) + fut = executor.submit(runner.run, work) except _SCHEDULE_RETRY_EXCEPTIONS as exc: # Restart after a short delay delay = (self._RESCHEDULE_DELAY + rnd.random() * self._RESCHEDULE_JITTER) - self._log.error("Failed to submit periodic function " + self._log.error("Failed to submit periodic callback " "'%s', retrying after %.2f sec. " "Error: %s", - cb_name, delay, exc) + work.name, delay, exc) self._schedule.push(self._now_func() + delay, index) else: barrier.incr() fut.add_done_callback(functools.partial(_on_done, PERIODIC, - cb, cb_name, - index, + work, index, submitted_at)) fut.add_done_callback(lambda _fut: barrier.decr()) else: @@ -644,26 +652,28 @@ class PeriodicWorker(object): except IndexError: pass else: - cb, cb_name, args, kwargs = self._callables[index] + work = self._works[index] submitted_at = self._now_func() - self._log.debug("Submitting immediate function '%s'", cb_name) + self._log.debug("Submitting immediate" + " callback '%s'", work.name) try: - fut = executor.submit(runner.run, cb, *args, **kwargs) + fut = executor.submit(runner.run, work) except _SCHEDULE_RETRY_EXCEPTIONS as exc: - self._log.error("Failed to submit immediate function " - "'%s', retrying. Error: %s", cb_name, exc) + self._log.error("Failed to submit immediate callback " + "'%s', retrying. Error: %s", work.name, + exc) # Restart as soon as possible self._immediates.append(index) else: barrier.incr() fut.add_done_callback(functools.partial(_on_done, IMMEDIATE, - cb, cb_name, - index, + work, index, submitted_at)) fut.add_done_callback(lambda _fut: barrier.decr()) - def _on_done(kind, cb, cb_name, index, submitted_at, fut): + def _on_done(kind, work, index, submitted_at, fut): + cb = work.callback started_at, finished_at, failure = fut.result() cb_metrics, _watcher = self._watchers[index] cb_metrics['runs'] += 1 @@ -720,7 +730,7 @@ class PeriodicWorker(object): " are %s" % (c, set(_DEFAULT_COLS))) tbl_rows = [] now = self._now_func() - for index, (cb, cb_name, _args, _kwargs) in enumerate(self._callables): + for index, work in enumerate(self._works): _cb_metrics, watcher = self._watchers[index] next_run = self._schedule.fetch_next_run(index) if next_run is None: @@ -730,9 +740,9 @@ class PeriodicWorker(object): active = False runs_in = "%0.4fs" % (max(0.0, next_run - now)) cb_row = { - 'Name': cb_name, + 'Name': work.name, 'Active': active, - 'Periodicity': cb._periodic_spacing, + 'Periodicity': work.callback._periodic_spacing, 'Runs': watcher.runs, 'Runs in': runs_in, 'Failures': watcher.failures, @@ -779,11 +789,11 @@ class PeriodicWorker(object): return None now = self._now_func() with self._waiter: - cb_index = len(self._callables) - cb_name = utils.get_callback_name(cb) + cb_index = len(self._works) cb_metrics = self._INITIAL_METRICS.copy() - watcher = Watcher(cb_metrics) - self._callables.append((cb, cb_name, args, kwargs)) + work = Work(utils.get_callback_name(cb), cb, args, kwargs) + watcher = Watcher(cb_metrics, work) + self._works.append(work) self._watchers.append((cb_metrics, watcher)) if cb._periodic_run_immediately: self._immediates.append(cb_index) @@ -805,9 +815,9 @@ class PeriodicWorker(object): added) :type allow_empty: bool """ - if not self._callables and not allow_empty: + if not self._works and not allow_empty: raise RuntimeError("A periodic worker can not start" - " without any callables") + " without any callables to process") if self._active.is_set(): raise RuntimeError("A periodic worker can not be started" " twice") @@ -856,7 +866,7 @@ class PeriodicWorker(object): # keys) is able to see those changes. cb_metrics[k] = 0 self._immediates, self._schedule = _build( - self._now_func, self._callables, self._initial_schedule_strategy) + self._now_func, self._works, self._initial_schedule_strategy) def wait(self, timeout=None): """Waits for the :py:meth:`.start` method to gracefully exit. diff --git a/futurist/tests/test_periodics.py b/futurist/tests/test_periodics.py index 027ed1e..f76e71b 100644 --- a/futurist/tests/test_periodics.py +++ b/futurist/tests/test_periodics.py @@ -117,6 +117,38 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase): schedule_order = w._schedule._ordering self.assertEqual([(expected_next, 0)], schedule_order) + def _run_work_up_to(self, stop_after_num_calls): + ev = self.event_cls() + called_tracker = [] + + @periodics.periodic(0.1, run_immediately=True) + def fast_periodic(): + called_tracker.append(True) + if len(called_tracker) >= stop_after_num_calls: + ev.set() + + callables = [ + (fast_periodic, None, None), + ] + + worker_kwargs = self.worker_kwargs.copy() + w = periodics.PeriodicWorker(callables, **worker_kwargs) + + with self.create_destroy(w.start): + ev.wait() + w.stop() + + return list(w.iter_watchers())[0], fast_periodic + + def test_watching_work(self): + for i in [3, 5, 9, 11]: + watcher, cb = self._run_work_up_to(i) + self.assertEqual(cb, watcher.work.callback) + self.assertGreaterEqual(i, watcher.runs) + self.assertGreaterEqual(i, watcher.successes) + self.assertEqual((), watcher.work.args) + self.assertEqual({}, watcher.work.kwargs) + def test_last_finished_strategy(self): last_now = 3.2 nows = [