proxy-server: add replicated GET path tests

Improve test coverage for the resuming multipart replicated GET path.

Change-Id: I7de34f443399f645f5021ed392e515f795ed7249
This commit is contained in:
Alistair Coles 2023-09-19 11:44:29 +01:00
parent 01c7ade798
commit f8c94d6bbc
2 changed files with 167 additions and 4 deletions

View File

@ -1099,7 +1099,7 @@ def requires_o_tmpfile_support_in_tmp(func):
class StubResponse(object):
def __init__(self, status, body=b'', headers=None, frag_index=None,
slowdown=None):
slowdown=None, slowdown_after=0):
self.status = status
self.body = body
self.readable = BytesIO(body)
@ -1107,13 +1107,17 @@ class StubResponse(object):
self._slowdown = iter(slowdown)
except TypeError:
self._slowdown = iter([slowdown])
self.slowdown_after = slowdown_after
self.headers = HeaderKeyDict(headers)
if frag_index is not None:
self.headers['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
fake_reason = ('Fake', 'This response is a lie.')
self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0]
self.bytes_read = 0
def slowdown(self):
if self.bytes_read < self.slowdown_after:
return
try:
wait = next(self._slowdown)
except StopIteration:
@ -1135,11 +1139,15 @@ class StubResponse(object):
def read(self, amt=0):
self.slowdown()
return self.readable.read(amt)
res = self.readable.read(amt)
self.bytes_read += len(res)
return res
def readline(self, size=-1):
self.slowdown()
return self.readable.readline(size)
res = self.readable.readline(size)
self.bytes_read += len(res)
return res
def __repr__(self):
info = ['Status: %s' % self.status]

View File

