deb-python-eventlet/tests/hub_test.py

403 lines
13 KiB
Python

from __future__ import with_statement
import sys
import time
import tests
from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless
from tests.patcher_test import ProcessBase
import eventlet
from eventlet import hubs
from eventlet.support import greenlets, six
DELAY = 0.001
def noop():
pass
class TestTimerCleanup(tests.LimitedTestCase):
TEST_TIMEOUT = 2
@skip_with_pyevent
def test_cancel_immediate(self):
hub = hubs.get_hub()
stimers = hub.get_timers_count()
scanceled = hub.timers_canceled
for i in six.moves.range(2000):
t = hubs.get_hub().schedule_call_global(60, noop)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
# there should be fewer than 1000 new timers and canceled
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
self.assert_less_than_equal(hub.timers_canceled, 1000)
@skip_with_pyevent
def test_cancel_accumulated(self):
hub = hubs.get_hub()
stimers = hub.get_timers_count()
scanceled = hub.timers_canceled
for i in six.moves.range(2000):
t = hubs.get_hub().schedule_call_global(60, noop)
eventlet.sleep()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1, hub.timers)
# there should be fewer than 1000 new timers and canceled
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
self.assert_less_than_equal(hub.timers_canceled, 1000)
@skip_with_pyevent
def test_cancel_proportion(self):
# if fewer than half the pending timers are canceled, it should
# not clean them out
hub = hubs.get_hub()
uncanceled_timers = []
stimers = hub.get_timers_count()
scanceled = hub.timers_canceled
for i in six.moves.range(1000):
# 2/3rds of new timers are uncanceled
t = hubs.get_hub().schedule_call_global(60, noop)
t2 = hubs.get_hub().schedule_call_global(60, noop)
t3 = hubs.get_hub().schedule_call_global(60, noop)
eventlet.sleep()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count() + 1)
uncanceled_timers.append(t2)
uncanceled_timers.append(t3)
# 3000 new timers, plus a few extras
self.assert_less_than_equal(stimers + 3000,
stimers + hub.get_timers_count())
self.assertEqual(hub.timers_canceled, 1000)
for t in uncanceled_timers:
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count())
eventlet.sleep()
class TestScheduleCall(tests.LimitedTestCase):
def test_local(self):
lst = [1]
eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
eventlet.sleep(0)
eventlet.sleep(DELAY * 2)
assert lst == [1], lst
def test_global(self):
lst = [1]
eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
eventlet.sleep(0)
eventlet.sleep(DELAY * 2)
assert lst == [], lst
def test_ordering(self):
lst = []
hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
while len(lst) < 3:
eventlet.sleep(DELAY)
self.assertEqual(lst, [1, 2, 3])
class TestDebug(tests.LimitedTestCase):
def test_debug_listeners(self):
hubs.get_hub().set_debug_listeners(True)
hubs.get_hub().set_debug_listeners(False)
def test_timer_exceptions(self):
hubs.get_hub().set_timer_exceptions(True)
hubs.get_hub().set_timer_exceptions(False)
class TestExceptionInMainloop(tests.LimitedTestCase):
def test_sleep(self):
# even if there was an error in the mainloop, the hub should continue
# to work
start = time.time()
eventlet.sleep(DELAY)
delay = time.time() - start
assert delay >= DELAY * \
0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
delay, DELAY)
def fail():
1 // 0
hubs.get_hub().schedule_call_global(0, fail)
start = time.time()
eventlet.sleep(DELAY)
delay = time.time() - start
assert delay >= DELAY * \
0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
delay, DELAY)
class TestExceptionInGreenthread(tests.LimitedTestCase):
@skip_unless(greenlets.preserves_excinfo)
def test_exceptionpreservation(self):
# events for controlling execution order
gt1event = eventlet.Event()
gt2event = eventlet.Event()
def test_gt1():
try:
raise KeyError()
except KeyError:
gt1event.send('exception')
gt2event.wait()
assert sys.exc_info()[0] is KeyError
gt1event.send('test passed')
def test_gt2():
gt1event.wait()
gt1event.reset()
assert sys.exc_info()[0] is None
try:
raise ValueError()
except ValueError:
gt2event.send('exception')
gt1event.wait()
assert sys.exc_info()[0] is ValueError
g1 = eventlet.spawn(test_gt1)
g2 = eventlet.spawn(test_gt2)
try:
g1.wait()
g2.wait()
finally:
g1.kill()
g2.kill()
def test_exceptionleaks(self):
# tests expected behaviour with all versions of greenlet
def test_gt(sem):
try:
raise KeyError()
except KeyError:
sem.release()
hubs.get_hub().switch()
# semaphores for controlling execution order
sem = eventlet.Semaphore()
sem.acquire()
g = eventlet.spawn(test_gt, sem)
try:
sem.acquire()
assert sys.exc_info()[0] is None
finally:
g.kill()
class TestHubSelection(tests.LimitedTestCase):
def test_explicit_hub(self):
oldhub = hubs.get_hub()
try:
hubs.use_hub(Foo)
assert isinstance(hubs.get_hub(), Foo), hubs.get_hub()
finally:
hubs._threadlocal.hub = oldhub
class TestHubBlockingDetector(tests.LimitedTestCase):
TEST_TIMEOUT = 10
@skip_with_pyevent
def test_block_detect(self):
def look_im_blocking():
import time
time.sleep(2)
from eventlet import debug
debug.hub_blocking_detection(True)
gt = eventlet.spawn(look_im_blocking)
self.assertRaises(RuntimeError, gt.wait)
debug.hub_blocking_detection(False)
@skip_with_pyevent
@skip_if_no_itimer
def test_block_detect_with_itimer(self):
def look_im_blocking():
import time
time.sleep(0.5)
from eventlet import debug
debug.hub_blocking_detection(True, resolution=0.1)
gt = eventlet.spawn(look_im_blocking)
self.assertRaises(RuntimeError, gt.wait)
debug.hub_blocking_detection(False)
class TestSuspend(tests.LimitedTestCase):
TEST_TIMEOUT = 4
longMessage = True
maxDiff = None
def test_suspend_doesnt_crash(self):
import os
import shutil
import signal
import subprocess
import sys
import tempfile
self.tempdir = tempfile.mkdtemp('test_suspend')
filename = os.path.join(self.tempdir, 'test_suspend.py')
fd = open(filename, "w")
fd.write("""import eventlet
eventlet.Timeout(0.5)
try:
eventlet.listen(("127.0.0.1", 0)).accept()
except eventlet.Timeout:
print("exited correctly")
""")
fd.close()
python_path = os.pathsep.join(sys.path + [self.tempdir])
new_env = os.environ.copy()
new_env['PYTHONPATH'] = python_path
p = subprocess.Popen([sys.executable,
os.path.join(self.tempdir, filename)],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
eventlet.sleep(0.4) # wait for process to hit accept
os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR
os.kill(p.pid, signal.SIGCONT)
output, _ = p.communicate()
lines = output.decode('utf-8', 'replace').splitlines()
assert "exited correctly" in lines[-1], output
shutil.rmtree(self.tempdir)
def test_repeated_select_bad_fd():
from eventlet.green import select
def once():
try:
select.select([-1], [], [])
assert False, 'Expected ValueError'
except ValueError:
pass
once()
once()
@skip_with_pyevent
def test_fork():
tests.run_isolated('hub_fork.py')
def test_fork_simple():
tests.run_isolated('hub_fork_simple.py')
class TestDeadRunLoop(tests.LimitedTestCase):
TEST_TIMEOUT = 2
class CustomException(Exception):
pass
def test_kill(self):
""" Checks that killing a process after the hub runloop dies does
not immediately return to hub greenlet's parent and schedule a
redundant timer. """
hub = hubs.get_hub()
def dummyproc():
hub.switch()
g = eventlet.spawn(dummyproc)
eventlet.sleep(0) # let dummyproc run
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
# kill dummyproc, this schedules a timer to return execution to
# this greenlet before throwing an exception in dummyproc.
# it is from this timer that execution should be returned to this
# greenlet, and not by propogating of the terminating greenlet.
g.kill()
with eventlet.Timeout(0.5, self.CustomException()):
# we now switch to the hub, there should be no existing timers
# that switch back to this greenlet and so this hub.switch()
# call should block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
def test_parent(self):
""" Checks that a terminating greenthread whose parent
was a previous, now-defunct hub greenlet returns execution to
the hub runloop and not the hub greenlet's parent. """
hub = hubs.get_hub()
def dummyproc():
pass
g = eventlet.spawn(dummyproc)
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
KeyboardInterrupt())
assert not g.dead # check dummyproc hasn't completed
with eventlet.Timeout(0.5, self.CustomException()):
# we now switch to the hub which will allow
# completion of dummyproc.
# this should return execution back to the runloop and not
# this greenlet so that hub.switch() would block indefinitely.
self.assertRaises(self.CustomException, hub.switch)
assert g.dead # sanity check that dummyproc has completed
class Foo(object):
pass
class TestDefaultHub(ProcessBase):
def test_kqueue_unsupported(self):
# https://github.com/eventlet/eventlet/issues/38
# get_hub on windows broken by kqueue
module_source = r'''
from __future__ import print_function
# Simulate absence of kqueue even on platforms that support it.
import select
try:
del select.kqueue
except AttributeError:
pass
from eventlet.support.six.moves import builtins
original_import = builtins.__import__
def fail_import(name, *args, **kwargs):
if 'epoll' in name:
raise ImportError('disabled for test')
if 'kqueue' in name:
print('kqueue tried')
return original_import(name, *args, **kwargs)
builtins.__import__ = fail_import
import eventlet.hubs
eventlet.hubs.get_default_hub()
print('ok')
'''
self.write_to_tempfile('newmod', module_source)
output, _ = self.launch_subprocess('newmod.py')
self.assertEqual(output, 'kqueue tried\nok\n')