From 711e5bb9d5ebb42dec12c51ca2b742df58f830be Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 30 Sep 2015 16:23:35 -0700 Subject: [PATCH] Move most of green helper classes -> '_green.py' Instead of having the green executor(s) helper classes be inline with the rest of the other executors move its helper classes/objects into its own module and use them from there instead. This also moves the work item helper into '_utils.py' for similar/equivalent reasons. Change-Id: I028738bf7d64f97320450d63debbd5116b505a72 --- futurist/_futures.py | 108 +++++-------------------------------------- futurist/_green.py | 84 +++++++++++++++++++++++++++++++++ futurist/_utils.py | 26 +++++++++++ 3 files changed, 121 insertions(+), 97 deletions(-) create mode 100644 futurist/_green.py diff --git a/futurist/_futures.py b/futurist/_futures.py index 4d54dd1..585d6aa 100644 --- a/futurist/_futures.py +++ b/futurist/_futures.py @@ -15,24 +15,13 @@ # under the License. import functools -import sys import threading from concurrent import futures as _futures from concurrent.futures import process as _process from concurrent.futures import thread as _thread -import six - -try: - from eventlet import greenpool - from eventlet import patcher as greenpatcher - from eventlet import queue as greenqueue - - from eventlet.green import threading as greenthreading -except ImportError: - greenpatcher, greenpool, greenqueue, greenthreading = (None, None, - None, None) +from futurist import _green from futurist import _utils @@ -227,56 +216,6 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor): return self._gatherer.submit(fn, *args, **kwargs) -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - - def run(self): - if not self.future.set_running_or_notify_cancel(): - return - try: - result = self.fn(*self.args, **self.kwargs) - except BaseException: - exc_type, exc_value, exc_tb = sys.exc_info() - try: - if six.PY2: - self.future.set_exception_info(exc_value, exc_tb) - else: - self.future.set_exception(exc_value) - finally: - del(exc_type, exc_value, exc_tb) - else: - self.future.set_result(result) - - -if _utils.EVENTLET_AVAILABLE: - - class _GreenThreading(object): - - @staticmethod - def event_object(*args, **kwargs): - return greenthreading.Event(*args, **kwargs) - - @staticmethod - def lock_object(*args, **kwargs): - return greenthreading.Lock(*args, **kwargs) - - @staticmethod - def rlock_object(*args, **kwargs): - return greenthreading.RLock(*args, **kwargs) - - @staticmethod - def condition_object(*args, **kwargs): - return greenthreading.Condition(*args, **kwargs) - - _green_threading = _GreenThreading() -else: - _green_threading = None - - class SynchronousExecutor(_futures.Executor): """Executor that uses the caller to execute calls synchronously. @@ -303,7 +242,7 @@ class SynchronousExecutor(_futures.Executor): ' synchronous executor') self._shutoff = False if green: - self.threading = _green_threading + self.threading = _green.threading self._future_cls = GreenFuture else: self._future_cls = Future @@ -342,36 +281,11 @@ class SynchronousExecutor(_futures.Executor): def _submit(self, fn, *args, **kwargs): fut = self._future_cls() - runner = _WorkItem(fut, fn, args, kwargs) + runner = _utils.WorkItem(fut, fn, args, kwargs) runner.run() return fut -class _GreenWorker(object): - def __init__(self, work, work_queue): - self.work = work - self.work_queue = work_queue - - def __call__(self): - # Run our main piece of work. - try: - self.work.run() - finally: - # Consume any delayed work before finishing (this is how we finish - # work that was to big for the pool size, but needs to be finished - # no matter). - while True: - try: - w = self.work_queue.get_nowait() - except greenqueue.Empty: - break - else: - try: - w.run() - finally: - self.work_queue.task_done() - - class GreenFuture(Future): __doc__ = Future.__doc__ @@ -384,8 +298,8 @@ class GreenFuture(Future): # functions will correctly yield to eventlet. If this is not done then # waiting on the future never actually causes the greenthreads to run # and thus you wait for infinity. - if not greenpatcher.is_monkey_patched('threading'): - self._condition = greenthreading.Condition() + if not _green.is_monkey_patched('threading'): + self._condition = _green.threading.condition_object() class GreenThreadPoolExecutor(_futures.Executor): @@ -398,7 +312,7 @@ class GreenThreadPoolExecutor(_futures.Executor): It gathers statistics about the submissions executed for post-analysis... """ - threading = _green_threading + threading = _green.threading def __init__(self, max_workers=1000, check_and_reject=None): """Initializes a green thread pool executor. @@ -423,10 +337,10 @@ class GreenThreadPoolExecutor(_futures.Executor): if max_workers <= 0: raise ValueError("Max workers must be greater than zero") self._max_workers = max_workers - self._pool = greenpool.GreenPool(self._max_workers) - self._delayed_work = greenqueue.Queue() + self._pool = _green.Pool(self._max_workers) + self._delayed_work = _green.Queue() self._check_and_reject = check_and_reject or (lambda e, waiting: None) - self._shutdown_lock = greenthreading.Lock() + self._shutdown_lock = self.threading.lock_object() self._shutdown = False self._gatherer = _Gatherer(self._submit, self.threading.lock_object) @@ -458,7 +372,7 @@ class GreenThreadPoolExecutor(_futures.Executor): def _submit(self, fn, *args, **kwargs): f = GreenFuture() - work = _WorkItem(f, fn, args, kwargs) + work = _utils.WorkItem(f, fn, args, kwargs) if not self._spin_up(work): self._delayed_work.put(work) return f @@ -472,7 +386,7 @@ class GreenThreadPoolExecutor(_futures.Executor): """ alive = self._pool.running() + self._pool.waiting() if alive < self._max_workers: - self._pool.spawn_n(_GreenWorker(work, self._delayed_work)) + self._pool.spawn_n(_green.GreenWorker(work, self._delayed_work)) return True return False diff --git a/futurist/_green.py b/futurist/_green.py new file mode 100644 index 0000000..ef2c2d5 --- /dev/null +++ b/futurist/_green.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from futurist import _utils + +try: + from eventlet import greenpool + from eventlet import patcher as greenpatcher + from eventlet import queue as greenqueue + + from eventlet.green import threading as greenthreading +except ImportError: + greenpatcher, greenpool, greenqueue, greenthreading = (None, None, + None, None) + + +if _utils.EVENTLET_AVAILABLE: + # Aliases that we use and only expose vs the whole of eventlet... + Pool = greenpool.GreenPool + Queue = greenqueue.Queue + is_monkey_patched = greenpatcher.is_monkey_patched + + class GreenThreading(object): + + @staticmethod + def event_object(*args, **kwargs): + return greenthreading.Event(*args, **kwargs) + + @staticmethod + def lock_object(*args, **kwargs): + return greenthreading.Lock(*args, **kwargs) + + @staticmethod + def rlock_object(*args, **kwargs): + return greenthreading.RLock(*args, **kwargs) + + @staticmethod + def condition_object(*args, **kwargs): + return greenthreading.Condition(*args, **kwargs) + + threading = GreenThreading() +else: + threading = None + Pool = None + Queue = None + is_monkey_patched = lambda mod: False + + +class GreenWorker(object): + def __init__(self, work, work_queue): + self.work = work + self.work_queue = work_queue + + def __call__(self): + # Run our main piece of work. + try: + self.work.run() + finally: + # Consume any delayed work before finishing (this is how we finish + # work that was to big for the pool size, but needs to be finished + # no matter). + while True: + try: + w = self.work_queue.get_nowait() + except greenqueue.Empty: + break + else: + try: + w.run() + finally: + self.work_queue.task_done() diff --git a/futurist/_utils.py b/futurist/_utils.py index 1cf0c50..a80033b 100644 --- a/futurist/_utils.py +++ b/futurist/_utils.py @@ -20,6 +20,7 @@ import sys import traceback from monotonic import monotonic as now # noqa +import six try: import eventlet as _eventlet # noqa @@ -28,6 +29,31 @@ except ImportError: EVENTLET_AVAILABLE = False +class WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException: + exc_type, exc_value, exc_tb = sys.exc_info() + try: + if six.PY2: + self.future.set_exception_info(exc_value, exc_tb) + else: + self.future.set_exception(exc_value) + finally: + del(exc_type, exc_value, exc_tb) + else: + self.future.set_result(result) + + class Failure(object): """Object that captures a exception (and its associated information)."""