Don't wait for task in submit_task

The exception shifting performed by the Task class is meant to
protect the TaskManager run method from encountering any exceptions
done in the course of running a task.  This is especially important
for subclasses of TaskManager which implement alternate run
strategies.  The exception is shifted so that it is raised by the
Task.wait method.  In a multi-thread environment, it is the caller
of the submit_task method which should receive the exception, and
therefore it is submit_task which should call Task.wait.  Currently
it is the _run_task method which calls Task.wait, which means the
TaskManager itself receives the exception.

Change-Id: I3a6e61601164811fdd255ae54470011768c99a7d
This commit is contained in:
James E. Blair 2018-08-03 13:50:55 -07:00 committed by Monty Taylor
parent 004f35ffd1
commit 3ad9dabe55
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
3 changed files with 115 additions and 2 deletions

View File

@ -121,8 +121,21 @@ class TaskManager(object):
:param task: The task to execute.
:param bool raw: If True, return the raw result as received from the
underlying client call.
This method calls task.wait() so that it only returns when the
task is complete.
"""
return self.run_task(task=task)
if task.run_async:
# Async tasks run the wait lower in the stack because the wait
# is just returning the concurrent Future object. That future
# object handles the exception shifting across threads.
return self.run_task(task=task)
else:
# It's important that we call task.wait() here, rather than in
# the run_task call stack below here, since subclasses may
# cause run_task to be called from a different thread.
self.run_task(task=task)
return task.wait()
def submit_function(self, method, name=None, *args, **kwargs):
""" Allows submitting an arbitrary method for work.
@ -164,9 +177,14 @@ class TaskManager(object):
def _run_task_async(self, task):
self._log.debug(
"Manager %s submitting task %s", self.name, task.name)
return self.executor.submit(self._run_task, task)
return self.executor.submit(self._run_task_wait, task)
def _run_task(self, task):
# Never call task.wait() in the run_task call stack because we
# might be running in another thread. The exception-shifting
# code is designed so that caller of submit_task (which may be
# in a different thread than this run_task) gets the
# exception.
self.pre_run_task(task)
start = time.time()
task.run()
@ -174,6 +192,12 @@ class TaskManager(object):
dt = end - start
self.post_run_task(dt, task)
def _run_task_wait(self, task):
# For async tasks, the action being performed is getting a
# future back from concurrent.futures.ThreadPoolExecutor.
# We do need to run the wait because the Future object is
# handling the exception shifting for us.
self._run_task(task)
return task.wait()

View File

@ -14,7 +14,10 @@
import concurrent.futures
import fixtures
import mock
import queue
import threading
from openstack import task_manager
from openstack.tests.unit import base
@ -106,3 +109,82 @@ class TestTaskManager(base.TestCase):
def test_async(self, mock_submit):
self.manager.submit_task(TaskTestAsync())
self.assertTrue(mock_submit.called)
class ThreadingTaskManager(task_manager.TaskManager):
"""A subclass of TaskManager which exercises the thread-shifting
exception handling behavior."""
def __init__(self, *args, **kw):
super(ThreadingTaskManager, self).__init__(
*args, **kw)
self.queue = queue.Queue()
self._running = True
self._thread = threading.Thread(name=self.name, target=self.run)
self._thread.daemon = True
self.failed = False
def start(self):
self._thread.start()
def stop(self):
self._running = False
self.queue.put(None)
def join(self):
self._thread.join()
def run(self):
# No exception should ever cause this method to hit its
# exception handler.
try:
while True:
task = self.queue.get()
if not task:
if not self._running:
break
continue
self.run_task(task)
self.queue.task_done()
except Exception:
self.failed = True
raise
def submit_task(self, task, raw=False):
# An important part of the exception-shifting feature is that
# this method should raise the exception.
self.queue.put(task)
return task.wait()
class ThreadingTaskManagerFixture(fixtures.Fixture):
def _setUp(self):
self.manager = ThreadingTaskManager(name='threading test')
self.manager.start()
self.addCleanup(self._cleanup)
def _cleanup(self):
self.manager.stop()
self.manager.join()
class TestThreadingTaskManager(base.TestCase):
def setUp(self):
super(TestThreadingTaskManager, self).setUp()
f = self.useFixture(ThreadingTaskManagerFixture())
self.manager = f.manager
def test_wait_re_raise(self):
"""Test that Exceptions thrown in a Task is reraised correctly
This test is aimed to six.reraise(), called in Task::wait().
Specifically, we test if we get the same behaviour with all the
configured interpreters (e.g. py27, p35, ...)
"""
self.assertRaises(TestException, self.manager.submit_task, TaskTest())
# Stop the manager and join the run thread to ensure the
# exception handler has run.
self.manager.stop()
self.manager.join()
self.assertFalse(self.manager.failed)

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Fixed an issue with exceptions raised in TaskManager being raised at
the wrong part of the process causing queue oriented task managers to
abort their processing loop instead of reporting the exception to the
caller.