wsgi: close idle connections (also applies to websockets)

https://github.com/eventlet/eventlet/issues/188
This commit is contained in:
Sergey Shepelev 2017-04-28 23:58:32 +03:00
parent ef83055441
commit 7f53465578
3 changed files with 120 additions and 34 deletions

View File

@ -24,8 +24,15 @@ MINIMUM_CHUNK_SIZE = 4096
# %(client_port)s is also available
DEFAULT_LOG_FORMAT = ('%(client_ip)s - - [%(date_time)s] "%(request_line)s"'
' %(status_code)s %(body_length)s %(wall_seconds).6f')
RESPONSE_414 = b'''HTTP/1.0 414 Request URI Too Long\r\n\
Connection: close\r\n\
Content-Length: 0\r\n\r\n'''
is_accepting = True
STATE_IDLE = 'idle'
STATE_REQUEST = 'request'
STATE_CLOSE = 'close'
__all__ = ['server', 'format_date_time']
# Weekday and month names for HTTP date/time formatting; always English!
@ -318,6 +325,17 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
# so before going back to unbuffered, remove any usage of `writelines`.
wbufsize = 16 << 10
def __init__(self, conn_state, server):
self.request = conn_state[1]
self.client_address = conn_state[0]
self.conn_state = conn_state
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
# overriding SocketServer.setup to correctly handle SSL.Connection objects
conn = self.connection = self.request
@ -343,30 +361,40 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
raise NotImplementedError(
'''eventlet.wsgi doesn't support sockets of type {0}'''.format(type(conn)))
def handle(self):
self.close_connection = True
while True:
self.handle_one_request()
if self.conn_state[2] == STATE_CLOSE:
self.close_connection = 1
if self.close_connection:
break
def _read_request_line(self):
if self.rfile.closed:
self.close_connection = 1
return ''
try:
return self.rfile.readline(self.server.url_length_limit)
except greenio.SSL.ZeroReturnError:
pass
except socket.error as e:
if support.get_errno(e) not in BAD_SOCK:
raise
return ''
def handle_one_request(self):
if self.server.max_http_version:
self.protocol_version = self.server.max_http_version
if self.rfile.closed:
self.raw_requestline = self._read_request_line()
if not self.raw_requestline:
self.close_connection = 1
return
try:
self.raw_requestline = self.rfile.readline(self.server.url_length_limit)
if len(self.raw_requestline) >= self.server.url_length_limit:
self.wfile.write(
b"HTTP/1.0 414 Request URI Too Long\r\n"
b"Connection: close\r\nContent-length: 0\r\n\r\n")
self.close_connection = 1
return
except greenio.SSL.ZeroReturnError:
self.raw_requestline = ''
except socket.error as e:
if support.get_errno(e) not in BAD_SOCK:
raise
self.raw_requestline = ''
if not self.raw_requestline:
if len(self.raw_requestline) >= self.server.url_length_limit:
self.wfile.write(RESPONSE_414)
self.close_connection = 1
return
@ -736,22 +764,21 @@ class Server(BaseHTTPServer.HTTPServer):
d.update(self.environ)
return d
def process_request(self, sock_params):
def process_request(self, conn_state):
# The actual request handling takes place in __init__, so we need to
# set minimum_chunk_size before __init__ executes and we don't want to modify
# class variable
sock, address = sock_params[:2]
proto = new(self.protocol)
if self.minimum_chunk_size is not None:
proto.minimum_chunk_size = self.minimum_chunk_size
proto.capitalize_response_headers = self.capitalize_response_headers
try:
proto.__init__(sock, address, self)
proto.__init__(conn_state, self)
except socket.timeout:
# Expected exceptions are not exceptional
sock.close()
conn_state[1].close()
# similar to logging "accepted" in server()
self.log.debug('({0}) timed out {1!r}'.format(self.pid, address))
self.log.debug('({0}) timed out {1!r}'.format(self.pid, conn_state[0]))
def log_message(self, message):
raise AttributeError('''\
@ -893,19 +920,30 @@ def server(sock, site,
else:
pool = eventlet.GreenPool(max_size)
if not (hasattr(pool, 'spawn_n') and hasattr(pool, 'waitall')):
if not (hasattr(pool, 'spawn') and hasattr(pool, 'waitall')):
raise AttributeError('''\
eventlet.wsgi.Server pool must provide methods: `spawn_n`, `waitall`.
eventlet.wsgi.Server pool must provide methods: `spawn`, `waitall`.
If unsure, use eventlet.GreenPool.''')
# [addr, socket, state]
connections = {}
def _clean_connection(_, conn):
connections.pop(conn[0], None)
conn[2] = STATE_CLOSE
greenio.shutdown_safe(conn[1])
conn[1].close()
try:
serv.log.info('({0}) wsgi starting up on {1}'.format(serv.pid, socket_repr(sock)))
while is_accepting:
try:
client_socket = sock.accept()
client_socket[0].settimeout(serv.socket_timeout)
serv.log.debug('({0}) accepted {1!r}'.format(serv.pid, client_socket[1]))
pool.spawn_n(serv.process_request, client_socket)
client_socket, client_addr = sock.accept()
client_socket.settimeout(serv.socket_timeout)
serv.log.debug('({0}) accepted {1!r}'.format(serv.pid, client_addr))
connections[client_addr] = connection = [client_addr, client_socket, STATE_IDLE]
(pool.spawn(serv.process_request, connection)
.link(_clean_connection, connection))
except ACCEPT_EXCEPTIONS as e:
if support.get_errno(e) not in ACCEPT_ERRNO:
raise
@ -913,6 +951,11 @@ If unsure, use eventlet.GreenPool.''')
serv.log.info('wsgi exiting')
break
finally:
for cs in six.itervalues(connections):
prev_state = cs[2]
cs[2] = STATE_CLOSE
if prev_state == STATE_IDLE:
greenio.shutdown_safe(cs[1])
pool.waitall()
serv.log.info('({0}) wsgi exited, is_accepting={1}'.format(serv.pid, is_accepting))
try:

