Merge "Some minor refactoring and comment/note addition"
This commit is contained in:
commit
24166d23da
|
@ -542,6 +542,61 @@ class PeriodicWorker(object):
|
|||
def _run(self, executor, runner):
|
||||
"""Main worker run loop."""
|
||||
|
||||
def _process_scheduled():
|
||||
# Figure out when we should run next (by selecting the
|
||||
# minimum item from the heap, where the minimum should be
|
||||
# the callable that needs to run next and has the lowest
|
||||
# next desired run time).
|
||||
with self._waiter:
|
||||
while (not self._schedule and
|
||||
not self._tombstone.is_set() and
|
||||
not self._immediates):
|
||||
self._waiter.wait(self.MAX_LOOP_IDLE)
|
||||
if self._tombstone.is_set():
|
||||
# We were requested to stop, so stop.
|
||||
return
|
||||
if self._immediates:
|
||||
# This will get processed in _process_immediates()
|
||||
# in the next loop call.
|
||||
return
|
||||
submitted_at = now = self._now_func()
|
||||
next_run, index = self._schedule.pop()
|
||||
when_next = next_run - now
|
||||
if when_next <= 0:
|
||||
# Run & schedule its next execution.
|
||||
cb, cb_name, args, kwargs = self._callables[index]
|
||||
self._log.debug("Submitting periodic function '%s'",
|
||||
cb_name)
|
||||
fut = executor.submit(runner,
|
||||
self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
PERIODIC,
|
||||
cb, cb_name,
|
||||
index,
|
||||
submitted_at))
|
||||
else:
|
||||
# Gotta wait...
|
||||
self._schedule.push(next_run, index)
|
||||
when_next = min(when_next, self.MAX_LOOP_IDLE)
|
||||
self._waiter.wait(when_next)
|
||||
|
||||
def _process_immediates():
|
||||
try:
|
||||
index = self._immediates.popleft()
|
||||
except IndexError:
|
||||
pass
|
||||
else:
|
||||
cb, cb_name, args, kwargs = self._callables[index]
|
||||
submitted_at = self._now_func()
|
||||
self._log.debug("Submitting immediate function '%s'", cb_name)
|
||||
fut = executor.submit(runner, self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
IMMEDIATE,
|
||||
cb, cb_name,
|
||||
index, submitted_at))
|
||||
|
||||
def _on_done(kind, cb, cb_name, index, submitted_at, fut):
|
||||
started_at, finished_at, failure = fut.result()
|
||||
cb_metrics, _watcher = self._watchers[index]
|
||||
|
@ -564,61 +619,8 @@ class PeriodicWorker(object):
|
|||
self._waiter.notify_all()
|
||||
|
||||
while not self._tombstone.is_set():
|
||||
if self._immediates:
|
||||
# Run & schedule its next execution.
|
||||
try:
|
||||
index = self._immediates.popleft()
|
||||
except IndexError:
|
||||
pass
|
||||
else:
|
||||
cb, cb_name, args, kwargs = self._callables[index]
|
||||
submitted_at = self._now_func()
|
||||
self._log.debug("Submitting immediate function '%s'",
|
||||
cb_name)
|
||||
fut = executor.submit(runner,
|
||||
self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
IMMEDIATE,
|
||||
cb, cb_name,
|
||||
index,
|
||||
submitted_at))
|
||||
else:
|
||||
# Figure out when we should run next (by selecting the
|
||||
# minimum item from the heap, where the minimum should be
|
||||
# the callable that needs to run next and has the lowest
|
||||
# next desired run time).
|
||||
with self._waiter:
|
||||
while (not self._schedule and
|
||||
not self._tombstone.is_set() and
|
||||
not self._immediates):
|
||||
self._waiter.wait(self.MAX_LOOP_IDLE)
|
||||
if self._tombstone.is_set():
|
||||
break
|
||||
if self._immediates:
|
||||
# This will get popped in the prior condition...
|
||||
continue
|
||||
submitted_at = now = self._now_func()
|
||||
next_run, index = self._schedule.pop()
|
||||
when_next = next_run - now
|
||||
if when_next <= 0:
|
||||
# Run & schedule its next execution.
|
||||
cb, cb_name, args, kwargs = self._callables[index]
|
||||
self._log.debug("Submitting periodic function '%s'",
|
||||
cb_name)
|
||||
fut = executor.submit(runner,
|
||||
self._now_func,
|
||||
cb, *args, **kwargs)
|
||||
fut.add_done_callback(functools.partial(_on_done,
|
||||
PERIODIC,
|
||||
cb, cb_name,
|
||||
index,
|
||||
submitted_at))
|
||||
else:
|
||||
# Gotta wait...
|
||||
self._schedule.push(next_run, index)
|
||||
when_next = min(when_next, self.MAX_LOOP_IDLE)
|
||||
self._waiter.wait(when_next)
|
||||
_process_immediates()
|
||||
_process_scheduled()
|
||||
|
||||
def _on_finish(self):
|
||||
# TODO(harlowja): this may be to verbose for people?
|
||||
|
@ -733,6 +735,9 @@ class PeriodicWorker(object):
|
|||
self._dead.clear()
|
||||
for cb_metrics, _watcher in self._watchers:
|
||||
for k in list(six.iterkeys(cb_metrics)):
|
||||
# NOTE(harlowja): mutate the original dictionaries keys
|
||||
# so that the watcher (which references the same dictionary
|
||||
# keys) is able to see those changes.
|
||||
cb_metrics[k] = 0
|
||||
self._immediates, self._schedule = _build(
|
||||
self._now_func, self._callables, self._initial_schedule_strategy)
|
||||
|
|
Loading…
Reference in New Issue