Merge "Introducing NeverAgain functionality for periodics"

This commit is contained in:
Jenkins 2017-03-08 06:22:17 +00:00 committed by Gerrit Code Review
commit 68557fec45
4 changed files with 294 additions and 40 deletions

View File

@ -256,3 +256,109 @@ Running a set of functions periodically (using an executor)
:hide:
...
--------------------------------------------------------------------
Stopping periodic function to run again (using NeverAgain exception)
--------------------------------------------------------------------
.. testcode::
import futurist
from futurist import periodics
import time
import threading
@periodics.periodic(1)
def run_only_once(started_at):
print("1: %s" % (time.time() - started_at))
raise periodics.NeverAgain("No need to run again after first run !!")
@periodics.periodic(1)
def keep_running(started_at):
print("2: %s" % (time.time() - started_at))
started_at = time.time()
callables = [
# The function to run + any automatically provided positional and
# keyword arguments to provide to it everytime it is activated.
(run_only_once, (started_at,), {}),
(keep_running, (started_at,), {}),
]
w = periodics.PeriodicWorker(callables)
# In this example we will run the periodic functions using a thread, it
# is also possible to just call the w.start() method directly if you do
# not mind blocking up the current program.
t = threading.Thread(target=w.start)
t.daemon = True
t.start()
# Run for 10 seconds and then stop.
while (time.time() - started_at) <= 10:
time.sleep(0.1)
w.stop()
w.wait()
t.join()
.. testoutput::
:hide:
...
-------------------------------------------------------------------
Auto stopping the periodic worker when no more periodic work exists
-------------------------------------------------------------------
.. testcode::
import futurist
from futurist import periodics
import time
import threading
@periodics.periodic(1)
def run_only_once(started_at):
print("1: %s" % (time.time() - started_at))
raise periodics.NeverAgain("No need to run again after first run !!")
@periodics.periodic(2)
def run_for_some_time(started_at):
print("2: %s" % (time.time() - started_at))
if (time.time() - started_at) > 5:
raise periodics.NeverAgain("No need to run again !!")
started_at = time.time()
callables = [
# The function to run + any automatically provided positional and
# keyword arguments to provide to it everytime it is activated.
(run_only_once, (started_at,), {}),
(run_for_some_time, (started_at,), {}),
]
w = periodics.PeriodicWorker(callables)
# In this example we will run the periodic functions using a thread, it
# is also possible to just call the w.start() method directly if you do
# not mind blocking up the current program.
t = threading.Thread(target=w.start, kwargs={'auto_stop_when_empty': True})
t.daemon = True
t.start()
# Run for 10 seconds and then check to find out that it had
# already stooped.
while (time.time() - started_at) <= 10:
time.sleep(0.1)
print(w.pformat())
t.join()
.. testoutput::
:hide:
...

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import inspect
import multiprocessing
import sys
@ -85,6 +86,14 @@ class Failure(object):
finally:
del exc_info
@property
def exc_type(self):
return self.exc_info[0]
@property
def exc_value(self):
return self.exc_info[1]
def get_callback_name(cb):
"""Tries to get a callbacks fully-qualified name.
@ -137,11 +146,24 @@ class Barrier(object):
self._active = 0
self._cond = cond_cls()
@property
def active(self):
return self._active
def incr(self):
with self._cond:
self._active += 1
self._cond.notify_all()
@contextlib.contextmanager
def decr_cm(self):
with self._cond:
self._active -= 1
try:
yield self._active
finally:
self._cond.notify_all()
def decr(self):
with self._cond:
self._active -= 1

View File

@ -33,11 +33,21 @@ from futurist import _utils as utils
LOG = logging.getLogger(__name__)
class NeverAgain(Exception):
"""Exception to raise to stop further periodic calls for a function.
When you want a function never run again you can throw this from
you periodic function and that will signify to the execution framework
to remove that function (and never run it again).
"""
_REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing',
'_periodic_run_immediately')
_DEFAULT_COLS = ('Name', 'Active', 'Periodicity', 'Runs in',
'Runs', 'Failures', 'Successes',
'Runs', 'Failures', 'Successes', 'Stop Requested',
'Average elapsed', 'Average elapsed waiting')
# Constants that are used to determine what 'kind' the current callback
@ -67,6 +77,11 @@ class Watcher(object):
work=self._work,
metrics=self._metrics)
@property
def requested_stop(self):
"""If the work unit being ran has requested to be stopped."""
return self._metrics['requested_stop']
@property
def work(self):
"""**Read-only** named work tuple this object watches."""
@ -343,6 +358,7 @@ class PeriodicWorker(object):
'elapsed_waiting': 0,
'failures': 0,
'successes': 0,
'requested_stop': False,
}
# When scheduling fails temporary, use a random delay between 0.9-1.1 sec.
@ -591,7 +607,7 @@ class PeriodicWorker(object):
"""How many callables/periodic work units are currently active."""
return len(self._works)
def _run(self, executor, runner):
def _run(self, executor, runner, auto_stop_when_empty):
"""Main worker run loop."""
barrier = utils.Barrier(cond_cls=self._cond_cls)
rnd = random.SystemRandom()
@ -639,7 +655,6 @@ class PeriodicWorker(object):
PERIODIC,
work, index,
submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr())
else:
# Gotta wait...
self._schedule.push(next_run, index)
@ -647,56 +662,76 @@ class PeriodicWorker(object):
self._waiter.wait(when_next)
def _process_immediates():
try:
index = self._immediates.popleft()
except IndexError:
pass
else:
work = self._works[index]
submitted_at = self._now_func()
self._log.debug("Submitting immediate"
" callback '%s'", work.name)
with self._waiter:
try:
fut = executor.submit(runner.run, work)
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
self._log.error("Failed to submit immediate callback "
"'%s', retrying. Error: %s", work.name,
exc)
# Restart as soon as possible
self._immediates.append(index)
index = self._immediates.popleft()
except IndexError:
pass
else:
barrier.incr()
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
work, index,
submitted_at))
fut.add_done_callback(lambda _fut: barrier.decr())
work = self._works[index]
submitted_at = self._now_func()
self._log.debug("Submitting immediate"
" callback '%s'", work.name)
try:
fut = executor.submit(runner.run, work)
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
self._log.error("Failed to submit immediate callback "
"'%s', retrying. Error: %s", work.name,
exc)
# Restart as soon as possible
self._immediates.append(index)
else:
barrier.incr()
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
work, index,
submitted_at))
def _on_done(kind, work, index, submitted_at, fut):
cb = work.callback
started_at, finished_at, failure = fut.result()
cb_metrics, _watcher = self._watchers[index]
cb_metrics['runs'] += 1
schedule_again = True
if failure is not None:
cb_metrics['failures'] += 1
try:
self._on_failure(cb, kind, cb._periodic_spacing,
failure.exc_info,
traceback=failure.traceback)
except Exception as exc:
self._log.error("On failure callback %r raised an"
" unhandled exception. Error: %s",
self._on_failure, exc)
if not issubclass(failure.exc_type, NeverAgain):
cb_metrics['failures'] += 1
try:
self._on_failure(cb, kind, cb._periodic_spacing,
failure.exc_info,
traceback=failure.traceback)
except Exception as exc:
self._log.error("On failure callback %r raised an"
" unhandled exception. Error: %s",
self._on_failure, exc)
else:
cb_metrics['successes'] += 1
schedule_again = False
self._log.debug("Periodic callback '%s' raised "
"'NeverAgain' "
"exception, stopping any further "
"execution of it.", work.name)
else:
cb_metrics['successes'] += 1
elapsed = max(0, finished_at - started_at)
elapsed_waiting = max(0, started_at - submitted_at)
cb_metrics['elapsed'] += elapsed
cb_metrics['elapsed_waiting'] += elapsed_waiting
next_run = self._schedule_strategy(cb, started_at, finished_at,
cb_metrics)
with self._waiter:
self._schedule.push(next_run, index)
with barrier.decr_cm() as am_left:
if schedule_again:
next_run = self._schedule_strategy(cb, started_at,
finished_at,
cb_metrics)
self._schedule.push(next_run, index)
else:
cb_metrics['requested_stop'] = True
if (am_left <= 0 and
len(self._immediates) == 0 and
len(self._schedule) == 0 and
auto_stop_when_empty):
# Guess nothing left to do, goodbye...
self._tombstone.set()
self._waiter.notify_all()
try:
@ -733,7 +768,10 @@ class PeriodicWorker(object):
for index, work in enumerate(self._works):
_cb_metrics, watcher = self._watchers[index]
next_run = self._schedule.fetch_next_run(index)
if next_run is None:
if watcher.requested_stop:
active = False
runs_in = 'n/a'
elif next_run is None:
active = True
runs_in = 'n/a'
else:
@ -747,6 +785,7 @@ class PeriodicWorker(object):
'Runs in': runs_in,
'Failures': watcher.failures,
'Successes': watcher.successes,
'Stop Requested': watcher.requested_stop,
}
try:
cb_row_avgs = [
@ -803,7 +842,7 @@ class PeriodicWorker(object):
self._waiter.notify_all()
return watcher
def start(self, allow_empty=False):
def start(self, allow_empty=False, auto_stop_when_empty=False):
"""Starts running (will not return until :py:meth:`.stop` is called).
:param allow_empty: instead of running with no callbacks raise when
@ -814,6 +853,16 @@ class PeriodicWorker(object):
sleep (until either stopped or callbacks are
added)
:type allow_empty: bool
:param auto_stop_when_empty: when the provided periodic functions have
all exited and this is false then the
thread responsible for executing those
methods will just spin/idle waiting for
a new periodic function to be added;
switching it to true will make this
idling not happen (and instead when no
more periodic work exists then the
calling thread will just return).
:type auto_stop_when_empty: bool
"""
if not self._works and not allow_empty:
raise RuntimeError("A periodic worker can not start"
@ -836,7 +885,7 @@ class PeriodicWorker(object):
self._dead.clear()
self._active.set()
try:
self._run(executor, runner)
self._run(executor, runner, auto_stop_when_empty)
finally:
if getattr(self._executor_factory, 'shutdown', True):
executor.shutdown()

View File

@ -434,6 +434,83 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
m.assert_called_with('foo', bar='baz')
def test_never_again_exc(self):
m_1 = mock.MagicMock()
m_2 = mock.MagicMock()
@periodics.periodic(0.5)
def run_only_once():
m_1()
raise periodics.NeverAgain("No need to run again !!")
@periodics.periodic(0.5)
def keep_running():
m_2()
callables = [
(run_only_once, None, None),
(keep_running, None, None),
]
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
w = periodics.PeriodicWorker(callables,
executor_factory=executor_factory,
**self.worker_kwargs)
with self.create_destroy(w.start):
self.sleep(2.0)
w.stop()
for watcher in w.iter_watchers():
self.assertGreaterEqual(watcher.runs, 1)
self.assertGreaterEqual(watcher.successes, 1)
self.assertEqual(watcher.failures, 0)
self.assertEqual(m_1.call_count, 1)
self.assertGreaterEqual(m_2.call_count, 3)
def test_start_with_auto_stop_when_empty_set(self):
@periodics.periodic(0.5)
def run_only_once():
raise periodics.NeverAgain("No need to run again !!")
callables = [
(run_only_once, None, None),
(run_only_once, None, None),
]
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
w = periodics.PeriodicWorker(callables,
executor_factory=executor_factory,
**self.worker_kwargs)
with self.create_destroy(w.start, auto_stop_when_empty=True):
self.sleep(2.0)
for watcher in w.iter_watchers():
self.assertGreaterEqual(watcher.runs, 1)
self.assertGreaterEqual(watcher.successes, 1)
self.assertEqual(watcher.failures, 0)
self.assertEqual(watcher.requested_stop, True)
def test_add_with_auto_stop_when_empty_set(self):
m = mock.Mock()
@periodics.periodic(0.5)
def run_only_once():
raise periodics.NeverAgain("No need to run again !!")
callables = [
(run_only_once, None, None),
]
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
w = periodics.PeriodicWorker(callables,
executor_factory=executor_factory,
**self.worker_kwargs)
with self.create_destroy(w.start, auto_stop_when_empty=True):
self.sleep(2.0)
w.add(every_half_sec, m, None)
m.assert_not_called()
class RejectingExecutor(futurist.GreenThreadPoolExecutor):
MAX_REJECTIONS_COUNT = 2