Reduce/remove duplication in run functions

Have a smaller helper class that unifies the retaining
or not retaining of failure tracebacks instead of having
two nearly equivalent functions to do the same.

Change-Id: I54bb087bbe35f5597c6658ba8aa823795f560045
This commit is contained in:
Joshua Harlow 2016-03-02 15:39:57 -08:00
parent 81991b6a09
commit 74a8c7ad72
1 changed files with 20 additions and 38 deletions

View File

@ -244,38 +244,23 @@ def _on_failure_log(log, cb, kind, spacing, exc_info, traceback=None):
" seconds):\n%s", kind, cb_name, spacing, traceback)
def _run_callback_retain(now_func, cb, *args, **kwargs):
# NOTE(harlowja): this needs to be a module level function so that the
# process pool execution can locate it (it can't be a lambda or method
# local function because it won't be able to find those).
failure = None
started_at = now_func()
try:
cb(*args, **kwargs)
except Exception:
# Until https://bugs.python.org/issue24451 is merged we have to
# capture and return the failure, so that we can have reliable
# timing information.
failure = utils.Failure(True)
finished_at = now_func()
return (started_at, finished_at, failure)
class _Runner(object):
def __init__(self, now_func, retain_traceback=True):
self.now_func = now_func
self.retain_traceback = retain_traceback
def _run_callback_no_retain(now_func, cb, *args, **kwargs):
# NOTE(harlowja): this needs to be a module level function so that the
# process pool execution can locate it (it can't be a lambda or method
# local function because it won't be able to find those).
failure = None
started_at = now_func()
try:
cb(*args, **kwargs)
except Exception:
# Until https://bugs.python.org/issue24451 is merged we have to
# capture and return the failure, so that we can have reliable
# timing information.
failure = utils.Failure(False)
finished_at = now_func()
return (started_at, finished_at, failure)
def run(self, cb, *args, **kwargs):
failure = None
started_at = self.now_func()
try:
cb(*args, **kwargs)
except Exception:
# Until https://bugs.python.org/issue24451 is merged we have to
# capture and return the failure, so that we can have reliable
# timing information.
failure = utils.Failure(self.retain_traceback)
finished_at = self.now_func()
return (started_at, finished_at, failure)
def _build(now_func, callables, next_run_scheduler):
@ -610,9 +595,7 @@ class PeriodicWorker(object):
self._log.debug("Submitting periodic function '%s'",
cb_name)
try:
fut = executor.submit(runner,
self._now_func,
cb, *args, **kwargs)
fut = executor.submit(runner.run, cb, *args, **kwargs)
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
# Restart after a short delay
delay = (self._RESCHEDULE_DELAY +
@ -647,8 +630,7 @@ class PeriodicWorker(object):
submitted_at = self._now_func()
self._log.debug("Submitting immediate function '%s'", cb_name)
try:
fut = executor.submit(runner, self._now_func,
cb, *args, **kwargs)
fut = executor.submit(runner.run, cb, *args, **kwargs)
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
self._log.error("Failed to submit immediate function "
"'%s', retrying. Error: %s", cb_name, exc)
@ -773,9 +755,9 @@ class PeriodicWorker(object):
# Pickling a traceback will not work, so do not try to do it...
#
# Avoids 'TypeError: can't pickle traceback objects'
runner = _run_callback_no_retain
runner = _Runner(self._now_func, retain_traceback=False)
else:
runner = _run_callback_retain
runner = _Runner(self._now_func, retain_traceback=True)
self._dead.clear()
self._active.set()
try: