Avoid circular refs more aggressively in DependencyTaskGroup

Be ultra-careful to make sure that we can't end up with a local reference
to a traceback that contains the current function, thus causing a reference
loop where everything on the call stack has to be garbage collected.

Also, ignore exceptions from cancelling threads in _cancel_recursively(),
just as we do in cancel_all() since
2ffbd913a6.

Change-Id: I635c41faab4b54e95132a93f8046d0a63053485d
Related-Bug: #1626675
This commit is contained in:
Zane Bitter 2016-09-22 18:24:05 -04:00
parent 82b8fd8c17
commit 38483c56fc
1 changed files with 40 additions and 35 deletions

View File

@ -20,7 +20,6 @@ from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import excutils
import six
from six import reraise as raise_
from heat.common.i18n import _
from heat.common.i18n import _LI
@ -421,47 +420,50 @@ class DependencyTaskGroup(object):
raised_exceptions = []
thrown_exceptions = []
while any(six.itervalues(self._runners)):
try:
for k, r in self._ready():
r.start()
if not r:
del self._graph[k]
try:
while any(six.itervalues(self._runners)):
try:
for k, r in self._ready():
r.start()
if not r:
del self._graph[k]
if self._graph:
if self._graph:
try:
yield
except Exception:
thrown_exceptions.append(sys.exc_info())
raise
for k, r in self._running():
if r.step():
del self._graph[k]
except Exception:
exc_info = None
try:
yield
except Exception:
thrown_exceptions.append(sys.exc_info())
raise
exc_info = sys.exc_info()
if self.aggregate_exceptions:
self._cancel_recursively(k, r)
else:
self.cancel_all(grace_period=self.error_wait_time)
raised_exceptions.append(exc_info)
finally:
del exc_info
except: # noqa
with excutils.save_and_reraise_exception():
self.cancel_all()
for k, r in self._running():
if r.step():
del self._graph[k]
except Exception:
exc_info = sys.exc_info()
if self.aggregate_exceptions:
self._cancel_recursively(k, r)
else:
self.cancel_all(grace_period=self.error_wait_time)
raised_exceptions.append(exc_info)
del exc_info
except: # noqa
with excutils.save_and_reraise_exception():
self.cancel_all()
if raised_exceptions:
try:
if raised_exceptions:
if self.aggregate_exceptions:
raise ExceptionGroup(v for t, v, tb in raised_exceptions)
else:
if thrown_exceptions:
raise_(*thrown_exceptions[-1])
six.reraise(*thrown_exceptions[-1])
else:
raise_(*raised_exceptions[0])
finally:
del raised_exceptions
del thrown_exceptions
six.reraise(*raised_exceptions[0])
finally:
del raised_exceptions
del thrown_exceptions
def cancel_all(self, grace_period=None):
if callable(grace_period):
@ -478,7 +480,10 @@ class DependencyTaskGroup(object):
LOG.debug('Exception cancelling task: %s' % six.text_type(ex))
def _cancel_recursively(self, key, runner):
runner.cancel()
try:
runner.cancel()
except Exception as ex:
LOG.debug('Exception cancelling task: %s' % six.text_type(ex))
node = self._graph[key]
for dependent_node in node.required_by():
node_runner = self._runners[dependent_node]