diff --git a/files/subunit-gearman-worker.py b/files/subunit-gearman-worker.py index 36a11b2..5c64833 100644 --- a/files/subunit-gearman-worker.py +++ b/files/subunit-gearman-worker.py @@ -22,7 +22,6 @@ import io import json import logging import os -import Queue import socket import threading import time @@ -56,11 +55,50 @@ class FilterException(Exception): class SubunitRetriever(threading.Thread): - def __init__(self, gearman_worker, filters, subunitq): + def __init__(self, gearman_worker, filters, subunit2sql_conf): threading.Thread.__init__(self) self.gearman_worker = gearman_worker self.filters = filters - self.subunitq = subunitq + # Initialize subunit2sql settings + self.config = subunit2sql_conf + shell.cli_opts() + extensions = shell.get_extensions() + shell.parse_args([], [self.config]) + self.extra_targets = shell.get_targets(extensions) + + def _write_to_db(self, subunit): + subunit_v2 = subunit.pop('subunit') + # Set run metadata from gearman + log_url = subunit.pop('log_url', None) + if log_url: + log_dir = os.path.dirname(log_url) + + # log_dir should be the top-level directory containing a job run, + # but the subunit file may be nested in 0 - 2 subdirectories (top, + # logs/, or logs/old/), so we need to safely correct the path here + log_base = os.path.basename(log_dir) + if log_base == 'logs': + log_dir = os.path.dirname(log_dir) + elif log_base == 'old': + log_dir = os.path.dirname(os.path.dirname(log_dir)) + + shell.CONF.set_override('artifacts', log_dir) + shell.CONF.set_override('run_meta', subunit) + # Parse subunit stream and store in DB + if subunit_v2.closed: + logging.debug('Trying to convert closed subunit v2 stream: %s to ' + 'SQL' % subunit_v2) + else: + logging.debug('Converting Subunit V2 stream: %s to SQL' % + subunit_v2) + stream = read_subunit.ReadSubunit(subunit_v2, + targets=self.extra_targets) + results = stream.get_results() + start_time = sorted( + [results[x]['start_time'] for x in results if x != 'run_time'])[0] + shell.CONF.set_override('run_at', start_time.isoformat()) + shell.process_results(results) + subunit_v2.close() def run(self): while True: @@ -93,7 +131,7 @@ class SubunitRetriever(threading.Thread): logging.debug("Pushing subunit file: %s" % subunit_io) out_event = fields.copy() out_event["subunit"] = subunit_io - self.subunitq.put(out_event) + self._write_to_db(out_event) job.sendWorkComplete() except Exception as e: logging.exception("Exception handling log event.") @@ -163,53 +201,6 @@ class SubunitRetriever(threading.Thread): return gzipped, raw_buf -class Subunit2SQLProcessor(object): - def __init__(self, subunitq, subunit2sql_conf): - self.subunitq = subunitq - self.config = subunit2sql_conf - # Initialize subunit2sql settings - shell.cli_opts() - extensions = shell.get_extensions() - shell.parse_args([], [self.config]) - self.extra_targets = shell.get_targets(extensions) - - def handle_subunit_event(self): - # Pull subunit event from queue and separate stream from metadata - subunit = self.subunitq.get() - subunit_v2 = subunit.pop('subunit') - # Set run metadata from gearman - log_url = subunit.pop('log_url', None) - if log_url: - log_dir = os.path.dirname(log_url) - - # log_dir should be the top-level directory containing a job run, - # but the subunit file may be nested in 0 - 2 subdirectories (top, - # logs/, or logs/old/), so we need to safely correct the path here - log_base = os.path.basename(log_dir) - if log_base == 'logs': - log_dir = os.path.dirname(log_dir) - elif log_base == 'old': - log_dir = os.path.dirname(os.path.dirname(log_dir)) - - shell.CONF.set_override('artifacts', log_dir) - shell.CONF.set_override('run_meta', subunit) - # Parse subunit stream and store in DB - if subunit_v2.closed: - logging.debug('Trying to convert closed subunit v2 stream: %s to ' - 'SQL' % subunit_v2) - else: - logging.debug('Converting Subunit V2 stream: %s to SQL' % - subunit_v2) - stream = read_subunit.ReadSubunit(subunit_v2, - targets=self.extra_targets) - results = stream.get_results() - start_time = sorted( - [results[x]['start_time'] for x in results if x != 'run_time'])[0] - shell.CONF.set_override('run_at', start_time.isoformat()) - shell.process_results(results) - subunit_v2.close() - - class Server(object): def __init__(self, config, debuglog): # Config init. @@ -219,8 +210,6 @@ class Server(object): # Pythong logging output file. self.debuglog = debuglog self.retriever = None - self.subunitqueue = Queue.Queue(131072) - self.processor = None self.filter_factories = [] def setup_logging(self): @@ -238,28 +227,17 @@ class Server(object): gearman_worker.addServer(self.gearman_host, self.gearman_port) gearman_worker.registerFunction(b'push-subunit') + subunit2sql_conf = self.config['config'] self.retriever = SubunitRetriever(gearman_worker, self.filter_factories, - self.subunitqueue) - - def setup_processor(self): - subunit2sql_config = self.config['config'] - self.processor = Subunit2SQLProcessor(self.subunitqueue, - subunit2sql_config) + subunit2sql_conf) def main(self): self.setup_retriever() - self.setup_processor() self.retriever.daemon = True self.retriever.start() - while True: - try: - self.processor.handle_subunit_event() - except: - logging.exception("Exception processing log event.") - def main(): parser = argparse.ArgumentParser()