From 30dcf069467e387c77cbc82300a2abfe7a93e2cc Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Wed, 3 Sep 2014 16:02:42 -0700 Subject: [PATCH] Handle log processing subprocess cleanup better We are leaking file descriptors in our log worker processes because we are are not catch all possible errors leaving some actions left behind to do. More aggressively catch errors so that all cleanup happens Change-Id: I7a73a36c6fc42d4eba636cf36c8cfffcea48a318 --- log_processor/worker.py | 67 ++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/log_processor/worker.py b/log_processor/worker.py index 0a418d7..66007cb 100644 --- a/log_processor/worker.py +++ b/log_processor/worker.py @@ -92,13 +92,21 @@ class CRM114Filter(object): r = r.strip() data['error_pr'] = float(r) + def _catchOSError(self, method): + try: + method() + except OSError: + logging.exception("Subprocess cleanup failed.") + def close(self): if not self.p: return - self.p.stdin.close() - self.p.stdout.read() - self.p.stderr.read() - self.p.wait() + # CRM114 should die when its stdinput is closed. Close that + # fd along with stdout and stderr then return. + self._catchOSError(self.p.stdin.close) + self._catchOSError(self.p.stdout.close) + self._catchOSError(self.p.stderr.close) + self._catchOSError(self.p.wait) class CRM114FilterFactory(object): @@ -143,31 +151,34 @@ class LogRetriever(threading.Thread): # discarded by zuul. log_lines = self._retrieve_log(source_url, retry) - filters = [] - for f in self.filters: - logging.debug("Adding filter: %s" % f.name) - filters.append(f.create(fields)) - all_filters = filters + try: + all_filters = [] + for f in self.filters: + logging.debug("Adding filter: %s" % f.name) + all_filters.append(f.create(fields)) + filters = all_filters - logging.debug("Pushing " + str(len(log_lines)) + " log lines.") - base_event = {} - base_event.update(fields) - base_event["tags"] = tags - for line in log_lines: - out_event = base_event.copy() - out_event["message"] = line - new_filters = [] - for f in filters: - try: - f.process(out_event) - new_filters.append(f) - except FilterException: - logging.exception("Exception filtering event: " - "%s" % line.encode("utf-8")) - filters = new_filters - self.logq.put(out_event) - for f in all_filters: - f.close() + logging.debug("Pushing " + str(len(log_lines)) + + " log lines.") + base_event = {} + base_event.update(fields) + base_event["tags"] = tags + for line in log_lines: + out_event = base_event.copy() + out_event["message"] = line + new_filters = [] + for f in filters: + try: + f.process(out_event) + new_filters.append(f) + except FilterException: + logging.exception("Exception filtering event: " + "%s" % line.encode("utf-8")) + filters = new_filters + self.logq.put(out_event) + finally: + for f in all_filters: + f.close() job.sendWorkComplete() except Exception as e: logging.exception("Exception handling log event.")