deb-python-eventlet/tests/convenience_test.py

197 lines
6.2 KiB
Python

import os
import warnings
import eventlet
from eventlet import convenience, debug
from eventlet.green import socket
from eventlet.support import six
import tests
import tests.mock
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
class TestServe(tests.LimitedTestCase):
def setUp(self):
super(TestServe, self).setUp()
debug.hub_exceptions(False)
def tearDown(self):
super(TestServe, self).tearDown()
debug.hub_exceptions(True)
def test_exiting_server(self):
# tests that the server closes the client sock on handle() exit
def closer(sock, addr):
pass
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, closer)
client = eventlet.connect(('localhost', l.getsockname()[1]))
client.sendall(b'a')
self.assertFalse(client.recv(100))
gt.kill()
def test_excepting_server(self):
# tests that the server closes the client sock on handle() exception
def crasher(sock, addr):
sock.recv(1024)
0 // 0
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, crasher)
client = eventlet.connect(('localhost', l.getsockname()[1]))
client.sendall(b'a')
self.assertRaises(ZeroDivisionError, gt.wait)
self.assertFalse(client.recv(100))
def test_excepting_server_already_closed(self):
# same as above but with explicit clsoe before crash
def crasher(sock, addr):
sock.recv(1024)
sock.close()
0 // 0
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, crasher)
client = eventlet.connect(('localhost', l.getsockname()[1]))
client.sendall(b'a')
self.assertRaises(ZeroDivisionError, gt.wait)
self.assertFalse(client.recv(100))
def test_called_for_each_connection(self):
hits = [0]
def counter(sock, addr):
hits[0] += 1
l = eventlet.listen(('localhost', 0))
gt = eventlet.spawn(eventlet.serve, l, counter)
for i in six.moves.range(100):
client = eventlet.connect(('localhost', l.getsockname()[1]))
self.assertFalse(client.recv(100))
gt.kill()
self.assertEqual(100, hits[0])
def test_blocking(self):
l = eventlet.listen(('localhost', 0))
x = eventlet.with_timeout(
0.01,
eventlet.serve, l, lambda c, a: None,
timeout_value="timeout")
self.assertEqual(x, "timeout")
def test_raising_stopserve(self):
def stopit(conn, addr):
raise eventlet.StopServe()
l = eventlet.listen(('localhost', 0))
# connect to trigger a call to stopit
gt = eventlet.spawn(eventlet.connect, ('localhost', l.getsockname()[1]))
eventlet.serve(l, stopit)
gt.wait()
def test_concurrency(self):
evt = eventlet.Event()
def waiter(sock, addr):
sock.sendall(b'hi')
evt.wait()
l = eventlet.listen(('localhost', 0))
eventlet.spawn(eventlet.serve, l, waiter, 5)
def test_client():
c = eventlet.connect(('localhost', l.getsockname()[1]))
# verify the client is connected by getting data
self.assertEqual(b'hi', c.recv(2))
return c
[test_client() for i in range(5)]
# very next client should not get anything
x = eventlet.with_timeout(
0.01,
test_client,
timeout_value="timed out")
self.assertEqual(x, "timed out")
@tests.skip_if_no_ssl
def test_wrap_ssl(self):
server = eventlet.wrap_ssl(
eventlet.listen(('localhost', 0)),
certfile=certificate_file, keyfile=private_key_file,
server_side=True)
port = server.getsockname()[1]
def handle(sock, addr):
sock.sendall(sock.recv(1024))
raise eventlet.StopServe()
eventlet.spawn(eventlet.serve, server, handle)
client = eventlet.wrap_ssl(eventlet.connect(('localhost', port)))
client.sendall(b"echo")
self.assertEqual(b"echo", client.recv(1024))
def test_socket_reuse():
# pick a free port with bind to 0 - without SO_REUSEPORT
# then close it and try to bind to same port with SO_REUSEPORT
# loop helps in case something else used the chosen port before second bind
addr = None
errors = []
for _ in range(5):
lsock1 = eventlet.listen(('localhost', 0))
addr = lsock1.getsockname()
lsock1.close()
try:
lsock1 = eventlet.listen(addr)
except socket.error as e:
errors.append(e)
continue
break
else:
assert False, errors
if hasattr(socket, 'SO_REUSEPORT'):
lsock2 = eventlet.listen(addr)
else:
try:
lsock2 = eventlet.listen(addr)
assert lsock2
lsock2.close()
except socket.error:
pass
lsock1.close()
def test_reuse_random_port_warning():
with warnings.catch_warnings(record=True) as w:
eventlet.listen(('localhost', 0), reuse_port=True).close()
assert len(w) == 1
assert issubclass(w[0].category, convenience.ReuseRandomPortWarning)
@tests.skip_unless(hasattr(socket, 'SO_REUSEPORT'))
def test_reuseport_oserror():
# https://github.com/eventlet/eventlet/issues/380
# https://github.com/eventlet/eventlet/issues/418
err22 = OSError(22, 'Invalid argument')
sock1 = eventlet.listen(('localhost', 0))
addr = sock1.getsockname()
sock1.close()
original_socket_init = socket.socket.__init__
def patched(self, *a, **kw):
original_socket_init(self, *a, **kw)
self.setsockopt = tests.mock.Mock(side_effect=err22)
with warnings.catch_warnings(record=True) as w:
try:
socket.socket.__init__ = patched
eventlet.listen(addr, reuse_addr=False, reuse_port=True).close()
finally:
socket.socket.__init__ = original_socket_init
assert len(w) == 1
assert issubclass(w[0].category, convenience.ReusePortUnavailableWarning)