Merge "Allow providing a callback to certain executors to reject new work"

This commit is contained in:
Jenkins 2015-07-30 20:52:56 +00:00 committed by Gerrit Code Review
commit 5c5801bee0
4 changed files with 98 additions and 3 deletions

View File

@ -8,9 +8,11 @@ Executors
.. autoclass:: futurist.GreenThreadPoolExecutor
:members:
:special-members: __init__
.. autoclass:: futurist.ProcessPoolExecutor
:members:
:special-members: __init__
.. autoclass:: futurist.SynchronousExecutor
:members:
@ -18,6 +20,7 @@ Executors
.. autoclass:: futurist.ThreadPoolExecutor
:members:
:special-members: __init__
-------
Futures
@ -49,6 +52,13 @@ Miscellaneous
.. autoclass:: futurist.ExecutorStatistics
:members:
----------
Exceptions
----------
.. autoclass:: futurist.RejectedSubmission
:members:
-------
Waiters
-------

View File

@ -24,4 +24,6 @@ from futurist._futures import ProcessPoolExecutor # noqa
from futurist._futures import SynchronousExecutor # noqa
from futurist._futures import ThreadPoolExecutor # noqa
from futurist._futures import RejectedSubmission # noqa
from futurist._futures import ExecutorStatistics # noqa

View File

@ -36,6 +36,10 @@ except ImportError:
from futurist import _utils
class RejectedSubmission(Exception):
"""Exception raised when a submitted call is rejected (for some reason)."""
# NOTE(harlowja): Allows for simpler access to this type...
Future = _futures.Future
@ -108,12 +112,35 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
See: https://docs.python.org/dev/library/concurrent.futures.html
"""
def __init__(self, max_workers=None):
def __init__(self, max_workers=None, check_and_reject=None):
"""Initializes a thread pool executor.
:param max_workers: maximum number of workers that can be
simulatenously active at the same time, further
submitted work will be queued up when this limit
is reached.
:type max_workers: int
:param check_and_reject: a callback function that will be provided
two position arguments, the first argument
will be this executor instance, and the second
will be the number of currently queued work
items in this executors backlog; the callback
should raise a :py:class:`.RejectedSubmission`
exception if it wants to have this submission
rejected.
:type check_and_reject: callback
"""
if max_workers is None:
max_workers = _utils.get_optimal_thread_count()
super(ThreadPoolExecutor, self).__init__(max_workers=max_workers)
if self._max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
# NOTE(harlowja): this replaces the parent classes non-reentrant lock
# with a reentrant lock so that we can correctly call into the check
# and reject lock, and that it will block correctly if another
# submit call is done during that...
self._shutdown_lock = threading.RLock()
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
self._gatherer = _Gatherer(
# Since our submit will use this gatherer we have to reference
# the parent submit, bound to this instance (which is what we
@ -132,7 +159,12 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
def submit(self, fn, *args, **kwargs):
"""Submit some work to be executed (and gather statistics)."""
return self._gatherer.submit(fn, *args, **kwargs)
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('Can not schedule new futures'
' after being shutdown')
self._check_and_reject(self, self._work_queue.qsize())
return self._gatherer.submit(fn, *args, **kwargs)
class ProcessPoolExecutor(_process.ProcessPoolExecutor):
@ -315,7 +347,24 @@ class GreenThreadPoolExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
def __init__(self, max_workers=1000):
def __init__(self, max_workers=1000, check_and_reject=None):
"""Initializes a green thread pool executor.
:param max_workers: maximum number of workers that can be
simulatenously active at the same time, further
submitted work will be queued up when this limit
is reached.
:type max_workers: int
:param check_and_reject: a callback function that will be provided
two position arguments, the first argument
will be this executor instance, and the second
will be the number of currently queued work
items in this executors backlog; the callback
should raise a :py:class:`.RejectedSubmission`
exception if it wants to have this submission
rejected.
:type check_and_reject: callback
"""
if not _utils.EVENTLET_AVAILABLE:
raise RuntimeError('Eventlet is needed to use a green executor')
if max_workers <= 0:
@ -323,6 +372,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
self._max_workers = max_workers
self._pool = greenpool.GreenPool(self._max_workers)
self._delayed_work = greenqueue.Queue()
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
self._shutdown_lock = greenthreading.Lock()
self._shutdown = False
self._gatherer = _Gatherer(self._submit,
@ -350,6 +400,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
if self._shutdown:
raise RuntimeError('Can not schedule new futures'
' after being shutdown')
self._check_and_reject(self, self._delayed_work.qsize())
return self._gatherer.submit(fn, *args, **kwargs)
def _submit(self, fn, *args, **kwargs):

32
futurist/rejection.py Normal file
View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Executor rejection strategies."""
import futurist
def reject_when_reached(max_backlog):
"""Returns a function that will raise when backlog goes past max size."""
def _rejector(executor, backlog):
if backlog + 1 >= max_backlog:
raise futurist.RejectedSubmission("Current backlog %s is not"
" allowed to go"
" beyond %s" % (backlog,
max_backlog))
return _rejector