diff --git a/files/log-gearman-worker.py b/files/log-gearman-worker.py index 6d4466e..40d23b8 100644 --- a/files/log-gearman-worker.py +++ b/files/log-gearman-worker.py @@ -140,6 +140,7 @@ class LogRetriever(threading.Thread): def _handle_event(self): fields = {} + log_lines = None source_url = '' job = self.gearman_worker.getJob() try: @@ -184,7 +185,8 @@ class LogRetriever(threading.Thread): for f in all_filters: f.close() job.sendWorkComplete() - if self.mqtt: + # Only send mqtt events for log files we processed. + if self.mqtt and log_lines: msg = json.dumps({ 'build_uuid': fields.get('build_uuid'), 'source_url': source_url, @@ -301,10 +303,9 @@ class LogRetriever(threading.Thread): class StdOutLogProcessor(object): - def __init__(self, logq, pretty_print=False, mqtt=None): + def __init__(self, logq, pretty_print=False): self.logq = logq self.pretty_print = pretty_print - self.mqtt = mqtt def handle_log_event(self): log = self.logq.get() @@ -315,27 +316,16 @@ class StdOutLogProcessor(object): print(json.dumps(log)) # Push each log event through to keep logstash up to date. sys.stdout.flush() - if self.mqtt: - msg = json.dumps({ - 'build_uuid': log.get('build_uuid'), - 'source_url': log.get('log_url'), - 'status': 'success', - }) - self.mqtt.publish_single(msg, log.get('project'), - log.get('build_change'), - 'logs_to_logstash', - log.get('build_queue')) class INETLogProcessor(object): socket_type = None - def __init__(self, logq, host, port, mqtt=None): + def __init__(self, logq, host, port): self.logq = logq self.host = host self.port = port self.socket = None - self.mqtt = mqtt def _connect_socket(self): logging.debug("Creating socket.") @@ -348,16 +338,6 @@ class INETLogProcessor(object): if self.socket is None: self._connect_socket() self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - if self.mqtt: - msg = json.dumps({ - 'build_uuid': log.get('build_uuid'), - 'source_url': log.get('log_url'), - 'status': 'success', - }) - self.mqtt.publish_single(msg, log.get('project'), - log.get('build_change'), - 'logs_to_logstash', - log.get('build_queue')) except: logging.exception("Exception sending INET event.") # Logstash seems to take about a minute to start again. Wait 90 @@ -367,15 +347,6 @@ class INETLogProcessor(object): semi_busy_wait(90) self._connect_socket() self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) - if self.mqtt: - msg = json.dumps({ - 'build_uuid': log.get('build_uuid'), - 'status': 'success', - }) - self.mqtt.publish_single(msg, log.get('project'), - log.get('build_change'), - 'logs_to_logstash', - log.get('build_queue')) class UDPLogProcessor(INETLogProcessor): @@ -496,17 +467,15 @@ class Server(object): if self.output_mode == "tcp": self.processor = TCPLogProcessor(self.logqueue, self.output_host, - self.output_port, - mqtt=self.mqtt) + self.output_port) elif self.output_mode == "udp": self.processor = UDPLogProcessor(self.logqueue, self.output_host, - self.output_port, - mqtt=self.mqtt) + self.output_port) else: # Note this processor will not work if the process is run as a # daemon. You must use the --foreground option. - self.processor = StdOutLogProcessor(self.logqueue, mqtt=self.mqtt) + self.processor = StdOutLogProcessor(self.logqueue) def main(self): self.setup_retriever()