refactored child worker error handling to fix hangs

This commit is contained in:
John Dickinson 2012-03-26 06:48:45 -05:00
parent 856ecacc7c
commit ae0e7e0d1f
1 changed files with 17 additions and 4 deletions

View File

@ -23,6 +23,8 @@ from contextlib import contextmanager
import os
import errno
import fcntl
import sys
import traceback
from eventlet import sleep
@ -37,6 +39,12 @@ class BadFileDownload(Exception):
self.status_code = status_code
class WorkerError(Exception):
def __init__(self):
self.tb_str = '' # ensure that there is always something here
class LogProcessorCommon(object):
def __init__(self, conf, logger, log_route='log-processor'):
@ -188,9 +196,9 @@ def multiprocess_collate(processor_klass, processor_args, processor_method,
if logger:
logger.exception('error reading from out queue')
else:
if isinstance(data, Exception):
if isinstance(data, WorkerError):
if logger:
logger.exception(data)
logger.error(data.tb_str)
else:
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
@ -214,8 +222,13 @@ def collate_worker(processor_klass, processor_args, processor_method, in_queue,
return
try:
ret = method(*item)
except Exception, err:
ret = err
except Exception:
err_type, err, tb = sys.exc_info()
# Use err_type since unplickling err in the parent process
# will fail if it has a custom constructor with required
# parameters.
ret = WorkerError()
ret.tb_str = ''.join(traceback.format_tb(tb))
out_queue.put((item, ret))
except Exception, err:
print '****ERROR in worker****\n%r\n********' % err