Ensure all futures have completed before run returns

Now that shutdown can be skipped (which would previously
wait for all submitted futures to be finished) we need to have
a way to know that all submitted futures have been finished
before our run method returns.

A simple countdown barrier/latch does this so ensure that we use
it in the periodic run loop to be able to only return from the
run loop when no active futures are in progress.

Change-Id: Ia3dd84ebfe2416060aaf5113cc8310a23919a3f9
This commit is contained in:
Joshua Harlow 2016-02-09 10:02:05 -08:00
parent e7bbc502fe
commit da698e8511
2 changed files with 36 additions and 3 deletions

View File

@ -17,6 +17,7 @@
import inspect
import multiprocessing
import sys
import threading
import traceback
from monotonic import monotonic as now # noqa
@ -116,3 +117,26 @@ def get_optimal_thread_count(default=2):
# just setup two threads since it's hard to know what else we
# should do in this situation.
return default
class Barrier(object):
"""A class that ensures active <= 0 occur before unblocking."""
def __init__(self, cond_cls=threading.Condition):
self._active = 0
self._cond = cond_cls()
def incr(self):
with self._cond:
self._active += 1
self._cond.notify_all()
def decr(self):
with self._cond:
self._active -= 1
self._cond.notify_all()
def wait(self):
with self._cond:
while self._active > 0:
self._cond.wait()

View File

@ -532,6 +532,7 @@ class PeriodicWorker(object):
self._waiter = cond_cls()
self._dead = event_cls()
self._active = event_cls()
self._cond_cls = cond_cls
self._watchers = []
self._callables = []
for (cb, args, kwargs) in callables:
@ -577,6 +578,7 @@ class PeriodicWorker(object):
def _run(self, executor, runner):
"""Main worker run loop."""
barrier = utils.Barrier(cond_cls=self._cond_cls)
def _process_scheduled():
# Figure out when we should run next (by selecting the
@ -614,11 +616,13 @@ class PeriodicWorker(object):
# Restart as soon as possible
self._schedule.push(now, index)
else:
barrier.incr()
fut.add_done_callback(functools.partial(_on_done,
PERIODIC,
cb, cb_name,
index,
submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr())
else:
# Gotta wait...
self._schedule.push(next_run, index)
@ -643,11 +647,13 @@ class PeriodicWorker(object):
# Restart as soon as possible
self._immediates.append(index)
else:
barrier.incr()
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
cb, cb_name,
index,
submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr())
def _on_done(kind, cb, cb_name, index, submitted_at, fut):
started_at, finished_at, failure = fut.result()
@ -670,9 +676,12 @@ class PeriodicWorker(object):
self._schedule.push(next_run, index)
self._waiter.notify_all()
while not self._tombstone.is_set():
_process_immediates()
_process_scheduled()
try:
while not self._tombstone.is_set():
_process_immediates()
_process_scheduled()
finally:
barrier.wait()
def _on_finish(self):
# TODO(harlowja): this may be to verbose for people?