From dc0862510f35c2fd36e5010c3cc024fc19cc2a3f Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 1 Jun 2016 18:50:10 -0700 Subject: [PATCH] Begin adding our own thread pool executor To be able to add extended functionality without having to (just yet) upstream those changes into the stdlib, especially around thread pool execution and tracking start to add our own thread workers and the needed mechanics to clean them up appropriately. Change-Id: If9af0d905009435d91e7d8ac00005f4ca30bd987 --- futurist/_futures.py | 80 +++++++++++++----------- futurist/_thread.py | 146 +++++++++++++++++++++++++++++++++++++++++++ futurist/_utils.py | 2 + 3 files changed, 191 insertions(+), 37 deletions(-) create mode 100644 futurist/_thread.py diff --git a/futurist/_futures.py b/futurist/_futures.py index bc46813..ed0951f 100644 --- a/futurist/_futures.py +++ b/futurist/_futures.py @@ -19,10 +19,12 @@ import threading from concurrent import futures as _futures from concurrent.futures import process as _process -from concurrent.futures import thread as _thread import six +from six.moves import queue as compat_queue + from futurist import _green +from futurist import _thread from futurist import _utils @@ -37,25 +39,6 @@ class RejectedSubmission(Exception): Future = _futures.Future -class _Threading(object): - - @staticmethod - def event_object(*args, **kwargs): - return threading.Event(*args, **kwargs) - - @staticmethod - def lock_object(*args, **kwargs): - return threading.Lock(*args, **kwargs) - - @staticmethod - def rlock_object(*args, **kwargs): - return threading.RLock(*args, **kwargs) - - @staticmethod - def condition_object(*args, **kwargs): - return threading.Condition(*args, **kwargs) - - class _Gatherer(object): def __init__(self, submit_func, lock_factory, start_before_submit=False): self._submit_func = submit_func @@ -116,7 +99,7 @@ class _Gatherer(object): return fut -class ThreadPoolExecutor(_thread.ThreadPoolExecutor): +class ThreadPoolExecutor(_futures.Executor): """Executor that uses a thread pool to execute calls asynchronously. It gathers statistics about the submissions executed for post-analysis... @@ -124,13 +107,13 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor): See: https://docs.python.org/dev/library/concurrent.futures.html """ - threading = _Threading() + threading = _thread.Threading() def __init__(self, max_workers=None, check_and_reject=None): """Initializes a thread pool executor. :param max_workers: maximum number of workers that can be - simulatenously active at the same time, further + simultaneously active at the same time, further submitted work will be queued up when this limit is reached. :type max_workers: int @@ -146,21 +129,15 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor): """ if max_workers is None: max_workers = _utils.get_optimal_thread_count() - super(ThreadPoolExecutor, self).__init__(max_workers=max_workers) - if self._max_workers <= 0: + if max_workers <= 0: raise ValueError("Max workers must be greater than zero") - # NOTE(harlowja): this replaces the parent classes non-reentrant lock - # with a reentrant lock so that we can correctly call into the check - # and reject lock, and that it will block correctly if another - # submit call is done during that... + self._max_workers = max_workers + self._work_queue = compat_queue.Queue() self._shutdown_lock = threading.RLock() + self._shutdown = False + self._workers = [] self._check_and_reject = check_and_reject or (lambda e, waiting: None) - self._gatherer = _Gatherer( - # Since our submit will use this gatherer we have to reference - # the parent submit, bound to this instance (which is what we - # really want to use anyway). - super(ThreadPoolExecutor, self).submit, - self.threading.lock_object) + self._gatherer = _Gatherer(self._submit, self.threading.lock_object) @property def statistics(self): @@ -172,6 +149,35 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor): """Accessor to determine if the executor is alive/active.""" return not self._shutdown + def _maybe_spin_up(self): + """Spin up a worker if needed.""" + if (not self._workers or + (len(self._workers) < self._max_workers and not + # Do more advanced idle checks in the future.... + any(w.idle for w in self._workers))): + w = _thread.ThreadWorker.create_and_register( + self, self._work_queue) + # Always save it before we start (so that even if we fail + # starting it we can correctly join on it). + self._workers.append(w) + w.start() + + def shutdown(self, wait=True): + with self._shutdown_lock: + if not self._shutdown: + self._shutdown = True + for w in self._workers: + w.stop() + if wait: + for w in self._workers: + _thread.join_thread(w) + + def _submit(self, fn, *args, **kwargs): + f = Future() + self._maybe_spin_up() + self._work_queue.put(_utils.WorkItem(f, fn, args, kwargs)) + return f + def submit(self, fn, *args, **kwargs): """Submit some work to be executed (and gather statistics).""" with self._shutdown_lock: @@ -190,7 +196,7 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor): See: https://docs.python.org/dev/library/concurrent.futures.html """ - threading = _Threading() + threading = _thread.Threading() def __init__(self, max_workers=None): if max_workers is None: @@ -231,7 +237,7 @@ class SynchronousExecutor(_futures.Executor): It gathers statistics about the submissions executed for post-analysis... """ - threading = _Threading() + threading = _thread.Threading() def __init__(self, green=False, run_work_func=lambda work: work.run()): """Synchronous executor constructor. diff --git a/futurist/_thread.py b/futurist/_thread.py new file mode 100644 index 0000000..dc73666 --- /dev/null +++ b/futurist/_thread.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import atexit +import sys +import threading +import weakref + +import six +from six.moves import queue as compat_queue + + +class Threading(object): + + @staticmethod + def event_object(*args, **kwargs): + return threading.Event(*args, **kwargs) + + @staticmethod + def lock_object(*args, **kwargs): + return threading.Lock(*args, **kwargs) + + @staticmethod + def rlock_object(*args, **kwargs): + return threading.RLock(*args, **kwargs) + + @staticmethod + def condition_object(*args, **kwargs): + return threading.Condition(*args, **kwargs) + + +_to_be_cleaned = weakref.WeakKeyDictionary() +_dying = False + + +if six.PY2: + # This ensures joining responds to keyboard interrupts. + join_thread = lambda thread: thread.join(sys.maxint) +else: + # Not needed on py3 or newer... + join_thread = lambda thread: thread.join() + + +_TOMBSTONE = object() + + +class ThreadWorker(threading.Thread): + MAX_IDLE_FOR = 1 + + def __init__(self, executor, work_queue): + super(ThreadWorker, self).__init__() + self.work_queue = work_queue + self.should_stop = False + self.idle = False + self.daemon = True + # Ensure that when the owning executor gets cleaned up that these + # threads also get shutdown (if they were not already shutdown). + self.executor_ref = weakref.ref( + executor, lambda _obj: work_queue.put(_TOMBSTONE)) + + @classmethod + def create_and_register(cls, executor, work_queue): + w = cls(executor, work_queue) + # Ensure that on shutdown, if threads still exist that we get + # around to cleaning them up and waiting for them to correctly stop. + # + # TODO(harlowja): use a weakrefset in the future, as we don't + # really care about the values... + _to_be_cleaned[w] = True + return w + + def _is_dying(self): + if self.should_stop or _dying: + return True + executor = self.executor_ref() + if executor is None: + return True + # Avoid confusing the GC with cycles (since each executor + # references its known workers)... + del executor + return False + + def _wait_for_work(self): + self.idle = True + work = None + while work is None: + try: + work = self.work_queue.get(True, self.MAX_IDLE_FOR) + except compat_queue.Empty: + if self._is_dying(): + work = _TOMBSTONE + self.idle = False + return work + + def stop(self, soon_as_possible=False): + if soon_as_possible: + # This will potentially leave unfinished work on queues. + self.should_stop = True + self.work_queue.put(_TOMBSTONE) + + def run(self): + while not self._is_dying(): + work = self._wait_for_work() + try: + if work is _TOMBSTONE: + # Ensure any other threads on the same queue also get + # the tombstone object... + self.work_queue.put(_TOMBSTONE) + return + else: + work.run() + finally: + # Avoid any potential (self) references to the work item + # in tracebacks or similar... + del work + + +def _clean_up(): + """Ensure all threads that were created were destroyed cleanly.""" + global _dying + _dying = True + threads_to_wait_for = [] + while _to_be_cleaned: + worker, _work_val = _to_be_cleaned.popitem() + worker.stop(soon_as_possible=True) + threads_to_wait_for.append(worker) + while threads_to_wait_for: + worker = threads_to_wait_for.pop() + try: + join_thread(worker) + finally: + del worker + + +atexit.register(_clean_up) diff --git a/futurist/_utils.py b/futurist/_utils.py index 0b9fce5..26f7155 100644 --- a/futurist/_utils.py +++ b/futurist/_utils.py @@ -31,6 +31,8 @@ except ImportError: class WorkItem(object): + """A thing to be executed by a executor.""" + def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn