Merge "Allow adding periodic callables at runtime"
This commit is contained in:
commit
b05816e749
|
@ -222,6 +222,13 @@ class PeriodicWorker(object):
|
|||
|
||||
_NO_OP_ARGS = ()
|
||||
_NO_OP_KWARGS = {}
|
||||
_INITIAL_METRICS = {
|
||||
'runs': 0,
|
||||
'elapsed': 0,
|
||||
'elapsed_waiting': 0,
|
||||
'failures': 0,
|
||||
'successes': 0,
|
||||
}
|
||||
|
||||
DEFAULT_JITTER = fractions.Fraction(5, 100)
|
||||
"""
|
||||
|
@ -373,13 +380,7 @@ class PeriodicWorker(object):
|
|||
kwargs = self._NO_OP_KWARGS
|
||||
cb_name = _get_callback_name(cb)
|
||||
self._callables.append((cb, cb_name, args, kwargs))
|
||||
self._metrics.append({
|
||||
'runs': 0,
|
||||
'elapsed': 0,
|
||||
'elapsed_waiting': 0,
|
||||
'failures': 0,
|
||||
'successes': 0,
|
||||
})
|
||||
self._metrics.append(self._INITIAL_METRICS.copy())
|
||||
try:
|
||||
strategy = self.BUILT_IN_STRATEGIES[schedule_strategy]
|
||||
self._schedule_strategy = strategy[0]
|
||||
|
@ -496,14 +497,45 @@ class PeriodicWorker(object):
|
|||
self._log.debug(" Average elapsed waiting = %0.4fs",
|
||||
avg_elapsed_waiting)
|
||||
|
||||
def start(self):
|
||||
def add(self, cb, *args, **kwargs):
|
||||
"""Adds a new periodic callback to the current worker.
|
||||
|
||||
:param cb: a callable object/method/function previously decorated
|
||||
with the :py:func:`.periodic` decorator
|
||||
:type cb: callable
|
||||
"""
|
||||
if not six.callable(cb):
|
||||
raise ValueError("Periodic callback %r must be callable" % cb)
|
||||
missing_attrs = _check_attrs(cb)
|
||||
if missing_attrs:
|
||||
raise ValueError("Periodic callback %r missing required"
|
||||
" attributes %s" % (cb, missing_attrs))
|
||||
now = _utils.now()
|
||||
with self._waiter:
|
||||
index = len(self._callables)
|
||||
cb_name = _get_callback_name(cb)
|
||||
self._callables.append((cb, cb_name, args, kwargs))
|
||||
self._metrics.append(self._INITIAL_METRICS.copy())
|
||||
if cb._periodic_run_immediately:
|
||||
self._immediates.append(index)
|
||||
else:
|
||||
next_run = self._initial_schedule_strategy(cb, now)
|
||||
self._schedule.push(next_run, index)
|
||||
self._waiter.notify_all()
|
||||
|
||||
def start(self, allow_empty=False):
|
||||
"""Starts running (will not return until :py:meth:`.stop` is called).
|
||||
|
||||
NOTE(harlowja): If this worker has no contained callables this raises
|
||||
a runtime error and does not run since it is impossible to periodically
|
||||
run nothing.
|
||||
:param allow_empty: instead of running with no callbacks raise when
|
||||
this worker has no contained callables (this can be
|
||||
set to true and :py:meth:`.add` can be used to add
|
||||
new callables on demand), note that when enabled
|
||||
and no callbacks exist this will block and
|
||||
sleep (until either stopped or callbacks are
|
||||
added)
|
||||
:type allow_empty: bool
|
||||
"""
|
||||
if not self._callables:
|
||||
if not self._callables and not allow_empty:
|
||||
raise RuntimeError("A periodic worker can not start"
|
||||
" without any callables")
|
||||
executor = self._executor_factory()
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
@ -82,6 +83,30 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
|
|||
'event_cls': green_threading.Event}}),
|
||||
]
|
||||
|
||||
def test_add_on_demand(self):
|
||||
called = set()
|
||||
|
||||
def cb(name):
|
||||
called.add(name)
|
||||
|
||||
callables = []
|
||||
for i in range(0, 10):
|
||||
i_cb = functools.partial(cb, '%s_has_called' % i)
|
||||
callables.append((every_half_sec, (i_cb,), {}))
|
||||
|
||||
leftover_callables = list(callables)
|
||||
w = periodics.PeriodicWorker([], **self.worker_kwargs)
|
||||
with self.create_destroy(w.start, allow_empty=True):
|
||||
# NOTE(harlowja): if this never happens, the test will fail
|
||||
# eventually, with a timeout error..., probably can make it fail
|
||||
# slightly faster in the future...
|
||||
while len(called) != len(callables):
|
||||
if leftover_callables:
|
||||
cb, args, kwargs = leftover_callables.pop()
|
||||
w.add(cb, *args, **kwargs)
|
||||
self.sleep(0.1)
|
||||
w.stop()
|
||||
|
||||
def test_worker(self):
|
||||
called = []
|
||||
|
||||
|
|
Loading…
Reference in New Issue