Merge "Provide a thread differentiation attribute on executors"

This commit is contained in:
Jenkins 2015-08-22 01:41:15 +00:00 committed by Gerrit Code Review
commit 32c3fa2a7a
1 changed files with 64 additions and 10 deletions

View File

@ -44,11 +44,29 @@ class RejectedSubmission(Exception):
Future = _futures.Future
class _Threading(object):
@staticmethod
def event_object(*args, **kwargs):
return threading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return threading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return threading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return threading.Condition(*args, **kwargs)
class _Gatherer(object):
def __init__(self, submit_func,
lock_cls=threading.Lock, start_before_submit=False):
def __init__(self, submit_func, lock_factory, start_before_submit=False):
self._submit_func = submit_func
self._stats_lock = lock_cls()
self._stats_lock = lock_factory()
self._stats = ExecutorStatistics()
self._start_before_submit = start_before_submit
@ -112,6 +130,9 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
threading = _Threading()
def __init__(self, max_workers=None, check_and_reject=None):
"""Initializes a thread pool executor.
@ -145,7 +166,8 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ThreadPoolExecutor, self).submit)
super(ThreadPoolExecutor, self).submit,
self.threading.lock_object)
@property
def statistics(self):
@ -174,6 +196,9 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
threading = _Threading()
def __init__(self, max_workers=None):
if max_workers is None:
max_workers = _utils.get_optimal_thread_count()
@ -184,7 +209,8 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
# really want to use anyway).
super(ProcessPoolExecutor, self).submit)
super(ProcessPoolExecutor, self).submit,
self.threading.lock_object)
@property
def alive(self):
@ -226,6 +252,31 @@ class _WorkItem(object):
self.future.set_result(result)
if _utils.EVENTLET_AVAILABLE:
class _GreenThreading(object):
@staticmethod
def event_object(*args, **kwargs):
return greenthreading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return greenthreading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return greenthreading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return greenthreading.Condition(*args, **kwargs)
_green_threading = _GreenThreading()
else:
_green_threading = None
class SynchronousExecutor(_futures.Executor):
"""Executor that uses the caller to execute calls synchronously.
@ -237,6 +288,8 @@ class SynchronousExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _Threading()
def __init__(self, green=False):
"""Synchronous executor constructor.
@ -250,14 +303,13 @@ class SynchronousExecutor(_futures.Executor):
' synchronous executor')
self._shutoff = False
if green:
lock_cls = greenthreading.Lock
self.threading = _green_threading
self._future_cls = GreenFuture
else:
lock_cls = threading.Lock
self._future_cls = Future
self._gatherer = _Gatherer(self._submit,
start_before_submit=True,
lock_cls=lock_cls)
self.threading.lock_object,
start_before_submit=True)
@property
def alive(self):
@ -347,6 +399,8 @@ class GreenThreadPoolExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _green_threading
def __init__(self, max_workers=1000, check_and_reject=None):
"""Initializes a green thread pool executor.
@ -376,7 +430,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
self._shutdown_lock = greenthreading.Lock()
self._shutdown = False
self._gatherer = _Gatherer(self._submit,
lock_cls=greenthreading.Lock)
self.threading.lock_object)
@property
def alive(self):