diff --git a/futurist/_utils.py b/futurist/_utils.py index a80033b..0b9fce5 100644 --- a/futurist/_utils.py +++ b/futurist/_utils.py @@ -17,6 +17,7 @@ import inspect import multiprocessing import sys +import threading import traceback from monotonic import monotonic as now # noqa @@ -116,3 +117,26 @@ def get_optimal_thread_count(default=2): # just setup two threads since it's hard to know what else we # should do in this situation. return default + + +class Barrier(object): + """A class that ensures active <= 0 occur before unblocking.""" + + def __init__(self, cond_cls=threading.Condition): + self._active = 0 + self._cond = cond_cls() + + def incr(self): + with self._cond: + self._active += 1 + self._cond.notify_all() + + def decr(self): + with self._cond: + self._active -= 1 + self._cond.notify_all() + + def wait(self): + with self._cond: + while self._active > 0: + self._cond.wait() diff --git a/futurist/periodics.py b/futurist/periodics.py index 9356275..39324e1 100644 --- a/futurist/periodics.py +++ b/futurist/periodics.py @@ -532,6 +532,7 @@ class PeriodicWorker(object): self._waiter = cond_cls() self._dead = event_cls() self._active = event_cls() + self._cond_cls = cond_cls self._watchers = [] self._callables = [] for (cb, args, kwargs) in callables: @@ -577,6 +578,7 @@ class PeriodicWorker(object): def _run(self, executor, runner): """Main worker run loop.""" + barrier = utils.Barrier(cond_cls=self._cond_cls) def _process_scheduled(): # Figure out when we should run next (by selecting the @@ -614,11 +616,13 @@ class PeriodicWorker(object): # Restart as soon as possible self._schedule.push(now, index) else: + barrier.incr() fut.add_done_callback(functools.partial(_on_done, PERIODIC, cb, cb_name, index, submitted_at)) + fut.add_done_callback(lambda _fut: barrier.decr()) else: # Gotta wait... self._schedule.push(next_run, index) @@ -643,11 +647,13 @@ class PeriodicWorker(object): # Restart as soon as possible self._immediates.append(index) else: + barrier.incr() fut.add_done_callback(functools.partial(_on_done, IMMEDIATE, cb, cb_name, index, submitted_at)) + fut.add_done_callback(lambda _fut: barrier.decr()) def _on_done(kind, cb, cb_name, index, submitted_at, fut): started_at, finished_at, failure = fut.result() @@ -670,9 +676,12 @@ class PeriodicWorker(object): self._schedule.push(next_run, index) self._waiter.notify_all() - while not self._tombstone.is_set(): - _process_immediates() - _process_scheduled() + try: + while not self._tombstone.is_set(): + _process_immediates() + _process_scheduled() + finally: + barrier.wait() def _on_finish(self): # TODO(harlowja): this may be to verbose for people?