Allow scheduler tasks to request longer wait periods

This change allows a scheduler co-routine to yield an integer n, such
that only every nth call to TaskRunner.step() results in the co-routine
being advanced. We'll be able to use this to selectively slow down
polling of resources where and when we know it doesn't make sense to
poll every second.

Change-Id: Ifc6303524be8e477905546760279027886833617
Related-Bug: #1549219
This commit is contained in:
Zane Bitter 2016-05-17 12:29:12 -04:00
parent e0d92587f8
commit 8f8330573b
2 changed files with 93 additions and 5 deletions

View File

@ -140,6 +140,11 @@ class TaskRunner(object):
The task function may be a co-routine that yields control flow between
steps.
If the task co-routine wishes to be advanced only on every nth step of
the TaskRunner, it may yield an integer which is the period of the
task. e.g. "yield 2" will result in the task being advanced on every
second step.
"""
assert callable(task), "Task is not callable"
@ -149,6 +154,7 @@ class TaskRunner(object):
self._runner = None
self._done = False
self._timeout = None
self._poll_period = 1
self.name = task_description(task)
def __str__(self):
@ -207,6 +213,10 @@ class TaskRunner(object):
if not self.done():
assert self._runner is not None, "Task not started"
if self._poll_period > 1:
self._poll_period -= 1
return False
if self._timeout is not None and self._timeout.expired():
LOG.info(_LI('%s timed out'), six.text_type(self))
self._done = True
@ -216,10 +226,15 @@ class TaskRunner(object):
LOG.debug('%s running' % six.text_type(self))
try:
next(self._runner)
poll_period = next(self._runner)
except StopIteration:
self._done = True
LOG.debug('%s complete' % six.text_type(self))
else:
if isinstance(poll_period, six.integer_types):
self._poll_period = max(poll_period, 1)
else:
self._poll_period = 1
return self._done
@ -291,7 +306,7 @@ def wrappertask(task):
while True:
try:
if subtask is not None:
if isinstance(subtask, types.GeneratorType):
subtask_running = True
try:
step = next(subtask)
@ -315,7 +330,7 @@ def wrappertask(task):
except StopIteration:
subtask_running = False
else:
yield
yield subtask
except GeneratorExit:
parent.close()
raise

View File

@ -12,6 +12,7 @@
# under the License.
import contextlib
import itertools
import eventlet
import six
@ -24,13 +25,17 @@ from heat.tests import common
class DummyTask(object):
def __init__(self, num_steps=3):
def __init__(self, num_steps=3, delays=None):
self.num_steps = num_steps
if delays is not None:
self.delays = iter(delays)
else:
self.delays = itertools.repeat(None)
def __call__(self, *args, **kwargs):
for i in range(1, self.num_steps + 1):
self.do_step(i, *args, **kwargs)
yield
yield next(self.delays)
def do_step(self, step_num, *args, **kwargs):
pass
@ -337,6 +342,45 @@ class TaskTest(common.HeatTestCase):
scheduler.TaskRunner(task)()
def test_run_delays(self):
task = DummyTask(delays=itertools.repeat(2))
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
scheduler.TaskRunner._sleep(0).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
task.do_step(2).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
task.do_step(3).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
self.m.ReplayAll()
scheduler.TaskRunner(task)()
def test_run_delays_dynamic(self):
task = DummyTask(delays=[2, 4, 1])
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
scheduler.TaskRunner._sleep(0).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
task.do_step(2).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
task.do_step(3).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
self.m.ReplayAll()
scheduler.TaskRunner(task)()
def test_run_wait_time(self):
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
@ -879,6 +923,35 @@ class WrapperTaskTest(common.HeatTestCase):
scheduler.TaskRunner(task)()
def test_parent_yield_value(self):
@scheduler.wrappertask
def parent_task():
yield
yield 3
yield iter([1, 2, 4])
task = parent_task()
self.assertIsNone(next(task))
self.assertEqual(3, next(task))
self.assertEqual([1, 2, 4], list(next(task)))
def test_child_yield_value(self):
def child_task():
yield
yield 3
yield iter([1, 2, 4])
@scheduler.wrappertask
def parent_task():
yield child_task()
task = parent_task()
self.assertIsNone(next(task))
self.assertEqual(3, next(task))
self.assertEqual([1, 2, 4], list(next(task)))
def test_child_exception(self):
class MyException(Exception):
pass