@ -1595,7 +1595,8 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
headers=headers, slow=read_sleeps) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
_ = resp.body
body = resp.body
self.assertEqual(b'test', body)
self.assertEqual(len(log.requests), 2)
def make_key(r):
@ -1623,6 +1624,159 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
'Expected 1 ERROR lines, got %r' % (
self.logger.logger.records['ERROR'], ))
def _do_test_GET_with_multirange_slow_body_resumes(
self, slowdown_after=0, resume_bytes=0):
self.app.logger.clear()
self.app.recoverable_node_timeout = 0.01
self.app.object_chunk_size = 10
obj_data = b''.join([b'testing%03d' % i for i in range(100)])
etag = md5(obj_data, usedforsecurity=False).hexdigest()
boundary1 = b'81eb9c110b32ced5fe'
resp_body1 = b'\r\n'.join([
b'--' + boundary1,
b'Content-Type: application/octet-stream',
b'Content-Range: bytes 0-49/700',
b'',
obj_data[0:50],
b'--' + boundary1,
b'Content-Type: application/octet-stream',
b'Content-Range: bytes 100-104/700',
b'',
obj_data[100:105],
b'--' + boundary1 + b'--',
])
boundary2 = b'aaeb9c110b32ced5fe'
resp_body2 = b'\r\n'.join([
b'--' + boundary2,
b'Content-Type: application/octet-stream',
b'Content-Range: bytes %d-49/700' % resume_bytes,
b'',
obj_data[resume_bytes:50],
b'--' + boundary2,
b'Content-Type: application/octet-stream',
b'Content-Range: bytes 100-104/700',
b'',
obj_data[100:105],
b'--' + boundary2 + b'--',
])
headers1 = {
'Etag': etag,
'Content-Type': b'multipart/byteranges;boundary=' + boundary1,
'Content-Length': len(resp_body1),
'X-Timestamp': Timestamp(self.ts()).normal,
}
headers2 = {
'Etag': etag,
'Content-Type': b'multipart/byteranges;boundary=' + boundary2,
'Content-Length': len(resp_body2),
'X-Timestamp': Timestamp(self.ts()).normal,
}
responses = [
StubResponse(206, resp_body1, headers1, slowdown=0.1,
slowdown_after=slowdown_after),
StubResponse(206, resp_body2, headers2)
]
req_range_hdrs = []
def get_response(req):
req_range_hdrs.append(req['headers'].get('Range'))
return responses.pop(0) if responses else StubResponse(404)
req = swob.Request.blank('/v1/a/c/o', headers={
'Range': 'bytes=0-49,100-104'})
with capture_http_requests(get_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 206)
actual_body = resp.body
self.assertEqual(resp.status_int, 206)
self.assertEqual(2, len(log))
# note: client response uses boundary from first backend response
self.assertEqual(resp_body1, actual_body)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_lines))
self.assertIn('Trying to read object during GET ', error_lines[0])
return req_range_hdrs
def test_GET_with_multirange_slow_body_resumes(self):
req_range_hdrs = self._do_test_GET_with_multirange_slow_body_resumes(
slowdown_after=0)
self.assertEqual(['bytes=0-49,100-104'] * 2, req_range_hdrs)
def test_GET_with_multirange_slow_body_resumes_before_body_started(self):
# First response times out while first part boundary/headers are being
# read. No part body has been yielded to the client so range header is
# not adjusted for the second backend request.
req_range_hdrs = self._do_test_GET_with_multirange_slow_body_resumes(
slowdown_after=40, resume_bytes=0)
self.assertEqual(['bytes=0-49,100-104'] * 2, req_range_hdrs)
def test_GET_with_multirange_slow_body_resumes_after_body_started(self):
# First response times out after first part boundary/headers have been
# read. Some part body has been yielded to the client so range header
# is adjusted for the second backend request.
# 140 bytes before timeout is sufficient for the part boundary, headers
# and approx 50 body bytes to be read, but _MultipartMimeFileLikeObject
# buffers bytes from the backend response such that only 20 bytes are
# actually yielded to the client.
req_range_hdrs = self._do_test_GET_with_multirange_slow_body_resumes(
slowdown_after=140, resume_bytes=20)
self.assertEqual(['bytes=0-49,100-104', 'bytes=20-49,100-104'],
req_range_hdrs)
def test_GET_with_multirange_slow_body_unable_to_resume(self):
self.app.recoverable_node_timeout = 0.01
self.app.object_chunk_size = 10
obj_data = b'testing' * 100
etag = md5(obj_data, usedforsecurity=False).hexdigest()
boundary = b'81eb9c110b32ced5fe'
resp_body = b'\r\n'.join([
b'--' + boundary,
b'Content-Type: application/octet-stream',
b'Content-Range: bytes 0-49/700',
b'',
obj_data[0:50],
b'--' + boundary,
b'Content-Type: application/octet-stream',
b'Content-Range: bytes 100-104/700',
b'',
obj_data[100:105],
b'--' + boundary + b'--',
])
headers = {
'Etag': etag,
'Content-Type': b'multipart/byteranges;boundary=' + boundary,
'Content-Length': len(resp_body),
'X-Timestamp': Timestamp(self.ts()).normal,
}
responses = [
StubResponse(206, resp_body, headers, slowdown=0.1),
StubResponse(206, resp_body, headers, slowdown=0.1),
StubResponse(206, resp_body, headers, slowdown=0.1),
]
def get_response(req):
return responses.pop(0) if responses else StubResponse(404)
req = swob.Request.blank('/v1/a/c/o', headers={
'Range': 'bytes=0-49,100-104'})
with capture_http_requests(get_response) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 206)
actual_body = resp.body
self.assertEqual(resp.status_int, 206)
self.assertEqual(6, len(log))
resp_boundary = resp.headers['content-type'].rsplit('=', 1)[1].encode()
self.assertEqual(b'--%s--' % resp_boundary, actual_body)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertEqual(3, len(error_lines))
for line in error_lines:
self.assertIn('Trying to read object during GET ', line)
def test_GET_resuming_ignores_416(self):
# verify that a resuming getter will not try to use the content of a
# 416 response (because it's etag will mismatch that from the first
@ -4535,6 +4689,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
'Range': 'bytes=1000-2000,14000-15000'})
with capture_http_requests(get_response) as log:
resp = req.get_response(self.app)
_ = resp.body
self.assertEqual(resp.status_int, 500)
self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2)
log_lines = self.app.logger.get_lines_for_level('error')