From da698e8511fda4840af4382457f2c9c97feaa4c5 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 9 Feb 2016 10:02:05 -0800 Subject: [PATCH] Ensure all futures have completed before run returns Now that shutdown can be skipped (which would previously wait for all submitted futures to be finished) we need to have a way to know that all submitted futures have been finished before our run method returns. A simple countdown barrier/latch does this so ensure that we use it in the periodic run loop to be able to only return from the run loop when no active futures are in progress. Change-Id: Ia3dd84ebfe2416060aaf5113cc8310a23919a3f9 --- futurist/_utils.py | 24 ++++++++++++++++++++++++ futurist/periodics.py | 15 ++++++++++++--- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/futurist/_utils.py b/futurist/_utils.py index a80033b..0b9fce5 100644 --- a/futurist/_utils.py +++ b/futurist/_utils.py @@ -17,6 +17,7 @@ import inspect import multiprocessing import sys +import threading import traceback from monotonic import monotonic as now # noqa @@ -116,3 +117,26 @@ def get_optimal_thread_count(default=2): # just setup two threads since it's hard to know what else we # should do in this situation. return default + + +class Barrier(object): + """A class that ensures active <= 0 occur before unblocking.""" + + def __init__(self, cond_cls=threading.Condition): + self._active = 0 + self._cond = cond_cls() + + def incr(self): + with self._cond: + self._active += 1 + self._cond.notify_all() + + def decr(self): + with self._cond: + self._active -= 1 + self._cond.notify_all() + + def wait(self): + with self._cond: + while self._active > 0: + self._cond.wait() diff --git a/futurist/periodics.py b/futurist/periodics.py index 9356275..39324e1 100644 --- a/futurist/periodics.py +++ b/futurist/periodics.py @@ -532,6 +532,7 @@ class PeriodicWorker(object): self._waiter = cond_cls() self._dead = event_cls() self._active = event_cls() + self._cond_cls = cond_cls self._watchers = [] self._callables = [] for (cb, args, kwargs) in callables: @@ -577,6 +578,7 @@ class PeriodicWorker(object): def _run(self, executor, runner): """Main worker run loop.""" + barrier = utils.Barrier(cond_cls=self._cond_cls) def _process_scheduled(): # Figure out when we should run next (by selecting the @@ -614,11 +616,13 @@ class PeriodicWorker(object): # Restart as soon as possible self._schedule.push(now, index) else: + barrier.incr() fut.add_done_callback(functools.partial(_on_done, PERIODIC, cb, cb_name, index, submitted_at)) + fut.add_done_callback(lambda _fut: barrier.decr()) else: # Gotta wait... self._schedule.push(next_run, index) @@ -643,11 +647,13 @@ class PeriodicWorker(object): # Restart as soon as possible self._immediates.append(index) else: + barrier.incr() fut.add_done_callback(functools.partial(_on_done, IMMEDIATE, cb, cb_name, index, submitted_at)) + fut.add_done_callback(lambda _fut: barrier.decr()) def _on_done(kind, cb, cb_name, index, submitted_at, fut): started_at, finished_at, failure = fut.result() @@ -670,9 +676,12 @@ class PeriodicWorker(object): self._schedule.push(next_run, index) self._waiter.notify_all() - while not self._tombstone.is_set(): - _process_immediates() - _process_scheduled() + try: + while not self._tombstone.is_set(): + _process_immediates() + _process_scheduled() + finally: + barrier.wait() def _on_finish(self): # TODO(harlowja): this may be to verbose for people?