Better error handling for EC PUT path when client goes away

There are a few places in the PUT path where the object server is
reading WSGI input and can find that there's nothing there.  e.g. in the
middle of a 2 phase commit and the proxy goes away for whatever reason,
like maybe it timed out because things are really busy.  Anyway, this
results in the ugly ValueError coming out of eventlet.wsgi about a
zillion levels away from the PUT path.

Expanding on the test cases from lp bug #1496205 and lp bug #1469094
this change carefully narrows into our read/readline calls to
wsgi_input and makes sure to tranlsate the ValueError to a
ChunkReadError - which the object.server can handle along with
ChunkReadTimeout.  When it made sense, this change attempts to stay
consistent throughout the code path in logging/raising client disconnect
instead of timeout.

It's unfortunate the error coming out of eventlet is so generic, but
that will be improved in future versions [1].

1. c3ce3eef0b

Related-Bug: #1469094
Related-Bug: #1496205
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Change-Id: I9e4dbf26623c0c6fc5c87afd14349466aa157385
This commit is contained in:
paul luse 2015-08-10 14:37:10 -07:00
parent 9046676968
commit 696186c680
3 changed files with 176 additions and 22 deletions

View File

@ -3330,7 +3330,10 @@ class _MultipartMimeFileLikeObject(object):
if len(self.input_buffer) < length + len(self.boundary) + 2:
to_read = length + len(self.boundary) + 2
while to_read > 0:
chunk = self.wsgi_input.read(to_read)
try:
chunk = self.wsgi_input.read(to_read)
except (IOError, ValueError) as e:
raise swift.common.exceptions.ChunkReadError(str(e))
to_read -= len(chunk)
self.input_buffer += chunk
if not chunk:
@ -3400,9 +3403,12 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
"""
boundary = '--' + boundary
blen = len(boundary) + 2 # \r\n
got = wsgi_input.readline(blen)
while got == '\r\n':
try:
got = wsgi_input.readline(blen)
while got == '\r\n':
got = wsgi_input.readline(blen)
except (IOError, ValueError) as e:
raise swift.common.exceptions.ChunkReadError(str(e))
if got.strip() != boundary:
raise swift.common.exceptions.MimeInvalid(

View File

@ -405,8 +405,10 @@ class ObjectController(BaseStorageServer):
if commit_hdrs.get('X-Document', None) == "put commit":
rcvd_commit = True
drain(commit_iter, self.network_chunk_size, self.client_timeout)
except (ChunkReadTimeout, ChunkReadError):
except ChunkReadError:
raise HTTPClientDisconnect()
except ChunkReadTimeout:
raise HTTPRequestTimeout()
except StopIteration:
raise HTTPBadRequest(body="couldn't find PUT commit MIME doc")
return rcvd_commit
@ -415,16 +417,20 @@ class ObjectController(BaseStorageServer):
try:
with ChunkReadTimeout(self.client_timeout):
footer_hdrs, footer_iter = next(mime_documents_iter)
except ChunkReadTimeout:
except ChunkReadError:
raise HTTPClientDisconnect()
except ChunkReadTimeout:
raise HTTPRequestTimeout()
except StopIteration:
raise HTTPBadRequest(body="couldn't find footer MIME doc")
timeout_reader = self._make_timeout_reader(footer_iter)
try:
footer_body = ''.join(iter(timeout_reader, ''))
except ChunkReadTimeout:
except ChunkReadError:
raise HTTPClientDisconnect()
except ChunkReadTimeout:
raise HTTPRequestTimeout()
footer_md5 = footer_hdrs.get('Content-MD5')
if not footer_md5:
@ -609,6 +615,8 @@ class ObjectController(BaseStorageServer):
request.environ['wsgi.input'],
mime_boundary, self.network_chunk_size)
_junk_hdrs, obj_input = next(mime_documents_iter)
except ChunkReadError:
return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
@ -622,6 +630,8 @@ class ObjectController(BaseStorageServer):
etag.update(chunk)
upload_size = writer.write(chunk)
elapsed_time += time.time() - start_time
except ChunkReadError:
return HTTPClientDisconnect(request=request)
except ChunkReadTimeout:
return HTTPRequestTimeout(request=request)
if upload_size:
@ -682,8 +692,10 @@ class ObjectController(BaseStorageServer):
_junk_hdrs, _junk_body = next(mime_documents_iter)
drain(_junk_body, self.network_chunk_size,
self.client_timeout)
except ChunkReadTimeout:
except ChunkReadError:
raise HTTPClientDisconnect()
except ChunkReadTimeout:
raise HTTPRequestTimeout()
except StopIteration:
pass

View File

@ -5271,9 +5271,10 @@ class TestObjectServer(unittest.TestCase):
'mount_check': 'false',
}
self.logger = debug_logger('test-object-server')
app = object_server.ObjectController(self.conf, logger=self.logger)
self.app = object_server.ObjectController(
self.conf, logger=self.logger)
sock = listen(('127.0.0.1', 0))
self.server = spawn(wsgi.server, sock, app, utils.NullLogger())
self.server = spawn(wsgi.server, sock, self.app, utils.NullLogger())
self.port = sock.getsockname()[1]
def tearDown(self):
@ -5367,6 +5368,34 @@ class TestObjectServer(unittest.TestCase):
resp.read()
resp.close()
def test_expect_on_multiphase_put_diconnect(self):
put_timestamp = utils.Timestamp(time()).internal
headers = {
'Content-Type': 'text/plain',
'X-Timestamp': put_timestamp,
'Transfer-Encoding': 'chunked',
'Expect': '100-continue',
'X-Backend-Obj-Content-Length': 0,
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
'X-Backend-Obj-Multiphase-Commit': 'yes',
}
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
'PUT', '/a/c/o', headers=headers)
resp = conn.getexpect()
self.assertEqual(resp.status, 100)
headers = HeaderKeyDict(resp.getheaders())
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
conn.send('c\r\n--boundary123\r\n')
# disconnect client
conn.sock.fd._sock.close()
for i in range(2):
sleep(0)
self.assertFalse(self.logger.get_lines_for_level('error'))
for line in self.logger.get_lines_for_level('info'):
self.assertIn(' 499 ', line)
def find_files(self):
found_files = defaultdict(list)
for root, dirs, files in os.walk(self.devices):
@ -5377,8 +5406,10 @@ class TestObjectServer(unittest.TestCase):
return found_files
@contextmanager
def _check_multiphase_put_commit_handling(self, test_doc=None,
headers=None):
def _check_multiphase_put_commit_handling(self,
test_doc=None,
headers=None,
finish_body=True):
"""
This helper will setup a multiphase chunked PUT request and yield at
the context at the commit phase (after getting the second expect-100
@ -5393,6 +5424,8 @@ class TestObjectServer(unittest.TestCase):
:param headers: headers to send along with the initial request; some
object-metadata (e.g. X-Backend-Obj-Content-Length)
is generally expected tomatch the test_doc)
:param finish_body: boolean, if true send "0\r\n\r\n" after test_doc
and wait for 100-continue before yeilding context
"""
test_data = 'obj data'
footer_meta = {
@ -5439,11 +5472,13 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual(resp.status, 100)
expect_headers = HeaderKeyDict(resp.getheaders())
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
to_send = "%x\r\n%s\r\n" % (len(test_doc), test_doc)
conn.send(to_send)
# verify 100-continue response to mark end of phase1
resp = conn.getexpect()
self.assertEqual(resp.status, 100)
if finish_body:
conn.send("0\r\n\r\n")
# verify 100-continue response to mark end of phase1
resp = conn.getexpect()
self.assertEqual(resp.status, 100)
# yield relevant context for test
yield {
@ -5453,10 +5488,9 @@ class TestObjectServer(unittest.TestCase):
'mock_container_update': _container_update,
}
for i in range(3):
# give the object server a few trampolines to recognize request
# has finished, or socket has closed or whatever
sleep(0)
# give the object server a few trampolines to recognize request
# has finished, or socket has closed or whatever
sleep(0.1)
def test_multiphase_put_client_disconnect_right_before_commit(self):
with self._check_multiphase_put_commit_handling() as context:
@ -5480,7 +5514,7 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
self.assertEqual(found_files['.druable'], [])
self.assertEqual(found_files['.durable'], [])
# And no continer update
self.assertFalse(_container_update.called)
@ -5518,7 +5552,7 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
self.assertEqual(found_files['.druable'], [])
self.assertEqual(found_files['.durable'], [])
# And no continer update
self.assertFalse(_container_update.called)
@ -5620,6 +5654,58 @@ class TestObjectServer(unittest.TestCase):
# And continer update was called
self.assertTrue(context['mock_container_update'].called)
def test_multiphase_put_metadata_footer_disconnect(self):
test_data = 'obj data'
test_doc = "\r\n".join((
"--boundary123",
"X-Document: object body",
"",
test_data,
"--boundary123",
))
# eventlet.wsgi won't return < network_chunk_size from a chunked read
self.app.network_chunk_size = 16
with self._check_multiphase_put_commit_handling(
test_doc=test_doc, finish_body=False) as context:
conn = context['conn']
# make footer doc
footer_meta = {
"X-Object-Sysmeta-Ec-Frag-Index": "2",
"Etag": md5(test_data).hexdigest(),
}
footer_json = json.dumps(footer_meta)
footer_meta_cksum = md5(footer_json).hexdigest()
# send most of the footer doc
footer_doc = "\r\n".join((
"X-Document: object metadata",
"Content-MD5: " + footer_meta_cksum,
"",
footer_json,
))
# but don't send final boundry or last chunk
to_send = "%x\r\n%s\r\n" % \
(len(footer_doc), footer_doc)
conn.send(to_send)
# and then bail out
conn.sock.fd._sock.close()
# and make sure it demonstrates the client disconnect
log_lines = self.logger.get_lines_for_level('info')
self.assertEqual(len(log_lines), 1)
self.assertIn(' 499 ', log_lines[0])
# no artifacts left on disk
found_files = self.find_files()
self.assertEqual(len(found_files['.data']), 0)
self.assertEqual(len(found_files['.durable']), 0)
# ... and no continer update
_container_update = context['mock_container_update']
self.assertFalse(_container_update.called)
def test_multiphase_put_ec_fragment_in_headers_no_footers(self):
test_data = 'obj data'
test_doc = "\r\n".join((
@ -5714,7 +5800,7 @@ class TestObjectServer(unittest.TestCase):
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# but .durable isn't
self.assertEqual(found_files['.druable'], [])
self.assertEqual(found_files['.durable'], [])
# And no continer update
self.assertFalse(_container_update.called)
@ -5773,6 +5859,56 @@ class TestObjectServer(unittest.TestCase):
# And continer update was called
self.assertTrue(context['mock_container_update'].called)
def test_multiphase_put_drains_extra_commit_junk_disconnect(self):
commit_confirmation_doc = "\r\n".join((
"X-Document: put commit",
"",
"commit_confirmation",
"--boundary123",
"X-Document: we got cleverer",
"",
"stuff stuff meaningless stuuuuuuuuuuff",
"--boundary123",
"X-Document: we got even cleverer; can you believe it?",
"Waneshaft: ambifacient lunar",
"Casing: malleable logarithmic",
"",
"potato potato potato potato potato potato potato",
))
# eventlet.wsgi won't return < network_chunk_size from a chunked read
self.app.network_chunk_size = 16
with self._check_multiphase_put_commit_handling() as context:
conn = context['conn']
# send commit confirmation and some other stuff
# but don't send final boundry or last chunk
to_send = "%x\r\n%s\r\n" % \
(len(commit_confirmation_doc), commit_confirmation_doc)
conn.send(to_send)
# and then bail out
conn.sock.fd._sock.close()
# and make sure it demonstrates the client disconnect
log_lines = self.logger.get_lines_for_level('info')
self.assertEqual(len(log_lines), 1)
self.assertIn(' 499 ', log_lines[0])
# verify successful object data and durable state file write
put_timestamp = context['put_timestamp']
found_files = self.find_files()
# .data file is there
self.assertEqual(len(found_files['.data']), 1)
obj_datafile = found_files['.data'][0]
self.assertEqual("%s#2.data" % put_timestamp.internal,
os.path.basename(obj_datafile))
# ... and .durable is there
self.assertEqual(len(found_files['.durable']), 1)
durable_file = found_files['.durable'][0]
self.assertEqual("%s.durable" % put_timestamp.internal,
os.path.basename(durable_file))
# but no continer update
self.assertFalse(context['mock_container_update'].called)
@patch_policies
class TestZeroCopy(unittest.TestCase):