Merge "New proxy logging field for wire status"

This commit is contained in:
Zuul 2020-10-14 04:39:13 +00:00 committed by Gerrit Code Review
commit 127bf9707c
8 changed files with 136 additions and 58 deletions

View File

@ -93,6 +93,9 @@ container The container part extracted from the path of the request.
object The object part extracted from the path of the request.
(anonymizable)
pid PID of the process emitting the log line.
wire_status_int The status sent to the client, which may be different than
the logged response code if there was an error during the
body of the request or a disconnect.
=================== ==========================================================
In one log line, all of the above fields are space-separated and url-encoded.

View File

@ -74,9 +74,10 @@ bandwidth usage will want to only sum up logs with no swift.source.
import os
import time
from swift.common.middleware.catch_errors import enforce_byte_count
from swift.common.swob import Request
from swift.common.utils import (get_logger, get_remote_client,
config_true_value,
config_true_value, reiterate,
InputProxy, list_from_csv, get_policy_index,
split_path, StrAnonymizer, StrFormatTime,
LogStringFormatter)
@ -176,7 +177,8 @@ class ProxyLoggingMiddleware(object):
'log_info': '',
'policy_index': '',
'ttfb': '0.05',
'pid': '42'
'pid': '42',
'wire_status_int': '200',
}
try:
self.log_formatter.format(self.log_msg_template, **replacements)
@ -198,7 +200,8 @@ class ProxyLoggingMiddleware(object):
return value
def log_request(self, req, status_int, bytes_received, bytes_sent,
start_time, end_time, resp_headers=None, ttfb=0):
start_time, end_time, resp_headers=None, ttfb=0,
wire_status_int=None):
"""
Log a request.
@ -209,6 +212,7 @@ class ProxyLoggingMiddleware(object):
:param start_time: timestamp request started
:param end_time: timestamp request completed
:param resp_headers: dict of the response headers
:param wire_status_int: the on the wire status int
"""
resp_headers = resp_headers or {}
logged_headers = None
@ -277,6 +281,7 @@ class ProxyLoggingMiddleware(object):
'policy_index': policy_index,
'ttfb': ttfb,
'pid': self.pid,
'wire_status_int': wire_status_int or status_int,
}
self.access_logger.info(
self.log_formatter.format(self.log_msg_template,
@ -352,47 +357,46 @@ class ProxyLoggingMiddleware(object):
def my_start_response(status, headers, exc_info=None):
start_response_args[0] = (status, list(headers), exc_info)
def status_int_for_logging(client_disconnect=False, start_status=None):
def status_int_for_logging(start_status, client_disconnect=False):
# log disconnected clients as '499' status code
if client_disconnect or input_proxy.client_disconnect:
ret_status_int = 499
elif start_status is None:
ret_status_int = int(
start_response_args[0][0].split(' ', 1)[0])
else:
ret_status_int = start_status
return ret_status_int
return 499
return start_status
def iter_response(iterable):
iterator = iter(iterable)
try:
chunk = next(iterator)
while not chunk:
chunk = next(iterator)
except StopIteration:
chunk = b''
iterator = reiterate(iterable)
content_length = None
for h, v in start_response_args[0][1]:
if h.lower() in ('content-length', 'transfer-encoding'):
if h.lower() == 'content-length':
content_length = int(v)
break
elif h.lower() == 'transfer-encoding':
break
else:
if not chunk:
start_response_args[0][1].append(('Content-Length', '0'))
elif isinstance(iterable, list):
if isinstance(iterator, list):
content_length = sum(len(i) for i in iterator)
start_response_args[0][1].append(
('Content-Length', str(sum(len(i) for i in iterable))))
('Content-Length', str(content_length)))
req = Request(env)
method = self.method_from_req(req)
if method == 'HEAD':
content_length = 0
if content_length is not None:
iterator = enforce_byte_count(iterator, content_length)
wire_status_int = int(start_response_args[0][0].split(' ', 1)[0])
resp_headers = dict(start_response_args[0][1])
start_response(*start_response_args[0])
req = Request(env)
# Log timing information for time-to-first-byte (GET requests only)
method = self.method_from_req(req)
ttfb = 0.0
if method == 'GET':
status_int = status_int_for_logging()
policy_index = get_policy_index(req.headers, resp_headers)
metric_name = self.statsd_metric_name(req, status_int, method)
metric_name = self.statsd_metric_name(
req, wire_status_int, method)
metric_name_policy = self.statsd_metric_name_policy(
req, status_int, method, policy_index)
req, wire_status_int, method, policy_index)
ttfb = time.time() - start_time
if metric_name:
self.access_logger.timing(
@ -403,31 +407,33 @@ class ProxyLoggingMiddleware(object):
bytes_sent = 0
client_disconnect = False
start_status = wire_status_int
try:
while chunk:
for chunk in iterator:
bytes_sent += len(chunk)
yield chunk
chunk = next(iterator)
except StopIteration: # iterator was depleted
return
except GeneratorExit: # generator was closed before we finished
client_disconnect = True
raise
except Exception:
start_status = 500
raise
finally:
status_int = status_int_for_logging(client_disconnect)
status_int = status_int_for_logging(
start_status, client_disconnect)
self.log_request(
req, status_int, input_proxy.bytes_received, bytes_sent,
start_time, time.time(), resp_headers=resp_headers,
ttfb=ttfb)
close_method = getattr(iterable, 'close', None)
if callable(close_method):
close_method()
ttfb=ttfb, wire_status_int=wire_status_int)
iterator.close()
try:
iterable = self.app(env, my_start_response)
except Exception:
req = Request(env)
status_int = status_int_for_logging(start_status=500)
status_int = status_int_for_logging(500)
self.log_request(
req, status_int, input_proxy.bytes_received, 0, start_time,
time.time())

