Merge "Begin adding our own thread pool executor"
This commit is contained in:
commit
3ccc3c447a
|
@ -19,10 +19,12 @@ import threading
|
|||
|
||||
from concurrent import futures as _futures
|
||||
from concurrent.futures import process as _process
|
||||
from concurrent.futures import thread as _thread
|
||||
import six
|
||||
|
||||
from six.moves import queue as compat_queue
|
||||
|
||||
from futurist import _green
|
||||
from futurist import _thread
|
||||
from futurist import _utils
|
||||
|
||||
|
||||
|
@ -37,25 +39,6 @@ class RejectedSubmission(Exception):
|
|||
Future = _futures.Future
|
||||
|
||||
|
||||
class _Threading(object):
|
||||
|
||||
@staticmethod
|
||||
def event_object(*args, **kwargs):
|
||||
return threading.Event(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def lock_object(*args, **kwargs):
|
||||
return threading.Lock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def rlock_object(*args, **kwargs):
|
||||
return threading.RLock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def condition_object(*args, **kwargs):
|
||||
return threading.Condition(*args, **kwargs)
|
||||
|
||||
|
||||
class _Gatherer(object):
|
||||
def __init__(self, submit_func, lock_factory, start_before_submit=False):
|
||||
self._submit_func = submit_func
|
||||
|
@ -116,7 +99,7 @@ class _Gatherer(object):
|
|||
return fut
|
||||
|
||||
|
||||
class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
|
||||
class ThreadPoolExecutor(_futures.Executor):
|
||||
"""Executor that uses a thread pool to execute calls asynchronously.
|
||||
|
||||
It gathers statistics about the submissions executed for post-analysis...
|
||||
|
@ -124,13 +107,13 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
|
|||
See: https://docs.python.org/dev/library/concurrent.futures.html
|
||||
"""
|
||||
|
||||
threading = _Threading()
|
||||
threading = _thread.Threading()
|
||||
|
||||
def __init__(self, max_workers=None, check_and_reject=None):
|
||||
"""Initializes a thread pool executor.
|
||||
|
||||
:param max_workers: maximum number of workers that can be
|
||||
simulatenously active at the same time, further
|
||||
simultaneously active at the same time, further
|
||||
submitted work will be queued up when this limit
|
||||
is reached.
|
||||
:type max_workers: int
|
||||
|
@ -146,21 +129,15 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
|
|||
"""
|
||||
if max_workers is None:
|
||||
max_workers = _utils.get_optimal_thread_count()
|
||||
super(ThreadPoolExecutor, self).__init__(max_workers=max_workers)
|
||||
if self._max_workers <= 0:
|
||||
if max_workers <= 0:
|
||||
raise ValueError("Max workers must be greater than zero")
|
||||
# NOTE(harlowja): this replaces the parent classes non-reentrant lock
|
||||
# with a reentrant lock so that we can correctly call into the check
|
||||
# and reject lock, and that it will block correctly if another
|
||||
# submit call is done during that...
|
||||
self._max_workers = max_workers
|
||||
self._work_queue = compat_queue.Queue()
|
||||
self._shutdown_lock = threading.RLock()
|
||||
self._shutdown = False
|
||||
self._workers = []
|
||||
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
|
||||
self._gatherer = _Gatherer(
|
||||
# Since our submit will use this gatherer we have to reference
|
||||
# the parent submit, bound to this instance (which is what we
|
||||
# really want to use anyway).
|
||||
super(ThreadPoolExecutor, self).submit,
|
||||
self.threading.lock_object)
|
||||
self._gatherer = _Gatherer(self._submit, self.threading.lock_object)
|
||||
|
||||
@property
|
||||
def statistics(self):
|
||||
|
@ -172,6 +149,35 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
|
|||
"""Accessor to determine if the executor is alive/active."""
|
||||
return not self._shutdown
|
||||
|
||||
def _maybe_spin_up(self):
|
||||
"""Spin up a worker if needed."""
|
||||
if (not self._workers or
|
||||
(len(self._workers) < self._max_workers and not
|
||||
# Do more advanced idle checks in the future....
|
||||
any(w.idle for w in self._workers))):
|
||||
w = _thread.ThreadWorker.create_and_register(
|
||||
self, self._work_queue)
|
||||
# Always save it before we start (so that even if we fail
|
||||
# starting it we can correctly join on it).
|
||||
self._workers.append(w)
|
||||
w.start()
|
||||
|
||||
def shutdown(self, wait=True):
|
||||
with self._shutdown_lock:
|
||||
if not self._shutdown:
|
||||
self._shutdown = True
|
||||
for w in self._workers:
|
||||
w.stop()
|
||||
if wait:
|
||||
for w in self._workers:
|
||||
_thread.join_thread(w)
|
||||
|
||||
def _submit(self, fn, *args, **kwargs):
|
||||
f = Future()
|
||||
self._maybe_spin_up()
|
||||
self._work_queue.put(_utils.WorkItem(f, fn, args, kwargs))
|
||||
return f
|
||||
|
||||
def submit(self, fn, *args, **kwargs):
|
||||
"""Submit some work to be executed (and gather statistics)."""
|
||||
with self._shutdown_lock:
|
||||
|
@ -190,7 +196,7 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
|
|||
See: https://docs.python.org/dev/library/concurrent.futures.html
|
||||
"""
|
||||
|
||||
threading = _Threading()
|
||||
threading = _thread.Threading()
|
||||
|
||||
def __init__(self, max_workers=None):
|
||||
if max_workers is None:
|
||||
|
@ -231,7 +237,7 @@ class SynchronousExecutor(_futures.Executor):
|
|||
It gathers statistics about the submissions executed for post-analysis...
|
||||
"""
|
||||
|
||||
threading = _Threading()
|
||||
threading = _thread.Threading()
|
||||
|
||||
def __init__(self, green=False, run_work_func=lambda work: work.run()):
|
||||
"""Synchronous executor constructor.
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import atexit
|
||||
import sys
|
||||
import threading
|
||||
import weakref
|
||||
|
||||
import six
|
||||
from six.moves import queue as compat_queue
|
||||
|
||||
|
||||
class Threading(object):
|
||||
|
||||
@staticmethod
|
||||
def event_object(*args, **kwargs):
|
||||
return threading.Event(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def lock_object(*args, **kwargs):
|
||||
return threading.Lock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def rlock_object(*args, **kwargs):
|
||||
return threading.RLock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def condition_object(*args, **kwargs):
|
||||
return threading.Condition(*args, **kwargs)
|
||||
|
||||
|
||||
_to_be_cleaned = weakref.WeakKeyDictionary()
|
||||
_dying = False
|
||||
|
||||
|
||||
if six.PY2:
|
||||
# This ensures joining responds to keyboard interrupts.
|
||||
join_thread = lambda thread: thread.join(sys.maxint)
|
||||
else:
|
||||
# Not needed on py3 or newer...
|
||||
join_thread = lambda thread: thread.join()
|
||||
|
||||
|
||||
_TOMBSTONE = object()
|
||||
|
||||
|
||||
class ThreadWorker(threading.Thread):
|
||||
MAX_IDLE_FOR = 1
|
||||
|
||||
def __init__(self, executor, work_queue):
|
||||
super(ThreadWorker, self).__init__()
|
||||
self.work_queue = work_queue
|
||||
self.should_stop = False
|
||||
self.idle = False
|
||||
self.daemon = True
|
||||
# Ensure that when the owning executor gets cleaned up that these
|
||||
# threads also get shutdown (if they were not already shutdown).
|
||||
self.executor_ref = weakref.ref(
|
||||
executor, lambda _obj: work_queue.put(_TOMBSTONE))
|
||||
|
||||
@classmethod
|
||||
def create_and_register(cls, executor, work_queue):
|
||||
w = cls(executor, work_queue)
|
||||
# Ensure that on shutdown, if threads still exist that we get
|
||||
# around to cleaning them up and waiting for them to correctly stop.
|
||||
#
|
||||
# TODO(harlowja): use a weakrefset in the future, as we don't
|
||||
# really care about the values...
|
||||
_to_be_cleaned[w] = True
|
||||
return w
|
||||
|
||||
def _is_dying(self):
|
||||
if self.should_stop or _dying:
|
||||
return True
|
||||
executor = self.executor_ref()
|
||||
if executor is None:
|
||||
return True
|
||||
# Avoid confusing the GC with cycles (since each executor
|
||||
# references its known workers)...
|
||||
del executor
|
||||
return False
|
||||
|
||||
def _wait_for_work(self):
|
||||
self.idle = True
|
||||
work = None
|
||||
while work is None:
|
||||
try:
|
||||
work = self.work_queue.get(True, self.MAX_IDLE_FOR)
|
||||
except compat_queue.Empty:
|
||||
if self._is_dying():
|
||||
work = _TOMBSTONE
|
||||
self.idle = False
|
||||
return work
|
||||
|
||||
def stop(self, soon_as_possible=False):
|
||||
if soon_as_possible:
|
||||
# This will potentially leave unfinished work on queues.
|
||||
self.should_stop = True
|
||||
self.work_queue.put(_TOMBSTONE)
|
||||
|
||||
def run(self):
|
||||
while not self._is_dying():
|
||||
work = self._wait_for_work()
|
||||
try:
|
||||
if work is _TOMBSTONE:
|
||||
# Ensure any other threads on the same queue also get
|
||||
# the tombstone object...
|
||||
self.work_queue.put(_TOMBSTONE)
|
||||
return
|
||||
else:
|
||||
work.run()
|
||||
finally:
|
||||
# Avoid any potential (self) references to the work item
|
||||
# in tracebacks or similar...
|
||||
del work
|
||||
|
||||
|
||||
def _clean_up():
|
||||
"""Ensure all threads that were created were destroyed cleanly."""
|
||||
global _dying
|
||||
_dying = True
|
||||
threads_to_wait_for = []
|
||||
while _to_be_cleaned:
|
||||
worker, _work_val = _to_be_cleaned.popitem()
|
||||
worker.stop(soon_as_possible=True)
|
||||
threads_to_wait_for.append(worker)
|
||||
while threads_to_wait_for:
|
||||
worker = threads_to_wait_for.pop()
|
||||
try:
|
||||
join_thread(worker)
|
||||
finally:
|
||||
del worker
|
||||
|
||||
|
||||
atexit.register(_clean_up)
|
|
@ -31,6 +31,8 @@ except ImportError:
|
|||
|
||||
|
||||
class WorkItem(object):
|
||||
"""A thing to be executed by a executor."""
|
||||
|
||||
def __init__(self, future, fn, args, kwargs):
|
||||
self.future = future
|
||||
self.fn = fn
|
||||
|
|
Loading…
Reference in New Issue