From 3ad9dabe5593c02a435fb6e505826af0b874751e Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Fri, 3 Aug 2018 13:50:55 -0700 Subject: [PATCH] Don't wait for task in submit_task The exception shifting performed by the Task class is meant to protect the TaskManager run method from encountering any exceptions done in the course of running a task. This is especially important for subclasses of TaskManager which implement alternate run strategies. The exception is shifted so that it is raised by the Task.wait method. In a multi-thread environment, it is the caller of the submit_task method which should receive the exception, and therefore it is submit_task which should call Task.wait. Currently it is the _run_task method which calls Task.wait, which means the TaskManager itself receives the exception. Change-Id: I3a6e61601164811fdd255ae54470011768c99a7d --- openstack/task_manager.py | 28 ++++++- .../tests/unit/cloud/test_task_manager.py | 82 +++++++++++++++++++ .../wait-submit-task-ff7c47d6620954a5.yaml | 7 ++ 3 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 releasenotes/notes/wait-submit-task-ff7c47d6620954a5.yaml diff --git a/openstack/task_manager.py b/openstack/task_manager.py index 0b05e7107..0f47da0bc 100644 --- a/openstack/task_manager.py +++ b/openstack/task_manager.py @@ -121,8 +121,21 @@ class TaskManager(object): :param task: The task to execute. :param bool raw: If True, return the raw result as received from the underlying client call. + + This method calls task.wait() so that it only returns when the + task is complete. """ - return self.run_task(task=task) + if task.run_async: + # Async tasks run the wait lower in the stack because the wait + # is just returning the concurrent Future object. That future + # object handles the exception shifting across threads. + return self.run_task(task=task) + else: + # It's important that we call task.wait() here, rather than in + # the run_task call stack below here, since subclasses may + # cause run_task to be called from a different thread. + self.run_task(task=task) + return task.wait() def submit_function(self, method, name=None, *args, **kwargs): """ Allows submitting an arbitrary method for work. @@ -164,9 +177,14 @@ class TaskManager(object): def _run_task_async(self, task): self._log.debug( "Manager %s submitting task %s", self.name, task.name) - return self.executor.submit(self._run_task, task) + return self.executor.submit(self._run_task_wait, task) def _run_task(self, task): + # Never call task.wait() in the run_task call stack because we + # might be running in another thread. The exception-shifting + # code is designed so that caller of submit_task (which may be + # in a different thread than this run_task) gets the + # exception. self.pre_run_task(task) start = time.time() task.run() @@ -174,6 +192,12 @@ class TaskManager(object): dt = end - start self.post_run_task(dt, task) + def _run_task_wait(self, task): + # For async tasks, the action being performed is getting a + # future back from concurrent.futures.ThreadPoolExecutor. + # We do need to run the wait because the Future object is + # handling the exception shifting for us. + self._run_task(task) return task.wait() diff --git a/openstack/tests/unit/cloud/test_task_manager.py b/openstack/tests/unit/cloud/test_task_manager.py index a4c7d2694..6403d947b 100644 --- a/openstack/tests/unit/cloud/test_task_manager.py +++ b/openstack/tests/unit/cloud/test_task_manager.py @@ -14,7 +14,10 @@ import concurrent.futures +import fixtures import mock +import queue +import threading from openstack import task_manager from openstack.tests.unit import base @@ -106,3 +109,82 @@ class TestTaskManager(base.TestCase): def test_async(self, mock_submit): self.manager.submit_task(TaskTestAsync()) self.assertTrue(mock_submit.called) + + +class ThreadingTaskManager(task_manager.TaskManager): + """A subclass of TaskManager which exercises the thread-shifting + exception handling behavior.""" + + def __init__(self, *args, **kw): + super(ThreadingTaskManager, self).__init__( + *args, **kw) + self.queue = queue.Queue() + self._running = True + self._thread = threading.Thread(name=self.name, target=self.run) + self._thread.daemon = True + self.failed = False + + def start(self): + self._thread.start() + + def stop(self): + self._running = False + self.queue.put(None) + + def join(self): + self._thread.join() + + def run(self): + # No exception should ever cause this method to hit its + # exception handler. + try: + while True: + task = self.queue.get() + if not task: + if not self._running: + break + continue + self.run_task(task) + self.queue.task_done() + except Exception: + self.failed = True + raise + + def submit_task(self, task, raw=False): + # An important part of the exception-shifting feature is that + # this method should raise the exception. + self.queue.put(task) + return task.wait() + + +class ThreadingTaskManagerFixture(fixtures.Fixture): + def _setUp(self): + self.manager = ThreadingTaskManager(name='threading test') + self.manager.start() + self.addCleanup(self._cleanup) + + def _cleanup(self): + self.manager.stop() + self.manager.join() + + +class TestThreadingTaskManager(base.TestCase): + + def setUp(self): + super(TestThreadingTaskManager, self).setUp() + f = self.useFixture(ThreadingTaskManagerFixture()) + self.manager = f.manager + + def test_wait_re_raise(self): + """Test that Exceptions thrown in a Task is reraised correctly + + This test is aimed to six.reraise(), called in Task::wait(). + Specifically, we test if we get the same behaviour with all the + configured interpreters (e.g. py27, p35, ...) + """ + self.assertRaises(TestException, self.manager.submit_task, TaskTest()) + # Stop the manager and join the run thread to ensure the + # exception handler has run. + self.manager.stop() + self.manager.join() + self.assertFalse(self.manager.failed) diff --git a/releasenotes/notes/wait-submit-task-ff7c47d6620954a5.yaml b/releasenotes/notes/wait-submit-task-ff7c47d6620954a5.yaml new file mode 100644 index 000000000..05045d29d --- /dev/null +++ b/releasenotes/notes/wait-submit-task-ff7c47d6620954a5.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Fixed an issue with exceptions raised in TaskManager being raised at + the wrong part of the process causing queue oriented task managers to + abort their processing loop instead of reporting the exception to the + caller.