Merge "Provide a thread differentiation attribute on executors"
This commit is contained in:
commit
32c3fa2a7a
|
@ -44,11 +44,29 @@ class RejectedSubmission(Exception):
|
|||
Future = _futures.Future
|
||||
|
||||
|
||||
class _Threading(object):
|
||||
|
||||
@staticmethod
|
||||
def event_object(*args, **kwargs):
|
||||
return threading.Event(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def lock_object(*args, **kwargs):
|
||||
return threading.Lock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def rlock_object(*args, **kwargs):
|
||||
return threading.RLock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def condition_object(*args, **kwargs):
|
||||
return threading.Condition(*args, **kwargs)
|
||||
|
||||
|
||||
class _Gatherer(object):
|
||||
def __init__(self, submit_func,
|
||||
lock_cls=threading.Lock, start_before_submit=False):
|
||||
def __init__(self, submit_func, lock_factory, start_before_submit=False):
|
||||
self._submit_func = submit_func
|
||||
self._stats_lock = lock_cls()
|
||||
self._stats_lock = lock_factory()
|
||||
self._stats = ExecutorStatistics()
|
||||
self._start_before_submit = start_before_submit
|
||||
|
||||
|
@ -112,6 +130,9 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
|
|||
|
||||
See: https://docs.python.org/dev/library/concurrent.futures.html
|
||||
"""
|
||||
|
||||
threading = _Threading()
|
||||
|
||||
def __init__(self, max_workers=None, check_and_reject=None):
|
||||
"""Initializes a thread pool executor.
|
||||
|
||||
|
@ -145,7 +166,8 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
|
|||
# Since our submit will use this gatherer we have to reference
|
||||
# the parent submit, bound to this instance (which is what we
|
||||
# really want to use anyway).
|
||||
super(ThreadPoolExecutor, self).submit)
|
||||
super(ThreadPoolExecutor, self).submit,
|
||||
self.threading.lock_object)
|
||||
|
||||
@property
|
||||
def statistics(self):
|
||||
|
@ -174,6 +196,9 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
|
|||
|
||||
See: https://docs.python.org/dev/library/concurrent.futures.html
|
||||
"""
|
||||
|
||||
threading = _Threading()
|
||||
|
||||
def __init__(self, max_workers=None):
|
||||
if max_workers is None:
|
||||
max_workers = _utils.get_optimal_thread_count()
|
||||
|
@ -184,7 +209,8 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
|
|||
# Since our submit will use this gatherer we have to reference
|
||||
# the parent submit, bound to this instance (which is what we
|
||||
# really want to use anyway).
|
||||
super(ProcessPoolExecutor, self).submit)
|
||||
super(ProcessPoolExecutor, self).submit,
|
||||
self.threading.lock_object)
|
||||
|
||||
@property
|
||||
def alive(self):
|
||||
|
@ -226,6 +252,31 @@ class _WorkItem(object):
|
|||
self.future.set_result(result)
|
||||
|
||||
|
||||
if _utils.EVENTLET_AVAILABLE:
|
||||
|
||||
class _GreenThreading(object):
|
||||
|
||||
@staticmethod
|
||||
def event_object(*args, **kwargs):
|
||||
return greenthreading.Event(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def lock_object(*args, **kwargs):
|
||||
return greenthreading.Lock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def rlock_object(*args, **kwargs):
|
||||
return greenthreading.RLock(*args, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def condition_object(*args, **kwargs):
|
||||
return greenthreading.Condition(*args, **kwargs)
|
||||
|
||||
_green_threading = _GreenThreading()
|
||||
else:
|
||||
_green_threading = None
|
||||
|
||||
|
||||
class SynchronousExecutor(_futures.Executor):
|
||||
"""Executor that uses the caller to execute calls synchronously.
|
||||
|
||||
|
@ -237,6 +288,8 @@ class SynchronousExecutor(_futures.Executor):
|
|||
It gathers statistics about the submissions executed for post-analysis...
|
||||
"""
|
||||
|
||||
threading = _Threading()
|
||||
|
||||
def __init__(self, green=False):
|
||||
"""Synchronous executor constructor.
|
||||
|
||||
|
@ -250,14 +303,13 @@ class SynchronousExecutor(_futures.Executor):
|
|||
' synchronous executor')
|
||||
self._shutoff = False
|
||||
if green:
|
||||
lock_cls = greenthreading.Lock
|
||||
self.threading = _green_threading
|
||||
self._future_cls = GreenFuture
|
||||
else:
|
||||
lock_cls = threading.Lock
|
||||
self._future_cls = Future
|
||||
self._gatherer = _Gatherer(self._submit,
|
||||
start_before_submit=True,
|
||||
lock_cls=lock_cls)
|
||||
self.threading.lock_object,
|
||||
start_before_submit=True)
|
||||
|
||||
@property
|
||||
def alive(self):
|
||||
|
@ -347,6 +399,8 @@ class GreenThreadPoolExecutor(_futures.Executor):
|
|||
It gathers statistics about the submissions executed for post-analysis...
|
||||
"""
|
||||
|
||||
threading = _green_threading
|
||||
|
||||
def __init__(self, max_workers=1000, check_and_reject=None):
|
||||
"""Initializes a green thread pool executor.
|
||||
|
||||
|
@ -376,7 +430,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
|
|||
self._shutdown_lock = greenthreading.Lock()
|
||||
self._shutdown = False
|
||||
self._gatherer = _Gatherer(self._submit,
|
||||
lock_cls=greenthreading.Lock)
|
||||
self.threading.lock_object)
|
||||
|
||||
@property
|
||||
def alive(self):
|
||||
|
|
Loading…
Reference in New Issue