From 991d41f2554c68bdb922db7c934d6f7928129fee Mon Sep 17 00:00:00 2001 From: Zane Bitter Date: Wed, 22 Jun 2016 20:40:46 +0200 Subject: [PATCH] Scheduler: Add a progress callback to TaskRunner This allows the caller to specify an arbitrary function to be called after each sleep() when running a task. Callers can use this to keep track of progress or keep track of external state. Any exception raised by the callback function will be thrown into the task. Change-Id: I637166f9448169eed8696e5b0fba106b8de1547a --- heat/engine/scheduler.py | 26 +-- heat/tests/engine/test_scheduler.py | 236 ++++++++++++++++++++++++++++ 2 files changed, 252 insertions(+), 10 deletions(-) diff --git a/heat/engine/scheduler.py b/heat/engine/scheduler.py index 3ccc292b9f..281b17907f 100644 --- a/heat/engine/scheduler.py +++ b/heat/engine/scheduler.py @@ -168,18 +168,19 @@ class TaskRunner(object): LOG.debug('%s sleeping' % six.text_type(self)) eventlet.sleep(wait_time) - def __call__(self, wait_time=1, timeout=None): + def __call__(self, wait_time=1, timeout=None, progress_callback=None): """Start and run the task to completion. The task will first sleep for zero seconds, then sleep for `wait_time` seconds between steps. To avoid sleeping, pass `None` for `wait_time`. """ - self.start(timeout=timeout) - # ensure that zero second sleep is applied only if task - # has not completed. - if not self.done(): - self._sleep(0 if wait_time is not None else None) - self.run_to_completion(wait_time=wait_time) + assert self._runner is None, "Task already started" + + started = False + for step in self.as_task(timeout=timeout, + progress_callback=progress_callback): + self._sleep(wait_time if (started or wait_time is None) else 0) + started = True def start(self, timeout=None): """Initialise the task and run its first step. @@ -238,16 +239,18 @@ class TaskRunner(object): return self._done - def run_to_completion(self, wait_time=1): + def run_to_completion(self, wait_time=1, progress_callback=None): """Run the task to completion. The task will sleep for `wait_time` seconds between steps. To avoid sleeping, pass `None` for `wait_time`. """ - while not self.step(): + assert self._runner is not None, "Task not started" + + for step in self.as_task(progress_callback=progress_callback): self._sleep(wait_time) - def as_task(self, timeout=None): + def as_task(self, timeout=None, progress_callback=None): """Return a task that drives the TaskRunner.""" resuming = self.started() if not resuming: @@ -262,6 +265,9 @@ class TaskRunner(object): while not done: try: yield + + if progress_callback is not None: + progress_callback() except GeneratorExit: self.cancel() raise diff --git a/heat/tests/engine/test_scheduler.py b/heat/tests/engine/test_scheduler.py index 333f282fab..cb16a3976a 100644 --- a/heat/tests/engine/test_scheduler.py +++ b/heat/tests/engine/test_scheduler.py @@ -572,6 +572,242 @@ class TaskTest(common.HeatTestCase): runner.start() runner.run_to_completion(wait_time=24) + def test_run_progress(self): + progress_count = [] + + def progress(): + progress_count.append(None) + + task = DummyTask() + self.m.StubOutWithMock(task, 'do_step') + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + scheduler.TaskRunner._sleep(0).AndReturn(None) + task.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + task.do_step(3).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + + self.m.ReplayAll() + + scheduler.TaskRunner(task)(progress_callback=progress) + self.assertEqual(task.num_steps, len(progress_count)) + + def test_start_run_progress(self): + progress_count = [] + + def progress(): + progress_count.append(None) + + task = DummyTask() + self.m.StubOutWithMock(task, 'do_step') + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + task.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + task.do_step(3).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + + self.m.ReplayAll() + + runner = scheduler.TaskRunner(task) + runner.start() + runner.run_to_completion(progress_callback=progress) + self.assertEqual(task.num_steps - 1, len(progress_count)) + + def test_run_as_task_progress(self): + progress_count = [] + + def progress(): + progress_count.append(None) + + task = DummyTask() + self.m.StubOutWithMock(task, 'do_step') + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + task.do_step(2).AndReturn(None) + task.do_step(3).AndReturn(None) + + self.m.ReplayAll() + + tr = scheduler.TaskRunner(task) + rt = tr.as_task(progress_callback=progress) + for step in rt: + pass + self.assertEqual(task.num_steps, len(progress_count)) + + def test_run_progress_exception(self): + class TestException(Exception): + pass + + progress_count = [] + + def progress(): + if progress_count: + raise TestException + progress_count.append(None) + + task = DummyTask() + self.m.StubOutWithMock(task, 'do_step') + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + scheduler.TaskRunner._sleep(0).AndReturn(None) + task.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + + self.m.ReplayAll() + + self.assertRaises(TestException, scheduler.TaskRunner(task), + progress_callback=progress) + self.assertEqual(1, len(progress_count)) + + def test_start_run_progress_exception(self): + class TestException(Exception): + pass + + progress_count = [] + + def progress(): + if progress_count: + raise TestException + progress_count.append(None) + + task = DummyTask() + self.m.StubOutWithMock(task, 'do_step') + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + task.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + task.do_step(3).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + + self.m.ReplayAll() + + runner = scheduler.TaskRunner(task) + runner.start() + self.assertRaises(TestException, runner.run_to_completion, + progress_callback=progress) + self.assertEqual(1, len(progress_count)) + + def test_run_as_task_progress_exception(self): + class TestException(Exception): + pass + + progress_count = [] + + def progress(): + if progress_count: + raise TestException + progress_count.append(None) + + task = DummyTask() + self.m.StubOutWithMock(task, 'do_step') + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + task.do_step(2).AndReturn(None) + + self.m.ReplayAll() + + tr = scheduler.TaskRunner(task) + rt = tr.as_task(progress_callback=progress) + next(rt) + next(rt) + self.assertRaises(TestException, next, rt) + self.assertEqual(1, len(progress_count)) + + def test_run_progress_exception_swallow(self): + class TestException(Exception): + pass + + progress_count = [] + + def progress(): + try: + if not progress_count: + raise TestException + finally: + progress_count.append(None) + + def task(): + try: + yield + except TestException: + yield + + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + scheduler.TaskRunner._sleep(0).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + + self.m.ReplayAll() + + scheduler.TaskRunner(task)(progress_callback=progress) + self.assertEqual(2, len(progress_count)) + + def test_start_run_progress_exception_swallow(self): + class TestException(Exception): + pass + + progress_count = [] + + def progress(): + try: + if not progress_count: + raise TestException + finally: + progress_count.append(None) + + def task(): + yield + try: + yield + except TestException: + yield + + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + scheduler.TaskRunner._sleep(1).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + + self.m.ReplayAll() + + runner = scheduler.TaskRunner(task) + runner.start() + runner.run_to_completion(progress_callback=progress) + self.assertEqual(2, len(progress_count)) + + def test_run_as_task_progress_exception_swallow(self): + class TestException(Exception): + pass + + progress_count = [] + + def progress(): + try: + if not progress_count: + raise TestException + finally: + progress_count.append(None) + + def task(): + try: + yield + except TestException: + yield + + tr = scheduler.TaskRunner(task) + rt = tr.as_task(progress_callback=progress) + next(rt) + next(rt) + self.assertRaises(StopIteration, next, rt) + self.assertEqual(2, len(progress_count)) + def test_sleep(self): sleep_time = 42 self.m.StubOutWithMock(eventlet, 'sleep')