Avoid circular refs more aggressively in DependencyTaskGroup
Be ultra-careful to make sure that we can't end up with a local reference
to a traceback that contains the current function, thus causing a reference
loop where everything on the call stack has to be garbage collected.
Also, ignore exceptions from cancelling threads in _cancel_recursively(),
just as we do in cancel_all() since
2ffbd913a6
.
Change-Id: I635c41faab4b54e95132a93f8046d0a63053485d
Related-Bug: #1626675
This commit is contained in:
parent
82b8fd8c17
commit
38483c56fc
|
@ -20,7 +20,6 @@ from oslo_log import log as logging
|
|||
from oslo_utils import encodeutils
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
from six import reraise as raise_
|
||||
|
||||
from heat.common.i18n import _
|
||||
from heat.common.i18n import _LI
|
||||
|
@ -421,47 +420,50 @@ class DependencyTaskGroup(object):
|
|||
raised_exceptions = []
|
||||
thrown_exceptions = []
|
||||
|
||||
while any(six.itervalues(self._runners)):
|
||||
try:
|
||||
for k, r in self._ready():
|
||||
r.start()
|
||||
if not r:
|
||||
del self._graph[k]
|
||||
try:
|
||||
while any(six.itervalues(self._runners)):
|
||||
try:
|
||||
for k, r in self._ready():
|
||||
r.start()
|
||||
if not r:
|
||||
del self._graph[k]
|
||||
|
||||
if self._graph:
|
||||
if self._graph:
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
thrown_exceptions.append(sys.exc_info())
|
||||
raise
|
||||
|
||||
for k, r in self._running():
|
||||
if r.step():
|
||||
del self._graph[k]
|
||||
except Exception:
|
||||
exc_info = None
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
thrown_exceptions.append(sys.exc_info())
|
||||
raise
|
||||
exc_info = sys.exc_info()
|
||||
if self.aggregate_exceptions:
|
||||
self._cancel_recursively(k, r)
|
||||
else:
|
||||
self.cancel_all(grace_period=self.error_wait_time)
|
||||
raised_exceptions.append(exc_info)
|
||||
finally:
|
||||
del exc_info
|
||||
except: # noqa
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.cancel_all()
|
||||
|
||||
for k, r in self._running():
|
||||
if r.step():
|
||||
del self._graph[k]
|
||||
except Exception:
|
||||
exc_info = sys.exc_info()
|
||||
if self.aggregate_exceptions:
|
||||
self._cancel_recursively(k, r)
|
||||
else:
|
||||
self.cancel_all(grace_period=self.error_wait_time)
|
||||
raised_exceptions.append(exc_info)
|
||||
del exc_info
|
||||
except: # noqa
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.cancel_all()
|
||||
|
||||
if raised_exceptions:
|
||||
try:
|
||||
if raised_exceptions:
|
||||
if self.aggregate_exceptions:
|
||||
raise ExceptionGroup(v for t, v, tb in raised_exceptions)
|
||||
else:
|
||||
if thrown_exceptions:
|
||||
raise_(*thrown_exceptions[-1])
|
||||
six.reraise(*thrown_exceptions[-1])
|
||||
else:
|
||||
raise_(*raised_exceptions[0])
|
||||
finally:
|
||||
del raised_exceptions
|
||||
del thrown_exceptions
|
||||
six.reraise(*raised_exceptions[0])
|
||||
finally:
|
||||
del raised_exceptions
|
||||
del thrown_exceptions
|
||||
|
||||
def cancel_all(self, grace_period=None):
|
||||
if callable(grace_period):
|
||||
|
@ -478,7 +480,10 @@ class DependencyTaskGroup(object):
|
|||
LOG.debug('Exception cancelling task: %s' % six.text_type(ex))
|
||||
|
||||
def _cancel_recursively(self, key, runner):
|
||||
runner.cancel()
|
||||
try:
|
||||
runner.cancel()
|
||||
except Exception as ex:
|
||||
LOG.debug('Exception cancelling task: %s' % six.text_type(ex))
|
||||
node = self._graph[key]
|
||||
for dependent_node in node.required_by():
|
||||
node_runner = self._runners[dependent_node]
|
||||
|
|
Loading…
Reference in New Issue