Allow scheduler tasks to request longer wait periods
This change allows a scheduler co-routine to yield an integer n, such that only every nth call to TaskRunner.step() results in the co-routine being advanced. We'll be able to use this to selectively slow down polling of resources where and when we know it doesn't make sense to poll every second. Change-Id: Ifc6303524be8e477905546760279027886833617 Related-Bug: #1549219
This commit is contained in:
parent
e0d92587f8
commit
8f8330573b
|
@ -140,6 +140,11 @@ class TaskRunner(object):
|
|||
|
||||
The task function may be a co-routine that yields control flow between
|
||||
steps.
|
||||
|
||||
If the task co-routine wishes to be advanced only on every nth step of
|
||||
the TaskRunner, it may yield an integer which is the period of the
|
||||
task. e.g. "yield 2" will result in the task being advanced on every
|
||||
second step.
|
||||
"""
|
||||
assert callable(task), "Task is not callable"
|
||||
|
||||
|
@ -149,6 +154,7 @@ class TaskRunner(object):
|
|||
self._runner = None
|
||||
self._done = False
|
||||
self._timeout = None
|
||||
self._poll_period = 1
|
||||
self.name = task_description(task)
|
||||
|
||||
def __str__(self):
|
||||
|
@ -207,6 +213,10 @@ class TaskRunner(object):
|
|||
if not self.done():
|
||||
assert self._runner is not None, "Task not started"
|
||||
|
||||
if self._poll_period > 1:
|
||||
self._poll_period -= 1
|
||||
return False
|
||||
|
||||
if self._timeout is not None and self._timeout.expired():
|
||||
LOG.info(_LI('%s timed out'), six.text_type(self))
|
||||
self._done = True
|
||||
|
@ -216,10 +226,15 @@ class TaskRunner(object):
|
|||
LOG.debug('%s running' % six.text_type(self))
|
||||
|
||||
try:
|
||||
next(self._runner)
|
||||
poll_period = next(self._runner)
|
||||
except StopIteration:
|
||||
self._done = True
|
||||
LOG.debug('%s complete' % six.text_type(self))
|
||||
else:
|
||||
if isinstance(poll_period, six.integer_types):
|
||||
self._poll_period = max(poll_period, 1)
|
||||
else:
|
||||
self._poll_period = 1
|
||||
|
||||
return self._done
|
||||
|
||||
|
@ -291,7 +306,7 @@ def wrappertask(task):
|
|||
|
||||
while True:
|
||||
try:
|
||||
if subtask is not None:
|
||||
if isinstance(subtask, types.GeneratorType):
|
||||
subtask_running = True
|
||||
try:
|
||||
step = next(subtask)
|
||||
|
@ -315,7 +330,7 @@ def wrappertask(task):
|
|||
except StopIteration:
|
||||
subtask_running = False
|
||||
else:
|
||||
yield
|
||||
yield subtask
|
||||
except GeneratorExit:
|
||||
parent.close()
|
||||
raise
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
|
||||
import eventlet
|
||||
import six
|
||||
|
@ -24,13 +25,17 @@ from heat.tests import common
|
|||
|
||||
|
||||
class DummyTask(object):
|
||||
def __init__(self, num_steps=3):
|
||||
def __init__(self, num_steps=3, delays=None):
|
||||
self.num_steps = num_steps
|
||||
if delays is not None:
|
||||
self.delays = iter(delays)
|
||||
else:
|
||||
self.delays = itertools.repeat(None)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
for i in range(1, self.num_steps + 1):
|
||||
self.do_step(i, *args, **kwargs)
|
||||
yield
|
||||
yield next(self.delays)
|
||||
|
||||
def do_step(self, step_num, *args, **kwargs):
|
||||
pass
|
||||
|
@ -337,6 +342,45 @@ class TaskTest(common.HeatTestCase):
|
|||
|
||||
scheduler.TaskRunner(task)()
|
||||
|
||||
def test_run_delays(self):
|
||||
task = DummyTask(delays=itertools.repeat(2))
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
task.do_step(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(0).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
task.do_step(2).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
task.do_step(3).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
scheduler.TaskRunner(task)()
|
||||
|
||||
def test_run_delays_dynamic(self):
|
||||
task = DummyTask(delays=[2, 4, 1])
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
|
||||
|
||||
task.do_step(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(0).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
task.do_step(2).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
task.do_step(3).AndReturn(None)
|
||||
scheduler.TaskRunner._sleep(1).AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
scheduler.TaskRunner(task)()
|
||||
|
||||
def test_run_wait_time(self):
|
||||
task = DummyTask()
|
||||
self.m.StubOutWithMock(task, 'do_step')
|
||||
|
@ -879,6 +923,35 @@ class WrapperTaskTest(common.HeatTestCase):
|
|||
|
||||
scheduler.TaskRunner(task)()
|
||||
|
||||
def test_parent_yield_value(self):
|
||||
@scheduler.wrappertask
|
||||
def parent_task():
|
||||
yield
|
||||
yield 3
|
||||
yield iter([1, 2, 4])
|
||||
|
||||
task = parent_task()
|
||||
|
||||
self.assertIsNone(next(task))
|
||||
self.assertEqual(3, next(task))
|
||||
self.assertEqual([1, 2, 4], list(next(task)))
|
||||
|
||||
def test_child_yield_value(self):
|
||||
def child_task():
|
||||
yield
|
||||
yield 3
|
||||
yield iter([1, 2, 4])
|
||||
|
||||
@scheduler.wrappertask
|
||||
def parent_task():
|
||||
yield child_task()
|
||||
|
||||
task = parent_task()
|
||||
|
||||
self.assertIsNone(next(task))
|
||||
self.assertEqual(3, next(task))
|
||||
self.assertEqual([1, 2, 4], list(next(task)))
|
||||
|
||||
def test_child_exception(self):
|
||||
class MyException(Exception):
|
||||
pass
|
||||
|
|
Loading…
Reference in New Issue