Merge "Move most of green helper classes -> '_green.py'"

This commit is contained in:
Jenkins 2015-10-08 22:25:34 +00:00 committed by Gerrit Code Review
commit 4da07660a0
3 changed files with 121 additions and 97 deletions

View File

@ -15,24 +15,13 @@
# under the License.
import functools
import sys
import threading
from concurrent import futures as _futures
from concurrent.futures import process as _process
from concurrent.futures import thread as _thread
import six
try:
from eventlet import greenpool
from eventlet import patcher as greenpatcher
from eventlet import queue as greenqueue
from eventlet.green import threading as greenthreading
except ImportError:
greenpatcher, greenpool, greenqueue, greenthreading = (None, None,
None, None)
from futurist import _green
from futurist import _utils
@ -227,56 +216,6 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
return self._gatherer.submit(fn, *args, **kwargs)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
exc_type, exc_value, exc_tb = sys.exc_info()
try:
if six.PY2:
self.future.set_exception_info(exc_value, exc_tb)
else:
self.future.set_exception(exc_value)
finally:
del(exc_type, exc_value, exc_tb)
else:
self.future.set_result(result)
if _utils.EVENTLET_AVAILABLE:
class _GreenThreading(object):
@staticmethod
def event_object(*args, **kwargs):
return greenthreading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return greenthreading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return greenthreading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return greenthreading.Condition(*args, **kwargs)
_green_threading = _GreenThreading()
else:
_green_threading = None
class SynchronousExecutor(_futures.Executor):
"""Executor that uses the caller to execute calls synchronously.
@ -303,7 +242,7 @@ class SynchronousExecutor(_futures.Executor):
' synchronous executor')
self._shutoff = False
if green:
self.threading = _green_threading
self.threading = _green.threading
self._future_cls = GreenFuture
else:
self._future_cls = Future
@ -342,36 +281,11 @@ class SynchronousExecutor(_futures.Executor):
def _submit(self, fn, *args, **kwargs):
fut = self._future_cls()
runner = _WorkItem(fut, fn, args, kwargs)
runner = _utils.WorkItem(fut, fn, args, kwargs)
runner.run()
return fut
class _GreenWorker(object):
def __init__(self, work, work_queue):
self.work = work
self.work_queue = work_queue
def __call__(self):
# Run our main piece of work.
try:
self.work.run()
finally:
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
try:
w.run()
finally:
self.work_queue.task_done()
class GreenFuture(Future):
__doc__ = Future.__doc__
@ -384,8 +298,8 @@ class GreenFuture(Future):
# functions will correctly yield to eventlet. If this is not done then
# waiting on the future never actually causes the greenthreads to run
# and thus you wait for infinity.
if not greenpatcher.is_monkey_patched('threading'):
self._condition = greenthreading.Condition()
if not _green.is_monkey_patched('threading'):
self._condition = _green.threading.condition_object()
class GreenThreadPoolExecutor(_futures.Executor):
@ -398,7 +312,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
It gathers statistics about the submissions executed for post-analysis...
"""
threading = _green_threading
threading = _green.threading
def __init__(self, max_workers=1000, check_and_reject=None):
"""Initializes a green thread pool executor.
@ -423,10 +337,10 @@ class GreenThreadPoolExecutor(_futures.Executor):
if max_workers <= 0:
raise ValueError("Max workers must be greater than zero")
self._max_workers = max_workers
self._pool = greenpool.GreenPool(self._max_workers)
self._delayed_work = greenqueue.Queue()
self._pool = _green.Pool(self._max_workers)
self._delayed_work = _green.Queue()
self._check_and_reject = check_and_reject or (lambda e, waiting: None)
self._shutdown_lock = greenthreading.Lock()
self._shutdown_lock = self.threading.lock_object()
self._shutdown = False
self._gatherer = _Gatherer(self._submit,
self.threading.lock_object)
@ -458,7 +372,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
def _submit(self, fn, *args, **kwargs):
f = GreenFuture()
work = _WorkItem(f, fn, args, kwargs)
work = _utils.WorkItem(f, fn, args, kwargs)
if not self._spin_up(work):
self._delayed_work.put(work)
return f
@ -472,7 +386,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
"""
alive = self._pool.running() + self._pool.waiting()
if alive < self._max_workers:
self._pool.spawn_n(_GreenWorker(work, self._delayed_work))
self._pool.spawn_n(_green.GreenWorker(work, self._delayed_work))
return True
return False

84
futurist/_green.py Normal file
View File

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from futurist import _utils
try:
from eventlet import greenpool
from eventlet import patcher as greenpatcher
from eventlet import queue as greenqueue
from eventlet.green import threading as greenthreading
except ImportError:
greenpatcher, greenpool, greenqueue, greenthreading = (None, None,
None, None)
if _utils.EVENTLET_AVAILABLE:
# Aliases that we use and only expose vs the whole of eventlet...
Pool = greenpool.GreenPool
Queue = greenqueue.Queue
is_monkey_patched = greenpatcher.is_monkey_patched
class GreenThreading(object):
@staticmethod
def event_object(*args, **kwargs):
return greenthreading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return greenthreading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return greenthreading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return greenthreading.Condition(*args, **kwargs)
threading = GreenThreading()
else:
threading = None
Pool = None
Queue = None
is_monkey_patched = lambda mod: False
class GreenWorker(object):
def __init__(self, work, work_queue):
self.work = work
self.work_queue = work_queue
def __call__(self):
# Run our main piece of work.
try:
self.work.run()
finally:
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
try:
w.run()
finally:
self.work_queue.task_done()

View File

@ -20,6 +20,7 @@ import sys
import traceback
from monotonic import monotonic as now # noqa
import six
try:
import eventlet as _eventlet # noqa
@ -28,6 +29,31 @@ except ImportError:
EVENTLET_AVAILABLE = False
class WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
exc_type, exc_value, exc_tb = sys.exc_info()
try:
if six.PY2:
self.future.set_exception_info(exc_value, exc_tb)
else:
self.future.set_exception(exc_value)
finally:
del(exc_type, exc_value, exc_tb)
else:
self.future.set_result(result)
class Failure(object):
"""Object that captures a exception (and its associated information)."""