Merge "Add future waiting helper module"
This commit is contained in:
commit
ddad717042
|
@ -45,3 +45,11 @@ Miscellaneous
|
|||
|
||||
.. autoclass:: futurist.ExecutorStatistics
|
||||
:members:
|
||||
|
||||
-------
|
||||
Waiters
|
||||
-------
|
||||
|
||||
.. autofunction:: futurist.waiters.wait_for_any
|
||||
.. autofunction:: futurist.waiters.wait_for_all
|
||||
.. autoclass:: futurist.waiters.DoneAndNotDoneFutures
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
import eventlet
|
||||
import testscenarios
|
||||
|
||||
import futurist
|
||||
from futurist.tests import base
|
||||
from futurist import waiters
|
||||
|
||||
|
||||
# Module level functions need to be used since the process pool
|
||||
# executor can not access instance or lambda level functions (since those
|
||||
# are not pickleable).
|
||||
|
||||
def mini_delay(use_eventlet_sleep=False):
|
||||
if use_eventlet_sleep:
|
||||
eventlet.sleep(0.1)
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
return 1
|
||||
|
||||
|
||||
class TestWaiters(testscenarios.TestWithScenarios, base.TestCase):
|
||||
scenarios = [
|
||||
('sync', {'executor_cls': futurist.SynchronousExecutor,
|
||||
'executor_kwargs': {}, 'use_eventlet_sleep': False}),
|
||||
('green_sync', {'executor_cls': futurist.SynchronousExecutor,
|
||||
'executor_kwargs': {'green': True},
|
||||
'use_eventlet_sleep': True}),
|
||||
('green', {'executor_cls': futurist.GreenThreadPoolExecutor,
|
||||
'executor_kwargs': {}, 'use_eventlet_sleep': True}),
|
||||
('thread', {'executor_cls': futurist.ThreadPoolExecutor,
|
||||
'executor_kwargs': {}, 'use_eventlet_sleep': False}),
|
||||
('process', {'executor_cls': futurist.ProcessPoolExecutor,
|
||||
'executor_kwargs': {}, 'use_eventlet_sleep': False}),
|
||||
]
|
||||
|
||||
def setUp(self):
|
||||
super(TestWaiters, self).setUp()
|
||||
self.executor = self.executor_cls(**self.executor_kwargs)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestWaiters, self).tearDown()
|
||||
self.executor.shutdown()
|
||||
self.executor = None
|
||||
|
||||
def test_wait_for_any(self):
|
||||
fs = []
|
||||
for _i in range(0, 10):
|
||||
fs.append(self.executor.submit(
|
||||
mini_delay, use_eventlet_sleep=self.use_eventlet_sleep))
|
||||
all_done_fs = []
|
||||
total_fs = len(fs)
|
||||
while len(all_done_fs) != total_fs:
|
||||
done, not_done = waiters.wait_for_any(fs)
|
||||
all_done_fs.extend(done)
|
||||
fs = not_done
|
||||
self.assertEqual(total_fs, sum(f.result() for f in all_done_fs))
|
||||
|
||||
def test_wait_for_all(self):
|
||||
fs = []
|
||||
for _i in range(0, 10):
|
||||
fs.append(self.executor.submit(
|
||||
mini_delay, use_eventlet_sleep=self.use_eventlet_sleep))
|
||||
done_fs, not_done_fs = waiters.wait_for_all(fs)
|
||||
self.assertEqual(len(fs), sum(f.result() for f in done_fs))
|
||||
self.assertEqual(0, len(not_done_fs))
|
||||
|
||||
def test_no_mixed_wait_for_any(self):
|
||||
fs = [futurist.GreenFuture(), futurist.Future()]
|
||||
self.assertRaises(RuntimeError, waiters.wait_for_any, fs)
|
||||
|
||||
def test_no_mixed_wait_for_all(self):
|
||||
fs = [futurist.GreenFuture(), futurist.Future()]
|
||||
self.assertRaises(RuntimeError, waiters.wait_for_all, fs)
|
|
@ -0,0 +1,214 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
try:
|
||||
from contextlib import ExitStack
|
||||
except ImportError:
|
||||
from contextlib2 import ExitStack
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
|
||||
from concurrent import futures
|
||||
from concurrent.futures import _base
|
||||
import six
|
||||
|
||||
import futurist
|
||||
from futurist import _utils
|
||||
|
||||
try:
|
||||
from eventlet.green import threading as greenthreading
|
||||
except ImportError:
|
||||
greenthreading = None
|
||||
|
||||
|
||||
#: Named tuple returned from ``wait_for*`` calls.
|
||||
DoneAndNotDoneFutures = collections.namedtuple(
|
||||
'DoneAndNotDoneFutures', 'done not_done')
|
||||
|
||||
_DONE_STATES = frozenset([
|
||||
_base.CANCELLED_AND_NOTIFIED,
|
||||
_base.FINISHED,
|
||||
])
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _acquire_and_release_futures(fs):
|
||||
# Do this to ensure that we always get the futures in the same order (aka
|
||||
# always acquire the conditions in the same order, no matter what; a way
|
||||
# to avoid dead-lock).
|
||||
fs = sorted(fs, key=id)
|
||||
with ExitStack() as stack:
|
||||
for fut in fs:
|
||||
stack.enter_context(fut._condition)
|
||||
yield
|
||||
|
||||
|
||||
def _ensure_eventlet(func):
|
||||
"""Decorator that verifies we have the needed eventlet components."""
|
||||
|
||||
@six.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if not _utils.EVENTLET_AVAILABLE or greenthreading is None:
|
||||
raise RuntimeError('Eventlet is needed to wait on green futures')
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def _wait_for(fs, no_green_return_when, on_all_green_cb,
|
||||
caller_name, timeout=None):
|
||||
green_fs = sum(1 for f in fs if isinstance(f, futurist.GreenFuture))
|
||||
if not green_fs:
|
||||
done, not_done = futures.wait(fs, timeout=timeout,
|
||||
return_when=no_green_return_when)
|
||||
return DoneAndNotDoneFutures(done, not_done)
|
||||
else:
|
||||
non_green_fs = len(fs) - green_fs
|
||||
if non_green_fs:
|
||||
raise RuntimeError("Can not wait on %s green futures and %s"
|
||||
" non-green futures in the same"
|
||||
" `%s` call" % (green_fs, non_green_fs,
|
||||
caller_name))
|
||||
else:
|
||||
return on_all_green_cb(fs, timeout=timeout)
|
||||
|
||||
|
||||
def wait_for_all(fs, timeout=None):
|
||||
"""Wait for all of the futures to complete.
|
||||
|
||||
Works correctly with both green and non-green futures (but not both
|
||||
together, since this can't be guaranteed to avoid dead-lock due to how
|
||||
the waiting implementations are different when green threads are being
|
||||
used).
|
||||
|
||||
Returns pair (done futures, not done futures).
|
||||
"""
|
||||
return _wait_for(fs, futures.ALL_COMPLETED, _wait_for_all_green,
|
||||
'wait_for_all', timeout=timeout)
|
||||
|
||||
|
||||
def wait_for_any(fs, timeout=None):
|
||||
"""Wait for one (**any**) of the futures to complete.
|
||||
|
||||
Works correctly with both green and non-green futures (but not both
|
||||
together, since this can't be guaranteed to avoid dead-lock due to how
|
||||
the waiting implementations are different when green threads are being
|
||||
used).
|
||||
|
||||
Returns pair (done futures, not done futures).
|
||||
"""
|
||||
return _wait_for(fs, futures.FIRST_COMPLETED, _wait_for_any_green,
|
||||
'wait_for_any', timeout=timeout)
|
||||
|
||||
|
||||
class _AllGreenWaiter(object):
|
||||
"""Provides the event that ``_wait_for_all_green`` blocks on."""
|
||||
|
||||
def __init__(self, pending):
|
||||
self.event = greenthreading.Event()
|
||||
self.lock = greenthreading.Lock()
|
||||
self.pending = pending
|
||||
|
||||
def _decrement_pending(self):
|
||||
with self.lock:
|
||||
self.pending -= 1
|
||||
if self.pending <= 0:
|
||||
self.event.set()
|
||||
|
||||
def add_result(self, future):
|
||||
self._decrement_pending()
|
||||
|
||||
def add_exception(self, future):
|
||||
self._decrement_pending()
|
||||
|
||||
def add_cancelled(self, future):
|
||||
self._decrement_pending()
|
||||
|
||||
|
||||
class _AnyGreenWaiter(object):
|
||||
"""Provides the event that ``_wait_for_any_green`` blocks on."""
|
||||
|
||||
def __init__(self):
|
||||
self.event = greenthreading.Event()
|
||||
|
||||
def add_result(self, future):
|
||||
self.event.set()
|
||||
|
||||
def add_exception(self, future):
|
||||
self.event.set()
|
||||
|
||||
def add_cancelled(self, future):
|
||||
self.event.set()
|
||||
|
||||
|
||||
def _partition_futures(fs):
|
||||
done = set()
|
||||
not_done = set()
|
||||
for f in fs:
|
||||
if f._state in _DONE_STATES:
|
||||
done.add(f)
|
||||
else:
|
||||
not_done.add(f)
|
||||
return done, not_done
|
||||
|
||||
|
||||
def _create_and_install_waiters(fs, waiter_cls, *args, **kwargs):
|
||||
waiter = waiter_cls(*args, **kwargs)
|
||||
for f in fs:
|
||||
f._waiters.append(waiter)
|
||||
return waiter
|
||||
|
||||
|
||||
@_ensure_eventlet
|
||||
def _wait_for_all_green(fs, timeout=None):
|
||||
if not fs:
|
||||
return DoneAndNotDoneFutures(set(), set())
|
||||
|
||||
with _acquire_and_release_futures(fs):
|
||||
done, not_done = _partition_futures(fs)
|
||||
if len(done) == len(fs):
|
||||
return DoneAndNotDoneFutures(done, not_done)
|
||||
waiter = _create_and_install_waiters(not_done,
|
||||
_AllGreenWaiter,
|
||||
len(not_done))
|
||||
waiter.event.wait(timeout)
|
||||
for f in not_done:
|
||||
f._waiters.remove(waiter)
|
||||
|
||||
with _acquire_and_release_futures(fs):
|
||||
done, not_done = _partition_futures(fs)
|
||||
return DoneAndNotDoneFutures(done, not_done)
|
||||
|
||||
|
||||
@_ensure_eventlet
|
||||
def _wait_for_any_green(fs, timeout=None):
|
||||
if not fs:
|
||||
return DoneAndNotDoneFutures(set(), set())
|
||||
|
||||
with _acquire_and_release_futures(fs):
|
||||
done, not_done = _partition_futures(fs)
|
||||
if done:
|
||||
return DoneAndNotDoneFutures(done, not_done)
|
||||
waiter = _create_and_install_waiters(fs, _AnyGreenWaiter)
|
||||
|
||||
waiter.event.wait(timeout)
|
||||
for f in fs:
|
||||
f._waiters.remove(waiter)
|
||||
|
||||
with _acquire_and_release_futures(fs):
|
||||
done, not_done = _partition_futures(fs)
|
||||
return DoneAndNotDoneFutures(done, not_done)
|
|
@ -6,3 +6,4 @@ pbr<2.0,>=0.11
|
|||
six>=1.9.0
|
||||
monotonic>=0.1 # Apache-2.0
|
||||
futures>=3.0;python_version=='2.7' or python_version=='2.6'
|
||||
contextlib2>=0.4.0 # PSF License
|
||||
|
|
Loading…
Reference in New Issue