Type check Semaphore, GreenPool arguments; Thanks to Matthew D. Pagel

- export Event, *Semaphore in `eventlet.` top level namespace

https://github.com/eventlet/eventlet/issues/364
This commit is contained in:
Sergey Shepelev 2016-12-23 01:36:36 +03:00
parent e1bb2fee1d
commit 79292bd16a
5 changed files with 125 additions and 95 deletions

View File

@ -7,37 +7,45 @@ __version__ = '.'.join(map(str, version_info))
# errors of greenlet so that the packager can still at least
# access the version. Also this makes easy_install a little quieter
if os.environ.get('EVENTLET_IMPORT_VERSION_ONLY') != '1':
from eventlet import greenthread
from eventlet import greenpool
from eventlet import queue
from eventlet import timeout
from eventlet import patcher
from eventlet import convenience
from eventlet import event
from eventlet import greenpool
from eventlet import greenthread
from eventlet import patcher
from eventlet import queue
from eventlet import semaphore
from eventlet import timeout
import greenlet
sleep = greenthread.sleep
spawn = greenthread.spawn
spawn_n = greenthread.spawn_n
spawn_after = greenthread.spawn_after
kill = greenthread.kill
Timeout = timeout.Timeout
with_timeout = timeout.with_timeout
GreenPool = greenpool.GreenPool
GreenPile = greenpool.GreenPile
Queue = queue.Queue
import_patched = patcher.import_patched
monkey_patch = patcher.monkey_patch
connect = convenience.connect
listen = convenience.listen
serve = convenience.serve
StopServe = convenience.StopServe
wrap_ssl = convenience.wrap_ssl
Event = event.Event
GreenPool = greenpool.GreenPool
GreenPile = greenpool.GreenPile
sleep = greenthread.sleep
spawn = greenthread.spawn
spawn_n = greenthread.spawn_n
spawn_after = greenthread.spawn_after
kill = greenthread.kill
import_patched = patcher.import_patched
monkey_patch = patcher.monkey_patch
Queue = queue.Queue
Semaphore = semaphore.Semaphore
CappedSemaphore = semaphore.CappedSemaphore
BoundedSemaphore = semaphore.BoundedSemaphore
Timeout = timeout.Timeout
with_timeout = timeout.with_timeout
getcurrent = greenlet.greenlet.getcurrent
# deprecated

View File

@ -1,9 +1,7 @@
import traceback
from eventlet import event
from eventlet import greenthread
import eventlet
from eventlet import queue
from eventlet import semaphore
from eventlet.support import greenlets as greenlet
from eventlet.support import six
@ -17,10 +15,18 @@ class GreenPool(object):
"""
def __init__(self, size=1000):
try:
size = int(size)
except ValueError as e:
msg = 'GreenPool() expect size :: int, actual: {0} {1}'.format(type(size), str(e))
raise TypeError(msg)
if size < 0:
msg = 'GreenPool() expect size >= 0, actual: {0}'.format(repr(size))
raise ValueError(msg)
self.size = size
self.coroutines_running = set()
self.sem = semaphore.Semaphore(size)
self.no_coros_running = event.Event()
self.sem = eventlet.Semaphore(size)
self.no_coros_running = eventlet.Event()
def resize(self, new_size):
""" Change the max number of greenthreads doing work at any given time.
@ -49,7 +55,7 @@ class GreenPool(object):
def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread.
Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
Returns the :class:`GreenThread <eventlet.GreenThread>`
object that is running the function, which can be used to retrieve the
results.
@ -61,17 +67,17 @@ class GreenPool(object):
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = greenthread.getcurrent()
current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
# a bit hacky to use the GT without switching to it
gt = greenthread.GreenThread(current)
gt = eventlet.greenthread.GreenThread(current)
gt.main(function, args, kwargs)
return gt
else:
self.sem.acquire()
gt = greenthread.spawn(function, *args, **kwargs)
gt = eventlet.spawn(function, *args, **kwargs)
if not self.coroutines_running:
self.no_coros_running = event.Event()
self.no_coros_running = eventlet.Event()
self.coroutines_running.add(gt)
gt.link(self._spawn_done)
return gt
@ -89,7 +95,7 @@ class GreenPool(object):
if coro is None:
return
else:
coro = greenthread.getcurrent()
coro = eventlet.getcurrent()
self._spawn_done(coro)
def spawn_n(self, function, *args, **kwargs):
@ -99,21 +105,21 @@ class GreenPool(object):
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = greenthread.getcurrent()
current = eventlet.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
self._spawn_n_impl(function, args, kwargs, None)
else:
self.sem.acquire()
g = greenthread.spawn_n(
g = eventlet.spawn_n(
self._spawn_n_impl,
function, args, kwargs, True)
if not self.coroutines_running:
self.no_coros_running = event.Event()
self.no_coros_running = eventlet.Event()
self.coroutines_running.add(g)
def waitall(self):
"""Waits until all greenthreads in the pool are finished working."""
assert greenthread.getcurrent() not in self.coroutines_running, \
assert eventlet.getcurrent() not in self.coroutines_running, \
"Calling waitall() from within one of the " \
"GreenPool's greenthreads will never terminate."
if self.running():
@ -151,7 +157,7 @@ class GreenPool(object):
if function is None:
function = lambda *a: a
gi = GreenMap(self.size)
greenthread.spawn_n(self._do_map, function, iterable, gi)
eventlet.spawn_n(self._do_map, function, iterable, gi)
return gi
def imap(self, function, *iterables):

View File

@ -1,10 +1,7 @@
from __future__ import with_statement
import collections
from eventlet import greenthread
import eventlet
from eventlet import hubs
from eventlet.timeout import Timeout
class Semaphore(object):
@ -34,10 +31,15 @@ class Semaphore(object):
"""
def __init__(self, value=1):
self.counter = value
try:
value = int(value)
except ValueError as e:
msg = 'Semaphore() expect value :: int, actual: {0} {1}'.format(type(value), str(e))
raise TypeError(msg)
if value < 0:
raise ValueError("Semaphore must be initialized with a positive "
"number, got %s" % value)
msg = 'Semaphore() expect value >= 0, actual: {0}'.format(repr(value))
raise ValueError(msg)
self.counter = value
self._waiters = collections.deque()
def __repr__(self):
@ -92,7 +94,7 @@ class Semaphore(object):
if not blocking and self.locked():
return False
current_thread = greenthread.getcurrent()
current_thread = eventlet.getcurrent()
if self.counter <= 0 or self._waiters:
if current_thread not in self._waiters:
@ -100,7 +102,7 @@ class Semaphore(object):
try:
if timeout is not None:
ok = False
with Timeout(timeout, False):
with eventlet.Timeout(timeout, False):
while self.counter <= 0:
hubs.get_hub().switch()
ok = True

View File

@ -1,9 +1,8 @@
import gc
import os
import random
import eventlet
from eventlet import hubs, greenpool, event, pools
from eventlet import hubs, pools
from eventlet.support import greenlets as greenlet, six
import tests
@ -24,7 +23,7 @@ def raiser(exc):
class GreenPool(tests.LimitedTestCase):
def test_spawn(self):
p = greenpool.GreenPool(4)
p = eventlet.GreenPool(4)
waiters = []
for i in range(10):
waiters.append(p.spawn(passthru, i))
@ -32,7 +31,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(results, list(range(10)))
def test_spawn_n(self):
p = greenpool.GreenPool(4)
p = eventlet.GreenPool(4)
results_closure = []
def do_something(a):
@ -45,8 +44,8 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(results_closure, list(range(10)))
def test_waiting(self):
pool = greenpool.GreenPool(1)
done = event.Event()
pool = eventlet.GreenPool(1)
done = eventlet.Event()
def consume():
done.wait()
@ -74,7 +73,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(pool.running(), 0)
def test_multiple_coros(self):
evt = event.Event()
evt = eventlet.Event()
results = []
def producer():
@ -86,7 +85,7 @@ class GreenPool(tests.LimitedTestCase):
evt.wait()
results.append('cons2')
pool = greenpool.GreenPool(2)
pool = eventlet.GreenPool(2)
done = pool.spawn(consumer)
pool.spawn_n(producer)
done.wait()
@ -103,7 +102,7 @@ class GreenPool(tests.LimitedTestCase):
def some_work():
hubs.get_hub().schedule_call_local(0, fire_timer)
pool = greenpool.GreenPool(2)
pool = eventlet.GreenPool(2)
worker = pool.spawn(some_work)
worker.wait()
eventlet.sleep(0)
@ -111,7 +110,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(timer_fired, [])
def test_reentrant(self):
pool = greenpool.GreenPool(1)
pool = eventlet.GreenPool(1)
def reenter():
waiter = pool.spawn(lambda a: a, 'reenter')
@ -120,7 +119,7 @@ class GreenPool(tests.LimitedTestCase):
outer_waiter = pool.spawn(reenter)
outer_waiter.wait()
evt = event.Event()
evt = eventlet.Event()
def reenter_async():
pool.spawn_n(lambda a: a, 'reenter')
@ -137,7 +136,7 @@ class GreenPool(tests.LimitedTestCase):
timer = eventlet.Timeout(1)
try:
evt = event.Event()
evt = eventlet.Event()
for x in six.moves.range(num_free):
pool.spawn(wait_long_time, evt)
# if the pool has fewer free than we expect,
@ -159,8 +158,8 @@ class GreenPool(tests.LimitedTestCase):
eventlet.sleep(0)
def test_resize(self):
pool = greenpool.GreenPool(2)
evt = event.Event()
pool = eventlet.GreenPool(2)
evt = eventlet.Event()
def wait_long_time(e):
e.wait()
@ -194,7 +193,7 @@ class GreenPool(tests.LimitedTestCase):
# The premise is that a coroutine in a Pool tries to get a token out
# of a token pool but times out before getting the token. We verify
# that neither pool is adversely affected by this situation.
pool = greenpool.GreenPool(1)
pool = eventlet.GreenPool(1)
tp = pools.TokenPool(max_size=1)
tp.get() # empty out the pool
@ -230,7 +229,7 @@ class GreenPool(tests.LimitedTestCase):
gt.wait()
def test_spawn_n_2(self):
p = greenpool.GreenPool(2)
p = eventlet.GreenPool(2)
self.assertEqual(p.free(), 2)
r = []
@ -259,7 +258,7 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(set(r), set([1, 2, 3, 4]))
def test_exceptions(self):
p = greenpool.GreenPool(2)
p = eventlet.GreenPool(2)
for m in (p.spawn, p.spawn_n):
self.assert_pool_has_free(p, 2)
m(raiser, RuntimeError())
@ -272,22 +271,22 @@ class GreenPool(tests.LimitedTestCase):
self.assert_pool_has_free(p, 2)
def test_imap(self):
p = greenpool.GreenPool(4)
p = eventlet.GreenPool(4)
result_list = list(p.imap(passthru, range(10)))
self.assertEqual(result_list, list(range(10)))
def test_empty_imap(self):
p = greenpool.GreenPool(4)
p = eventlet.GreenPool(4)
result_iter = p.imap(passthru, [])
self.assertRaises(StopIteration, result_iter.next)
def test_imap_nonefunc(self):
p = greenpool.GreenPool(4)
p = eventlet.GreenPool(4)
result_list = list(p.imap(None, range(10)))
self.assertEqual(result_list, [(x,) for x in range(10)])
def test_imap_multi_args(self):
p = greenpool.GreenPool(4)
p = eventlet.GreenPool(4)
result_list = list(p.imap(passthru2, range(10), range(10, 20)))
self.assertEqual(result_list, list(zip(range(10), range(10, 20))))
@ -295,7 +294,7 @@ class GreenPool(tests.LimitedTestCase):
# testing the case where the function raises an exception;
# both that the caller sees that exception, and that the iterator
# continues to be usable to get the rest of the items
p = greenpool.GreenPool(4)
p = eventlet.GreenPool(4)
def raiser(item):
if item == 1 or item == 7:
@ -315,30 +314,30 @@ class GreenPool(tests.LimitedTestCase):
self.assertEqual(results, [0, 'r', 2, 3, 4, 5, 6, 'r', 8, 9])
def test_starmap(self):
p = greenpool.GreenPool(4)
p = eventlet.GreenPool(4)
result_list = list(p.starmap(passthru, [(x,) for x in range(10)]))
self.assertEqual(result_list, list(range(10)))
def test_waitall_on_nothing(self):
p = greenpool.GreenPool()
p = eventlet.GreenPool()
p.waitall()
def test_recursive_waitall(self):
p = greenpool.GreenPool()
p = eventlet.GreenPool()
gt = p.spawn(p.waitall)
self.assertRaises(AssertionError, gt.wait)
class GreenPile(tests.LimitedTestCase):
def test_pile(self):
p = greenpool.GreenPile(4)
p = eventlet.GreenPile(4)
for i in range(10):
p.spawn(passthru, i)
result_list = list(p)
self.assertEqual(result_list, list(range(10)))
def test_pile_spawn_times_out(self):
p = greenpool.GreenPile(4)
p = eventlet.GreenPile(4)
for i in range(4):
p.spawn(passthru, i)
# now it should be full and this should time out
@ -351,9 +350,9 @@ class GreenPile(tests.LimitedTestCase):
self.assertEqual(list(p), list(range(10)))
def test_constructing_from_pool(self):
pool = greenpool.GreenPool(2)
pile1 = greenpool.GreenPile(pool)
pile2 = greenpool.GreenPile(pool)
pool = eventlet.GreenPool(2)
pile1 = eventlet.GreenPile(pool)
pile2 = eventlet.GreenPile(pool)
def bunch_of_work(pile, unique):
for i in range(10):
@ -366,6 +365,17 @@ class GreenPile(tests.LimitedTestCase):
self.assertEqual(list(pile1), list(range(10)))
def test_greenpool_type_check():
eventlet.GreenPool(0)
eventlet.GreenPool(1)
eventlet.GreenPool(1e3)
with tests.assert_raises(TypeError):
eventlet.GreenPool('foo')
with tests.assert_raises(ValueError):
eventlet.GreenPool(-1)
class StressException(Exception):
pass
@ -391,10 +401,9 @@ class Stress(tests.LimitedTestCase):
# tests will take extra-long
TEST_TIMEOUT = 60
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def spawn_order_check(self, concurrency):
# checks that piles are strictly ordered
p = greenpool.GreenPile(concurrency)
p = eventlet.GreenPile(concurrency)
def makework(count, unique):
for i in six.moves.range(count):
@ -425,18 +434,16 @@ class Stress(tests.LimitedTestCase):
for l in latest[1:]:
self.assertEqual(l, iters - 1)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_ordering_5(self):
self.spawn_order_check(5)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_ordering_50(self):
self.spawn_order_check(50)
def imap_memory_check(self, concurrency):
# checks that imap is strictly
# ordered and consumes a constant amount of memory
p = greenpool.GreenPool(concurrency)
p = eventlet.GreenPool(concurrency)
count = 1000
it = p.imap(passthru, six.moves.range(count))
latest = -1
@ -460,15 +467,12 @@ class Stress(tests.LimitedTestCase):
# make sure we got to the end
self.assertEqual(latest, count - 1)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_imap_50(self):
self.imap_memory_check(50)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_imap_500(self):
self.imap_memory_check(500)
@tests.skip_unless(os.environ.get('RUN_STRESS_TESTS') == 'YES')
def test_with_intpool(self):
class IntPool(pools.Pool):
def create(self):
@ -483,7 +487,7 @@ class Stress(tests.LimitedTestCase):
return token
int_pool = IntPool(max_size=intpool_size)
pool = greenpool.GreenPool(pool_size)
pool = eventlet.GreenPool(pool_size)
for ix in six.moves.range(num_executes):
pool.spawn(run, int_pool)
pool.waitall()

View File

@ -1,14 +1,13 @@
import time
import eventlet
from eventlet import semaphore
from tests import LimitedTestCase
import tests
class TestSemaphore(LimitedTestCase):
class TestSemaphore(tests.LimitedTestCase):
def test_bounded(self):
sem = semaphore.CappedSemaphore(2, limit=3)
sem = eventlet.CappedSemaphore(2, limit=3)
self.assertEqual(sem.acquire(), True)
self.assertEqual(sem.acquire(), True)
gt1 = eventlet.spawn(sem.release)
@ -24,28 +23,28 @@ class TestSemaphore(LimitedTestCase):
gt2.wait()
def test_bounded_with_zero_limit(self):
sem = semaphore.CappedSemaphore(0, 0)
sem = eventlet.CappedSemaphore(0, 0)
gt = eventlet.spawn(sem.acquire)
sem.release()
gt.wait()
def test_non_blocking(self):
sem = semaphore.Semaphore(0)
sem = eventlet.Semaphore(0)
self.assertEqual(sem.acquire(blocking=False), False)
def test_timeout(self):
sem = semaphore.Semaphore(0)
sem = eventlet.Semaphore(0)
start = time.time()
self.assertEqual(sem.acquire(timeout=0.1), False)
self.assertTrue(time.time() - start >= 0.1)
def test_timeout_non_blocking(self):
sem = semaphore.Semaphore()
sem = eventlet.Semaphore()
self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
def test_semaphore_contention():
g_mutex = semaphore.Semaphore()
g_mutex = eventlet.Semaphore()
counts = [0, 0]
def worker(no):
@ -61,3 +60,14 @@ def test_semaphore_contention():
t2.kill()
assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
def test_semaphore_type_check():
eventlet.Semaphore(0)
eventlet.Semaphore(1)
eventlet.Semaphore(1e2)
with tests.assert_raises(TypeError):
eventlet.Semaphore('foo')
with tests.assert_raises(ValueError):
eventlet.Semaphore(-1)