Merge "Make subunit2sql population single threaded"
This commit is contained in:
commit
c078843f31
|
@ -22,7 +22,6 @@ import io
|
|||
import json
|
||||
import logging
|
||||
import os
|
||||
import Queue
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
@ -56,11 +55,50 @@ class FilterException(Exception):
|
|||
|
||||
|
||||
class SubunitRetriever(threading.Thread):
|
||||
def __init__(self, gearman_worker, filters, subunitq):
|
||||
def __init__(self, gearman_worker, filters, subunit2sql_conf):
|
||||
threading.Thread.__init__(self)
|
||||
self.gearman_worker = gearman_worker
|
||||
self.filters = filters
|
||||
self.subunitq = subunitq
|
||||
# Initialize subunit2sql settings
|
||||
self.config = subunit2sql_conf
|
||||
shell.cli_opts()
|
||||
extensions = shell.get_extensions()
|
||||
shell.parse_args([], [self.config])
|
||||
self.extra_targets = shell.get_targets(extensions)
|
||||
|
||||
def _write_to_db(self, subunit):
|
||||
subunit_v2 = subunit.pop('subunit')
|
||||
# Set run metadata from gearman
|
||||
log_url = subunit.pop('log_url', None)
|
||||
if log_url:
|
||||
log_dir = os.path.dirname(log_url)
|
||||
|
||||
# log_dir should be the top-level directory containing a job run,
|
||||
# but the subunit file may be nested in 0 - 2 subdirectories (top,
|
||||
# logs/, or logs/old/), so we need to safely correct the path here
|
||||
log_base = os.path.basename(log_dir)
|
||||
if log_base == 'logs':
|
||||
log_dir = os.path.dirname(log_dir)
|
||||
elif log_base == 'old':
|
||||
log_dir = os.path.dirname(os.path.dirname(log_dir))
|
||||
|
||||
shell.CONF.set_override('artifacts', log_dir)
|
||||
shell.CONF.set_override('run_meta', subunit)
|
||||
# Parse subunit stream and store in DB
|
||||
if subunit_v2.closed:
|
||||
logging.debug('Trying to convert closed subunit v2 stream: %s to '
|
||||
'SQL' % subunit_v2)
|
||||
else:
|
||||
logging.debug('Converting Subunit V2 stream: %s to SQL' %
|
||||
subunit_v2)
|
||||
stream = read_subunit.ReadSubunit(subunit_v2,
|
||||
targets=self.extra_targets)
|
||||
results = stream.get_results()
|
||||
start_time = sorted(
|
||||
[results[x]['start_time'] for x in results if x != 'run_time'])[0]
|
||||
shell.CONF.set_override('run_at', start_time.isoformat())
|
||||
shell.process_results(results)
|
||||
subunit_v2.close()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
@ -93,7 +131,7 @@ class SubunitRetriever(threading.Thread):
|
|||
logging.debug("Pushing subunit file: %s" % subunit_io)
|
||||
out_event = fields.copy()
|
||||
out_event["subunit"] = subunit_io
|
||||
self.subunitq.put(out_event)
|
||||
self._write_to_db(out_event)
|
||||
job.sendWorkComplete()
|
||||
except Exception as e:
|
||||
logging.exception("Exception handling log event.")
|
||||
|
@ -163,53 +201,6 @@ class SubunitRetriever(threading.Thread):
|
|||
return gzipped, raw_buf
|
||||
|
||||
|
||||
class Subunit2SQLProcessor(object):
|
||||
def __init__(self, subunitq, subunit2sql_conf):
|
||||
self.subunitq = subunitq
|
||||
self.config = subunit2sql_conf
|
||||
# Initialize subunit2sql settings
|
||||
shell.cli_opts()
|
||||
extensions = shell.get_extensions()
|
||||
shell.parse_args([], [self.config])
|
||||
self.extra_targets = shell.get_targets(extensions)
|
||||
|
||||
def handle_subunit_event(self):
|
||||
# Pull subunit event from queue and separate stream from metadata
|
||||
subunit = self.subunitq.get()
|
||||
subunit_v2 = subunit.pop('subunit')
|
||||
# Set run metadata from gearman
|
||||
log_url = subunit.pop('log_url', None)
|
||||
if log_url:
|
||||
log_dir = os.path.dirname(log_url)
|
||||
|
||||
# log_dir should be the top-level directory containing a job run,
|
||||
# but the subunit file may be nested in 0 - 2 subdirectories (top,
|
||||
# logs/, or logs/old/), so we need to safely correct the path here
|
||||
log_base = os.path.basename(log_dir)
|
||||
if log_base == 'logs':
|
||||
log_dir = os.path.dirname(log_dir)
|
||||
elif log_base == 'old':
|
||||
log_dir = os.path.dirname(os.path.dirname(log_dir))
|
||||
|
||||
shell.CONF.set_override('artifacts', log_dir)
|
||||
shell.CONF.set_override('run_meta', subunit)
|
||||
# Parse subunit stream and store in DB
|
||||
if subunit_v2.closed:
|
||||
logging.debug('Trying to convert closed subunit v2 stream: %s to '
|
||||
'SQL' % subunit_v2)
|
||||
else:
|
||||
logging.debug('Converting Subunit V2 stream: %s to SQL' %
|
||||
subunit_v2)
|
||||
stream = read_subunit.ReadSubunit(subunit_v2,
|
||||
targets=self.extra_targets)
|
||||
results = stream.get_results()
|
||||
start_time = sorted(
|
||||
[results[x]['start_time'] for x in results if x != 'run_time'])[0]
|
||||
shell.CONF.set_override('run_at', start_time.isoformat())
|
||||
shell.process_results(results)
|
||||
subunit_v2.close()
|
||||
|
||||
|
||||
class Server(object):
|
||||
def __init__(self, config, debuglog):
|
||||
# Config init.
|
||||
|
@ -219,8 +210,6 @@ class Server(object):
|
|||
# Pythong logging output file.
|
||||
self.debuglog = debuglog
|
||||
self.retriever = None
|
||||
self.subunitqueue = Queue.Queue(131072)
|
||||
self.processor = None
|
||||
self.filter_factories = []
|
||||
|
||||
def setup_logging(self):
|
||||
|
@ -238,28 +227,17 @@ class Server(object):
|
|||
gearman_worker.addServer(self.gearman_host,
|
||||
self.gearman_port)
|
||||
gearman_worker.registerFunction(b'push-subunit')
|
||||
subunit2sql_conf = self.config['config']
|
||||
self.retriever = SubunitRetriever(gearman_worker,
|
||||
self.filter_factories,
|
||||
self.subunitqueue)
|
||||
|
||||
def setup_processor(self):
|
||||
subunit2sql_config = self.config['config']
|
||||
self.processor = Subunit2SQLProcessor(self.subunitqueue,
|
||||
subunit2sql_config)
|
||||
subunit2sql_conf)
|
||||
|
||||
def main(self):
|
||||
self.setup_retriever()
|
||||
self.setup_processor()
|
||||
|
||||
self.retriever.daemon = True
|
||||
self.retriever.start()
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.processor.handle_subunit_event()
|
||||
except:
|
||||
logging.exception("Exception processing log event.")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
|
Loading…
Reference in New Issue