Merge "Introducing NeverAgain functionality for periodics"
This commit is contained in:
commit
68557fec45
|
@ -256,3 +256,109 @@ Running a set of functions periodically (using an executor)
|
|||
:hide:
|
||||
|
||||
...
|
||||
|
||||
--------------------------------------------------------------------
|
||||
Stopping periodic function to run again (using NeverAgain exception)
|
||||
--------------------------------------------------------------------
|
||||
|
||||
.. testcode::
|
||||
|
||||
import futurist
|
||||
from futurist import periodics
|
||||
|
||||
import time
|
||||
import threading
|
||||
|
||||
|
||||
@periodics.periodic(1)
|
||||
def run_only_once(started_at):
|
||||
print("1: %s" % (time.time() - started_at))
|
||||
raise periodics.NeverAgain("No need to run again after first run !!")
|
||||
|
||||
|
||||
@periodics.periodic(1)
|
||||
def keep_running(started_at):
|
||||
print("2: %s" % (time.time() - started_at))
|
||||
|
||||
|
||||
started_at = time.time()
|
||||
callables = [
|
||||
# The function to run + any automatically provided positional and
|
||||
# keyword arguments to provide to it everytime it is activated.
|
||||
(run_only_once, (started_at,), {}),
|
||||
(keep_running, (started_at,), {}),
|
||||
]
|
||||
w = periodics.PeriodicWorker(callables)
|
||||
|
||||
# In this example we will run the periodic functions using a thread, it
|
||||
# is also possible to just call the w.start() method directly if you do
|
||||
# not mind blocking up the current program.
|
||||
t = threading.Thread(target=w.start)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
# Run for 10 seconds and then stop.
|
||||
while (time.time() - started_at) <= 10:
|
||||
time.sleep(0.1)
|
||||
w.stop()
|
||||
w.wait()
|
||||
t.join()
|
||||
|
||||
.. testoutput::
|
||||
:hide:
|
||||
|
||||
...
|
||||
|
||||
-------------------------------------------------------------------
|
||||
Auto stopping the periodic worker when no more periodic work exists
|
||||
-------------------------------------------------------------------
|
||||
|
||||
.. testcode::
|
||||
|
||||
import futurist
|
||||
from futurist import periodics
|
||||
|
||||
import time
|
||||
import threading
|
||||
|
||||
|
||||
@periodics.periodic(1)
|
||||
def run_only_once(started_at):
|
||||
print("1: %s" % (time.time() - started_at))
|
||||
raise periodics.NeverAgain("No need to run again after first run !!")
|
||||
|
||||
|
||||
@periodics.periodic(2)
|
||||
def run_for_some_time(started_at):
|
||||
print("2: %s" % (time.time() - started_at))
|
||||
if (time.time() - started_at) > 5:
|
||||
raise periodics.NeverAgain("No need to run again !!")
|
||||
|
||||
|
||||
started_at = time.time()
|
||||
callables = [
|
||||
# The function to run + any automatically provided positional and
|
||||
# keyword arguments to provide to it everytime it is activated.
|
||||
(run_only_once, (started_at,), {}),
|
||||
(run_for_some_time, (started_at,), {}),
|
||||
]
|
||||
w = periodics.PeriodicWorker(callables)
|
||||
|
||||
# In this example we will run the periodic functions using a thread, it
|
||||
# is also possible to just call the w.start() method directly if you do
|
||||
# not mind blocking up the current program.
|
||||
t = threading.Thread(target=w.start, kwargs={'auto_stop_when_empty': True})
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
# Run for 10 seconds and then check to find out that it had
|
||||
# already stooped.
|
||||
while (time.time() - started_at) <= 10:
|
||||
time.sleep(0.1)
|
||||
print(w.pformat())
|
||||
t.join()
|
||||
|
||||
.. testoutput::
|
||||
:hide:
|
||||
|
||||
...
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import inspect
|
||||
import multiprocessing
|
||||
import sys
|
||||
|
@ -85,6 +86,14 @@ class Failure(object):
|
|||
finally:
|
||||
del exc_info
|
||||
|
||||
@property
|
||||
def exc_type(self):
|
||||
return self.exc_info[0]
|
||||
|
||||
@property
|
||||
def exc_value(self):
|
||||
return self.exc_info[1]
|
||||
|
||||
|
||||
def get_callback_name(cb):
|
||||
"""Tries to get a callbacks fully-qualified name.
|
||||
|
@ -137,11 +146,24 @@ class Barrier(object):
|
|||
self._active = 0
|
||||
self._cond = cond_cls()
|
||||
|
||||
@property
|
||||
def active(self):
|
||||
return self._active
|
||||
|
||||
def incr(self):
|
||||
with self._cond:
|
||||
self._active += 1
|
||||
self._cond.notify_all()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def decr_cm(self):
|
||||
with self._cond:
|
||||
self._active -= 1
|
||||
try:
|
||||
yield self._active
|
||||
finally:
|
||||
self._cond.notify_all()
|
||||
|
||||
def decr(self):
|
||||
with self._cond:
|
||||
self._active -= 1
|
||||
|
|
|
@ -33,11 +33,21 @@ from futurist import _utils as utils
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NeverAgain(Exception):
|
||||
"""Exception to raise to stop further periodic calls for a function.
|
||||
|
||||
When you want a function never run again you can throw this from
|
||||
you periodic function and that will signify to the execution framework
|
||||
to remove that function (and never run it again).
|
||||
"""
|
||||
|
||||
|
||||
_REQUIRED_ATTRS = ('_is_periodic', '_periodic_spacing',
|
||||
'_periodic_run_immediately')
|
||||
|
||||
_DEFAULT_COLS = ('Name', 'Active', 'Periodicity', 'Runs in',
|
||||
'Runs', 'Failures', 'Successes',
|
||||
'Runs', 'Failures', 'Successes', 'Stop Requested',
|
||||
'Average elapsed', 'Average elapsed waiting')
|
||||
|
||||
# Constants that are used to determine what 'kind' the current callback
|
||||
|
@ -67,6 +77,11 @@ class Watcher(object):
|
|||
work=self._work,
|
||||
metrics=self._metrics)
|
||||
|
||||
@property
|
||||
def requested_stop(self):
|
||||
"""If the work unit being ran has requested to be stopped."""
|
||||
return self._metrics['requested_stop']
|
||||
|
||||
@property
|
||||
def work(self):
|
||||
"""**Read-only** named work tuple this object watches."""
|
||||
|
@ -343,6 +358,7 @@ class PeriodicWorker(object):
|
|||
'elapsed_waiting': 0,
|
||||
'failures': 0,
|
||||
'successes': 0,
|
||||
'requested_stop': False,
|
||||
}
|
||||
|
||||
# When scheduling fails temporary, use a random delay between 0.9-1.1 sec.
|
||||
|
@ -591,7 +607,7 @@ class PeriodicWorker(object):
|
|||
"""How many callables/periodic work units are currently active."""
|
||||
return len(self._works)
|
||||
|
||||
def _run(self, executor, runner):
|
||||
def _run(self, executor, runner, auto_stop_when_empty):
|
||||
"""Main worker run loop."""
|
||||
barrier = utils.Barrier(cond_cls=self._cond_cls)
|
||||
rnd = random.SystemRandom()
|
||||
|
@ -639,7 +655,6 @@ class PeriodicWorker(object):
|
|||
PERIODIC,
|
||||
work, index,
|
||||
submitted_at))
|
||||
fut.add_done_callback(lambda _fut: barrier.decr())
|
||||
else:
|
||||
# Gotta wait...
|
||||
self._schedule.push(next_run, index)
|
||||
|
@ -647,56 +662,76 @@ class PeriodicWorker(object):
|
|||
self._waiter.wait(when_next)
|
||||
|
||||
def _process_immediates():
|
||||
try:
|
||||
index = self._immediates.popleft()
|
||||
except IndexError:
|
||||
pass
|
||||
else:
|
||||
work = self._works[index]
|
||||
submitted_at = self._now_func()
|
||||
self._log.debug("Submitting immediate"
|
||||
" callback '%s'", work.name)
|
||||
with self._waiter:
|
||||
try:
|
||||
fut = executor.submit(runner.run, work)
|
||||
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
|
||||
self._log.error("Failed to submit immediate callback "
|
||||
"'%s', retrying. Error: %s", work.name,
|
||||
exc)
|
||||
# Restart as soon as possible
|
||||
self._immediates.append(index)
|
||||
index = self._immediates.popleft()
|
||||
except IndexError:
|
||||
pass
|
||||
else:
|
||||
barrier.incr()
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
IMMEDIATE,
|
||||
work, index,
|
||||
submitted_at))
|
||||
fut.add_done_callback(lambda _fut: barrier.decr())
|
||||
work = self._works[index]
|
||||
submitted_at = self._now_func()
|
||||
self._log.debug("Submitting immediate"
|
||||
" callback '%s'", work.name)
|
||||
try:
|
||||
fut = executor.submit(runner.run, work)
|
||||
except _SCHEDULE_RETRY_EXCEPTIONS as exc:
|
||||
self._log.error("Failed to submit immediate callback "
|
||||
"'%s', retrying. Error: %s", work.name,
|
||||
exc)
|
||||
# Restart as soon as possible
|
||||
self._immediates.append(index)
|
||||
else:
|
||||
barrier.incr()
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
IMMEDIATE,
|
||||
work, index,
|
||||
submitted_at))
|
||||
|
||||
def _on_done(kind, work, index, submitted_at, fut):
|
||||
cb = work.callback
|
||||
started_at, finished_at, failure = fut.result()
|
||||
cb_metrics, _watcher = self._watchers[index]
|
||||
cb_metrics['runs'] += 1
|
||||
schedule_again = True
|
||||
if failure is not None:
|
||||
cb_metrics['failures'] += 1
|
||||
try:
|
||||
self._on_failure(cb, kind, cb._periodic_spacing,
|
||||
failure.exc_info,
|
||||
traceback=failure.traceback)
|
||||
except Exception as exc:
|
||||
self._log.error("On failure callback %r raised an"
|
||||
" unhandled exception. Error: %s",
|
||||
self._on_failure, exc)
|
||||
if not issubclass(failure.exc_type, NeverAgain):
|
||||
cb_metrics['failures'] += 1
|
||||
try:
|
||||
self._on_failure(cb, kind, cb._periodic_spacing,
|
||||
failure.exc_info,
|
||||
traceback=failure.traceback)
|
||||
except Exception as exc:
|
||||
self._log.error("On failure callback %r raised an"
|
||||
" unhandled exception. Error: %s",
|
||||
self._on_failure, exc)
|
||||
else:
|
||||
cb_metrics['successes'] += 1
|
||||
schedule_again = False
|
||||
self._log.debug("Periodic callback '%s' raised "
|
||||
"'NeverAgain' "
|
||||
"exception, stopping any further "
|
||||
"execution of it.", work.name)
|
||||
else:
|
||||
cb_metrics['successes'] += 1
|
||||
elapsed = max(0, finished_at - started_at)
|
||||
elapsed_waiting = max(0, started_at - submitted_at)
|
||||
cb_metrics['elapsed'] += elapsed
|
||||
cb_metrics['elapsed_waiting'] += elapsed_waiting
|
||||
next_run = self._schedule_strategy(cb, started_at, finished_at,
|
||||
cb_metrics)
|
||||
with self._waiter:
|
||||
self._schedule.push(next_run, index)
|
||||
with barrier.decr_cm() as am_left:
|
||||
if schedule_again:
|
||||
next_run = self._schedule_strategy(cb, started_at,
|
||||
finished_at,
|
||||
cb_metrics)
|
||||
self._schedule.push(next_run, index)
|
||||
else:
|
||||
cb_metrics['requested_stop'] = True
|
||||
if (am_left <= 0 and
|
||||
len(self._immediates) == 0 and
|
||||
len(self._schedule) == 0 and
|
||||
auto_stop_when_empty):
|
||||
# Guess nothing left to do, goodbye...
|
||||
self._tombstone.set()
|
||||
self._waiter.notify_all()
|
||||
|
||||
try:
|
||||
|
@ -733,7 +768,10 @@ class PeriodicWorker(object):
|
|||
for index, work in enumerate(self._works):
|
||||
_cb_metrics, watcher = self._watchers[index]
|
||||
next_run = self._schedule.fetch_next_run(index)
|
||||
if next_run is None:
|
||||
if watcher.requested_stop:
|
||||
active = False
|
||||
runs_in = 'n/a'
|
||||
elif next_run is None:
|
||||
active = True
|
||||
runs_in = 'n/a'
|
||||
else:
|
||||
|
@ -747,6 +785,7 @@ class PeriodicWorker(object):
|
|||
'Runs in': runs_in,
|
||||
'Failures': watcher.failures,
|
||||
'Successes': watcher.successes,
|
||||
'Stop Requested': watcher.requested_stop,
|
||||
}
|
||||
try:
|
||||
cb_row_avgs = [
|
||||
|
@ -803,7 +842,7 @@ class PeriodicWorker(object):
|
|||
self._waiter.notify_all()
|
||||
return watcher
|
||||
|
||||
def start(self, allow_empty=False):
|
||||
def start(self, allow_empty=False, auto_stop_when_empty=False):
|
||||
"""Starts running (will not return until :py:meth:`.stop` is called).
|
||||
|
||||
:param allow_empty: instead of running with no callbacks raise when
|
||||
|
@ -814,6 +853,16 @@ class PeriodicWorker(object):
|
|||
sleep (until either stopped or callbacks are
|
||||
added)
|
||||
:type allow_empty: bool
|
||||
:param auto_stop_when_empty: when the provided periodic functions have
|
||||
all exited and this is false then the
|
||||
thread responsible for executing those
|
||||
methods will just spin/idle waiting for
|
||||
a new periodic function to be added;
|
||||
switching it to true will make this
|
||||
idling not happen (and instead when no
|
||||
more periodic work exists then the
|
||||
calling thread will just return).
|
||||
:type auto_stop_when_empty: bool
|
||||
"""
|
||||
if not self._works and not allow_empty:
|
||||
raise RuntimeError("A periodic worker can not start"
|
||||
|
@ -836,7 +885,7 @@ class PeriodicWorker(object):
|
|||
self._dead.clear()
|
||||
self._active.set()
|
||||
try:
|
||||
self._run(executor, runner)
|
||||
self._run(executor, runner, auto_stop_when_empty)
|
||||
finally:
|
||||
if getattr(self._executor_factory, 'shutdown', True):
|
||||
executor.shutdown()
|
||||
|
|
|
@ -434,6 +434,83 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
|
|||
|
||||
m.assert_called_with('foo', bar='baz')
|
||||
|
||||
def test_never_again_exc(self):
|
||||
|
||||
m_1 = mock.MagicMock()
|
||||
m_2 = mock.MagicMock()
|
||||
|
||||
@periodics.periodic(0.5)
|
||||
def run_only_once():
|
||||
m_1()
|
||||
raise periodics.NeverAgain("No need to run again !!")
|
||||
|
||||
@periodics.periodic(0.5)
|
||||
def keep_running():
|
||||
m_2()
|
||||
|
||||
callables = [
|
||||
(run_only_once, None, None),
|
||||
(keep_running, None, None),
|
||||
]
|
||||
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
|
||||
w = periodics.PeriodicWorker(callables,
|
||||
executor_factory=executor_factory,
|
||||
**self.worker_kwargs)
|
||||
with self.create_destroy(w.start):
|
||||
self.sleep(2.0)
|
||||
w.stop()
|
||||
|
||||
for watcher in w.iter_watchers():
|
||||
self.assertGreaterEqual(watcher.runs, 1)
|
||||
self.assertGreaterEqual(watcher.successes, 1)
|
||||
self.assertEqual(watcher.failures, 0)
|
||||
|
||||
self.assertEqual(m_1.call_count, 1)
|
||||
self.assertGreaterEqual(m_2.call_count, 3)
|
||||
|
||||
def test_start_with_auto_stop_when_empty_set(self):
|
||||
|
||||
@periodics.periodic(0.5)
|
||||
def run_only_once():
|
||||
raise periodics.NeverAgain("No need to run again !!")
|
||||
|
||||
callables = [
|
||||
(run_only_once, None, None),
|
||||
(run_only_once, None, None),
|
||||
]
|
||||
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
|
||||
w = periodics.PeriodicWorker(callables,
|
||||
executor_factory=executor_factory,
|
||||
**self.worker_kwargs)
|
||||
with self.create_destroy(w.start, auto_stop_when_empty=True):
|
||||
self.sleep(2.0)
|
||||
|
||||
for watcher in w.iter_watchers():
|
||||
self.assertGreaterEqual(watcher.runs, 1)
|
||||
self.assertGreaterEqual(watcher.successes, 1)
|
||||
self.assertEqual(watcher.failures, 0)
|
||||
self.assertEqual(watcher.requested_stop, True)
|
||||
|
||||
def test_add_with_auto_stop_when_empty_set(self):
|
||||
m = mock.Mock()
|
||||
|
||||
@periodics.periodic(0.5)
|
||||
def run_only_once():
|
||||
raise periodics.NeverAgain("No need to run again !!")
|
||||
|
||||
callables = [
|
||||
(run_only_once, None, None),
|
||||
]
|
||||
executor_factory = lambda: self.executor_cls(**self.executor_kwargs)
|
||||
w = periodics.PeriodicWorker(callables,
|
||||
executor_factory=executor_factory,
|
||||
**self.worker_kwargs)
|
||||
with self.create_destroy(w.start, auto_stop_when_empty=True):
|
||||
self.sleep(2.0)
|
||||
w.add(every_half_sec, m, None)
|
||||
|
||||
m.assert_not_called()
|
||||
|
||||
|
||||
class RejectingExecutor(futurist.GreenThreadPoolExecutor):
|
||||
MAX_REJECTIONS_COUNT = 2
|
||||
|
|
Loading…
Reference in New Issue