Add re-raising SystemExit exception

SystemExit exception is usually raised by signal handlers
and then caught in main thread to handle signal. Signal
handler executes in main threads but if we use eventlet
we have ony one thread and signal handler can interrupt
any greenlet. Therefore we need to re-raise this exception
to catch it in main greenlet at last.

Also it is necessary to keep executor working and continue
processing scheduled tasks because signal handling logic may
wait for some job results

Change-Id: Ia920922d8444c8216b442577c37710538ed8e919
This commit is contained in:
Dmitriy Ukhlov 2016-05-20 11:15:09 +03:00 committed by ChangBo Guo(gcb)
parent c23d013fe4
commit ee42ab588a
3 changed files with 45 additions and 19 deletions

View File

@ -414,8 +414,8 @@ class GreenThreadPoolExecutor(_futures.Executor):
else:
shutoff = False
if wait and shutoff:
self._pool.waitall()
self._delayed_work.join()
self._pool.waitall()
class ExecutorStatistics(object):

View File

@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from futurist import _utils
@ -68,17 +69,33 @@ class GreenWorker(object):
# Run our main piece of work.
try:
self.work.run()
finally:
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
except SystemExit as e:
exc_info = sys.exc_info()
try:
while True:
try:
w.run()
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
try:
w.fail(exc_info)
finally:
self.work_queue.task_done()
finally:
del exc_info
raise e
# Consume any delayed work before finishing (this is how we finish
# work that was to big for the pool size, but needs to be finished
# no matter).
while True:
try:
w = self.work_queue.get_nowait()
except greenqueue.Empty:
break
else:
try:
w.run()
finally:
self.work_queue.task_done()

View File

@ -44,18 +44,27 @@ class WorkItem(object):
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
exc_type, exc_value, exc_tb = sys.exc_info()
except SystemExit as e:
try:
if six.PY2:
self.future.set_exception_info(exc_value, exc_tb)
else:
self.future.set_exception(exc_value)
self.fail()
finally:
del(exc_type, exc_value, exc_tb)
raise e
except BaseException:
self.fail()
else:
self.future.set_result(result)
def fail(self, exc_info=None):
exc_type, exc_value, exc_tb = exc_info or sys.exc_info()
try:
if six.PY2:
self.future.set_exception_info(exc_value, exc_tb)
else:
self.future.set_exception(exc_value)
finally:
if exc_info is None:
del exc_type, exc_value, exc_tb
class Failure(object):
"""Object that captures a exception (and its associated information)."""