From 89bfe00dda0b9761bd79b0aa1ac2092940f0f11d Mon Sep 17 00:00:00 2001 From: melanie witt Date: Thu, 22 Oct 2020 17:18:32 +0000 Subject: [PATCH] Stream log files instead of loading full files into memory For awhile now lately, we have been seeing Elastic Search indexing quickly fall behind as some log files generated in the gate have become larger. Currently, we download a full log file into memory and then emit it line-by-line to be received by a logstash listener. When log files are large (example: 40M) logstash gets bogged down processing them. Instead of downloading full files into memory, we can stream the files and emit their lines on-the-fly to try to alleviate load on the log processor. This: * Replaces use of urllib2.urlopen with requests with stream=True * Removes manual decoding of gzip and deflate compression formats as these are decoded automatically by requests.iter_lines * Removes unrelated unused imports * Removes an unused arg 'retry' from the log retrieval method Change-Id: I6d32036566834da75f3a73f2d086475ef3431165 --- files/log-gearman-worker.py | 83 +++++++++++++++---------------------- manifests/init.pp | 5 +++ 2 files changed, 39 insertions(+), 49 deletions(-) diff --git a/files/log-gearman-worker.py b/files/log-gearman-worker.py index 40a2f01..0bc92c9 100644 --- a/files/log-gearman-worker.py +++ b/files/log-gearman-worker.py @@ -15,24 +15,21 @@ # under the License. import argparse -import cStringIO import daemon import gear -import gzip import json import logging import os import Queue import re +import requests import select import socket import subprocess import sys import threading import time -import urllib2 import yaml -import zlib import paho.mqtt.publish as publish @@ -215,13 +212,13 @@ class LogRetriever(threading.Thread): def _handle_event(self): fields = {} - log_lines = None + num_log_lines = 0 source_url = '' + http_session = None job = self.gearman_worker.getJob() try: arguments = json.loads(job.arguments.decode('utf-8')) source_url = arguments['source_url'] - retry = arguments['retry'] event = arguments['event'] logging.debug("Handling event: " + json.dumps(event)) fields = event.get('fields') or event.get('@fields') @@ -229,7 +226,7 @@ class LogRetriever(threading.Thread): if fields['build_status'] != 'ABORTED': # Handle events ignoring aborted builds. These builds are # discarded by zuul. - log_lines = self._retrieve_log(source_url, retry) + file_obj, http_session = self._open_log_file_url(source_url) try: all_filters = [] @@ -238,12 +235,10 @@ class LogRetriever(threading.Thread): all_filters.append(f.create(fields)) filters = all_filters - logging.debug("Pushing " + str(len(log_lines)) + - " log lines.") base_event = {} base_event.update(fields) base_event["tags"] = tags - for line in log_lines: + for line in self._retrieve_log_line(file_obj): keep_line = True out_event = base_event.copy() out_event["message"] = line @@ -261,12 +256,18 @@ class LogRetriever(threading.Thread): filters = new_filters if keep_line: self.logq.put(out_event) + num_log_lines += 1 + + logging.debug("Pushed " + str(num_log_lines) + + " log lines.") finally: for f in all_filters: f.close() + if http_session: + http_session.close() job.sendWorkComplete() # Only send mqtt events for log files we processed. - if self.mqtt and log_lines: + if self.mqtt and num_log_lines: msg = json.dumps({ 'build_uuid': fields.get('build_uuid'), 'source_url': source_url, @@ -290,13 +291,27 @@ class LogRetriever(threading.Thread): 'retrieve_logs', fields.get('build_queue')) - def _retrieve_log(self, source_url, retry): - encoding = 'raw' - raw_buf = b'' + def _retrieve_log_line(self, file_obj, chunk_size=4096): + if not file_obj: + return + # Response.iter_lines automatically decodes 'gzip' and 'deflate' + # encodings. + # https://requests.readthedocs.io/en/master/user/quickstart/#raw-response-content + for line in file_obj.iter_lines(chunk_size, decode_unicode=True): + yield line + + def _open_log_file_url(self, source_url): + file_obj = None try: - encoding, raw_buf = self._get_log_data(source_url, retry) - except urllib2.HTTPError as e: - if e.code == 404: + logging.debug("Retrieving: " + source_url) + # Use a session to persist the HTTP connection across requests + # while downloading chunks of the log file. + session = requests.Session() + session.headers = {'Accept-encoding': 'deflate, gzip'} + file_obj = session.get(source_url, stream=True) + file_obj.raise_for_status() + except requests.HTTPError as e: + if e.response.status_code == 404: logging.info("Unable to retrieve %s: HTTP error 404" % source_url) else: @@ -304,41 +319,11 @@ class LogRetriever(threading.Thread): except Exception: # Silently drop fatal errors when retrieving logs. # TODO (clarkb): Handle these errors. - # Perhaps simply add a log message to raw_buf? - logging.exception("Unable to get log data.") - if encoding == 'gzip': - logging.debug("Decompressing gzipped source file.") - raw_strIO = cStringIO.StringIO(raw_buf) - f = gzip.GzipFile(fileobj=raw_strIO) - buf = f.read().decode('utf-8') - raw_strIO.close() - f.close() - elif encoding == 'deflate': - logging.debug("Decompressing deflate compressed source file.") - buf = zlib.decompress(raw_buf).decode('utf-8') - else: - logging.debug("Decoding raw source file.") - buf = raw_buf.decode('utf-8') - return buf.splitlines() - - def _get_log_data(self, source_url, retry): - encoding = 'raw' - try: - logging.debug("Retrieving: " + source_url) - req = urllib2.Request(source_url) - req.add_header('Accept-encoding', 'deflate, gzip') - r = urllib2.urlopen(req) - except: + # Perhaps simply add a log message to file_obj? logging.exception("Unable to retrieve source file.") raise - if ('gzip' in r.info().get('Content-Type', '') or - 'gzip' in r.info().get('Content-Encoding', '')): - encoding = 'gzip' - elif 'deflate' in r.info().get('Content-Encoding', ''): - encoding = 'deflate' - raw_buf = r.read() - return encoding, raw_buf + return file_obj, session class StdOutLogProcessor(object): diff --git a/manifests/init.pp b/manifests/init.pp index 42f95ae..90227fb 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -70,6 +70,11 @@ class log_processor ( provider => openstack_pip, require => Class['pip'], } + package { 'requests': + ensure => latest, + provider => openstack_pip, + require => Class['pip'], + } if ! defined(Package['statsd']) { package { 'statsd':