Merge "Call pre/post run task calls from TaskManager.submit_task()"
This commit is contained in:
commit
2d199ff2c6
|
@ -183,8 +183,14 @@ class TaskManager(object):
|
|||
raise exceptions.TaskManagerStopped(
|
||||
"TaskManager {name} is no longer running".format(
|
||||
name=self.name))
|
||||
self.pre_run_task(task)
|
||||
start = time.time()
|
||||
self.queue.put(task)
|
||||
return task.wait()
|
||||
ret = task.wait()
|
||||
end = time.time()
|
||||
dt = end - start
|
||||
self.post_run_task(dt, task)
|
||||
return ret
|
||||
|
||||
def submit_function(
|
||||
self, method, name=None, run_async=False, tag=None,
|
||||
|
@ -224,6 +230,13 @@ class TaskManager(object):
|
|||
method, name=name, run_async=True, *args, **kwargs)
|
||||
|
||||
def pre_run_task(self, task):
|
||||
'''Callback when task enters the task queue
|
||||
|
||||
:param task: the task
|
||||
|
||||
Intended to be overridden by child classes to track task
|
||||
progress.
|
||||
'''
|
||||
self._log.debug(
|
||||
"Manager %s running task %s", self.name, task.name)
|
||||
|
||||
|
@ -233,14 +246,21 @@ class TaskManager(object):
|
|||
# code is designed so that caller of submit_task (which may be
|
||||
# in a different thread than this run_task) gets the
|
||||
# exception.
|
||||
self.pre_run_task(task)
|
||||
start = time.time()
|
||||
#
|
||||
# Note all threads go through the threadpool, so this is an
|
||||
# async call. submit_task will wait() for the final result.
|
||||
task.run()
|
||||
end = time.time()
|
||||
dt = end - start
|
||||
self.post_run_task(dt, task)
|
||||
|
||||
def post_run_task(self, elapsed_time, task):
|
||||
'''Callback at task completion
|
||||
|
||||
:param float elapsed_time: time in seconds between task entering
|
||||
queue and finishing
|
||||
:param task: the task
|
||||
|
||||
This function is intended to be overridden by child classes to
|
||||
monitor task runtimes.
|
||||
'''
|
||||
self._log.debug(
|
||||
"Manager %s ran task %s in %ss",
|
||||
self.name, task.name, elapsed_time)
|
||||
|
|
|
@ -17,6 +17,7 @@ import concurrent.futures
|
|||
import fixtures
|
||||
import mock
|
||||
import threading
|
||||
import time
|
||||
|
||||
from six.moves import queue
|
||||
|
||||
|
@ -124,6 +125,28 @@ class TestTaskManager(base.TestCase):
|
|||
self.manager.submit_function(set, run_async=True)
|
||||
self.assertTrue(mock_submit.called)
|
||||
|
||||
@mock.patch.object(task_manager.TaskManager, 'post_run_task')
|
||||
@mock.patch.object(task_manager.TaskManager, 'pre_run_task')
|
||||
def test_pre_post_calls(self, mock_pre, mock_post):
|
||||
self.manager.submit_function(lambda: None)
|
||||
mock_pre.assert_called_once()
|
||||
mock_post.assert_called_once()
|
||||
|
||||
@mock.patch.object(task_manager.TaskManager, 'post_run_task')
|
||||
@mock.patch.object(task_manager.TaskManager, 'pre_run_task')
|
||||
def test_validate_timing(self, mock_pre, mock_post):
|
||||
# Note the unit test setup has mocked out time.sleep() and
|
||||
# done a * 0.0001, and the test should be under the 5
|
||||
# second timeout. Thus with below, we should see at
|
||||
# *least* a 1 second pause running the task.
|
||||
self.manager.submit_function(lambda: time.sleep(10000))
|
||||
|
||||
mock_pre.assert_called_once()
|
||||
mock_post.assert_called_once()
|
||||
|
||||
args, kwargs = mock_post.call_args_list[0]
|
||||
self.assertTrue(args[0] > 1.0)
|
||||
|
||||
|
||||
class ThreadingTaskManager(task_manager.TaskManager):
|
||||
"""A subclass of TaskManager which exercises the thread-shifting
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
fixes:
|
||||
- |
|
||||
Fix a regression where the ``TaskManager.post_run_task`` ``elapsed_time``
|
||||
argument was not reflecting the time taken by the actual task.
|
Loading…
Reference in New Issue