Merge "Some minor refactoring and comment/note addition"

This commit is contained in:
Jenkins 2015-10-09 13:53:02 +00:00 committed by Gerrit Code Review
commit 24166d23da
1 changed files with 60 additions and 55 deletions

View File

@ -542,6 +542,61 @@ class PeriodicWorker(object):
def _run(self, executor, runner):
"""Main worker run loop."""
def _process_scheduled():
# Figure out when we should run next (by selecting the
# minimum item from the heap, where the minimum should be
# the callable that needs to run next and has the lowest
# next desired run time).
with self._waiter:
while (not self._schedule and
not self._tombstone.is_set() and
not self._immediates):
self._waiter.wait(self.MAX_LOOP_IDLE)
if self._tombstone.is_set():
# We were requested to stop, so stop.
return
if self._immediates:
# This will get processed in _process_immediates()
# in the next loop call.
return
submitted_at = now = self._now_func()
next_run, index = self._schedule.pop()
when_next = next_run - now
if when_next <= 0:
# Run & schedule its next execution.
cb, cb_name, args, kwargs = self._callables[index]
self._log.debug("Submitting periodic function '%s'",
cb_name)
fut = executor.submit(runner,
self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
PERIODIC,
cb, cb_name,
index,
submitted_at))
else:
# Gotta wait...
self._schedule.push(next_run, index)
when_next = min(when_next, self.MAX_LOOP_IDLE)
self._waiter.wait(when_next)
def _process_immediates():
try:
index = self._immediates.popleft()
except IndexError:
pass
else:
cb, cb_name, args, kwargs = self._callables[index]
submitted_at = self._now_func()
self._log.debug("Submitting immediate function '%s'", cb_name)
fut = executor.submit(runner, self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
cb, cb_name,
index, submitted_at))
def _on_done(kind, cb, cb_name, index, submitted_at, fut):
started_at, finished_at, failure = fut.result()
cb_metrics, _watcher = self._watchers[index]
@ -564,61 +619,8 @@ class PeriodicWorker(object):
self._waiter.notify_all()
while not self._tombstone.is_set():
if self._immediates:
# Run & schedule its next execution.
try:
index = self._immediates.popleft()
except IndexError:
pass
else:
cb, cb_name, args, kwargs = self._callables[index]
submitted_at = self._now_func()
self._log.debug("Submitting immediate function '%s'",
cb_name)
fut = executor.submit(runner,
self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
IMMEDIATE,
cb, cb_name,
index,
submitted_at))
else:
# Figure out when we should run next (by selecting the
# minimum item from the heap, where the minimum should be
# the callable that needs to run next and has the lowest
# next desired run time).
with self._waiter:
while (not self._schedule and
not self._tombstone.is_set() and
not self._immediates):
self._waiter.wait(self.MAX_LOOP_IDLE)
if self._tombstone.is_set():
break
if self._immediates:
# This will get popped in the prior condition...
continue
submitted_at = now = self._now_func()
next_run, index = self._schedule.pop()
when_next = next_run - now
if when_next <= 0:
# Run & schedule its next execution.
cb, cb_name, args, kwargs = self._callables[index]
self._log.debug("Submitting periodic function '%s'",
cb_name)
fut = executor.submit(runner,
self._now_func,
cb, *args, **kwargs)
fut.add_done_callback(functools.partial(_on_done,
PERIODIC,
cb, cb_name,
index,
submitted_at))
else:
# Gotta wait...
self._schedule.push(next_run, index)
when_next = min(when_next, self.MAX_LOOP_IDLE)
self._waiter.wait(when_next)
_process_immediates()
_process_scheduled()
def _on_finish(self):
# TODO(harlowja): this may be to verbose for people?
@ -733,6 +735,9 @@ class PeriodicWorker(object):
self._dead.clear()
for cb_metrics, _watcher in self._watchers:
for k in list(six.iterkeys(cb_metrics)):
# NOTE(harlowja): mutate the original dictionaries keys
# so that the watcher (which references the same dictionary
# keys) is able to see those changes.
cb_metrics[k] = 0
self._immediates, self._schedule = _build(
self._now_func, self._callables, self._initial_schedule_strategy)