Make subunit2sql population single threaded

This commit switches the gearman worker to handle both retrieving the
subunit file and populating the DB with that data in a single thread.
This is as opposed to pushing it on an in memory queue and processing
the streams in a separate thread. This should have 2 major benefits
first it should make the behavior much more consistent, we'll only
send work completed to the gearman server after data is already in
the db. It should also significantly improve our memory consumption.

Change-Id: Ifaac2f150e26158b0a22e1c33e0c5d39c10182a4
This commit is contained in:
Matthew Treinish 2016-04-08 14:59:07 -04:00
parent 224ed54b1e
commit d31b5d9710
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
1 changed files with 44 additions and 66 deletions

View File

@ -22,7 +22,6 @@ import io
import json
import logging
import os
import Queue
import socket
import threading
import time
@ -56,11 +55,50 @@ class FilterException(Exception):
class SubunitRetriever(threading.Thread):
def __init__(self, gearman_worker, filters, subunitq):
def __init__(self, gearman_worker, filters, subunit2sql_conf):
threading.Thread.__init__(self)
self.gearman_worker = gearman_worker
self.filters = filters
self.subunitq = subunitq
# Initialize subunit2sql settings
self.config = subunit2sql_conf
shell.cli_opts()
extensions = shell.get_extensions()
shell.parse_args([], [self.config])
self.extra_targets = shell.get_targets(extensions)
def _write_to_db(self, subunit):
subunit_v2 = subunit.pop('subunit')
# Set run metadata from gearman
log_url = subunit.pop('log_url', None)
if log_url:
log_dir = os.path.dirname(log_url)
# log_dir should be the top-level directory containing a job run,
# but the subunit file may be nested in 0 - 2 subdirectories (top,
# logs/, or logs/old/), so we need to safely correct the path here
log_base = os.path.basename(log_dir)
if log_base == 'logs':
log_dir = os.path.dirname(log_dir)
elif log_base == 'old':
log_dir = os.path.dirname(os.path.dirname(log_dir))
shell.CONF.set_override('artifacts', log_dir)
shell.CONF.set_override('run_meta', subunit)
# Parse subunit stream and store in DB
if subunit_v2.closed:
logging.debug('Trying to convert closed subunit v2 stream: %s to '
'SQL' % subunit_v2)
else:
logging.debug('Converting Subunit V2 stream: %s to SQL' %
subunit_v2)
stream = read_subunit.ReadSubunit(subunit_v2,
targets=self.extra_targets)
results = stream.get_results()
start_time = sorted(
[results[x]['start_time'] for x in results if x != 'run_time'])[0]
shell.CONF.set_override('run_at', start_time.isoformat())
shell.process_results(results)
subunit_v2.close()
def run(self):
while True:
@ -93,7 +131,7 @@ class SubunitRetriever(threading.Thread):
logging.debug("Pushing subunit file: %s" % subunit_io)
out_event = fields.copy()
out_event["subunit"] = subunit_io
self.subunitq.put(out_event)
self._write_to_db(out_event)
job.sendWorkComplete()
except Exception as e:
logging.exception("Exception handling log event.")
@ -163,53 +201,6 @@ class SubunitRetriever(threading.Thread):
return gzipped, raw_buf
class Subunit2SQLProcessor(object):
def __init__(self, subunitq, subunit2sql_conf):
self.subunitq = subunitq
self.config = subunit2sql_conf
# Initialize subunit2sql settings
shell.cli_opts()
extensions = shell.get_extensions()
shell.parse_args([], [self.config])
self.extra_targets = shell.get_targets(extensions)
def handle_subunit_event(self):
# Pull subunit event from queue and separate stream from metadata
subunit = self.subunitq.get()
subunit_v2 = subunit.pop('subunit')
# Set run metadata from gearman
log_url = subunit.pop('log_url', None)
if log_url:
log_dir = os.path.dirname(log_url)
# log_dir should be the top-level directory containing a job run,
# but the subunit file may be nested in 0 - 2 subdirectories (top,
# logs/, or logs/old/), so we need to safely correct the path here
log_base = os.path.basename(log_dir)
if log_base == 'logs':
log_dir = os.path.dirname(log_dir)
elif log_base == 'old':
log_dir = os.path.dirname(os.path.dirname(log_dir))
shell.CONF.set_override('artifacts', log_dir)
shell.CONF.set_override('run_meta', subunit)
# Parse subunit stream and store in DB
if subunit_v2.closed:
logging.debug('Trying to convert closed subunit v2 stream: %s to '
'SQL' % subunit_v2)
else:
logging.debug('Converting Subunit V2 stream: %s to SQL' %
subunit_v2)
stream = read_subunit.ReadSubunit(subunit_v2,
targets=self.extra_targets)
results = stream.get_results()
start_time = sorted(
[results[x]['start_time'] for x in results if x != 'run_time'])[0]
shell.CONF.set_override('run_at', start_time.isoformat())
shell.process_results(results)
subunit_v2.close()
class Server(object):
def __init__(self, config, debuglog):
# Config init.
@ -219,8 +210,6 @@ class Server(object):
# Pythong logging output file.
self.debuglog = debuglog
self.retriever = None
self.subunitqueue = Queue.Queue(131072)
self.processor = None
self.filter_factories = []
def setup_logging(self):
@ -238,28 +227,17 @@ class Server(object):
gearman_worker.addServer(self.gearman_host,
self.gearman_port)
gearman_worker.registerFunction(b'push-subunit')
subunit2sql_conf = self.config['config']
self.retriever = SubunitRetriever(gearman_worker,
self.filter_factories,
self.subunitqueue)
def setup_processor(self):
subunit2sql_config = self.config['config']
self.processor = Subunit2SQLProcessor(self.subunitqueue,
subunit2sql_config)
subunit2sql_conf)
def main(self):
self.setup_retriever()
self.setup_processor()
self.retriever.daemon = True
self.retriever.start()
while True:
try:
self.processor.handle_subunit_event()
except:
logging.exception("Exception processing log event.")
def main():
parser = argparse.ArgumentParser()