View File

@ -329,16 +329,17 @@ class ObjectContext(ObjectVersioningContext):
# do the write
put_resp = put_req.get_response(self.app)
drain_and_close(put_resp)
close_if_possible(put_req.environ['wsgi.input'])
if put_resp.status_int == HTTP_NOT_FOUND:
drain_and_close(put_resp)
raise HTTPInternalServerError(
request=req, content_type='text/plain',
body=b'The versions container does not exist. You may '
b'want to re-enable object versioning.')
self._check_response_error(req, put_resp)
drain_and_close(put_resp)
put_bytes = byte_counter.bytes_read
# N.B. this is essentially the same hack that symlink does in
# _validate_etag_and_update_sysmeta to deal with SLO
@ -392,12 +393,13 @@ class ObjectContext(ObjectVersioningContext):
"""
if is_success(resp.status_int):
return
body = resp.body
drain_and_close(resp)
if is_client_error(resp.status_int):
# missing container or bad permissions
if resp.status_int == 404:
raise HTTPPreconditionFailed(request=req)
raise HTTPException(body=resp.body, status=resp.status,
raise HTTPException(body=body, status=resp.status,
headers=resp.headers)
# could not version the data, bail
raise HTTPServiceUnavailable(request=req)

View File

@ -3981,23 +3981,26 @@ class CloseableChain(object):
"""
def __init__(self, *iterables):
self.iterables = iterables
self.chained_iter = itertools.chain(*self.iterables)
def __iter__(self):
return iter(itertools.chain(*(self.iterables)))
return self
def __next__(self):
return next(self.chained_iter)
next = __next__ # py2
def close(self):
for it in self.iterables:
close_method = getattr(it, 'close', None)
if close_method:
close_method()
close_if_possible(it)
def reiterate(iterable):
"""
Consume the first item from an iterator, then re-chain it to the rest of
the iterator. This is useful when you want to make sure the prologue to
downstream generators have been executed before continuing.
Consume the first truthy item from an iterator, then re-chain it to the
rest of the iterator. This is useful when you want to make sure the
prologue to downstream generators have been executed before continuing.
:param iterable: an iterable object
"""
if isinstance(iterable, (list, tuple)):
@ -4005,12 +4008,13 @@ def reiterate(iterable):
else:
iterator = iter(iterable)
try:
chunk = ''
chunk = next(iterator)
while not chunk:
chunk = next(iterator)
return CloseableChain([chunk], iterator)
except StopIteration:
return []
close_if_possible(iterable)
return iter([])
class InputProxy(object):
@ -4311,6 +4315,8 @@ def drain_and_close(response_or_app_iter):
don't care about the body of an error.
"""
app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter)
if app_iter is None: # for example, if we used the Response.body property
return
for _chunk in app_iter:
pass
close_if_possible(app_iter)

View File

@ -1063,7 +1063,7 @@ class ECAppIter(object):
# cleanup the frag queue feeding coros that may be currently
# executing the internal_parts_iters.
if self.stashed_iter:
self.stashed_iter.close()
close_if_possible(self.stashed_iter)
sleep() # Give the per-frag threads a chance to clean up
for it in self.internal_parts_iters:
close_if_possible(it)
@ -1200,10 +1200,15 @@ class ECAppIter(object):
def __iter__(self):
if self.stashed_iter is not None:
return iter(self.stashed_iter)
return self
else:
raise ValueError("Failed to call kickoff() before __iter__()")
def __next__(self):
return next(self.stashed_iter)
next = __next__ # py2
def _real_iter(self, req, resp_headers):
if not self.range_specs:
client_asked_for_range = False

View File

@ -32,15 +32,22 @@ class LeakTrackingIter(object):
def __init__(self, inner_iter, mark_closed, mark_read, key):
if isinstance(inner_iter, bytes):
inner_iter = (inner_iter, )
self.inner_iter = inner_iter
self.inner_iter = iter(inner_iter)
self.mark_closed = mark_closed
self.mark_read = mark_read
self.key = key
def __iter__(self):
for x in self.inner_iter:
yield x
self.mark_read(self.key)
return self
def __next__(self):
try:
return next(self.inner_iter)
except StopIteration:
self.mark_read(self.key)
raise
next = __next__ # for py2
def close(self):
self.mark_closed(self.key)

View File

@ -51,8 +51,10 @@ class FakeApp(object):
except ValueError:
is_container_or_object_req = False
headers = [('Content-Type', 'text/plain'),
('Content-Length', str(sum(map(len, self.body))))]
headers = [('Content-Type', 'text/plain')]
if not hasattr(self.body, 'close'):
content_length = sum(map(len, self.body))
headers.append(('Content-Length', str(content_length)))
if is_container_or_object_req and self.policy_idx is not None:
headers.append(('X-Backend-Storage-Policy-Index',
str(self.policy_idx)))
@ -612,13 +614,22 @@ class TestProxyLogging(unittest.TestCase):
class CloseableBody(object):
def __init__(self):
self.msg = b"CloseableBody"
self.closed = False
def close(self):
self.closed = True
def __iter__(self):
return iter(["CloseableBody"])
return self
def __next__(self):
if not self.msg:
raise StopIteration
result, self.msg = self.msg, b''
return result
next = __next__ # py2
body = CloseableBody()
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(body), {})
@ -682,6 +693,27 @@ class TestProxyLogging(unittest.TestCase):
self.assertEqual(log_parts[6], '499')
self.assertEqual(log_parts[11], '4') # write length
def test_exploding_body(self):
def exploding_body():
yield 'some'
yield 'stuff'
raise Exception('kaboom!')
app = proxy_logging.ProxyLoggingMiddleware(
FakeApp(exploding_body()), {
'log_msg_template': '{method} {path} '
'{status_int} {wire_status_int}',
})
app.access_logger = FakeLogger()
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(app)
with self.assertRaises(Exception) as ctx:
resp.body
self.assertEqual('kaboom!', str(ctx.exception))
log_parts = self._log_parts(app)
self.assertEqual(log_parts, ['GET', '/', '500', '200'])
def test_disconnect_on_readline(self):
app = proxy_logging.ProxyLoggingMiddleware(FakeAppReadline(), {})
app.access_logger = FakeLogger()
@ -748,7 +780,7 @@ class TestProxyLogging(unittest.TestCase):
app = proxy_logging.ProxyLoggingMiddleware(
FakeAppNoContentLengthNoTransferEncoding(
# test the "while not chunk: chunk = next(iterator)"
body=['', '', ''],
body=[b'', b'', b''],
), {})
app.access_logger = FakeLogger()
req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'})

View File

@ -1247,6 +1247,23 @@ class TestUtils(unittest.TestCase):
else:
os.environ.pop('TZ')
def test_drain_and_close(self):
utils.drain_and_close([])
utils.drain_and_close(iter([]))
drained = [False]
def gen():
yield 'x'
yield 'y'
drained[0] = True
utils.drain_and_close(gen())
self.assertTrue(drained[0])
utils.drain_and_close(Response(status=200, body=b'Some body'))
drained = [False]
utils.drain_and_close(Response(status=200, app_iter=gen()))
self.assertTrue(drained[0])
def test_backwards(self):
# Test swift.common.utils.backward