Stream log files instead of loading full files into memory

For awhile now lately, we have been seeing Elastic Search indexing
quickly fall behind as some log files generated in the gate have become
larger. Currently, we download a full log file into memory and then
emit it line-by-line to be received by a logstash listener. When log
files are large (example: 40M) logstash gets bogged down processing
them.

Instead of downloading full files into memory, we can stream the files
and emit their lines on-the-fly to try to alleviate load on the log
processor.

This:

  * Replaces use of urllib2.urlopen with requests with stream=True
  * Removes manual decoding of gzip and deflate compression
    formats as these are decoded automatically by requests.iter_lines
  * Removes unrelated unused imports
  * Removes an unused arg 'retry' from the log retrieval method

Change-Id: I6d32036566834da75f3a73f2d086475ef3431165
This commit is contained in:
melanie witt 2020-10-22 17:18:32 +00:00 committed by Clark Boylan
parent a9da328436
commit 89bfe00dda
2 changed files with 39 additions and 49 deletions

View File

@ -15,24 +15,21 @@
# under the License. # under the License.
import argparse import argparse
import cStringIO
import daemon import daemon
import gear import gear
import gzip
import json import json
import logging import logging
import os import os
import Queue import Queue
import re import re
import requests
import select import select
import socket import socket
import subprocess import subprocess
import sys import sys
import threading import threading
import time import time
import urllib2
import yaml import yaml
import zlib
import paho.mqtt.publish as publish import paho.mqtt.publish as publish
@ -215,13 +212,13 @@ class LogRetriever(threading.Thread):
def _handle_event(self): def _handle_event(self):
fields = {} fields = {}
log_lines = None num_log_lines = 0
source_url = '' source_url = ''
http_session = None
job = self.gearman_worker.getJob() job = self.gearman_worker.getJob()
try: try:
arguments = json.loads(job.arguments.decode('utf-8')) arguments = json.loads(job.arguments.decode('utf-8'))
source_url = arguments['source_url'] source_url = arguments['source_url']
retry = arguments['retry']
event = arguments['event'] event = arguments['event']
logging.debug("Handling event: " + json.dumps(event)) logging.debug("Handling event: " + json.dumps(event))
fields = event.get('fields') or event.get('@fields') fields = event.get('fields') or event.get('@fields')
@ -229,7 +226,7 @@ class LogRetriever(threading.Thread):
if fields['build_status'] != 'ABORTED': if fields['build_status'] != 'ABORTED':
# Handle events ignoring aborted builds. These builds are # Handle events ignoring aborted builds. These builds are
# discarded by zuul. # discarded by zuul.
log_lines = self._retrieve_log(source_url, retry) file_obj, http_session = self._open_log_file_url(source_url)
try: try:
all_filters = [] all_filters = []
@ -238,12 +235,10 @@ class LogRetriever(threading.Thread):
all_filters.append(f.create(fields)) all_filters.append(f.create(fields))
filters = all_filters filters = all_filters
logging.debug("Pushing " + str(len(log_lines)) +
" log lines.")
base_event = {} base_event = {}
base_event.update(fields) base_event.update(fields)
base_event["tags"] = tags base_event["tags"] = tags
for line in log_lines: for line in self._retrieve_log_line(file_obj):
keep_line = True keep_line = True
out_event = base_event.copy() out_event = base_event.copy()
out_event["message"] = line out_event["message"] = line
@ -261,12 +256,18 @@ class LogRetriever(threading.Thread):
filters = new_filters filters = new_filters
if keep_line: if keep_line:
self.logq.put(out_event) self.logq.put(out_event)
num_log_lines += 1
logging.debug("Pushed " + str(num_log_lines) +
" log lines.")
finally: finally:
for f in all_filters: for f in all_filters:
f.close() f.close()
if http_session:
http_session.close()
job.sendWorkComplete() job.sendWorkComplete()
# Only send mqtt events for log files we processed. # Only send mqtt events for log files we processed.
if self.mqtt and log_lines: if self.mqtt and num_log_lines:
msg = json.dumps({ msg = json.dumps({
'build_uuid': fields.get('build_uuid'), 'build_uuid': fields.get('build_uuid'),
'source_url': source_url, 'source_url': source_url,
@ -290,13 +291,27 @@ class LogRetriever(threading.Thread):
'retrieve_logs', 'retrieve_logs',
fields.get('build_queue')) fields.get('build_queue'))
def _retrieve_log(self, source_url, retry): def _retrieve_log_line(self, file_obj, chunk_size=4096):
encoding = 'raw' if not file_obj:
raw_buf = b'' return
# Response.iter_lines automatically decodes 'gzip' and 'deflate'
# encodings.
# https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content
for line in file_obj.iter_lines(chunk_size, decode_unicode=True):
yield line
def _open_log_file_url(self, source_url):
file_obj = None
try: try:
encoding, raw_buf = self._get_log_data(source_url, retry) logging.debug("Retrieving: " + source_url)
except urllib2.HTTPError as e: # Use a session to persist the HTTP connection across requests
if e.code == 404: # while downloading chunks of the log file.
session = requests.Session()
session.headers = {'Accept-encoding': 'deflate, gzip'}
file_obj = session.get(source_url, stream=True)
file_obj.raise_for_status()
except requests.HTTPError as e:
if e.response.status_code == 404:
logging.info("Unable to retrieve %s: HTTP error 404" % logging.info("Unable to retrieve %s: HTTP error 404" %
source_url) source_url)
else: else:
@ -304,41 +319,11 @@ class LogRetriever(threading.Thread):
except Exception: except Exception:
# Silently drop fatal errors when retrieving logs. # Silently drop fatal errors when retrieving logs.
# TODO (clarkb): Handle these errors. # TODO (clarkb): Handle these errors.
# Perhaps simply add a log message to raw_buf? # Perhaps simply add a log message to file_obj?
logging.exception("Unable to get log data.")
if encoding == 'gzip':
logging.debug("Decompressing gzipped source file.")
raw_strIO = cStringIO.StringIO(raw_buf)
f = gzip.GzipFile(fileobj=raw_strIO)
buf = f.read().decode('utf-8')
raw_strIO.close()
f.close()
elif encoding == 'deflate':
logging.debug("Decompressing deflate compressed source file.")
buf = zlib.decompress(raw_buf).decode('utf-8')
else:
logging.debug("Decoding raw source file.")
buf = raw_buf.decode('utf-8')
return buf.splitlines()
def _get_log_data(self, source_url, retry):
encoding = 'raw'
try:
logging.debug("Retrieving: " + source_url)
req = urllib2.Request(source_url)
req.add_header('Accept-encoding', 'deflate, gzip')
r = urllib2.urlopen(req)
except:
logging.exception("Unable to retrieve source file.") logging.exception("Unable to retrieve source file.")
raise raise
if ('gzip' in r.info().get('Content-Type', '') or
'gzip' in r.info().get('Content-Encoding', '')):
encoding = 'gzip'
elif 'deflate' in r.info().get('Content-Encoding', ''):
encoding = 'deflate'
raw_buf = r.read() return file_obj, session
return encoding, raw_buf
class StdOutLogProcessor(object): class StdOutLogProcessor(object):

View File

@ -70,6 +70,11 @@ class log_processor (
provider => openstack_pip, provider => openstack_pip,
require => Class['pip'], require => Class['pip'],
} }
package { 'requests':
ensure => latest,
provider => openstack_pip,
require => Class['pip'],
}
if ! defined(Package['statsd']) { if ! defined(Package['statsd']) {
package { 'statsd': package { 'statsd':