Scheduler: Allow TaskRunner to be used as a task

This will be useful in numerous places where we use a TaskRunner to provide
a timeout when we are already inside a wrappertask. It eliminates the
need to manually advance the TaskRunner, and ensures correct exception
handling.

Change-Id: I9be0a361e1d1856adb6eee2ef0da8d5aa23795be
This commit is contained in:
Zane Bitter 2016-06-22 20:40:46 +02:00
parent cac2bf7b6f
commit ecdaa8e183
2 changed files with 184 additions and 0 deletions

View File

@ -247,6 +247,35 @@ class TaskRunner(object):
while not self.step():
self._sleep(wait_time)
def as_task(self, timeout=None):
"""Return a task that drives the TaskRunner."""
resuming = self.started()
if not resuming:
self.start(timeout=timeout)
else:
if timeout is not None:
new_timeout = Timeout(self, timeout)
if self._timeout is None or new_timeout < self._timeout:
self._timeout = new_timeout
done = self.step() if resuming else self.done()
while not done:
try:
yield
except GeneratorExit:
self.cancel()
raise
except: # noqa
self._done = True
try:
self._runner.throw(*sys.exc_info())
except StopIteration:
return
else:
self._done = False
else:
done = self.step()
def cancel(self, grace_period=None):
"""Cancel the task and mark it as done."""
if self.done():

View File

@ -395,6 +395,94 @@ class TaskTest(common.HeatTestCase):
scheduler.TaskRunner(task)()
def test_run_as_task(self):
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
task.do_step(2).AndReturn(None)
task.do_step(3).AndReturn(None)
self.m.ReplayAll()
tr = scheduler.TaskRunner(task)
rt = tr.as_task()
for step in rt:
pass
self.assertTrue(tr.done())
def test_run_as_task_started(self):
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
task.do_step(2).AndReturn(None)
task.do_step(3).AndReturn(None)
self.m.ReplayAll()
tr = scheduler.TaskRunner(task)
tr.start()
for step in tr.as_task():
pass
self.assertTrue(tr.done())
def test_run_as_task_cancel(self):
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
self.m.ReplayAll()
tr = scheduler.TaskRunner(task)
rt = tr.as_task()
next(rt)
rt.close()
self.assertTrue(tr.done())
def test_run_as_task_exception(self):
class TestException(Exception):
pass
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
self.m.ReplayAll()
tr = scheduler.TaskRunner(task)
rt = tr.as_task()
next(rt)
self.assertRaises(TestException, rt.throw, TestException)
self.assertTrue(tr.done())
def test_run_as_task_swallow_exception(self):
class TestException(Exception):
pass
def task():
try:
yield
except TestException:
yield
tr = scheduler.TaskRunner(task)
rt = tr.as_task()
next(rt)
rt.throw(TestException)
self.assertFalse(tr.done())
self.assertRaises(StopIteration, next, rt)
self.assertTrue(tr.done())
def test_run_delays(self):
task = DummyTask(delays=itertools.repeat(2))
self.m.StubOutWithMock(task, 'do_step')
@ -688,6 +776,73 @@ class TaskTest(common.HeatTestCase):
self.assertFalse(runner)
self.assertTrue(runner.step())
def test_as_task_timeout(self):
st = timeutils.wallclock()
def task():
while True:
yield
self.m.StubOutWithMock(timeutils, 'wallclock')
timeutils.wallclock().AndReturn(st)
timeutils.wallclock().AndReturn(st + 0.5)
timeutils.wallclock().AndReturn(st + 1.5)
self.m.ReplayAll()
runner = scheduler.TaskRunner(task)
rt = runner.as_task(timeout=1)
next(rt)
self.assertTrue(runner)
self.assertRaises(scheduler.Timeout, next, rt)
def test_as_task_timeout_shorter(self):
st = timeutils.wallclock()
def task():
while True:
yield
self.m.StubOutWithMock(timeutils, 'wallclock')
timeutils.wallclock().AndReturn(st)
timeutils.wallclock().AndReturn(st + 0.5)
timeutils.wallclock().AndReturn(st + 0.7)
timeutils.wallclock().AndReturn(st + 1.6)
timeutils.wallclock().AndReturn(st + 2.6)
self.m.ReplayAll()
runner = scheduler.TaskRunner(task)
runner.start(timeout=10)
self.assertTrue(runner)
rt = runner.as_task(timeout=1)
next(rt)
self.assertRaises(scheduler.Timeout, next, rt)
def test_as_task_timeout_longer(self):
st = timeutils.wallclock()
def task():
while True:
yield
self.m.StubOutWithMock(timeutils, 'wallclock')
timeutils.wallclock().AndReturn(st)
timeutils.wallclock().AndReturn(st + 0.5)
timeutils.wallclock().AndReturn(st + 0.6)
timeutils.wallclock().AndReturn(st + 1.5)
self.m.ReplayAll()
runner = scheduler.TaskRunner(task)
runner.start(timeout=1)
self.assertTrue(runner)
rt = runner.as_task(timeout=10)
self.assertRaises(scheduler.Timeout, next, rt)
def test_cancel_not_started(self):
task = DummyTask(1)