View File

@ -1,5 +1,6 @@
import errno
import socket
import sys
import eventlet
from eventlet import event
@ -503,6 +504,30 @@ class TestWebSocket(tests.wsgi_test._TestBase):
done_with_request.wait()
assert error_detected[0]
def test_close_idle(self):
pool = eventlet.GreenPool()
# use log=stderr when test runner can capture it
self.spawn_server(custom_pool=pool, log=sys.stdout)
connect = (
'GET /echo HTTP/1.1',
'Upgrade: WebSocket',
'Connection: Upgrade',
'Host: %s:%s' % self.server_addr,
'Origin: http://%s:%s' % self.server_addr,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5',
'Sec-WebSocket-Key2: 12998 5 Y3 1 .P00',
)
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
sock.recv(1024)
sock.sendall(b'\x00hello\xff')
result = sock.recv(1024)
assert result, b'\x00hello\xff'
self.killer.kill(KeyboardInterrupt)
with eventlet.Timeout(1):
pool.waitall()
class TestWebSocketSSL(tests.wsgi_test._TestBase):
def set_site(self):

View File

@ -255,7 +255,7 @@ class _TestBase(tests.LimitedTestCase):
if self.killer:
greenthread.kill(self.killer)
self.killer = eventlet.spawn_n(target, **kwargs)
self.killer = eventlet.spawn(target, **kwargs)
def set_site(self):
raise NotImplementedError
@ -557,8 +557,8 @@ class TestHttpd(_TestBase):
def server(sock, site, log):
try:
serv = wsgi.Server(sock, sock.getsockname(), site, log)
client_socket = sock.accept()
serv.process_request(client_socket)
client_socket, addr = sock.accept()
serv.process_request([addr, client_socket, wsgi.STATE_IDLE])
return True
except Exception:
traceback.print_exc()
@ -1471,13 +1471,14 @@ class TestHttpd(_TestBase):
sock.close()
request_thread = eventlet.spawn(make_request)
server_conn = server_sock.accept()
client_sock, addr = server_sock.accept()
# Next line must not raise IOError -32 Broken pipe
server.process_request(server_conn)
server.process_request([addr, client_sock, wsgi.STATE_IDLE])
request_thread.wait()
server_sock.close()
def test_server_connection_timeout_exception(self):
self.reset_timeout(5)
# Handle connection socket timeouts
# https://bitbucket.org/eventlet/eventlet/issue/143/
# Runs tests.wsgi_test_conntimeout in a separate process.
@ -1582,6 +1583,23 @@ class TestHttpd(_TestBase):
log_content = self.logfile.getvalue()
assert log_content == ''
def test_close_idle_connections(self):
self.reset_timeout(2)
pool = eventlet.GreenPool()
self.spawn_server(custom_pool=pool)
# https://github.com/eventlet/eventlet/issues/188
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
result = read_http(sock)
assert result.status == 'HTTP/1.1 200 OK', 'Received status {0!r}'.format(result.status)
self.killer.kill(KeyboardInterrupt)
try:
with eventlet.Timeout(1):
pool.waitall()
except Exception:
assert False, self.logfile.getvalue()
def read_headers(sock):
fd = sock.makefile('rb')