Merge "Scheduler: Add a progress callback to TaskRunner"

This commit is contained in:
Jenkins 2016-07-20 14:17:09 +00:00 committed by Gerrit Code Review
commit 94a8d440c6
2 changed files with 252 additions and 10 deletions

View File

@ -157,18 +157,19 @@ class TaskRunner(object):
LOG.debug('%s sleeping' % six.text_type(self))
eventlet.sleep(wait_time)
def __call__(self, wait_time=1, timeout=None):
def __call__(self, wait_time=1, timeout=None, progress_callback=None):
"""Start and run the task to completion.
The task will first sleep for zero seconds, then sleep for `wait_time`
seconds between steps. To avoid sleeping, pass `None` for `wait_time`.
"""
self.start(timeout=timeout)
# ensure that zero second sleep is applied only if task
# has not completed.
if not self.done():
self._sleep(0 if wait_time is not None else None)
self.run_to_completion(wait_time=wait_time)
assert self._runner is None, "Task already started"
started = False
for step in self.as_task(timeout=timeout,
progress_callback=progress_callback):
self._sleep(wait_time if (started or wait_time is None) else 0)
started = True
def start(self, timeout=None):
"""Initialise the task and run its first step.
@ -227,16 +228,18 @@ class TaskRunner(object):
return self._done
def run_to_completion(self, wait_time=1):
def run_to_completion(self, wait_time=1, progress_callback=None):
"""Run the task to completion.
The task will sleep for `wait_time` seconds between steps. To avoid
sleeping, pass `None` for `wait_time`.
"""
while not self.step():
assert self._runner is not None, "Task not started"
for step in self.as_task(progress_callback=progress_callback):
self._sleep(wait_time)
def as_task(self, timeout=None):
def as_task(self, timeout=None, progress_callback=None):
"""Return a task that drives the TaskRunner."""
resuming = self.started()
if not resuming:
@ -251,6 +254,9 @@ class TaskRunner(object):
while not done:
try:
yield
if progress_callback is not None:
progress_callback()
except GeneratorExit:
self.cancel()
raise

View File

@ -572,6 +572,242 @@ class TaskTest(common.HeatTestCase):
runner.start()
runner.run_to_completion(wait_time=24)
def test_run_progress(self):
progress_count = []
def progress():
progress_count.append(None)
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
scheduler.TaskRunner._sleep(0).AndReturn(None)
task.do_step(2).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
task.do_step(3).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
self.m.ReplayAll()
scheduler.TaskRunner(task)(progress_callback=progress)
self.assertEqual(task.num_steps, len(progress_count))
def test_start_run_progress(self):
progress_count = []
def progress():
progress_count.append(None)
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
task.do_step(2).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
task.do_step(3).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
self.m.ReplayAll()
runner = scheduler.TaskRunner(task)
runner.start()
runner.run_to_completion(progress_callback=progress)
self.assertEqual(task.num_steps - 1, len(progress_count))
def test_run_as_task_progress(self):
progress_count = []
def progress():
progress_count.append(None)
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
task.do_step(2).AndReturn(None)
task.do_step(3).AndReturn(None)
self.m.ReplayAll()
tr = scheduler.TaskRunner(task)
rt = tr.as_task(progress_callback=progress)
for step in rt:
pass
self.assertEqual(task.num_steps, len(progress_count))
def test_run_progress_exception(self):
class TestException(Exception):
pass
progress_count = []
def progress():
if progress_count:
raise TestException
progress_count.append(None)
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
scheduler.TaskRunner._sleep(0).AndReturn(None)
task.do_step(2).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
self.m.ReplayAll()
self.assertRaises(TestException, scheduler.TaskRunner(task),
progress_callback=progress)
self.assertEqual(1, len(progress_count))
def test_start_run_progress_exception(self):
class TestException(Exception):
pass
progress_count = []
def progress():
if progress_count:
raise TestException
progress_count.append(None)
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
task.do_step(2).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
task.do_step(3).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
self.m.ReplayAll()
runner = scheduler.TaskRunner(task)
runner.start()
self.assertRaises(TestException, runner.run_to_completion,
progress_callback=progress)
self.assertEqual(1, len(progress_count))
def test_run_as_task_progress_exception(self):
class TestException(Exception):
pass
progress_count = []
def progress():
if progress_count:
raise TestException
progress_count.append(None)
task = DummyTask()
self.m.StubOutWithMock(task, 'do_step')
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
task.do_step(1).AndReturn(None)
task.do_step(2).AndReturn(None)
self.m.ReplayAll()
tr = scheduler.TaskRunner(task)
rt = tr.as_task(progress_callback=progress)
next(rt)
next(rt)
self.assertRaises(TestException, next, rt)
self.assertEqual(1, len(progress_count))
def test_run_progress_exception_swallow(self):
class TestException(Exception):
pass
progress_count = []
def progress():
try:
if not progress_count:
raise TestException
finally:
progress_count.append(None)
def task():
try:
yield
except TestException:
yield
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
scheduler.TaskRunner._sleep(0).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
self.m.ReplayAll()
scheduler.TaskRunner(task)(progress_callback=progress)
self.assertEqual(2, len(progress_count))
def test_start_run_progress_exception_swallow(self):
class TestException(Exception):
pass
progress_count = []
def progress():
try:
if not progress_count:
raise TestException
finally:
progress_count.append(None)
def task():
yield
try:
yield
except TestException:
yield
self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
scheduler.TaskRunner._sleep(1).AndReturn(None)
scheduler.TaskRunner._sleep(1).AndReturn(None)
self.m.ReplayAll()
runner = scheduler.TaskRunner(task)
runner.start()
runner.run_to_completion(progress_callback=progress)
self.assertEqual(2, len(progress_count))
def test_run_as_task_progress_exception_swallow(self):
class TestException(Exception):
pass
progress_count = []
def progress():
try:
if not progress_count:
raise TestException
finally:
progress_count.append(None)
def task():
try:
yield
except TestException:
yield
tr = scheduler.TaskRunner(task)
rt = tr.as_task(progress_callback=progress)
next(rt)
next(rt)
self.assertRaises(StopIteration, next, rt)
self.assertEqual(2, len(progress_count))
def test_sleep(self):
sleep_time = 42
self.m.StubOutWithMock(eventlet, 'sleep')