From 95e9efc5f249f90a4c24c7fdf59c23b56ccffae5 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 6 Oct 2015 21:59:26 -0700 Subject: [PATCH] Some minor refactoring and comment/note addition In the internal '_run' instead of having a loop that has various cases with quite a bit of code per case instead have the loop call into smaller helper functions that handle the different types of processing the loop does. Also adds a note to the 'reset' method that explains why the metrics keys are overwritten vs being copied/replaced with a new copy. Change-Id: I5ac1794d5803850120be4e67b85efe0449aa1891 --- futurist/periodics.py | 115 ++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 55 deletions(-) diff --git a/futurist/periodics.py b/futurist/periodics.py index a629b67..1fd7a41 100644 --- a/futurist/periodics.py +++ b/futurist/periodics.py @@ -530,6 +530,61 @@ class PeriodicWorker(object): def _run(self, executor, runner): """Main worker run loop.""" + def _process_scheduled(): + # Figure out when we should run next (by selecting the + # minimum item from the heap, where the minimum should be + # the callable that needs to run next and has the lowest + # next desired run time). + with self._waiter: + while (not self._schedule and + not self._tombstone.is_set() and + not self._immediates): + self._waiter.wait(self.MAX_LOOP_IDLE) + if self._tombstone.is_set(): + # We were requested to stop, so stop. + return + if self._immediates: + # This will get processed in _process_immediates() + # in the next loop call. + return + submitted_at = now = self._now_func() + next_run, index = self._schedule.pop() + when_next = next_run - now + if when_next <= 0: + # Run & schedule its next execution. + cb, cb_name, args, kwargs = self._callables[index] + self._log.debug("Submitting periodic function '%s'", + cb_name) + fut = executor.submit(runner, + self._now_func, + cb, *args, **kwargs) + fut.add_done_callback(functools.partial(_on_done, + PERIODIC, + cb, cb_name, + index, + submitted_at)) + else: + # Gotta wait... + self._schedule.push(next_run, index) + when_next = min(when_next, self.MAX_LOOP_IDLE) + self._waiter.wait(when_next) + + def _process_immediates(): + try: + index = self._immediates.popleft() + except IndexError: + pass + else: + cb, cb_name, args, kwargs = self._callables[index] + submitted_at = self._now_func() + self._log.debug("Submitting immediate function '%s'", cb_name) + fut = executor.submit(runner, self._now_func, + cb, *args, **kwargs) + fut.add_done_callback(functools.partial(_on_done, + IMMEDIATE, + cb, cb_name, + index, submitted_at)) + def _on_done(kind, cb, cb_name, index, submitted_at, fut): started_at, finished_at, failure = fut.result() cb_metrics, _watcher = self._watchers[index] @@ -552,61 +607,8 @@ class PeriodicWorker(object): self._waiter.notify_all() while not self._tombstone.is_set(): - if self._immediates: - # Run & schedule its next execution. - try: - index = self._immediates.popleft() - except IndexError: - pass - else: - cb, cb_name, args, kwargs = self._callables[index] - submitted_at = self._now_func() - self._log.debug("Submitting immediate function '%s'", - cb_name) - fut = executor.submit(runner, - self._now_func, - cb, *args, **kwargs) - fut.add_done_callback(functools.partial(_on_done, - IMMEDIATE, - cb, cb_name, - index, - submitted_at)) - else: - # Figure out when we should run next (by selecting the - # minimum item from the heap, where the minimum should be - # the callable that needs to run next and has the lowest - # next desired run time). - with self._waiter: - while (not self._schedule and - not self._tombstone.is_set() and - not self._immediates): - self._waiter.wait(self.MAX_LOOP_IDLE) - if self._tombstone.is_set(): - break - if self._immediates: - # This will get popped in the prior condition... - continue - submitted_at = now = self._now_func() - next_run, index = self._schedule.pop() - when_next = next_run - now - if when_next <= 0: - # Run & schedule its next execution. - cb, cb_name, args, kwargs = self._callables[index] - self._log.debug("Submitting periodic function '%s'", - cb_name) - fut = executor.submit(runner, - self._now_func, - cb, *args, **kwargs) - fut.add_done_callback(functools.partial(_on_done, - PERIODIC, - cb, cb_name, - index, - submitted_at)) - else: - # Gotta wait... - self._schedule.push(next_run, index) - when_next = min(when_next, self.MAX_LOOP_IDLE) - self._waiter.wait(when_next) + _process_immediates() + _process_scheduled() def _on_finish(self): # TODO(harlowja): this may be to verbose for people? @@ -721,6 +723,9 @@ class PeriodicWorker(object): self._dead.clear() for cb_metrics, _watcher in self._watchers: for k in list(six.iterkeys(cb_metrics)): + # NOTE(harlowja): mutate the original dictionaries keys + # so that the watcher (which references the same dictionary + # keys) is able to see those changes. cb_metrics[k] = 0 self._immediates, self._schedule = _build( self._now_func, self._callables, self._initial_schedule_strategy)