From 7a7059da00005e9153846d59044b6f9d98398d55 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 23 Jul 2015 12:42:23 -0700 Subject: [PATCH] Allow providing a callback to certain executors to reject new work Change-Id: Id71b9ed45cd9b71c82eed1562cd3f40701a3a983 --- doc/source/api.rst | 10 ++++++++ futurist/__init__.py | 2 ++ futurist/_futures.py | 57 ++++++++++++++++++++++++++++++++++++++++--- futurist/rejection.py | 32 ++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 futurist/rejection.py diff --git a/doc/source/api.rst b/doc/source/api.rst index 38db6b7..b9897a9 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -8,9 +8,11 @@ Executors .. autoclass:: futurist.GreenThreadPoolExecutor :members: + :special-members: __init__ .. autoclass:: futurist.ProcessPoolExecutor :members: + :special-members: __init__ .. autoclass:: futurist.SynchronousExecutor :members: @@ -18,6 +20,7 @@ Executors .. autoclass:: futurist.ThreadPoolExecutor :members: + :special-members: __init__ ------- Futures @@ -46,6 +49,13 @@ Miscellaneous .. autoclass:: futurist.ExecutorStatistics :members: +---------- +Exceptions +---------- + +.. autoclass:: futurist.RejectedSubmission + :members: + ------- Waiters ------- diff --git a/futurist/__init__.py b/futurist/__init__.py index 1fad1cb..023c086 100644 --- a/futurist/__init__.py +++ b/futurist/__init__.py @@ -24,4 +24,6 @@ from futurist._futures import ProcessPoolExecutor # noqa from futurist._futures import SynchronousExecutor # noqa from futurist._futures import ThreadPoolExecutor # noqa +from futurist._futures import RejectedSubmission # noqa + from futurist._futures import ExecutorStatistics # noqa diff --git a/futurist/_futures.py b/futurist/_futures.py index f880ee6..8bf23cb 100644 --- a/futurist/_futures.py +++ b/futurist/_futures.py @@ -36,6 +36,10 @@ except ImportError: from futurist import _utils +class RejectedSubmission(Exception): + """Exception raised when a submitted call is rejected (for some reason).""" + + # NOTE(harlowja): Allows for simpler access to this type... Future = _futures.Future @@ -108,12 +112,35 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor): See: https://docs.python.org/dev/library/concurrent.futures.html """ - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, check_and_reject=None): + """Initializes a thread pool executor. + + :param max_workers: maximum number of workers that can be + simulatenously active at the same time, further + submitted work will be queued up when this limit + is reached. + :type max_workers: int + :param check_and_reject: a callback function that will be provided + two position arguments, the first argument + will be this executor instance, and the second + will be the number of currently queued work + items in this executors backlog; the callback + should raise a :py:class:`.RejectedSubmission` + exception if it wants to have this submission + rejected. + :type check_and_reject: callback + """ if max_workers is None: max_workers = _utils.get_optimal_thread_count() super(ThreadPoolExecutor, self).__init__(max_workers=max_workers) if self._max_workers <= 0: raise ValueError("Max workers must be greater than zero") + # NOTE(harlowja): this replaces the parent classes non-reentrant lock + # with a reentrant lock so that we can correctly call into the check + # and reject lock, and that it will block correctly if another + # submit call is done during that... + self._shutdown_lock = threading.RLock() + self._check_and_reject = check_and_reject or (lambda e, waiting: None) self._gatherer = _Gatherer( # Since our submit will use this gatherer we have to reference # the parent submit, bound to this instance (which is what we @@ -132,7 +159,12 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor): def submit(self, fn, *args, **kwargs): """Submit some work to be executed (and gather statistics).""" - return self._gatherer.submit(fn, *args, **kwargs) + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('Can not schedule new futures' + ' after being shutdown') + self._check_and_reject(self, self._work_queue.qsize()) + return self._gatherer.submit(fn, *args, **kwargs) class ProcessPoolExecutor(_process.ProcessPoolExecutor): @@ -315,7 +347,24 @@ class GreenThreadPoolExecutor(_futures.Executor): It gathers statistics about the submissions executed for post-analysis... """ - def __init__(self, max_workers=1000): + def __init__(self, max_workers=1000, check_and_reject=None): + """Initializes a green thread pool executor. + + :param max_workers: maximum number of workers that can be + simulatenously active at the same time, further + submitted work will be queued up when this limit + is reached. + :type max_workers: int + :param check_and_reject: a callback function that will be provided + two position arguments, the first argument + will be this executor instance, and the second + will be the number of currently queued work + items in this executors backlog; the callback + should raise a :py:class:`.RejectedSubmission` + exception if it wants to have this submission + rejected. + :type check_and_reject: callback + """ if not _utils.EVENTLET_AVAILABLE: raise RuntimeError('Eventlet is needed to use a green executor') if max_workers <= 0: @@ -323,6 +372,7 @@ class GreenThreadPoolExecutor(_futures.Executor): self._max_workers = max_workers self._pool = greenpool.GreenPool(self._max_workers) self._delayed_work = greenqueue.Queue() + self._check_and_reject = check_and_reject or (lambda e, waiting: None) self._shutdown_lock = greenthreading.Lock() self._shutdown = False self._gatherer = _Gatherer(self._submit, @@ -350,6 +400,7 @@ class GreenThreadPoolExecutor(_futures.Executor): if self._shutdown: raise RuntimeError('Can not schedule new futures' ' after being shutdown') + self._check_and_reject(self, self._delayed_work.qsize()) return self._gatherer.submit(fn, *args, **kwargs) def _submit(self, fn, *args, **kwargs): diff --git a/futurist/rejection.py b/futurist/rejection.py new file mode 100644 index 0000000..3ea19f8 --- /dev/null +++ b/futurist/rejection.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Executor rejection strategies.""" + +import futurist + + +def reject_when_reached(max_backlog): + """Returns a function that will raise when backlog goes past max size.""" + + def _rejector(executor, backlog): + if backlog + 1 >= max_backlog: + raise futurist.RejectedSubmission("Current backlog %s is not" + " allowed to go" + " beyond %s" % (backlog, + max_backlog)) + + return _rejector