diff --git a/futurist/_futures.py b/futurist/_futures.py index d3dd105..5dfe1d3 100644 --- a/futurist/_futures.py +++ b/futurist/_futures.py @@ -414,8 +414,8 @@ class GreenThreadPoolExecutor(_futures.Executor): else: shutoff = False if wait and shutoff: - self._pool.waitall() self._delayed_work.join() + self._pool.waitall() class ExecutorStatistics(object): diff --git a/futurist/_green.py b/futurist/_green.py index ef2c2d5..daf937e 100644 --- a/futurist/_green.py +++ b/futurist/_green.py @@ -13,6 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import sys from futurist import _utils @@ -68,17 +69,33 @@ class GreenWorker(object): # Run our main piece of work. try: self.work.run() - finally: - # Consume any delayed work before finishing (this is how we finish - # work that was to big for the pool size, but needs to be finished - # no matter). - while True: - try: - w = self.work_queue.get_nowait() - except greenqueue.Empty: - break - else: + except SystemExit as e: + exc_info = sys.exc_info() + try: + while True: try: - w.run() + w = self.work_queue.get_nowait() + except greenqueue.Empty: + break + + try: + w.fail(exc_info) finally: self.work_queue.task_done() + finally: + del exc_info + raise e + + # Consume any delayed work before finishing (this is how we finish + # work that was to big for the pool size, but needs to be finished + # no matter). + while True: + try: + w = self.work_queue.get_nowait() + except greenqueue.Empty: + break + else: + try: + w.run() + finally: + self.work_queue.task_done() diff --git a/futurist/_utils.py b/futurist/_utils.py index 26f7155..11648bc 100644 --- a/futurist/_utils.py +++ b/futurist/_utils.py @@ -44,18 +44,27 @@ class WorkItem(object): return try: result = self.fn(*self.args, **self.kwargs) - except BaseException: - exc_type, exc_value, exc_tb = sys.exc_info() + except SystemExit as e: try: - if six.PY2: - self.future.set_exception_info(exc_value, exc_tb) - else: - self.future.set_exception(exc_value) + self.fail() finally: - del(exc_type, exc_value, exc_tb) + raise e + except BaseException: + self.fail() else: self.future.set_result(result) + def fail(self, exc_info=None): + exc_type, exc_value, exc_tb = exc_info or sys.exc_info() + try: + if six.PY2: + self.future.set_exception_info(exc_value, exc_tb) + else: + self.future.set_exception(exc_value) + finally: + if exc_info is None: + del exc_type, exc_value, exc_tb + class Failure(object): """Object that captures a exception (and its associated information)."""