deb-python-eventlet/tests/tpool_test.py

370 lines
11 KiB
Python

# Copyright (c) 2007, Linden Research, Inc.
# Copyright (c) 2007, IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import gc
import random
import re
import time
import eventlet
from eventlet import tpool
from eventlet.support import six
import tests
one = 1
two = 2
three = 3
none = None
def noop():
pass
def raise_exception():
raise RuntimeError("hi")
class TestTpool(tests.LimitedTestCase):
def setUp(self):
super(TestTpool, self).setUp()
def tearDown(self):
tpool.killall()
super(TestTpool, self).tearDown()
@tests.skip_with_pyevent
def test_wrap_tuple(self):
my_tuple = (1, 2)
prox = tpool.Proxy(my_tuple)
self.assertEqual(prox[0], 1)
self.assertEqual(prox[1], 2)
self.assertEqual(len(my_tuple), 2)
@tests.skip_with_pyevent
def test_wrap_string(self):
my_object = "whatever"
prox = tpool.Proxy(my_object)
self.assertEqual(str(my_object), str(prox))
self.assertEqual(len(my_object), len(prox))
self.assertEqual(my_object.join(['a', 'b']), prox.join(['a', 'b']))
@tests.skip_with_pyevent
def test_wrap_uniterable(self):
prox = tpool.Proxy([])
def index():
prox[0]
def key():
prox['a']
self.assertRaises(IndexError, index)
self.assertRaises(TypeError, key)
@tests.skip_with_pyevent
def test_wrap_dict(self):
my_object = {'a': 1}
prox = tpool.Proxy(my_object)
self.assertEqual('a', list(prox.keys())[0])
self.assertEqual(1, prox['a'])
self.assertEqual(str(my_object), str(prox))
self.assertEqual(repr(my_object), repr(prox))
@tests.skip_with_pyevent
def test_wrap_module_class(self):
prox = tpool.Proxy(re)
self.assertEqual(tpool.Proxy, type(prox))
exp = prox.compile('(.)(.)(.)')
self.assertEqual(exp.groups, 3)
assert repr(prox.compile)
@tests.skip_with_pyevent
def test_wrap_eq(self):
prox = tpool.Proxy(re)
exp1 = prox.compile('.')
exp2 = prox.compile(exp1.pattern)
self.assertEqual(exp1, exp2)
exp3 = prox.compile('/')
assert exp1 != exp3
@tests.skip_with_pyevent
def test_wrap_ints(self):
p = tpool.Proxy(4)
assert p == 4
@tests.skip_with_pyevent
def test_wrap_hash(self):
prox1 = tpool.Proxy('' + 'A')
prox2 = tpool.Proxy('A' + '')
assert prox1 == 'A'
assert 'A' == prox2
# assert prox1 == prox2 FIXME - could __eq__ unwrap rhs if it is other proxy?
self.assertEqual(hash(prox1), hash(prox2))
proxList = tpool.Proxy([])
self.assertRaises(TypeError, hash, proxList)
@tests.skip_with_pyevent
def test_wrap_nonzero(self):
prox = tpool.Proxy(re)
exp1 = prox.compile('.')
assert bool(exp1)
prox2 = tpool.Proxy([1, 2, 3])
assert bool(prox2)
@tests.skip_with_pyevent
def test_multiple_wraps(self):
prox1 = tpool.Proxy(re)
prox2 = tpool.Proxy(re)
prox1.compile('.')
x2 = prox1.compile('.')
del x2
prox2.compile('.')
@tests.skip_with_pyevent
def test_wrap_getitem(self):
prox = tpool.Proxy([0, 1, 2])
self.assertEqual(prox[0], 0)
@tests.skip_with_pyevent
def test_wrap_setitem(self):
prox = tpool.Proxy([0, 1, 2])
prox[1] = 2
self.assertEqual(prox[1], 2)
@tests.skip_with_pyevent
def test_wrap_iterator(self):
self.reset_timeout(2)
prox = tpool.Proxy(range(10))
result = []
for i in prox:
result.append(i)
self.assertEqual(list(range(10)), result)
@tests.skip_with_pyevent
def test_wrap_iterator2(self):
self.reset_timeout(5) # might take a while due to imprecise sleeping
def foo():
import time
for x in range(2):
yield x
time.sleep(0.001)
counter = [0]
def tick():
for i in six.moves.range(20000):
counter[0] += 1
if counter[0] % 20 == 0:
eventlet.sleep(0.0001)
else:
eventlet.sleep()
gt = eventlet.spawn(tick)
previtem = 0
for item in tpool.Proxy(foo()):
assert item >= previtem
# make sure the tick happened at least a few times so that we know
# that our iterations in foo() were actually tpooled
assert counter[0] > 10, counter[0]
gt.kill()
@tests.skip_with_pyevent
def test_raising_exceptions(self):
prox = tpool.Proxy(re)
def nofunc():
prox.never_name_a_function_like_this()
self.assertRaises(AttributeError, nofunc)
from tests import tpool_test
prox = tpool.Proxy(tpool_test)
self.assertRaises(RuntimeError, prox.raise_exception)
@tests.skip_with_pyevent
def test_variable_and_keyword_arguments_with_function_calls(self):
import optparse
parser = tpool.Proxy(optparse.OptionParser())
parser.add_option('-n', action='store', type='string', dest='n')
opts, args = parser.parse_args(["-nfoo"])
self.assertEqual(opts.n, 'foo')
@tests.skip_with_pyevent
def test_contention(self):
from tests import tpool_test
prox = tpool.Proxy(tpool_test)
pile = eventlet.GreenPile(4)
pile.spawn(lambda: self.assertEqual(prox.one, 1))
pile.spawn(lambda: self.assertEqual(prox.two, 2))
pile.spawn(lambda: self.assertEqual(prox.three, 3))
results = list(pile)
self.assertEqual(len(results), 3)
@tests.skip_with_pyevent
def test_timeout(self):
blocking = eventlet.patcher.original('time')
eventlet.Timeout(0.1, eventlet.Timeout())
try:
tpool.execute(blocking.sleep, 0.3)
assert False, 'Expected Timeout'
except eventlet.Timeout:
pass
@tests.skip_with_pyevent
def test_killall(self):
tpool.killall()
tpool.setup()
@tests.skip_with_pyevent
def test_killall_remaining_results(self):
semaphore = eventlet.Event()
def native_fun():
time.sleep(.5)
def gt_fun():
semaphore.send(None)
tpool.execute(native_fun)
gt = eventlet.spawn(gt_fun)
semaphore.wait()
tpool.killall()
gt.wait()
@tests.skip_with_pyevent
def test_autowrap(self):
x = tpool.Proxy({'a': 1, 'b': 2}, autowrap=(int,))
assert isinstance(x.get('a'), tpool.Proxy)
assert not isinstance(x.items(), tpool.Proxy)
# attributes as well as callables
from tests import tpool_test
x = tpool.Proxy(tpool_test, autowrap=(int,))
assert isinstance(x.one, tpool.Proxy)
assert not isinstance(x.none, tpool.Proxy)
@tests.skip_with_pyevent
def test_autowrap_names(self):
x = tpool.Proxy({'a': 1, 'b': 2}, autowrap_names=('get',))
assert isinstance(x.get('a'), tpool.Proxy)
assert not isinstance(x.items(), tpool.Proxy)
from tests import tpool_test
x = tpool.Proxy(tpool_test, autowrap_names=('one',))
assert isinstance(x.one, tpool.Proxy)
assert not isinstance(x.two, tpool.Proxy)
@tests.skip_with_pyevent
def test_autowrap_both(self):
from tests import tpool_test
x = tpool.Proxy(tpool_test, autowrap=(int,), autowrap_names=('one',))
assert isinstance(x.one, tpool.Proxy)
# violating the abstraction to check that we didn't double-wrap
assert not isinstance(x._obj, tpool.Proxy)
@tests.skip_with_pyevent
def test_callable(self):
def wrapped(arg):
return arg
x = tpool.Proxy(wrapped)
self.assertEqual(4, x(4))
# verify that it wraps return values if specified
x = tpool.Proxy(wrapped, autowrap_names=('__call__',))
assert isinstance(x(4), tpool.Proxy)
self.assertEqual("4", str(x(4)))
@tests.skip_with_pyevent
def test_callable_iterator(self):
def wrapped(arg):
yield arg
yield arg
yield arg
x = tpool.Proxy(wrapped, autowrap_names=('__call__',))
for r in x(3):
self.assertEqual(3, r)
@tests.skip_with_pyevent
def test_eventlet_timeout(self):
def raise_timeout():
raise eventlet.Timeout()
self.assertRaises(eventlet.Timeout, tpool.execute, raise_timeout)
@tests.skip_with_pyevent
def test_tpool_set_num_threads(self):
tpool.set_num_threads(5)
self.assertEqual(5, tpool._nthreads)
class TpoolLongTests(tests.LimitedTestCase):
TEST_TIMEOUT = 60
@tests.skip_with_pyevent
def test_a_buncha_stuff(self):
assert_ = self.assert_
class Dummy(object):
def foo(self, when, token=None):
assert_(token is not None)
time.sleep(random.random() / 200.0)
return token
def sender_loop(loopnum):
obj = tpool.Proxy(Dummy())
count = 100
for n in six.moves.range(count):
eventlet.sleep(random.random() / 200.0)
now = time.time()
token = loopnum * count + n
rv = obj.foo(now, token=token)
self.assertEqual(token, rv)
eventlet.sleep(random.random() / 200.0)
cnt = 10
pile = eventlet.GreenPile(cnt)
for i in six.moves.range(cnt):
pile.spawn(sender_loop, i)
results = list(pile)
self.assertEqual(len(results), cnt)
tpool.killall()
@tests.skip_with_pyevent
def test_leakage_from_tracebacks(self):
tpool.execute(noop) # get it started
gc.collect()
initial_objs = len(gc.get_objects())
for i in range(10):
self.assertRaises(RuntimeError, tpool.execute, raise_exception)
gc.collect()
middle_objs = len(gc.get_objects())
# some objects will inevitably be created by the previous loop
# now we test to ensure that running the loop an order of
# magnitude more doesn't generate additional objects
for i in six.moves.range(100):
self.assertRaises(RuntimeError, tpool.execute, raise_exception)
first_created = middle_objs - initial_objs
gc.collect()
second_created = len(gc.get_objects()) - middle_objs
self.assert_(second_created - first_created < 10,
"first loop: %s, second loop: %s" % (first_created,
second_created))
tpool.killall()
def test_isolate_from_socket_default_timeout():
tests.run_isolated('tpool_isolate_socket_default_timeout.py', timeout=1)