Merge "Allow for providing custom 'on_failure' callbacks"

This commit is contained in:
Jenkins 2015-07-30 20:56:33 +00:00 committed by Gerrit Code Review
commit d03cc4f05f
1 changed files with 62 additions and 14 deletions

View File

@ -36,6 +36,11 @@ LOG = logging.getLogger(__name__)
_REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing',
'_periodic_run_immediately')
# Constants that are used to determine what 'kind' the current callback
# is being ran as.
PERIODIC = 'periodic'
IMMEDIATE = 'immediate'
class Watcher(object):
"""A **read-only** object representing a periodics callbacks activities."""
@ -205,6 +210,16 @@ class _Schedule(object):
return heapq.heappop(self._ordering)
def _on_failure_log(log, cb, kind, spacing, exc_info, traceback=None):
cb_name = utils.get_callback_name(cb)
if all(exc_info) or not traceback:
log.error("Failed to call %s '%s' (it runs every %0.2f"
" seconds)", kind, cb_name, spacing, exc_info=exc_info)
else:
log.error("Failed to call %s '%s' (it runs every %0.2f"
" seconds):\n%s", kind, cb_name, spacing, traceback)
def _run_callback_retain(now_func, cb, *args, **kwargs):
# NOTE(harlowja): this needs to be a module level function so that the
# process pool execution can locate it (it can't be a lambda or method
@ -328,7 +343,8 @@ class PeriodicWorker(object):
def create(cls, objects, exclude_hidden=True,
log=None, executor_factory=None,
cond_cls=threading.Condition, event_cls=threading.Event,
schedule_strategy='last_started', now_func=utils.now):
schedule_strategy='last_started', now_func=utils.now,
on_failure=None):
"""Automatically creates a worker by analyzing object(s) methods.
Only picks up methods that have been tagged/decorated with
@ -367,6 +383,24 @@ class PeriodicWorker(object):
and next times to run); preferably this is
monotonically increasing
:type now_func: callable
:param on_failure: callable that will be called whenever a periodic
function fails with an error, it will be provided
four positional arguments and one keyword
argument, the first positional argument being the
callable that failed, the second being the type
of activity under which it failed (IMMEDIATE or
PERIODIC), the third being the spacing that the
callable runs at and the fourth `exc_info` tuple
of the failure. The keyword argument 'traceback'
will also be provided that may be be a string
that caused the failure (this is required for
executors which run out of process, as those can not
transfer stack frames across process boundaries); if
no callable is provided then a default failure
logging function will be used instead, do note that
any user provided callable should not raise
exceptions on being called
:type on_failure: callable
"""
callables = []
for obj in objects:
@ -382,11 +416,13 @@ class PeriodicWorker(object):
cls._NO_OP_KWARGS))
return cls(callables, log=log, executor_factory=executor_factory,
cond_cls=cond_cls, event_cls=event_cls,
schedule_strategy=schedule_strategy, now_func=now_func)
schedule_strategy=schedule_strategy, now_func=now_func,
on_failure=on_failure)
def __init__(self, callables, log=None, executor_factory=None,
cond_cls=threading.Condition, event_cls=threading.Event,
schedule_strategy='last_started', now_func=utils.now):
schedule_strategy='last_started', now_func=utils.now,
on_failure=None):
"""Creates a new worker using the given periodic callables.
:param callables: a iterable of tuple objects previously decorated
@ -426,6 +462,24 @@ class PeriodicWorker(object):
and next times to run); preferably this is
monotonically increasing
:type now_func: callable
:param on_failure: callable that will be called whenever a periodic
function fails with an error, it will be provided
four positional arguments and one keyword
argument, the first positional argument being the
callable that failed, the second being the type
of activity under which it failed (IMMEDIATE or
PERIODIC), the third being the spacing that the
callable runs at and the fourth `exc_info` tuple
of the failure. The keyword argument 'traceback'
will also be provided that may be be a string
that caused the failure (this is required for
executors which run out of process, as those can not
transfer stack frames across process boundaries); if
no callable is provided then a default failure
logging function will be used instead, do note that
any user provided callable should not raise
exceptions on being called
:type on_failure: callable
"""
self._tombstone = event_cls()
self._waiter = cond_cls()
@ -466,6 +520,7 @@ class PeriodicWorker(object):
self._log = log or LOG
if executor_factory is None:
executor_factory = lambda: futurist.SynchronousExecutor()
self._on_failure = functools.partial(_on_failure_log, self._log)
self._executor_factory = executor_factory
self._now_func = now_func
@ -481,16 +536,9 @@ class PeriodicWorker(object):
cb_metrics, _watcher = self._watchers[index]
cb_metrics['runs'] += 1
if failure is not None:
how_often = cb._periodic_spacing
if all(failure.exc_info):
self._log.error("Failed to call %s '%s' (it runs every"
" %0.2f seconds)", kind, cb_name,
how_often, exc_info=failure.exc_info)
else:
self._log.error("Failed to call %s '%s' (it runs every"
" %0.2f seconds):\n%s", kind, cb_name,
how_often, failure.traceback)
cb_metrics['failures'] += 1
self._on_failure(cb, kind, cb._periodic_spacing,
failure.exc_info, traceback=failure.traceback)
else:
cb_metrics['successes'] += 1
elapsed = max(0, finished_at - started_at)
@ -520,7 +568,7 @@ class PeriodicWorker(object):
self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
'immediate',
IMMEDIATE,
cb, cb_name,
index,
submitted_at))
@ -547,7 +595,7 @@ class PeriodicWorker(object):
self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
'periodic',
PERIODIC,
cb, cb_name,
index,
submitted_at))