Merge "Allow for providing custom 'on_failure' callbacks"
This commit is contained in:
commit
d03cc4f05f
|
@ -36,6 +36,11 @@ LOG = logging.getLogger(__name__)
|
|||
_REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing',
|
||||
'_periodic_run_immediately')
|
||||
|
||||
# Constants that are used to determine what 'kind' the current callback
|
||||
# is being ran as.
|
||||
PERIODIC = 'periodic'
|
||||
IMMEDIATE = 'immediate'
|
||||
|
||||
|
||||
class Watcher(object):
|
||||
"""A **read-only** object representing a periodics callbacks activities."""
|
||||
|
@ -205,6 +210,16 @@ class _Schedule(object):
|
|||
return heapq.heappop(self._ordering)
|
||||
|
||||
|
||||
def _on_failure_log(log, cb, kind, spacing, exc_info, traceback=None):
|
||||
cb_name = utils.get_callback_name(cb)
|
||||
if all(exc_info) or not traceback:
|
||||
log.error("Failed to call %s '%s' (it runs every %0.2f"
|
||||
" seconds)", kind, cb_name, spacing, exc_info=exc_info)
|
||||
else:
|
||||
log.error("Failed to call %s '%s' (it runs every %0.2f"
|
||||
" seconds):\n%s", kind, cb_name, spacing, traceback)
|
||||
|
||||
|
||||
def _run_callback_retain(now_func, cb, *args, **kwargs):
|
||||
# NOTE(harlowja): this needs to be a module level function so that the
|
||||
# process pool execution can locate it (it can't be a lambda or method
|
||||
|
@ -328,7 +343,8 @@ class PeriodicWorker(object):
|
|||
def create(cls, objects, exclude_hidden=True,
|
||||
log=None, executor_factory=None,
|
||||
cond_cls=threading.Condition, event_cls=threading.Event,
|
||||
schedule_strategy='last_started', now_func=utils.now):
|
||||
schedule_strategy='last_started', now_func=utils.now,
|
||||
on_failure=None):
|
||||
"""Automatically creates a worker by analyzing object(s) methods.
|
||||
|
||||
Only picks up methods that have been tagged/decorated with
|
||||
|
@ -367,6 +383,24 @@ class PeriodicWorker(object):
|
|||
and next times to run); preferably this is
|
||||
monotonically increasing
|
||||
:type now_func: callable
|
||||
:param on_failure: callable that will be called whenever a periodic
|
||||
function fails with an error, it will be provided
|
||||
four positional arguments and one keyword
|
||||
argument, the first positional argument being the
|
||||
callable that failed, the second being the type
|
||||
of activity under which it failed (IMMEDIATE or
|
||||
PERIODIC), the third being the spacing that the
|
||||
callable runs at and the fourth `exc_info` tuple
|
||||
of the failure. The keyword argument 'traceback'
|
||||
will also be provided that may be be a string
|
||||
that caused the failure (this is required for
|
||||
executors which run out of process, as those can not
|
||||
transfer stack frames across process boundaries); if
|
||||
no callable is provided then a default failure
|
||||
logging function will be used instead, do note that
|
||||
any user provided callable should not raise
|
||||
exceptions on being called
|
||||
:type on_failure: callable
|
||||
"""
|
||||
callables = []
|
||||
for obj in objects:
|
||||
|
@ -382,11 +416,13 @@ class PeriodicWorker(object):
|
|||
cls._NO_OP_KWARGS))
|
||||
return cls(callables, log=log, executor_factory=executor_factory,
|
||||
cond_cls=cond_cls, event_cls=event_cls,
|
||||
schedule_strategy=schedule_strategy, now_func=now_func)
|
||||
schedule_strategy=schedule_strategy, now_func=now_func,
|
||||
on_failure=on_failure)
|
||||
|
||||
def __init__(self, callables, log=None, executor_factory=None,
|
||||
cond_cls=threading.Condition, event_cls=threading.Event,
|
||||
schedule_strategy='last_started', now_func=utils.now):
|
||||
schedule_strategy='last_started', now_func=utils.now,
|
||||
on_failure=None):
|
||||
"""Creates a new worker using the given periodic callables.
|
||||
|
||||
:param callables: a iterable of tuple objects previously decorated
|
||||
|
@ -426,6 +462,24 @@ class PeriodicWorker(object):
|
|||
and next times to run); preferably this is
|
||||
monotonically increasing
|
||||
:type now_func: callable
|
||||
:param on_failure: callable that will be called whenever a periodic
|
||||
function fails with an error, it will be provided
|
||||
four positional arguments and one keyword
|
||||
argument, the first positional argument being the
|
||||
callable that failed, the second being the type
|
||||
of activity under which it failed (IMMEDIATE or
|
||||
PERIODIC), the third being the spacing that the
|
||||
callable runs at and the fourth `exc_info` tuple
|
||||
of the failure. The keyword argument 'traceback'
|
||||
will also be provided that may be be a string
|
||||
that caused the failure (this is required for
|
||||
executors which run out of process, as those can not
|
||||
transfer stack frames across process boundaries); if
|
||||
no callable is provided then a default failure
|
||||
logging function will be used instead, do note that
|
||||
any user provided callable should not raise
|
||||
exceptions on being called
|
||||
:type on_failure: callable
|
||||
"""
|
||||
self._tombstone = event_cls()
|
||||
self._waiter = cond_cls()
|
||||
|
@ -466,6 +520,7 @@ class PeriodicWorker(object):
|
|||
self._log = log or LOG
|
||||
if executor_factory is None:
|
||||
executor_factory = lambda: futurist.SynchronousExecutor()
|
||||
self._on_failure = functools.partial(_on_failure_log, self._log)
|
||||
self._executor_factory = executor_factory
|
||||
self._now_func = now_func
|
||||
|
||||
|
@ -481,16 +536,9 @@ class PeriodicWorker(object):
|
|||
cb_metrics, _watcher = self._watchers[index]
|
||||
cb_metrics['runs'] += 1
|
||||
if failure is not None:
|
||||
how_often = cb._periodic_spacing
|
||||
if all(failure.exc_info):
|
||||
self._log.error("Failed to call %s '%s' (it runs every"
|
||||
" %0.2f seconds)", kind, cb_name,
|
||||
how_often, exc_info=failure.exc_info)
|
||||
else:
|
||||
self._log.error("Failed to call %s '%s' (it runs every"
|
||||
" %0.2f seconds):\n%s", kind, cb_name,
|
||||
how_often, failure.traceback)
|
||||
cb_metrics['failures'] += 1
|
||||
self._on_failure(cb, kind, cb._periodic_spacing,
|
||||
failure.exc_info, traceback=failure.traceback)
|
||||
else:
|
||||
cb_metrics['successes'] += 1
|
||||
elapsed = max(0, finished_at - started_at)
|
||||
|
@ -520,7 +568,7 @@ class PeriodicWorker(object):
|
|||
self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
'immediate',
|
||||
IMMEDIATE,
|
||||
cb, cb_name,
|
||||
index,
|
||||
submitted_at))
|
||||
|
@ -547,7 +595,7 @@ class PeriodicWorker(object):
|
|||
self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
'periodic',
|
||||
PERIODIC,
|
||||
cb, cb_name,
|
||||
index,
|
||||
submitted_at))
|
||||
|
|
Loading…
Reference in New Issue