From 88e0d21347b5492dba8c6c619f1d53cdbc9cb6db Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Thu, 3 Aug 2017 15:28:45 -0700 Subject: [PATCH] Only send mqtt events for processed files We were previously sending events for every file we attempted to process, not just those that were processed and also for every single log line event. This effectively doubled the io performed by the logstash workers which seemed to slow the whole pipeline down. Trim it down to only recording events for log files that are processed which should significantly trim down the total number of events. Change-Id: I0daf3eb2e2b3240e3efa4f2c7bac57de99505df0 --- files/log-gearman-worker.py | 47 +++++++------------------------------ 1 file changed, 8 insertions(+), 39 deletions(-) diff --git a/files/log-gearman-worker.py b/files/log-gearman-worker.py index 6d4466e..40d23b8 100644 --- a/files/log-gearman-worker.py +++ b/files/log-gearman-worker.py @@ -140,6 +140,7 @@ class LogRetriever(threading.Thread): def _handle_event(self): fields = {} + log_lines = None source_url = '' job = self.gearman_worker.getJob() try: @@ -184,7 +185,8 @@ class LogRetriever(threading.Thread): for f in all_filters: f.close() job.sendWorkComplete() - if self.mqtt: + # Only send mqtt events for log files we processed. + if self.mqtt and log_lines: msg = json.dumps({ 'build_uuid': fields.get('build_uuid'), 'source_url': source_url, @@ -301,10 +303,9 @@ class LogRetriever(threading.Thread): class StdOutLogProcessor(object): - def __init__(self, logq, pretty_print=False, mqtt=None): + def __init__(self, logq, pretty_print=False): self.logq = logq self.pretty_print = pretty_print - self.mqtt = mqtt def handle_log_event(self): log = self.logq.get() @@ -315,27 +316,16 @@ class StdOutLogProcessor(object): print(json.dumps(log)) # Push each log event through to keep logstash up to date. sys.stdout.flush() - if self.mqtt: - msg = json.dumps({ - 'build_uuid': log.get('build_uuid'), - 'source_url': log.get('log_url'), - 'status': 'success', - }) - self.mqtt.publish_single(msg, log.get('project'), - log.get('build_change'), - 'logs_to_logstash', - log.get('build_queue')) class INETLogProcessor(object): socket_type = None - def __init__(self, logq, host, port, mqtt=None): + def __init__(self, logq, host, port): self.logq = logq self.host = host self.port = port self.socket = None - self.mqtt = mqtt def _connect_socket(self): logging.debug("Creating socket.") @@ -348,16 +338,6 @@ class INETLogProcessor(object): if self.socket is None: self._connect_socket() self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - if self.mqtt: - msg = json.dumps({ - 'build_uuid': log.get('build_uuid'), - 'source_url': log.get('log_url'), - 'status': 'success', - }) - self.mqtt.publish_single(msg, log.get('project'), - log.get('build_change'), - 'logs_to_logstash', - log.get('build_queue')) except: logging.exception("Exception sending INET event.") # Logstash seems to take about a minute to start again. Wait 90 @@ -367,15 +347,6 @@ class INETLogProcessor(object): semi_busy_wait(90) self._connect_socket() self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - if self.mqtt: - msg = json.dumps({ - 'build_uuid': log.get('build_uuid'), - 'status': 'success', - }) - self.mqtt.publish_single(msg, log.get('project'), - log.get('build_change'), - 'logs_to_logstash', - log.get('build_queue')) class UDPLogProcessor(INETLogProcessor): @@ -496,17 +467,15 @@ class Server(object): if self.output_mode == "tcp": self.processor = TCPLogProcessor(self.logqueue, self.output_host, - self.output_port, - mqtt=self.mqtt) + self.output_port) elif self.output_mode == "udp": self.processor = UDPLogProcessor(self.logqueue, self.output_host, - self.output_port, - mqtt=self.mqtt) + self.output_port) else: # Note this processor will not work if the process is run as a # daemon. You must use the --foreground option. - self.processor = StdOutLogProcessor(self.logqueue, mqtt=self.mqtt) + self.processor = StdOutLogProcessor(self.logqueue) def main(self): self.setup_retriever()