wsgi: close idle connections (also applies to websockets)
https://github.com/eventlet/eventlet/issues/188
This commit is contained in:
parent
ef83055441
commit
7f53465578
101
eventlet/wsgi.py
101
eventlet/wsgi.py
|
@ -24,8 +24,15 @@ MINIMUM_CHUNK_SIZE = 4096
|
|||
# %(client_port)s is also available
|
||||
DEFAULT_LOG_FORMAT = ('%(client_ip)s - - [%(date_time)s] "%(request_line)s"'
|
||||
' %(status_code)s %(body_length)s %(wall_seconds).6f')
|
||||
RESPONSE_414 = b'''HTTP/1.0 414 Request URI Too Long\r\n\
|
||||
Connection: close\r\n\
|
||||
Content-Length: 0\r\n\r\n'''
|
||||
is_accepting = True
|
||||
|
||||
STATE_IDLE = 'idle'
|
||||
STATE_REQUEST = 'request'
|
||||
STATE_CLOSE = 'close'
|
||||
|
||||
__all__ = ['server', 'format_date_time']
|
||||
|
||||
# Weekday and month names for HTTP date/time formatting; always English!
|
||||
|
@ -318,6 +325,17 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
|
|||
# so before going back to unbuffered, remove any usage of `writelines`.
|
||||
wbufsize = 16 << 10
|
||||
|
||||
def __init__(self, conn_state, server):
|
||||
self.request = conn_state[1]
|
||||
self.client_address = conn_state[0]
|
||||
self.conn_state = conn_state
|
||||
self.server = server
|
||||
self.setup()
|
||||
try:
|
||||
self.handle()
|
||||
finally:
|
||||
self.finish()
|
||||
|
||||
def setup(self):
|
||||
# overriding SocketServer.setup to correctly handle SSL.Connection objects
|
||||
conn = self.connection = self.request
|
||||
|
@ -343,30 +361,40 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
|
|||
raise NotImplementedError(
|
||||
'''eventlet.wsgi doesn't support sockets of type {0}'''.format(type(conn)))
|
||||
|
||||
def handle(self):
|
||||
self.close_connection = True
|
||||
|
||||
while True:
|
||||
self.handle_one_request()
|
||||
if self.conn_state[2] == STATE_CLOSE:
|
||||
self.close_connection = 1
|
||||
if self.close_connection:
|
||||
break
|
||||
|
||||
def _read_request_line(self):
|
||||
if self.rfile.closed:
|
||||
self.close_connection = 1
|
||||
return ''
|
||||
|
||||
try:
|
||||
return self.rfile.readline(self.server.url_length_limit)
|
||||
except greenio.SSL.ZeroReturnError:
|
||||
pass
|
||||
except socket.error as e:
|
||||
if support.get_errno(e) not in BAD_SOCK:
|
||||
raise
|
||||
return ''
|
||||
|
||||
def handle_one_request(self):
|
||||
if self.server.max_http_version:
|
||||
self.protocol_version = self.server.max_http_version
|
||||
|
||||
if self.rfile.closed:
|
||||
self.raw_requestline = self._read_request_line()
|
||||
if not self.raw_requestline:
|
||||
self.close_connection = 1
|
||||
return
|
||||
|
||||
try:
|
||||
self.raw_requestline = self.rfile.readline(self.server.url_length_limit)
|
||||
if len(self.raw_requestline) >= self.server.url_length_limit:
|
||||
self.wfile.write(
|
||||
b"HTTP/1.0 414 Request URI Too Long\r\n"
|
||||
b"Connection: close\r\nContent-length: 0\r\n\r\n")
|
||||
self.close_connection = 1
|
||||
return
|
||||
except greenio.SSL.ZeroReturnError:
|
||||
self.raw_requestline = ''
|
||||
except socket.error as e:
|
||||
if support.get_errno(e) not in BAD_SOCK:
|
||||
raise
|
||||
self.raw_requestline = ''
|
||||
|
||||
if not self.raw_requestline:
|
||||
if len(self.raw_requestline) >= self.server.url_length_limit:
|
||||
self.wfile.write(RESPONSE_414)
|
||||
self.close_connection = 1
|
||||
return
|
||||
|
||||
|
@ -736,22 +764,21 @@ class Server(BaseHTTPServer.HTTPServer):
|
|||
d.update(self.environ)
|
||||
return d
|
||||
|
||||
def process_request(self, sock_params):
|
||||
def process_request(self, conn_state):
|
||||
# The actual request handling takes place in __init__, so we need to
|
||||
# set minimum_chunk_size before __init__ executes and we don't want to modify
|
||||
# class variable
|
||||
sock, address = sock_params[:2]
|
||||
proto = new(self.protocol)
|
||||
if self.minimum_chunk_size is not None:
|
||||
proto.minimum_chunk_size = self.minimum_chunk_size
|
||||
proto.capitalize_response_headers = self.capitalize_response_headers
|
||||
try:
|
||||
proto.__init__(sock, address, self)
|
||||
proto.__init__(conn_state, self)
|
||||
except socket.timeout:
|
||||
# Expected exceptions are not exceptional
|
||||
sock.close()
|
||||
conn_state[1].close()
|
||||
# similar to logging "accepted" in server()
|
||||
self.log.debug('({0}) timed out {1!r}'.format(self.pid, address))
|
||||
self.log.debug('({0}) timed out {1!r}'.format(self.pid, conn_state[0]))
|
||||
|
||||
def log_message(self, message):
|
||||
raise AttributeError('''\
|
||||
|
@ -893,19 +920,30 @@ def server(sock, site,
|
|||
else:
|
||||
pool = eventlet.GreenPool(max_size)
|
||||
|
||||
if not (hasattr(pool, 'spawn_n') and hasattr(pool, 'waitall')):
|
||||
if not (hasattr(pool, 'spawn') and hasattr(pool, 'waitall')):
|
||||
raise AttributeError('''\
|
||||
eventlet.wsgi.Server pool must provide methods: `spawn_n`, `waitall`.
|
||||
eventlet.wsgi.Server pool must provide methods: `spawn`, `waitall`.
|
||||
If unsure, use eventlet.GreenPool.''')
|
||||
|
||||
# [addr, socket, state]
|
||||
connections = {}
|
||||
|
||||
def _clean_connection(_, conn):
|
||||
connections.pop(conn[0], None)
|
||||
conn[2] = STATE_CLOSE
|
||||
greenio.shutdown_safe(conn[1])
|
||||
conn[1].close()
|
||||
|
||||
try:
|
||||
serv.log.info('({0}) wsgi starting up on {1}'.format(serv.pid, socket_repr(sock)))
|
||||
while is_accepting:
|
||||
try:
|
||||
client_socket = sock.accept()
|
||||
client_socket[0].settimeout(serv.socket_timeout)
|
||||
serv.log.debug('({0}) accepted {1!r}'.format(serv.pid, client_socket[1]))
|
||||
pool.spawn_n(serv.process_request, client_socket)
|
||||
client_socket, client_addr = sock.accept()
|
||||
client_socket.settimeout(serv.socket_timeout)
|
||||
serv.log.debug('({0}) accepted {1!r}'.format(serv.pid, client_addr))
|
||||
connections[client_addr] = connection = [client_addr, client_socket, STATE_IDLE]
|
||||
(pool.spawn(serv.process_request, connection)
|
||||
.link(_clean_connection, connection))
|
||||
except ACCEPT_EXCEPTIONS as e:
|
||||
if support.get_errno(e) not in ACCEPT_ERRNO:
|
||||
raise
|
||||
|
@ -913,6 +951,11 @@ If unsure, use eventlet.GreenPool.''')
|
|||
serv.log.info('wsgi exiting')
|
||||
break
|
||||
finally:
|
||||
for cs in six.itervalues(connections):
|
||||
prev_state = cs[2]
|
||||
cs[2] = STATE_CLOSE
|
||||
if prev_state == STATE_IDLE:
|
||||
greenio.shutdown_safe(cs[1])
|
||||
pool.waitall()
|
||||
serv.log.info('({0}) wsgi exited, is_accepting={1}'.format(serv.pid, is_accepting))
|
||||
try:
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import errno
|
||||
import socket
|
||||
import sys
|
||||
|
||||
import eventlet
|
||||
from eventlet import event
|
||||
|
@ -503,6 +504,30 @@ class TestWebSocket(tests.wsgi_test._TestBase):
|
|||
done_with_request.wait()
|
||||
assert error_detected[0]
|
||||
|
||||
def test_close_idle(self):
|
||||
pool = eventlet.GreenPool()
|
||||
# use log=stderr when test runner can capture it
|
||||
self.spawn_server(custom_pool=pool, log=sys.stdout)
|
||||
connect = (
|
||||
'GET /echo HTTP/1.1',
|
||||
'Upgrade: WebSocket',
|
||||
'Connection: Upgrade',
|
||||
'Host: %s:%s' % self.server_addr,
|
||||
'Origin: http://%s:%s' % self.server_addr,
|
||||
'Sec-WebSocket-Protocol: ws',
|
||||
'Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5',
|
||||
'Sec-WebSocket-Key2: 12998 5 Y3 1 .P00',
|
||||
)
|
||||
sock = eventlet.connect(self.server_addr)
|
||||
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
|
||||
sock.recv(1024)
|
||||
sock.sendall(b'\x00hello\xff')
|
||||
result = sock.recv(1024)
|
||||
assert result, b'\x00hello\xff'
|
||||
self.killer.kill(KeyboardInterrupt)
|
||||
with eventlet.Timeout(1):
|
||||
pool.waitall()
|
||||
|
||||
|
||||
class TestWebSocketSSL(tests.wsgi_test._TestBase):
|
||||
def set_site(self):
|
||||
|
|
|
@ -255,7 +255,7 @@ class _TestBase(tests.LimitedTestCase):
|
|||
if self.killer:
|
||||
greenthread.kill(self.killer)
|
||||
|
||||
self.killer = eventlet.spawn_n(target, **kwargs)
|
||||
self.killer = eventlet.spawn(target, **kwargs)
|
||||
|
||||
def set_site(self):
|
||||
raise NotImplementedError
|
||||
|
@ -557,8 +557,8 @@ class TestHttpd(_TestBase):
|
|||
def server(sock, site, log):
|
||||
try:
|
||||
serv = wsgi.Server(sock, sock.getsockname(), site, log)
|
||||
client_socket = sock.accept()
|
||||
serv.process_request(client_socket)
|
||||
client_socket, addr = sock.accept()
|
||||
serv.process_request([addr, client_socket, wsgi.STATE_IDLE])
|
||||
return True
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
|
@ -1471,13 +1471,14 @@ class TestHttpd(_TestBase):
|
|||
sock.close()
|
||||
|
||||
request_thread = eventlet.spawn(make_request)
|
||||
server_conn = server_sock.accept()
|
||||
client_sock, addr = server_sock.accept()
|
||||
# Next line must not raise IOError -32 Broken pipe
|
||||
server.process_request(server_conn)
|
||||
server.process_request([addr, client_sock, wsgi.STATE_IDLE])
|
||||
request_thread.wait()
|
||||
server_sock.close()
|
||||
|
||||
def test_server_connection_timeout_exception(self):
|
||||
self.reset_timeout(5)
|
||||
# Handle connection socket timeouts
|
||||
# https://bitbucket.org/eventlet/eventlet/issue/143/
|
||||
# Runs tests.wsgi_test_conntimeout in a separate process.
|
||||
|
@ -1582,6 +1583,23 @@ class TestHttpd(_TestBase):
|
|||
log_content = self.logfile.getvalue()
|
||||
assert log_content == ''
|
||||
|
||||
def test_close_idle_connections(self):
|
||||
self.reset_timeout(2)
|
||||
pool = eventlet.GreenPool()
|
||||
self.spawn_server(custom_pool=pool)
|
||||
# https://github.com/eventlet/eventlet/issues/188
|
||||
sock = eventlet.connect(self.server_addr)
|
||||
|
||||
sock.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
|
||||
result = read_http(sock)
|
||||
assert result.status == 'HTTP/1.1 200 OK', 'Received status {0!r}'.format(result.status)
|
||||
self.killer.kill(KeyboardInterrupt)
|
||||
try:
|
||||
with eventlet.Timeout(1):
|
||||
pool.waitall()
|
||||
except Exception:
|
||||
assert False, self.logfile.getvalue()
|
||||
|
||||
|
||||
def read_headers(sock):
|
||||
fd = sock.makefile('rb')
|
||||
|
|
Loading…
Reference in New Issue