Merge "Ensure new entry in immediates gets processed during wait(s)"
This commit is contained in:
commit
2ada7fba27
|
@ -579,10 +579,14 @@ class PeriodicWorker(object):
|
|||
# next desired run time).
|
||||
with self._waiter:
|
||||
while (not self._schedule and
|
||||
not self._tombstone.is_set()):
|
||||
not self._tombstone.is_set() and
|
||||
not self._immediates):
|
||||
self._waiter.wait(self.MAX_LOOP_IDLE)
|
||||
if self._tombstone.is_set():
|
||||
break
|
||||
if self._immediates:
|
||||
# This will get popped in the prior condition...
|
||||
continue
|
||||
submitted_at = now = self._now_func()
|
||||
next_run, index = self._schedule.pop()
|
||||
when_next = next_run - now
|
||||
|
|
|
@ -133,6 +133,22 @@ class TestPeriodics(testscenarios.TestWithScenarios, base.TestCase):
|
|||
nows = list(reversed(nows))
|
||||
self._test_strategy('last_finished', nows, last_now, 5.0)
|
||||
|
||||
def test_waiting_immediate_add_processed(self):
|
||||
ran_at = []
|
||||
|
||||
@periodics.periodic(0.1, run_immediately=True)
|
||||
def activated_periodic():
|
||||
ran_at.append(time.time())
|
||||
|
||||
w = periodics.PeriodicWorker([], **self.worker_kwargs)
|
||||
with self.create_destroy(w.start, allow_empty=True):
|
||||
# Give some time for the thread to start...
|
||||
self.sleep(0.5)
|
||||
w.add(activated_periodic)
|
||||
while len(ran_at) == 0:
|
||||
self.sleep(0.1)
|
||||
w.stop()
|
||||
|
||||
def test_double_start_fail(self):
|
||||
w = periodics.PeriodicWorker([], **self.worker_kwargs)
|
||||
with self.create_destroy(w.start, allow_empty=True):
|
||||
|
|
Loading…
Reference in New Issue