Add re-raising SystemExit exception
SystemExit exception is usually raised by signal handlers and then caught in main thread to handle signal. Signal handler executes in main threads but if we use eventlet we have ony one thread and signal handler can interrupt any greenlet. Therefore we need to re-raise this exception to catch it in main greenlet at last. Also it is necessary to keep executor working and continue processing scheduled tasks because signal handling logic may wait for some job results Change-Id: Ia920922d8444c8216b442577c37710538ed8e919
This commit is contained in:
parent
c23d013fe4
commit
ee42ab588a
|
@ -414,8 +414,8 @@ class GreenThreadPoolExecutor(_futures.Executor):
|
|||
else:
|
||||
shutoff = False
|
||||
if wait and shutoff:
|
||||
self._pool.waitall()
|
||||
self._delayed_work.join()
|
||||
self._pool.waitall()
|
||||
|
||||
|
||||
class ExecutorStatistics(object):
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import sys
|
||||
|
||||
from futurist import _utils
|
||||
|
||||
|
@ -68,17 +69,33 @@ class GreenWorker(object):
|
|||
# Run our main piece of work.
|
||||
try:
|
||||
self.work.run()
|
||||
finally:
|
||||
# Consume any delayed work before finishing (this is how we finish
|
||||
# work that was to big for the pool size, but needs to be finished
|
||||
# no matter).
|
||||
while True:
|
||||
try:
|
||||
w = self.work_queue.get_nowait()
|
||||
except greenqueue.Empty:
|
||||
break
|
||||
else:
|
||||
except SystemExit as e:
|
||||
exc_info = sys.exc_info()
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
w.run()
|
||||
w = self.work_queue.get_nowait()
|
||||
except greenqueue.Empty:
|
||||
break
|
||||
|
||||
try:
|
||||
w.fail(exc_info)
|
||||
finally:
|
||||
self.work_queue.task_done()
|
||||
finally:
|
||||
del exc_info
|
||||
raise e
|
||||
|
||||
# Consume any delayed work before finishing (this is how we finish
|
||||
# work that was to big for the pool size, but needs to be finished
|
||||
# no matter).
|
||||
while True:
|
||||
try:
|
||||
w = self.work_queue.get_nowait()
|
||||
except greenqueue.Empty:
|
||||
break
|
||||
else:
|
||||
try:
|
||||
w.run()
|
||||
finally:
|
||||
self.work_queue.task_done()
|
||||
|
|
|
@ -44,18 +44,27 @@ class WorkItem(object):
|
|||
return
|
||||
try:
|
||||
result = self.fn(*self.args, **self.kwargs)
|
||||
except BaseException:
|
||||
exc_type, exc_value, exc_tb = sys.exc_info()
|
||||
except SystemExit as e:
|
||||
try:
|
||||
if six.PY2:
|
||||
self.future.set_exception_info(exc_value, exc_tb)
|
||||
else:
|
||||
self.future.set_exception(exc_value)
|
||||
self.fail()
|
||||
finally:
|
||||
del(exc_type, exc_value, exc_tb)
|
||||
raise e
|
||||
except BaseException:
|
||||
self.fail()
|
||||
else:
|
||||
self.future.set_result(result)
|
||||
|
||||
def fail(self, exc_info=None):
|
||||
exc_type, exc_value, exc_tb = exc_info or sys.exc_info()
|
||||
try:
|
||||
if six.PY2:
|
||||
self.future.set_exception_info(exc_value, exc_tb)
|
||||
else:
|
||||
self.future.set_exception(exc_value)
|
||||
finally:
|
||||
if exc_info is None:
|
||||
del exc_type, exc_value, exc_tb
|
||||
|
||||
|
||||
class Failure(object):
|
||||
"""Object that captures a exception (and its associated information)."""
|
||||
|
|
Loading…
Reference in New Issue