From 12d8a53fffea6e4bed8ba3d502ce625f5c6710b9 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Thu, 18 Jun 2015 12:58:03 -0700 Subject: [PATCH] Get better at closing WSGI iterables. PEP 333 (WSGI) says: "If the iterable returned by the application has a close() method, the server or gateway must call that method upon completion of the current request[.]" There's a bunch of places where we weren't doing that; some of them matter more than others. Calling .close() can prevent a connection leak in some cases. In others, it just provides a certain pedantic smugness. Either way, we should do what WSGI requires. Noteworthy goofs include: * If a client is downloading a large object and disconnects halfway through, a proxy -> obj connection may be leaked. In this case, the WSGI iterable is a SegmentedIterable, which lacked a close() method. Thus, when the WSGI server noticed the client disconnect, it had no way of telling the SegmentedIterable about it, and so the underlying iterable for the segment's data didn't get closed. Here, it seems likely (though unproven) that the object server would time out and kill the connection, or that a ChunkWriteTimeout would fire down in the proxy server, so the leaked connection would eventually go away. However, a flurry of client disconnects could leave a big pile of useless connections. * If a conditional request receives a 304 or 412, the underlying app_iter is not closed. This mostly affects conditional requests for large objects. The leaked connections were noticed by this patch's co-author, who made the changes to SegmentedIterable. Those changes helped, but did not completely fix, the issue. The rest of the patch is an attempt to plug the rest of the holes. Co-Authored-By: Romain LE DISEZ Change-Id: I168e147aae7c1728e7e3fdabb7fba6f2d747d937 Closes-Bug: #1466549 --- swift/common/middleware/dlo.py | 8 ++++-- swift/common/middleware/slo.py | 10 ++++--- swift/common/request_helpers.py | 35 +++++++++---------------- swift/common/swob.py | 9 ++++++- swift/common/utils.py | 22 ++++++++++++++++ swift/proxy/controllers/obj.py | 4 +-- test/unit/common/middleware/helpers.py | 32 +++++++++++++++++++++- test/unit/common/middleware/test_dlo.py | 10 +++++-- test/unit/common/middleware/test_slo.py | 13 ++++++--- 9 files changed, 105 insertions(+), 38 deletions(-) diff --git a/swift/common/middleware/dlo.py b/swift/common/middleware/dlo.py index d2761acb67..9330ccb8cb 100644 --- a/swift/common/middleware/dlo.py +++ b/swift/common/middleware/dlo.py @@ -22,7 +22,8 @@ from swift.common.http import is_success from swift.common.swob import Request, Response, \ HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict from swift.common.utils import get_logger, json, \ - RateLimitedIterator, read_conf_dir, quote + RateLimitedIterator, read_conf_dir, quote, close_if_possible, \ + closing_if_possible from swift.common.request_helpers import SegmentedIterable from swift.common.wsgi import WSGIContext, make_subrequest from urllib import unquote @@ -48,7 +49,8 @@ class GetContext(WSGIContext): con_resp = con_req.get_response(self.dlo.app) if not is_success(con_resp.status_int): return con_resp, None - return None, json.loads(''.join(con_resp.app_iter)) + with closing_if_possible(con_resp.app_iter): + return None, json.loads(''.join(con_resp.app_iter)) def _segment_listing_iterator(self, req, version, account, container, prefix, segments, first_byte=None, @@ -107,6 +109,7 @@ class GetContext(WSGIContext): # we've already started sending the response body to the # client, so all we can do is raise an exception to make the # WSGI server close the connection early + close_if_possible(error_response.app_iter) raise ListingIterError( "Got status %d listing container /%s/%s" % (error_response.status_int, account, container)) @@ -233,6 +236,7 @@ class GetContext(WSGIContext): # make sure this response is for a dynamic large object manifest for header, value in self._response_headers: if (header.lower() == 'x-object-manifest'): + close_if_possible(resp_iter) response = self.get_or_head_response(req, value) return response(req.environ, start_response) else: diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py index 241210d6ae..4fce4f9d32 100644 --- a/swift/common/middleware/slo.py +++ b/swift/common/middleware/slo.py @@ -159,9 +159,9 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \ Response from swift.common.utils import json, get_logger, config_true_value, \ get_valid_utf8_str, override_bytes_from_content_type, split_path, \ - register_swift_info, RateLimitedIterator, quote -from swift.common.request_helpers import SegmentedIterable, \ - closing_if_possible, close_if_possible + register_swift_info, RateLimitedIterator, quote, close_if_possible, \ + closing_if_possible +from swift.common.request_helpers import SegmentedIterable from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success from swift.common.wsgi import WSGIContext, make_subrequest @@ -239,6 +239,7 @@ class SloGetContext(WSGIContext): sub_resp = sub_req.get_response(self.slo.app) if not is_success(sub_resp.status_int): + close_if_possible(sub_resp.app_iter) raise ListingIterError( 'ERROR: while fetching %s, GET of submanifest %s ' 'failed with status %d' % (req.path, sub_req.path, @@ -412,7 +413,8 @@ class SloGetContext(WSGIContext): return response(req.environ, start_response) def get_or_head_response(self, req, resp_headers, resp_iter): - resp_body = ''.join(resp_iter) + with closing_if_possible(resp_iter): + resp_body = ''.join(resp_iter) try: segments = json.loads(resp_body) except ValueError: diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index c9da1cb754..c7d551c307 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -23,7 +23,6 @@ from swob in here without creating circular imports. import hashlib import itertools import time -from contextlib import contextmanager from urllib import unquote from swift import gettext_ as _ from swift.common.storage_policy import POLICIES @@ -32,7 +31,8 @@ from swift.common.exceptions import ListingIterError, SegmentError from swift.common.http import is_success from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable, HTTPServiceUnavailable) -from swift.common.utils import split_path, validate_device_partition +from swift.common.utils import split_path, validate_device_partition, \ + close_if_possible from swift.common.wsgi import make_subrequest @@ -249,26 +249,6 @@ def copy_header_subset(from_r, to_r, condition): to_r.headers[k] = v -def close_if_possible(maybe_closable): - close_method = getattr(maybe_closable, 'close', None) - if callable(close_method): - return close_method() - - -@contextmanager -def closing_if_possible(maybe_closable): - """ - Like contextlib.closing(), but doesn't crash if the object lacks a close() - method. - - PEP 333 (WSGI) says: "If the iterable returned by the application has a - close() method, the server or gateway must call that method upon - completion of the current request[.]" This function makes that easier. - """ - yield maybe_closable - close_if_possible(maybe_closable) - - class SegmentedIterable(object): """ Iterable that returns the object contents for a large object. @@ -304,6 +284,7 @@ class SegmentedIterable(object): self.peeked_chunk = None self.app_iter = self._internal_iter() self.validated_first_segment = False + self.current_resp = None def _internal_iter(self): start_time = time.time() @@ -360,6 +341,8 @@ class SegmentedIterable(object): 'r_size': seg_resp.content_length, 's_etag': seg_etag, 's_size': seg_size}) + else: + self.current_resp = seg_resp seg_hash = hashlib.md5() for chunk in seg_resp.app_iter: @@ -431,3 +414,11 @@ class SegmentedIterable(object): return itertools.chain([pc], self.app_iter) else: return self.app_iter + + def close(self): + """ + Called when the client disconnect. Ensure that the connection to the + backend server is closed. + """ + if self.current_resp: + close_if_possible(self.current_resp.app_iter) diff --git a/swift/common/swob.py b/swift/common/swob.py index 39f0c0e3cb..b35be6849f 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -49,7 +49,8 @@ import random import functools import inspect -from swift.common.utils import reiterate, split_path, Timestamp, pairs +from swift.common.utils import reiterate, split_path, Timestamp, pairs, \ + close_if_possible from swift.common.exceptions import InvalidTimestamp @@ -1220,12 +1221,14 @@ class Response(object): etag in self.request.if_none_match: self.status = 304 self.content_length = 0 + close_if_possible(app_iter) return [''] if etag and self.request.if_match and \ etag not in self.request.if_match: self.status = 412 self.content_length = 0 + close_if_possible(app_iter) return [''] if self.status_int == 404 and self.request.if_match \ @@ -1236,18 +1239,21 @@ class Response(object): # Failed) response. [RFC 2616 section 14.24] self.status = 412 self.content_length = 0 + close_if_possible(app_iter) return [''] if self.last_modified and self.request.if_modified_since \ and self.last_modified <= self.request.if_modified_since: self.status = 304 self.content_length = 0 + close_if_possible(app_iter) return [''] if self.last_modified and self.request.if_unmodified_since \ and self.last_modified > self.request.if_unmodified_since: self.status = 412 self.content_length = 0 + close_if_possible(app_iter) return [''] if self.request and self.request.method == 'HEAD': @@ -1261,6 +1267,7 @@ class Response(object): if ranges == []: self.status = 416 self.content_length = 0 + close_if_possible(app_iter) return [''] elif ranges: range_size = len(ranges) diff --git a/swift/common/utils.py b/swift/common/utils.py index d470fb9970..47aff54e9a 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3144,6 +3144,28 @@ def ismount_raw(path): return False +def close_if_possible(maybe_closable): + close_method = getattr(maybe_closable, 'close', None) + if callable(close_method): + return close_method() + + +@contextmanager +def closing_if_possible(maybe_closable): + """ + Like contextlib.closing(), but doesn't crash if the object lacks a close() + method. + + PEP 333 (WSGI) says: "If the iterable returned by the application has a + close() method, the server or gateway must call that method upon + completion of the current request[.]" This function makes that easier. + """ + try: + yield maybe_closable + finally: + close_if_possible(maybe_closable) + + _rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+' _rfc_extension_pattern = re.compile( r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token + diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 10e83bcad7..609f21b5d9 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -44,7 +44,7 @@ from swift.common.utils import ( GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp, normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, - quorum_size, reiterate) + quorum_size, reiterate, close_if_possible) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ check_copy_from_header, check_destination_header, \ @@ -70,7 +70,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException, \ HTTPRequestedRangeNotSatisfiable, Range from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \ - remove_items, copy_header_subset, close_if_possible + remove_items, copy_header_subset def copy_headers_into(from_r, to_r): diff --git a/test/unit/common/middleware/helpers.py b/test/unit/common/middleware/helpers.py index 68a4bfee3d..7c1b45571e 100644 --- a/test/unit/common/middleware/helpers.py +++ b/test/unit/common/middleware/helpers.py @@ -15,6 +15,7 @@ # This stuff can't live in test/unit/__init__.py due to its swob dependency. +from collections import defaultdict from copy import deepcopy from hashlib import md5 from swift.common import swob @@ -23,6 +24,20 @@ from swift.common.utils import split_path from test.unit import FakeLogger, FakeRing +class LeakTrackingIter(object): + def __init__(self, inner_iter, fake_swift, path): + self.inner_iter = inner_iter + self.fake_swift = fake_swift + self.path = path + + def __iter__(self): + for x in self.inner_iter: + yield x + + def close(self): + self.fake_swift.mark_closed(self.path) + + class FakeSwift(object): """ A good-enough fake Swift proxy server to use in testing middleware. @@ -30,6 +45,7 @@ class FakeSwift(object): def __init__(self): self._calls = [] + self._unclosed_req_paths = defaultdict(int) self.req_method_paths = [] self.swift_sources = [] self.uploaded = {} @@ -105,7 +121,21 @@ class FakeSwift(object): req = swob.Request(env) resp = resp_class(req=req, headers=headers, body=body, conditional_response=True) - return resp(env, start_response) + wsgi_iter = resp(env, start_response) + self.mark_opened(path) + return LeakTrackingIter(wsgi_iter, self, path) + + def mark_opened(self, path): + self._unclosed_req_paths[path] += 1 + + def mark_closed(self, path): + self._unclosed_req_paths[path] -= 1 + + @property + def unclosed_requests(self): + return {path: count + for path, count in self._unclosed_req_paths.items() + if count > 0} @property def calls(self): diff --git a/test/unit/common/middleware/test_dlo.py b/test/unit/common/middleware/test_dlo.py index 16237eb1d1..119e4aba55 100644 --- a/test/unit/common/middleware/test_dlo.py +++ b/test/unit/common/middleware/test_dlo.py @@ -26,6 +26,7 @@ import unittest from swift.common import exceptions, swob from swift.common.middleware import dlo +from swift.common.utils import closing_if_possible from test.unit.common.middleware.helpers import FakeSwift @@ -54,8 +55,10 @@ class DloTestCase(unittest.TestCase): body = '' caught_exc = None try: - for chunk in body_iter: - body += chunk + # appease the close-checker + with closing_if_possible(body_iter): + for chunk in body_iter: + body += chunk except Exception as exc: if expect_exception: caught_exc = exc @@ -279,6 +282,9 @@ class TestDloHeadManifest(DloTestCase): class TestDloGetManifest(DloTestCase): + def tearDown(self): + self.assertEqual(self.app.unclosed_requests, {}) + def test_get_manifest(self): expected_etag = '"%s"' % md5hex( md5hex("aaaaa") + md5hex("bbbbb") + md5hex("ccccc") + diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index 86a11734d3..d5129da4aa 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -24,7 +24,7 @@ from swift.common import swob, utils from swift.common.exceptions import ListingIterError, SegmentError from swift.common.middleware import slo from swift.common.swob import Request, Response, HTTPException -from swift.common.utils import quote, json +from swift.common.utils import quote, json, closing_if_possible from test.unit.common.middleware.helpers import FakeSwift @@ -74,8 +74,10 @@ class SloTestCase(unittest.TestCase): body = '' caught_exc = None try: - for chunk in body_iter: - body += chunk + # appease the close-checker + with closing_if_possible(body_iter): + for chunk in body_iter: + body += chunk except Exception as exc: if expect_exception: caught_exc = exc @@ -232,7 +234,7 @@ class TestSloPutManifest(SloTestCase): '/?multipart-manifest=put', environ={'REQUEST_METHOD': 'PUT'}, body=test_json_data) self.assertEquals( - self.slo.handle_multipart_put(req, fake_start_response), + list(self.slo.handle_multipart_put(req, fake_start_response)), ['passed']) def test_handle_multipart_put_success(self): @@ -949,6 +951,9 @@ class TestSloGetManifest(SloTestCase): 'X-Object-Meta-Fish': 'Bass'}, "[not {json (at ++++all") + def tearDown(self): + self.assertEqual(self.app.unclosed_requests, {}) + def test_get_manifest_passthrough(self): req = Request.blank( '/v1/AUTH_test/gettest/manifest-bc?multipart-manifest=get',