From d31b5d971076b2586be827ce80b6d30043974fc9 Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Fri, 8 Apr 2016 14:59:07 -0400 Subject: [PATCH] Make subunit2sql population single threaded This commit switches the gearman worker to handle both retrieving the subunit file and populating the DB with that data in a single thread. This is as opposed to pushing it on an in memory queue and processing the streams in a separate thread. This should have 2 major benefits first it should make the behavior much more consistent, we'll only send work completed to the gearman server after data is already in the db. It should also significantly improve our memory consumption. Change-Id: Ifaac2f150e26158b0a22e1c33e0c5d39c10182a4 --- files/subunit-gearman-worker.py | 110 +++++++++++++------------------- 1 file changed, 44 insertions(+), 66 deletions(-) diff --git a/files/subunit-gearman-worker.py b/files/subunit-gearman-worker.py index 36a11b2..5c64833 100644 --- a/files/subunit-gearman-worker.py +++ b/files/subunit-gearman-worker.py @@ -22,7 +22,6 @@ import io import json import logging import os -import Queue import socket import threading import time @@ -56,11 +55,50 @@ class FilterException(Exception): class SubunitRetriever(threading.Thread): - def __init__(self, gearman_worker, filters, subunitq): + def __init__(self, gearman_worker, filters, subunit2sql_conf): threading.Thread.__init__(self) self.gearman_worker = gearman_worker self.filters = filters - self.subunitq = subunitq + # Initialize subunit2sql settings + self.config = subunit2sql_conf + shell.cli_opts() + extensions = shell.get_extensions() + shell.parse_args([], [self.config]) + self.extra_targets = shell.get_targets(extensions) + + def _write_to_db(self, subunit): + subunit_v2 = subunit.pop('subunit') + # Set run metadata from gearman + log_url = subunit.pop('log_url', None) + if log_url: + log_dir = os.path.dirname(log_url) + + # log_dir should be the top-level directory containing a job run, + # but the subunit file may be nested in 0 - 2 subdirectories (top, + # logs/, or logs/old/), so we need to safely correct the path here + log_base = os.path.basename(log_dir) + if log_base == 'logs': + log_dir = os.path.dirname(log_dir) + elif log_base == 'old': + log_dir = os.path.dirname(os.path.dirname(log_dir)) + + shell.CONF.set_override('artifacts', log_dir) + shell.CONF.set_override('run_meta', subunit) + # Parse subunit stream and store in DB + if subunit_v2.closed: + logging.debug('Trying to convert closed subunit v2 stream: %s to ' + 'SQL' % subunit_v2) + else: + logging.debug('Converting Subunit V2 stream: %s to SQL' % + subunit_v2) + stream = read_subunit.ReadSubunit(subunit_v2, + targets=self.extra_targets) + results = stream.get_results() + start_time = sorted( + [results[x]['start_time'] for x in results if x != 'run_time'])[0] + shell.CONF.set_override('run_at', start_time.isoformat()) + shell.process_results(results) + subunit_v2.close() def run(self): while True: @@ -93,7 +131,7 @@ class SubunitRetriever(threading.Thread): logging.debug("Pushing subunit file: %s" % subunit_io) out_event = fields.copy() out_event["subunit"] = subunit_io - self.subunitq.put(out_event) + self._write_to_db(out_event) job.sendWorkComplete() except Exception as e: logging.exception("Exception handling log event.") @@ -163,53 +201,6 @@ class SubunitRetriever(threading.Thread): return gzipped, raw_buf -class Subunit2SQLProcessor(object): - def __init__(self, subunitq, subunit2sql_conf): - self.subunitq = subunitq - self.config = subunit2sql_conf - # Initialize subunit2sql settings - shell.cli_opts() - extensions = shell.get_extensions() - shell.parse_args([], [self.config]) - self.extra_targets = shell.get_targets(extensions) - - def handle_subunit_event(self): - # Pull subunit event from queue and separate stream from metadata - subunit = self.subunitq.get() - subunit_v2 = subunit.pop('subunit') - # Set run metadata from gearman - log_url = subunit.pop('log_url', None) - if log_url: - log_dir = os.path.dirname(log_url) - - # log_dir should be the top-level directory containing a job run, - # but the subunit file may be nested in 0 - 2 subdirectories (top, - # logs/, or logs/old/), so we need to safely correct the path here - log_base = os.path.basename(log_dir) - if log_base == 'logs': - log_dir = os.path.dirname(log_dir) - elif log_base == 'old': - log_dir = os.path.dirname(os.path.dirname(log_dir)) - - shell.CONF.set_override('artifacts', log_dir) - shell.CONF.set_override('run_meta', subunit) - # Parse subunit stream and store in DB - if subunit_v2.closed: - logging.debug('Trying to convert closed subunit v2 stream: %s to ' - 'SQL' % subunit_v2) - else: - logging.debug('Converting Subunit V2 stream: %s to SQL' % - subunit_v2) - stream = read_subunit.ReadSubunit(subunit_v2, - targets=self.extra_targets) - results = stream.get_results() - start_time = sorted( - [results[x]['start_time'] for x in results if x != 'run_time'])[0] - shell.CONF.set_override('run_at', start_time.isoformat()) - shell.process_results(results) - subunit_v2.close() - - class Server(object): def __init__(self, config, debuglog): # Config init. @@ -219,8 +210,6 @@ class Server(object): # Pythong logging output file. self.debuglog = debuglog self.retriever = None - self.subunitqueue = Queue.Queue(131072) - self.processor = None self.filter_factories = [] def setup_logging(self): @@ -238,28 +227,17 @@ class Server(object): gearman_worker.addServer(self.gearman_host, self.gearman_port) gearman_worker.registerFunction(b'push-subunit') + subunit2sql_conf = self.config['config'] self.retriever = SubunitRetriever(gearman_worker, self.filter_factories, - self.subunitqueue) - - def setup_processor(self): - subunit2sql_config = self.config['config'] - self.processor = Subunit2SQLProcessor(self.subunitqueue, - subunit2sql_config) + subunit2sql_conf) def main(self): self.setup_retriever() - self.setup_processor() self.retriever.daemon = True self.retriever.start() - while True: - try: - self.processor.handle_subunit_event() - except: - logging.exception("Exception processing log event.") - def main(): parser = argparse.ArgumentParser()