diff --git a/openstack/task_manager.py b/openstack/task_manager.py index 0b05e7107..0f47da0bc 100644 --- a/openstack/task_manager.py +++ b/openstack/task_manager.py @@ -121,8 +121,21 @@ class TaskManager(object): :param task: The task to execute. :param bool raw: If True, return the raw result as received from the underlying client call. + + This method calls task.wait() so that it only returns when the + task is complete. """ - return self.run_task(task=task) + if task.run_async: + # Async tasks run the wait lower in the stack because the wait + # is just returning the concurrent Future object. That future + # object handles the exception shifting across threads. + return self.run_task(task=task) + else: + # It's important that we call task.wait() here, rather than in + # the run_task call stack below here, since subclasses may + # cause run_task to be called from a different thread. + self.run_task(task=task) + return task.wait() def submit_function(self, method, name=None, *args, **kwargs): """ Allows submitting an arbitrary method for work. @@ -164,9 +177,14 @@ class TaskManager(object): def _run_task_async(self, task): self._log.debug( "Manager %s submitting task %s", self.name, task.name) - return self.executor.submit(self._run_task, task) + return self.executor.submit(self._run_task_wait, task) def _run_task(self, task): + # Never call task.wait() in the run_task call stack because we + # might be running in another thread. The exception-shifting + # code is designed so that caller of submit_task (which may be + # in a different thread than this run_task) gets the + # exception. self.pre_run_task(task) start = time.time() task.run() @@ -174,6 +192,12 @@ class TaskManager(object): dt = end - start self.post_run_task(dt, task) + def _run_task_wait(self, task): + # For async tasks, the action being performed is getting a + # future back from concurrent.futures.ThreadPoolExecutor. + # We do need to run the wait because the Future object is + # handling the exception shifting for us. + self._run_task(task) return task.wait() diff --git a/openstack/tests/unit/cloud/test_task_manager.py b/openstack/tests/unit/cloud/test_task_manager.py index a4c7d2694..6403d947b 100644 --- a/openstack/tests/unit/cloud/test_task_manager.py +++ b/openstack/tests/unit/cloud/test_task_manager.py @@ -14,7 +14,10 @@ import concurrent.futures +import fixtures import mock +import queue +import threading from openstack import task_manager from openstack.tests.unit import base @@ -106,3 +109,82 @@ class TestTaskManager(base.TestCase): def test_async(self, mock_submit): self.manager.submit_task(TaskTestAsync()) self.assertTrue(mock_submit.called) + + +class ThreadingTaskManager(task_manager.TaskManager): + """A subclass of TaskManager which exercises the thread-shifting + exception handling behavior.""" + + def __init__(self, *args, **kw): + super(ThreadingTaskManager, self).__init__( + *args, **kw) + self.queue = queue.Queue() + self._running = True + self._thread = threading.Thread(name=self.name, target=self.run) + self._thread.daemon = True + self.failed = False + + def start(self): + self._thread.start() + + def stop(self): + self._running = False + self.queue.put(None) + + def join(self): + self._thread.join() + + def run(self): + # No exception should ever cause this method to hit its + # exception handler. + try: + while True: + task = self.queue.get() + if not task: + if not self._running: + break + continue + self.run_task(task) + self.queue.task_done() + except Exception: + self.failed = True + raise + + def submit_task(self, task, raw=False): + # An important part of the exception-shifting feature is that + # this method should raise the exception. + self.queue.put(task) + return task.wait() + + +class ThreadingTaskManagerFixture(fixtures.Fixture): + def _setUp(self): + self.manager = ThreadingTaskManager(name='threading test') + self.manager.start() + self.addCleanup(self._cleanup) + + def _cleanup(self): + self.manager.stop() + self.manager.join() + + +class TestThreadingTaskManager(base.TestCase): + + def setUp(self): + super(TestThreadingTaskManager, self).setUp() + f = self.useFixture(ThreadingTaskManagerFixture()) + self.manager = f.manager + + def test_wait_re_raise(self): + """Test that Exceptions thrown in a Task is reraised correctly + + This test is aimed to six.reraise(), called in Task::wait(). + Specifically, we test if we get the same behaviour with all the + configured interpreters (e.g. py27, p35, ...) + """ + self.assertRaises(TestException, self.manager.submit_task, TaskTest()) + # Stop the manager and join the run thread to ensure the + # exception handler has run. + self.manager.stop() + self.manager.join() + self.assertFalse(self.manager.failed) diff --git a/releasenotes/notes/wait-submit-task-ff7c47d6620954a5.yaml b/releasenotes/notes/wait-submit-task-ff7c47d6620954a5.yaml new file mode 100644 index 000000000..05045d29d --- /dev/null +++ b/releasenotes/notes/wait-submit-task-ff7c47d6620954a5.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Fixed an issue with exceptions raised in TaskManager being raised at + the wrong part of the process causing queue oriented task managers to + abort their processing loop instead of reporting the exception to the + caller.