Merge "Handle exceptions from executor.submit in PeriodicWorker"
This commit is contained in:
commit
79eeb23680
|
@ -294,6 +294,9 @@ def _build(now_func, callables, next_run_scheduler):
|
|||
return immediates, schedule
|
||||
|
||||
|
||||
_SCHEDULE_RETRY_EXCEPTIONS = (RuntimeError, futurist.RejectedSubmission)
|
||||
|
||||
|
||||
class PeriodicWorker(object):
|
||||
"""Calls a collection of callables periodically (sleeping as needed...).
|
||||
|
||||
|
@ -577,14 +580,22 @@ class PeriodicWorker(object):
|
|||
cb, cb_name, args, kwargs = self._callables[index]
|
||||
self._log.debug("Submitting periodic function '%s'",
|
||||
cb_name)
|
||||
fut = executor.submit(runner,
|
||||
self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
PERIODIC,
|
||||
cb, cb_name,
|
||||
index,
|
||||
submitted_at))
|
||||
try:
|
||||
fut = executor.submit(runner,
|
||||
self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
|
||||
self._log.error("Failed to submit periodic function "
|
||||
"%s, retrying. Error: %s",
|
||||
cb_name, exc)
|
||||
# Restart as soon as possible
|
||||
self._schedule.push(now, index)
|
||||
else:
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
PERIODIC,
|
||||
cb, cb_name,
|
||||
index,
|
||||
submitted_at))
|
||||
else:
|
||||
# Gotta wait...
|
||||
self._schedule.push(next_run, index)
|
||||
|
@ -600,12 +611,20 @@ class PeriodicWorker(object):
|
|||
cb, cb_name, args, kwargs = self._callables[index]
|
||||
submitted_at = self._now_func()
|
||||
self._log.debug("Submitting immediate function '%s'", cb_name)
|
||||
fut = executor.submit(runner, self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
IMMEDIATE,
|
||||
cb, cb_name,
|
||||
index, submitted_at))
|
||||
try:
|
||||
fut = executor.submit(runner, self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
|
||||
self._log.error("Failed to submit immediate function "
|
||||
"%s, retrying. Error: %s", cb_name, exc)
|
||||
# Restart as soon as possible
|
||||
self._immediates.append(index)
|
||||
else:
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
IMMEDIATE,
|
||||
cb, cb_name,
|
||||
index,
|
||||
submitted_at))
|
||||
|
||||
def _on_done(kind, cb, cb_name, index, submitted_at, fut):
|
||||
started_at, finished_at, failure = fut.result()
|
||||
|
|
|
@ -297,3 +297,39 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
|
|||
|
||||
am_called = sum(called)
|
||||
self.assertGreaterEqual(am_called, 4)
|
||||
|
||||
|
||||
class RejectingExecutor(futurist.GreenThreadPoolExecutor):
|
||||
MAX_REJECTIONS_COUNT = 2
|
||||
|
||||
def _reject(self, *args):
|
||||
if self._rejections_count < self.MAX_REJECTIONS_COUNT:
|
||||
self._rejections_count += 1
|
||||
raise futurist.RejectedSubmission()
|
||||
|
||||
def __init__(self):
|
||||
self._rejections_count = 0
|
||||
super(RejectingExecutor, self).__init__(check_and_reject=self._reject)
|
||||
|
||||
|
||||
class TestRetrySubmission(base.TestCase):
|
||||
def test_retry_submission(self):
|
||||
called = []
|
||||
|
||||
def cb():
|
||||
called.append(1)
|
||||
|
||||
callables = [
|
||||
(every_one_sec, (cb,), None),
|
||||
(every_half_sec, (cb,), None),
|
||||
]
|
||||
w = periodics.PeriodicWorker(callables,
|
||||
executor_factory=RejectingExecutor,
|
||||
cond_cls=green_threading.Condition,
|
||||
event_cls=green_threading.Event)
|
||||
with create_destroy_green_thread(w.start):
|
||||
eventlet.sleep(2.0)
|
||||
w.stop()
|
||||
|
||||
am_called = sum(called)
|
||||
self.assertGreaterEqual(am_called, 4)
|
||||
|
|
Loading…
Reference in New Issue