deb-python-eventlet/tests/greenthread_test.py

165 lines
4.4 KiB
Python

from tests import LimitedTestCase
from eventlet import greenthread
from eventlet.support import greenlets as greenlet
_g_results = []
def passthru(*args, **kw):
_g_results.append((args, kw))
return args, kw
def waiter(a):
greenthread.sleep(0.1)
return a
class Asserts(object):
def assert_dead(self, gt):
if hasattr(gt, 'wait'):
self.assertRaises(greenlet.GreenletExit, gt.wait)
assert gt.dead
assert not gt
class Spawn(LimitedTestCase, Asserts):
def tearDown(self):
global _g_results
super(Spawn, self).tearDown()
_g_results = []
def test_simple(self):
gt = greenthread.spawn(passthru, 1, b=2)
self.assertEqual(gt.wait(), ((1,), {'b': 2}))
self.assertEqual(_g_results, [((1,), {'b': 2})])
def test_n(self):
gt = greenthread.spawn_n(passthru, 2, b=3)
assert not gt.dead
greenthread.sleep(0)
assert gt.dead
self.assertEqual(_g_results, [((2,), {'b': 3})])
def test_kill(self):
gt = greenthread.spawn(passthru, 6)
greenthread.kill(gt)
self.assert_dead(gt)
greenthread.sleep(0.001)
self.assertEqual(_g_results, [])
greenthread.kill(gt)
self.assert_dead(gt)
def test_kill_meth(self):
gt = greenthread.spawn(passthru, 6)
gt.kill()
self.assert_dead(gt)
greenthread.sleep(0.001)
self.assertEqual(_g_results, [])
gt.kill()
self.assert_dead(gt)
def test_kill_n(self):
gt = greenthread.spawn_n(passthru, 7)
greenthread.kill(gt)
self.assert_dead(gt)
greenthread.sleep(0.001)
self.assertEqual(_g_results, [])
greenthread.kill(gt)
self.assert_dead(gt)
def test_link(self):
results = []
def link_func(g, *a, **kw):
results.append(g)
results.append(a)
results.append(kw)
gt = greenthread.spawn(passthru, 5)
gt.link(link_func, 4, b=5)
self.assertEqual(gt.wait(), ((5,), {}))
self.assertEqual(results, [gt, (4,), {'b': 5}])
def test_link_after_exited(self):
results = []
def link_func(g, *a, **kw):
results.append(g)
results.append(a)
results.append(kw)
gt = greenthread.spawn(passthru, 5)
self.assertEqual(gt.wait(), ((5,), {}))
gt.link(link_func, 4, b=5)
self.assertEqual(results, [gt, (4,), {'b': 5}])
def test_link_relinks(self):
# test that linking in a linked func doesn't cause infinite recursion.
called = []
def link_func(g):
g.link(link_func_pass)
def link_func_pass(g):
called.append(True)
gt = greenthread.spawn(passthru)
gt.link(link_func)
gt.wait()
self.assertEqual(called, [True])
class SpawnAfter(Spawn):
def test_basic(self):
gt = greenthread.spawn_after(0.1, passthru, 20)
self.assertEqual(gt.wait(), ((20,), {}))
def test_cancel(self):
gt = greenthread.spawn_after(0.1, passthru, 21)
gt.cancel()
self.assert_dead(gt)
def test_cancel_already_started(self):
gt = greenthread.spawn_after(0, waiter, 22)
greenthread.sleep(0)
gt.cancel()
self.assertEqual(gt.wait(), 22)
def test_kill_already_started(self):
gt = greenthread.spawn_after(0, waiter, 22)
greenthread.sleep(0)
gt.kill()
self.assert_dead(gt)
class SpawnAfterLocal(LimitedTestCase, Asserts):
def setUp(self):
super(SpawnAfterLocal, self).setUp()
self.lst = [1]
def test_timer_fired(self):
def func():
greenthread.spawn_after_local(0.1, self.lst.pop)
greenthread.sleep(0.2)
greenthread.spawn(func)
assert self.lst == [1], self.lst
greenthread.sleep(0.3)
assert self.lst == [], self.lst
def test_timer_cancelled_upon_greenlet_exit(self):
def func():
greenthread.spawn_after_local(0.1, self.lst.pop)
greenthread.spawn(func)
assert self.lst == [1], self.lst
greenthread.sleep(0.2)
assert self.lst == [1], self.lst
def test_spawn_is_not_cancelled(self):
def func():
greenthread.spawn(self.lst.pop)
# exiting immediatelly, but self.lst.pop must be called
greenthread.spawn(func)
greenthread.sleep(0.1)
assert self.lst == [], self.lst