Handle log processing subprocess cleanup better

We are leaking file descriptors in our log worker processes because we
are are not catch all possible errors leaving some actions left behind
to do. More aggressively catch errors so that all cleanup happens

Change-Id: I7a73a36c6fc42d4eba636cf36c8cfffcea48a318
This commit is contained in:
Clark Boylan 2014-09-03 16:02:42 -07:00
parent 837d4f8265
commit 30dcf06946
1 changed files with 39 additions and 28 deletions

View File

@ -92,13 +92,21 @@ class CRM114Filter(object):
r = r.strip()
data['error_pr'] = float(r)
def _catchOSError(self, method):
try:
method()
except OSError:
logging.exception("Subprocess cleanup failed.")
def close(self):
if not self.p:
return
self.p.stdin.close()
self.p.stdout.read()
self.p.stderr.read()
self.p.wait()
# CRM114 should die when its stdinput is closed. Close that
# fd along with stdout and stderr then return.
self._catchOSError(self.p.stdin.close)
self._catchOSError(self.p.stdout.close)
self._catchOSError(self.p.stderr.close)
self._catchOSError(self.p.wait)
class CRM114FilterFactory(object):
@ -143,31 +151,34 @@ class LogRetriever(threading.Thread):
# discarded by zuul.
log_lines = self._retrieve_log(source_url, retry)
filters = []
for f in self.filters:
logging.debug("Adding filter: %s" % f.name)
filters.append(f.create(fields))
all_filters = filters
try:
all_filters = []
for f in self.filters:
logging.debug("Adding filter: %s" % f.name)
all_filters.append(f.create(fields))
filters = all_filters
logging.debug("Pushing " + str(len(log_lines)) + " log lines.")
base_event = {}
base_event.update(fields)
base_event["tags"] = tags
for line in log_lines:
out_event = base_event.copy()
out_event["message"] = line
new_filters = []
for f in filters:
try:
f.process(out_event)
new_filters.append(f)
except FilterException:
logging.exception("Exception filtering event: "
"%s" % line.encode("utf-8"))
filters = new_filters
self.logq.put(out_event)
for f in all_filters:
f.close()
logging.debug("Pushing " + str(len(log_lines)) +
" log lines.")
base_event = {}
base_event.update(fields)
base_event["tags"] = tags
for line in log_lines:
out_event = base_event.copy()
out_event["message"] = line
new_filters = []
for f in filters:
try:
f.process(out_event)
new_filters.append(f)
except FilterException:
logging.exception("Exception filtering event: "
"%s" % line.encode("utf-8"))
filters = new_filters
self.logq.put(out_event)
finally:
for f in all_filters:
f.close()
job.sendWorkComplete()
except Exception as e:
logging.exception("Exception handling log event.")