Make subunit2sql population single threaded
This commit switches the gearman worker to handle both retrieving the subunit file and populating the DB with that data in a single thread. This is as opposed to pushing it on an in memory queue and processing the streams in a separate thread. This should have 2 major benefits first it should make the behavior much more consistent, we'll only send work completed to the gearman server after data is already in the db. It should also significantly improve our memory consumption. Change-Id: Ifaac2f150e26158b0a22e1c33e0c5d39c10182a4
This commit is contained in:
parent
224ed54b1e
commit
d31b5d9710
|
@ -22,7 +22,6 @@ import io
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import Queue
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
@ -56,11 +55,50 @@ class FilterException(Exception):
|
|||
|
||||
|
||||
class SubunitRetriever(threading.Thread):
|
||||
def __init__(self, gearman_worker, filters, subunitq):
|
||||
def __init__(self, gearman_worker, filters, subunit2sql_conf):
|
||||
threading.Thread.__init__(self)
|
||||
self.gearman_worker = gearman_worker
|
||||
self.filters = filters
|
||||
self.subunitq = subunitq
|
||||
# Initialize subunit2sql settings
|
||||
self.config = subunit2sql_conf
|
||||
shell.cli_opts()
|
||||
extensions = shell.get_extensions()
|
||||
shell.parse_args([], [self.config])
|
||||
self.extra_targets = shell.get_targets(extensions)
|
||||
|
||||
def _write_to_db(self, subunit):
|
||||
subunit_v2 = subunit.pop('subunit')
|
||||
# Set run metadata from gearman
|
||||
log_url = subunit.pop('log_url', None)
|
||||
if log_url:
|
||||
log_dir = os.path.dirname(log_url)
|
||||
|
||||
# log_dir should be the top-level directory containing a job run,
|
||||
# but the subunit file may be nested in 0 - 2 subdirectories (top,
|
||||
# logs/, or logs/old/), so we need to safely correct the path here
|
||||
log_base = os.path.basename(log_dir)
|
||||
if log_base == 'logs':
|
||||
log_dir = os.path.dirname(log_dir)
|
||||
elif log_base == 'old':
|
||||
log_dir = os.path.dirname(os.path.dirname(log_dir))
|
||||
|
||||
shell.CONF.set_override('artifacts', log_dir)
|
||||
shell.CONF.set_override('run_meta', subunit)
|
||||
# Parse subunit stream and store in DB
|
||||
if subunit_v2.closed:
|
||||
logging.debug('Trying to convert closed subunit v2 stream: %s to '
|
||||
'SQL' % subunit_v2)
|
||||
else:
|
||||
logging.debug('Converting Subunit V2 stream: %s to SQL' %
|
||||
subunit_v2)
|
||||
stream = read_subunit.ReadSubunit(subunit_v2,
|
||||
targets=self.extra_targets)
|
||||
results = stream.get_results()
|
||||
start_time = sorted(
|
||||
[results[x]['start_time'] for x in results if x != 'run_time'])[0]
|
||||
shell.CONF.set_override('run_at', start_time.isoformat())
|
||||
shell.process_results(results)
|
||||
subunit_v2.close()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
@ -93,7 +131,7 @@ class SubunitRetriever(threading.Thread):
|
|||
logging.debug("Pushing subunit file: %s" % subunit_io)
|
||||
out_event = fields.copy()
|
||||
out_event["subunit"] = subunit_io
|
||||
self.subunitq.put(out_event)
|
||||
self._write_to_db(out_event)
|
||||
job.sendWorkComplete()
|
||||
except Exception as e:
|
||||
logging.exception("Exception handling log event.")
|
||||
|
@ -163,53 +201,6 @@ class SubunitRetriever(threading.Thread):
|
|||
return gzipped, raw_buf
|
||||
|
||||
|
||||
class Subunit2SQLProcessor(object):
|
||||
def __init__(self, subunitq, subunit2sql_conf):
|
||||
self.subunitq = subunitq
|
||||
self.config = subunit2sql_conf
|
||||
# Initialize subunit2sql settings
|
||||
shell.cli_opts()
|
||||
extensions = shell.get_extensions()
|
||||
shell.parse_args([], [self.config])
|
||||
self.extra_targets = shell.get_targets(extensions)
|
||||
|
||||
def handle_subunit_event(self):
|
||||
# Pull subunit event from queue and separate stream from metadata
|
||||
subunit = self.subunitq.get()
|
||||
subunit_v2 = subunit.pop('subunit')
|
||||
# Set run metadata from gearman
|
||||
log_url = subunit.pop('log_url', None)
|
||||
if log_url:
|
||||
log_dir = os.path.dirname(log_url)
|
||||
|
||||
# log_dir should be the top-level directory containing a job run,
|
||||
# but the subunit file may be nested in 0 - 2 subdirectories (top,
|
||||
# logs/, or logs/old/), so we need to safely correct the path here
|
||||
log_base = os.path.basename(log_dir)
|
||||
if log_base == 'logs':
|
||||
log_dir = os.path.dirname(log_dir)
|
||||
elif log_base == 'old':
|
||||
log_dir = os.path.dirname(os.path.dirname(log_dir))
|
||||
|
||||
shell.CONF.set_override('artifacts', log_dir)
|
||||
shell.CONF.set_override('run_meta', subunit)
|
||||
# Parse subunit stream and store in DB
|
||||
if subunit_v2.closed:
|
||||
logging.debug('Trying to convert closed subunit v2 stream: %s to '
|
||||
'SQL' % subunit_v2)
|
||||
else:
|
||||
logging.debug('Converting Subunit V2 stream: %s to SQL' %
|
||||
subunit_v2)
|
||||
stream = read_subunit.ReadSubunit(subunit_v2,
|
||||
targets=self.extra_targets)
|
||||
results = stream.get_results()
|
||||
start_time = sorted(
|
||||
[results[x]['start_time'] for x in results if x != 'run_time'])[0]
|
||||
shell.CONF.set_override('run_at', start_time.isoformat())
|
||||
shell.process_results(results)
|
||||
subunit_v2.close()
|
||||
|
||||
|
||||
class Server(object):
|
||||
def __init__(self, config, debuglog):
|
||||
# Config init.
|
||||
|
@ -219,8 +210,6 @@ class Server(object):
|
|||
# Pythong logging output file.
|
||||
self.debuglog = debuglog
|
||||
self.retriever = None
|
||||
self.subunitqueue = Queue.Queue(131072)
|
||||
self.processor = None
|
||||
self.filter_factories = []
|
||||
|
||||
def setup_logging(self):
|
||||
|
@ -238,28 +227,17 @@ class Server(object):
|
|||
gearman_worker.addServer(self.gearman_host,
|
||||
self.gearman_port)
|
||||
gearman_worker.registerFunction(b'push-subunit')
|
||||
subunit2sql_conf = self.config['config']
|
||||
self.retriever = SubunitRetriever(gearman_worker,
|
||||
self.filter_factories,
|
||||
self.subunitqueue)
|
||||
|
||||
def setup_processor(self):
|
||||
subunit2sql_config = self.config['config']
|
||||
self.processor = Subunit2SQLProcessor(self.subunitqueue,
|
||||
subunit2sql_config)
|
||||
subunit2sql_conf)
|
||||
|
||||
def main(self):
|
||||
self.setup_retriever()
|
||||
self.setup_processor()
|
||||
|
||||
self.retriever.daemon = True
|
||||
self.retriever.start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.processor.handle_subunit_event()
|
||||
except:
|
||||
logging.exception("Exception processing log event.")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
|
Loading…
Reference in New Issue