Merge "Make subunit2sql population single threaded"

This commit is contained in:
Jenkins 2016-04-15 23:45:28 +00:00 committed by Gerrit Code Review
commit c078843f31
1 changed files with 44 additions and 66 deletions

View File

@ -22,7 +22,6 @@ import io
import json
import logging
import os
import Queue
import socket
import threading
import time
@ -56,11 +55,50 @@ class FilterException(Exception):
class SubunitRetriever(threading.Thread):
def __init__(self, gearman_worker, filters, subunitq):
def __init__(self, gearman_worker, filters, subunit2sql_conf):
threading.Thread.__init__(self)
self.gearman_worker = gearman_worker
self.filters = filters
self.subunitq = subunitq
# Initialize subunit2sql settings
self.config = subunit2sql_conf
shell.cli_opts()
extensions = shell.get_extensions()
shell.parse_args([], [self.config])
self.extra_targets = shell.get_targets(extensions)
def _write_to_db(self, subunit):
subunit_v2 = subunit.pop('subunit')
# Set run metadata from gearman
log_url = subunit.pop('log_url', None)
if log_url:
log_dir = os.path.dirname(log_url)
# log_dir should be the top-level directory containing a job run,
# but the subunit file may be nested in 0 - 2 subdirectories (top,
# logs/, or logs/old/), so we need to safely correct the path here
log_base = os.path.basename(log_dir)
if log_base == 'logs':
log_dir = os.path.dirname(log_dir)
elif log_base == 'old':
log_dir = os.path.dirname(os.path.dirname(log_dir))
shell.CONF.set_override('artifacts', log_dir)
shell.CONF.set_override('run_meta', subunit)
# Parse subunit stream and store in DB
if subunit_v2.closed:
logging.debug('Trying to convert closed subunit v2 stream: %s to '
'SQL' % subunit_v2)
else:
logging.debug('Converting Subunit V2 stream: %s to SQL' %
subunit_v2)
stream = read_subunit.ReadSubunit(subunit_v2,
targets=self.extra_targets)
results = stream.get_results()
start_time = sorted(
[results[x]['start_time'] for x in results if x != 'run_time'])[0]
shell.CONF.set_override('run_at', start_time.isoformat())
shell.process_results(results)
subunit_v2.close()
def run(self):
while True:
@ -93,7 +131,7 @@ class SubunitRetriever(threading.Thread):
logging.debug("Pushing subunit file: %s" % subunit_io)
out_event = fields.copy()
out_event["subunit"] = subunit_io
self.subunitq.put(out_event)
self._write_to_db(out_event)
job.sendWorkComplete()
except Exception as e:
logging.exception("Exception handling log event.")
@ -163,53 +201,6 @@ class SubunitRetriever(threading.Thread):
return gzipped, raw_buf
class Subunit2SQLProcessor(object):
def __init__(self, subunitq, subunit2sql_conf):
self.subunitq = subunitq
self.config = subunit2sql_conf
# Initialize subunit2sql settings
shell.cli_opts()
extensions = shell.get_extensions()
shell.parse_args([], [self.config])
self.extra_targets = shell.get_targets(extensions)
def handle_subunit_event(self):
# Pull subunit event from queue and separate stream from metadata
subunit = self.subunitq.get()
subunit_v2 = subunit.pop('subunit')
# Set run metadata from gearman
log_url = subunit.pop('log_url', None)
if log_url:
log_dir = os.path.dirname(log_url)
# log_dir should be the top-level directory containing a job run,
# but the subunit file may be nested in 0 - 2 subdirectories (top,
# logs/, or logs/old/), so we need to safely correct the path here
log_base = os.path.basename(log_dir)
if log_base == 'logs':
log_dir = os.path.dirname(log_dir)
elif log_base == 'old':
log_dir = os.path.dirname(os.path.dirname(log_dir))
shell.CONF.set_override('artifacts', log_dir)
shell.CONF.set_override('run_meta', subunit)
# Parse subunit stream and store in DB
if subunit_v2.closed:
logging.debug('Trying to convert closed subunit v2 stream: %s to '
'SQL' % subunit_v2)
else:
logging.debug('Converting Subunit V2 stream: %s to SQL' %
subunit_v2)
stream = read_subunit.ReadSubunit(subunit_v2,
targets=self.extra_targets)
results = stream.get_results()
start_time = sorted(
[results[x]['start_time'] for x in results if x != 'run_time'])[0]
shell.CONF.set_override('run_at', start_time.isoformat())
shell.process_results(results)
subunit_v2.close()
class Server(object):
def __init__(self, config, debuglog):
# Config init.
@ -219,8 +210,6 @@ class Server(object):
# Pythong logging output file.
self.debuglog = debuglog
self.retriever = None
self.subunitqueue = Queue.Queue(131072)
self.processor = None
self.filter_factories = []
def setup_logging(self):
@ -238,28 +227,17 @@ class Server(object):
gearman_worker.addServer(self.gearman_host,
self.gearman_port)
gearman_worker.registerFunction(b'push-subunit')
subunit2sql_conf = self.config['config']
self.retriever = SubunitRetriever(gearman_worker,
self.filter_factories,
self.subunitqueue)
def setup_processor(self):
subunit2sql_config = self.config['config']
self.processor = Subunit2SQLProcessor(self.subunitqueue,
subunit2sql_config)
subunit2sql_conf)
def main(self):
self.setup_retriever()
self.setup_processor()
self.retriever.daemon = True
self.retriever.start()
while True:
try:
self.processor.handle_subunit_event()
except:
logging.exception("Exception processing log event.")
def main():
parser = argparse.ArgumentParser()