Restore old SIGPIPE handler in a proxy server test.
I think not doing this was setting the SIGPIPE handler to signal.SIG_DFL instead of the Python default of signal.SIG_IGN. This could cause other tests which make a client stop reading before all data "chunks" are read to fail harder than they should (i.e. the SIGPIPE there is benign and even expected--the other side of the socket really did get closed early). Fixed leak on 499s This fixes an issue where Request objects (and related objects) were not getting garbage collected when a 499 (client disconnect) occurred for responses that still would have had more than the proxy server's client chunk size left to send. Fixed bug #1055834 Change-Id: I40266a0874cd2142c90f26b9f030d534286fc6da
This commit is contained in:
parent
50d72a1a80
commit
7368af5475
|
@ -1,3 +1,7 @@
|
|||
swift (1.7.4)
|
||||
|
||||
* Fix issue where early client disconnects may have caused a memory leak
|
||||
|
||||
swift (1.7.2)
|
||||
|
||||
* Fix issue where memcache serialization was not properly loading
|
||||
|
|
|
@ -14,7 +14,7 @@ class Version(object):
|
|||
return '%s-dev' % (self.canonical_version,)
|
||||
|
||||
|
||||
_version = Version('1.7.2', True)
|
||||
_version = Version('1.7.4', True)
|
||||
__version__ = _version.pretty_version
|
||||
__canonical_version__ = _version.canonical_version
|
||||
|
||||
|
|
|
@ -505,45 +505,40 @@ class Controller(object):
|
|||
if getattr(source, 'swift_conn', None):
|
||||
self.close_swift_conn(source)
|
||||
|
||||
def _make_app_iter(self, node, source, response):
|
||||
def _make_app_iter(self, node, source):
|
||||
"""
|
||||
Returns an iterator over the contents of the source (via its read
|
||||
func). There is also quite a bit of cleanup to ensure garbage
|
||||
collection works and the underlying socket of the source is closed.
|
||||
|
||||
:param response: The webob.Response object this iterator should be
|
||||
assigned to via response.app_iter.
|
||||
:param source: The httplib.Response object this iterator should read
|
||||
from.
|
||||
:param node: The node the source is reading from, for logging purposes.
|
||||
"""
|
||||
try:
|
||||
try:
|
||||
# Spawn reader to read from the source and place in the queue.
|
||||
# We then drop any reference to the source or node, for garbage
|
||||
# collection purposes.
|
||||
queue = Queue(1)
|
||||
spawn_n(self._make_app_iter_reader, node, source, queue,
|
||||
self.app.logger.thread_locals)
|
||||
source = node = None
|
||||
while True:
|
||||
chunk = queue.get(timeout=self.app.node_timeout)
|
||||
if isinstance(chunk, bool): # terminator
|
||||
success = chunk
|
||||
if not success:
|
||||
raise Exception(_('Failed to read all data'
|
||||
' from the source'))
|
||||
break
|
||||
yield chunk
|
||||
except Empty:
|
||||
raise ChunkReadTimeout()
|
||||
except (GeneratorExit, Timeout):
|
||||
self.app.logger.warn(_('Client disconnected on read'))
|
||||
except Exception:
|
||||
self.app.logger.exception(_('Trying to send to client'))
|
||||
raise
|
||||
finally:
|
||||
response.app_iter = None
|
||||
# Spawn reader to read from the source and place in the queue.
|
||||
# We then drop any reference to the source or node, for garbage
|
||||
# collection purposes.
|
||||
queue = Queue(1)
|
||||
spawn_n(self._make_app_iter_reader, node, source, queue,
|
||||
self.app.logger.thread_locals)
|
||||
source = node = None
|
||||
while True:
|
||||
chunk = queue.get(timeout=self.app.node_timeout)
|
||||
if isinstance(chunk, bool): # terminator
|
||||
success = chunk
|
||||
if not success:
|
||||
raise Exception(_('Failed to read all data'
|
||||
' from the source'))
|
||||
break
|
||||
yield chunk
|
||||
except Empty:
|
||||
raise ChunkReadTimeout()
|
||||
except (GeneratorExit, Timeout):
|
||||
self.app.logger.warn(_('Client disconnected on read'))
|
||||
except Exception:
|
||||
self.app.logger.exception(_('Trying to send to client'))
|
||||
raise
|
||||
|
||||
def close_swift_conn(self, src):
|
||||
try:
|
||||
|
@ -656,7 +651,7 @@ class Controller(object):
|
|||
self.close_swift_conn(src)
|
||||
|
||||
res = Response(request=req, conditional_response=True)
|
||||
res.app_iter = self._make_app_iter(node, source, res)
|
||||
res.app_iter = self._make_app_iter(node, source)
|
||||
# See NOTE: swift_conn at top of file about this.
|
||||
res.swift_conn = source.swift_conn
|
||||
update_headers(res, source.getheaders())
|
||||
|
|
|
@ -58,9 +58,29 @@ import swift.proxy.controllers
|
|||
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
|
||||
|
||||
|
||||
_request_instances = 0
|
||||
|
||||
|
||||
def request_init(self, *args, **kwargs):
|
||||
global _request_instances
|
||||
self._orig_init(*args, **kwargs)
|
||||
_request_instances += 1
|
||||
|
||||
|
||||
def request_del(self):
|
||||
global _request_instances
|
||||
if self._orig_del:
|
||||
self._orig_del()
|
||||
_request_instances -= 1
|
||||
|
||||
|
||||
def setup():
|
||||
global _testdir, _test_servers, _test_sockets, \
|
||||
_orig_container_listing_limit, _test_coros
|
||||
Request._orig_init = Request.__init__
|
||||
Request.__init__ = request_init
|
||||
Request._orig_del = getattr(Request, '__del__', None)
|
||||
Request.__del__ = request_del
|
||||
monkey_patch_mimetools()
|
||||
# Since we're starting up a lot here, we're going to test more than
|
||||
# just chunked puts; we're also going to test parts of
|
||||
|
@ -152,6 +172,9 @@ def teardown():
|
|||
swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = \
|
||||
_orig_container_listing_limit
|
||||
rmtree(os.path.dirname(_testdir))
|
||||
Request.__init__ = Request._orig_init
|
||||
if Request._orig_del:
|
||||
Request.__del__ = Request._orig_del
|
||||
|
||||
|
||||
def fake_http_connect(*code_iter, **kwargs):
|
||||
|
@ -711,8 +734,8 @@ class TestObjectController(unittest.TestCase):
|
|||
def handler(_junk1, _junk2):
|
||||
calls[0] += 1
|
||||
|
||||
old_handler = signal.signal(signal.SIGPIPE, handler)
|
||||
try:
|
||||
signal.signal(signal.SIGPIPE, handler)
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
|
@ -739,7 +762,7 @@ class TestObjectController(unittest.TestCase):
|
|||
self.assertEqual(res.body, obj)
|
||||
self.assertEqual(calls[0], 0)
|
||||
finally:
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGPIPE, old_handler)
|
||||
|
||||
def test_PUT_auto_content_type(self):
|
||||
with save_globals():
|
||||
|
@ -3250,6 +3273,43 @@ class TestObjectController(unittest.TestCase):
|
|||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||
|
||||
def test_leak_1(self):
|
||||
global _request_instances
|
||||
prolis = _test_sockets[0]
|
||||
prosrv = _test_servers[0]
|
||||
obj_len = prosrv.client_chunk_size * 2
|
||||
# PUT test file
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
fd.write('PUT /v1/a/c/test_leak_1 HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Auth-Token: t\r\n'
|
||||
'Content-Length: %s\r\n'
|
||||
'Content-Type: application/octet-stream\r\n'
|
||||
'\r\n%s' % (obj_len, 'a' * obj_len))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 201'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
# Remember Request instance count
|
||||
before_request_instances = _request_instances
|
||||
# GET test file, but disconnect early
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
fd.write('GET /v1/a/c/test_leak_1 HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Auth-Token: t\r\n'
|
||||
'\r\n')
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 200'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
fd.read(1)
|
||||
fd.close()
|
||||
sock.close()
|
||||
self.assertEquals(before_request_instances, _request_instances)
|
||||
|
||||
class TestContainerController(unittest.TestCase):
|
||||
"Test swift.proxy_server.ContainerController"
|
||||
|
|
Loading…
Reference in New Issue