Fix 'on_failure' param not be used

Ensures it is used, and exceptions from it are
handled, and that it is type checked correctly before
being used.

Closes-Bug: #1531530

Change-Id: I0843f363ae70d3b6475cddeebb7564204a10a04d
This commit is contained in:
Joshua Harlow 2016-03-07 10:38:42 -08:00
parent 74a8c7ad72
commit 29c5d14d4a
2 changed files with 64 additions and 19 deletions

View File

@ -422,18 +422,19 @@ class PeriodicWorker(object):
four positional arguments and one keyword
argument, the first positional argument being the
callable that failed, the second being the type
of activity under which it failed (IMMEDIATE or
PERIODIC), the third being the spacing that the
callable runs at and the fourth `exc_info` tuple
of the failure. The keyword argument 'traceback'
of activity under which it failed (``IMMEDIATE`` or
``PERIODIC``), the third being the spacing that the
callable runs at and the fourth ``exc_info`` tuple
of the failure. The keyword argument ``traceback``
will also be provided that may be be a string
that caused the failure (this is required for
executors which run out of process, as those can not
transfer stack frames across process boundaries); if
no callable is provided then a default failure
logging function will be used instead, do note that
*currently* transfer stack frames across process
boundaries); if no callable is provided then a
default failure logging function will be used
instead (do note that
any user provided callable should not raise
exceptions on being called
exceptions on being called)
:type on_failure: callable
:param args: positional arguments to be passed to all callables
:type args: tuple
@ -502,20 +503,24 @@ class PeriodicWorker(object):
four positional arguments and one keyword
argument, the first positional argument being the
callable that failed, the second being the type
of activity under which it failed (IMMEDIATE or
PERIODIC), the third being the spacing that the
callable runs at and the fourth `exc_info` tuple
of the failure. The keyword argument 'traceback'
of activity under which it failed (``IMMEDIATE`` or
``PERIODIC``), the third being the spacing that the
callable runs at and the fourth ``exc_info`` tuple
of the failure. The keyword argument ``traceback``
will also be provided that may be be a string
that caused the failure (this is required for
executors which run out of process, as those can not
transfer stack frames across process boundaries); if
no callable is provided then a default failure
logging function will be used instead, do note that
*currently* transfer stack frames across process
boundaries); if no callable is provided then a
default failure logging function will be used
instead (do note that
any user provided callable should not raise
exceptions on being called
exceptions on being called)
:type on_failure: callable
"""
if on_failure is not None and not six.callable(on_failure):
raise ValueError("On failure callback %r must be"
" callable" % on_failure)
self._tombstone = event_cls()
self._waiter = cond_cls()
self._dead = event_cls()
@ -556,7 +561,9 @@ class PeriodicWorker(object):
self._log = log or LOG
if executor_factory is None:
executor_factory = lambda: futurist.SynchronousExecutor()
self._on_failure = functools.partial(_on_failure_log, self._log)
if on_failure is None:
on_failure = functools.partial(_on_failure_log, self._log)
self._on_failure = on_failure
self._executor_factory = executor_factory
self._now_func = now_func
@ -651,8 +658,14 @@ class PeriodicWorker(object):
cb_metrics['runs'] += 1
if failure is not None:
cb_metrics['failures'] += 1
self._on_failure(cb, kind, cb._periodic_spacing,
failure.exc_info, traceback=failure.traceback)
try:
self._on_failure(cb, kind, cb._periodic_spacing,
failure.exc_info,
traceback=failure.traceback)
except Exception as exc:
self._log.error("On failure callback %r raised an"
" unhandled exception. Error: %s",
self._on_failure, exc)
else:
cb_metrics['successes'] += 1
elapsed = max(0, finished_at - started_at)

View File

@ -200,6 +200,38 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
nows = list(reversed(nows))
self._test_strategy('last_started', nows, last_now, 4.0)
def test_failure_callback_fail(self):
worker_kwargs = self.worker_kwargs.copy()
worker_kwargs['on_failure'] = 'not-a-func'
self.assertRaises(ValueError,
periodics.PeriodicWorker, [], **worker_kwargs)
def test_failure_callback(self):
captures = []
ev = self.event_cls()
@periodics.periodic(0.1)
def failing_always():
raise RuntimeError("Broke!")
def trap_failures(cb, kind, periodic_spacing,
exc_info, traceback=None):
captures.append([cb, kind, periodic_spacing, traceback])
ev.set()
callables = [
(failing_always, None, None),
]
worker_kwargs = self.worker_kwargs.copy()
worker_kwargs['on_failure'] = trap_failures
w = periodics.PeriodicWorker(callables, **worker_kwargs)
with self.create_destroy(w.start, allow_empty=True):
ev.wait()
w.stop()
self.assertEqual(captures[0], [failing_always, periodics.PERIODIC,
0.1, mock.ANY])
def test_aligned_strategy(self):
last_now = 5.5
nows = [