Provide a thread differentiation attribute on executors

Have each executor expose a 'threading' attribute on itself
or its class which can be used by executor consumers to be able
to construct locks, rlocks, conditions, and event objects that
are most compatible with the executors execution style.

This allows for creating libraries that can use the executor defined
objects internally, and avoids the need to have to monkey patch all
the things (since typically libraries just stick to the built-in locks
and conditions and such, and therefore require monkey patching to work
correctly under different executor models).

Change-Id: I6c09a7660cf6a89142064d0db31846481ae7d1be
This commit is contained in:
Joshua Harlow 2015-07-20 17:36:15 -07:00
parent d03cc4f05f
commit 7b08c0a2ad
1 changed files with 64 additions and 10 deletions

View File

@ -44,11 +44,29 @@ class RejectedSubmission(Exception):
Future = _futures.Future
class _Threading(object):
@staticmethod
def event_object(*args, **kwargs):
return threading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return threading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return threading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return threading.Condition(*args, **kwargs)
class _Gatherer(object):
def __init__(self, submit_func,
lock_cls=threading.Lock, start_before_submit=False):
def __init__(self, submit_func, lock_factory, start_before_submit=False):
self._submit_func = submit_func
self._stats_lock = lock_cls()
self._stats_lock = lock_factory()
self._stats = ExecutorStatistics()
self._start_before_submit = start_before_submit
@ -112,6 +130,9 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
threading = _Threading()
def __init__(self, max_workers=None, check_and_reject=None):
"""Initializes a thread pool executor.
@ -145,7 +166,8 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ThreadPoolExecutor, self).submit)
super(ThreadPoolExecutor, self).submit,
self.threading.lock_object)
@property
def statistics(self):
@ -174,6 +196,9 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
threading = _Threading()
def __init__(self, max_workers=None):
if max_workers is None:
max_workers = _utils.get_optimal_thread_count()
@ -184,7 +209,8 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ProcessPoolExecutor, self).submit)
super(ProcessPoolExecutor, self).submit,
self.threading.lock_object)
@property
def alive(self):
@ -226,6 +252,31 @@ class _WorkItem(object):
self.future.set_result(result)
if _utils.EVENTLET_AVAILABLE:
class _GreenThreading(object):
@staticmethod
def event_object(*args, **kwargs):
return greenthreading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return greenthreading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return greenthreading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return greenthreading.Condition(*args, **kwargs)
_green_threading = _GreenThreading()
else:
_green_threading = None
class SynchronousExecutor(_futures.Executor):
"""Executor that uses the caller to execute calls synchronously.
@ -237,6 +288,8 @@ class SynchronousExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _Threading()
def __init__(self, green=False):
"""Synchronous executor constructor.
@ -250,14 +303,13 @@ class SynchronousExecutor(_futures.Executor):
' synchronous executor')
self._shutoff = False
if green:
lock_cls = greenthreading.Lock
self.threading = _green_threading
self._future_cls = GreenFuture
else:
lock_cls = threading.Lock
self._future_cls = Future
self._gatherer = _Gatherer(self._submit,
start_before_submit=True,
lock_cls=lock_cls)
self.threading.lock_object,
start_before_submit=True)
@property
def alive(self):
@ -347,6 +399,8 @@ class GreenThreadPoolExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _green_threading
def __init__(self, max_workers=1000, check_and_reject=None):
"""Initializes a green thread pool executor.
@ -376,7 +430,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
self._shutdown_lock = greenthreading.Lock()
self._shutdown = False
self._gatherer = _Gatherer(self._submit,
lock_cls=greenthreading.Lock)
self.threading.lock_object)
@property
def alive(self):