Fix multiprocessing issues

This commit is contained in:
Stanislav Kudriashev 2015-07-08 19:30:48 +03:00
parent 72a3d5af71
commit dd1b941851
1 changed files with 26 additions and 12 deletions

View File

@ -6,8 +6,14 @@ import termcolor
import timeit
# Windows and Python 2.7 multiprocessing don't marry well.
_results_queue = None
if os.name != 'nt':
import multiprocessing
try:
import Queue
except ImportError:
import queue as Queue
_results_queue = multiprocessing.Queue()
from nose.plugins import Plugin
@ -21,14 +27,7 @@ class TimerPlugin(Plugin):
score = 1
time_format = re.compile(r'^(?P<time>\d+)(?P<units>s|ms)?$')
def __init__(self):
super(TimerPlugin, self).__init__()
if os.name != 'nt':
self._timed_tests = multiprocessing.Manager().dict()
else:
self._timed_tests = {}
_timed_tests = {}
def _time_taken(self):
if hasattr(self, '_timer'):
@ -73,13 +72,16 @@ class TimerPlugin(Plugin):
self.timer_ok = self._parse_time(options.timer_ok)
self.timer_warning = self._parse_time(options.timer_warning)
self.timer_filter = self._parse_filter(options.timer_filter)
self.timer_no_color = True
# Windows + nosetests does not support colors (even with colorama).
if os.name == 'nt':
self.timer_no_color = True
else:
if os.name != 'nt':
self.timer_no_color = options.timer_no_color
# determine if multiprocessing plugin enabled
self.multiprocessing_enabled = \
bool(getattr(options, 'multiprocess_workers', False))
def startTest(self, test):
"""Initializes a timer before starting a test."""
self._timer = timeit.default_timer()
@ -89,6 +91,15 @@ class TimerPlugin(Plugin):
if not self.enabled:
return
# if multiprocessing plugin enabled - get items from results queue
if self.multiprocessing_enabled:
for i in range(_results_queue.qsize()):
try:
k, v = _results_queue.get_nowait()
self._timed_tests[k] = v
except Queue.Empty:
pass
d = sorted(self._timed_tests.items(),
key=operator.itemgetter(1),
reverse=True)
@ -133,7 +144,10 @@ class TimerPlugin(Plugin):
return "{0}: {1}".format(test, self._colored_time(time_taken, color))
def _register_time(self, test):
self._timed_tests[test.id()] = self._time_taken()
if self.multiprocessing_enabled:
_results_queue.put((test.id(), self._time_taken()))
else:
self._timed_tests[test.id()] = self._time_taken()
def addError(self, test, err, capt=None):
"""Called when a test raises an uncaught exception."""