Only send mqtt events for processed files

We were previously sending events for every file we attempted to
process, not just those that were processed and also for every single
log line event. This effectively doubled the io performed by the
logstash workers which seemed to slow the whole pipeline down. Trim it
down to only recording events for log files that are processed which
should significantly trim down the total number of events.

Change-Id: I0daf3eb2e2b3240e3efa4f2c7bac57de99505df0
This commit is contained in:
Clark Boylan 2017-08-03 15:28:45 -07:00
parent becc05e0aa
commit 88e0d21347
1 changed files with 8 additions and 39 deletions

View File

@ -140,6 +140,7 @@ class LogRetriever(threading.Thread):
def _handle_event(self):
fields = {}
log_lines = None
source_url = ''
job = self.gearman_worker.getJob()
try:
@ -184,7 +185,8 @@ class LogRetriever(threading.Thread):
for f in all_filters:
f.close()
job.sendWorkComplete()
if self.mqtt:
# Only send mqtt events for log files we processed.
if self.mqtt and log_lines:
msg = json.dumps({
'build_uuid': fields.get('build_uuid'),
'source_url': source_url,
@ -301,10 +303,9 @@ class LogRetriever(threading.Thread):
class StdOutLogProcessor(object):
def __init__(self, logq, pretty_print=False, mqtt=None):
def __init__(self, logq, pretty_print=False):
self.logq = logq
self.pretty_print = pretty_print
self.mqtt = mqtt
def handle_log_event(self):
log = self.logq.get()
@ -315,27 +316,16 @@ class StdOutLogProcessor(object):
print(json.dumps(log))
# Push each log event through to keep logstash up to date.
sys.stdout.flush()
if self.mqtt:
msg = json.dumps({
'build_uuid': log.get('build_uuid'),
'source_url': log.get('log_url'),
'status': 'success',
})
self.mqtt.publish_single(msg, log.get('project'),
log.get('build_change'),
'logs_to_logstash',
log.get('build_queue'))
class INETLogProcessor(object):
socket_type = None
def __init__(self, logq, host, port, mqtt=None):
def __init__(self, logq, host, port):
self.logq = logq
self.host = host
self.port = port
self.socket = None
self.mqtt = mqtt
def _connect_socket(self):
logging.debug("Creating socket.")
@ -348,16 +338,6 @@ class INETLogProcessor(object):
if self.socket is None:
self._connect_socket()
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
if self.mqtt:
msg = json.dumps({
'build_uuid': log.get('build_uuid'),
'source_url': log.get('log_url'),
'status': 'success',
})
self.mqtt.publish_single(msg, log.get('project'),
log.get('build_change'),
'logs_to_logstash',
log.get('build_queue'))
except:
logging.exception("Exception sending INET event.")
# Logstash seems to take about a minute to start again. Wait 90
@ -367,15 +347,6 @@ class INETLogProcessor(object):
semi_busy_wait(90)
self._connect_socket()
self.socket.sendall((json.dumps(log) + '\n').encode('utf-8'))
if self.mqtt:
msg = json.dumps({
'build_uuid': log.get('build_uuid'),
'status': 'success',
})
self.mqtt.publish_single(msg, log.get('project'),
log.get('build_change'),
'logs_to_logstash',
log.get('build_queue'))
class UDPLogProcessor(INETLogProcessor):
@ -496,17 +467,15 @@ class Server(object):
if self.output_mode == "tcp":
self.processor = TCPLogProcessor(self.logqueue,
self.output_host,
self.output_port,
mqtt=self.mqtt)
self.output_port)
elif self.output_mode == "udp":
self.processor = UDPLogProcessor(self.logqueue,
self.output_host,
self.output_port,
mqtt=self.mqtt)
self.output_port)
else:
# Note this processor will not work if the process is run as a
# daemon. You must use the --foreground option.
self.processor = StdOutLogProcessor(self.logqueue, mqtt=self.mqtt)
self.processor = StdOutLogProcessor(self.logqueue)
def main(self):
self.setup_retriever()