Merge "Call pre/post run task calls from TaskManager.submit_task()"

This commit is contained in:
Zuul 2018-10-29 19:47:56 +00:00 committed by Gerrit Code Review
commit 2d199ff2c6
3 changed files with 54 additions and 6 deletions

View File

@ -183,8 +183,14 @@ class TaskManager(object):
raise exceptions.TaskManagerStopped(
"TaskManager {name} is no longer running".format(
name=self.name))
self.pre_run_task(task)
start = time.time()
self.queue.put(task)
return task.wait()
ret = task.wait()
end = time.time()
dt = end - start
self.post_run_task(dt, task)
return ret
def submit_function(
self, method, name=None, run_async=False, tag=None,
@ -224,6 +230,13 @@ class TaskManager(object):
method, name=name, run_async=True, *args, **kwargs)
def pre_run_task(self, task):
'''Callback when task enters the task queue
:param task: the task
Intended to be overridden by child classes to track task
progress.
'''
self._log.debug(
"Manager %s running task %s", self.name, task.name)
@ -233,14 +246,21 @@ class TaskManager(object):
# code is designed so that caller of submit_task (which may be
# in a different thread than this run_task) gets the
# exception.
self.pre_run_task(task)
start = time.time()
#
# Note all threads go through the threadpool, so this is an
# async call. submit_task will wait() for the final result.
task.run()
end = time.time()
dt = end - start
self.post_run_task(dt, task)
def post_run_task(self, elapsed_time, task):
'''Callback at task completion
:param float elapsed_time: time in seconds between task entering
queue and finishing
:param task: the task
This function is intended to be overridden by child classes to
monitor task runtimes.
'''
self._log.debug(
"Manager %s ran task %s in %ss",
self.name, task.name, elapsed_time)

View File

@ -17,6 +17,7 @@ import concurrent.futures
import fixtures
import mock
import threading
import time
from six.moves import queue
@ -124,6 +125,28 @@ class TestTaskManager(base.TestCase):
self.manager.submit_function(set, run_async=True)
self.assertTrue(mock_submit.called)
@mock.patch.object(task_manager.TaskManager, 'post_run_task')
@mock.patch.object(task_manager.TaskManager, 'pre_run_task')
def test_pre_post_calls(self, mock_pre, mock_post):
self.manager.submit_function(lambda: None)
mock_pre.assert_called_once()
mock_post.assert_called_once()
@mock.patch.object(task_manager.TaskManager, 'post_run_task')
@mock.patch.object(task_manager.TaskManager, 'pre_run_task')
def test_validate_timing(self, mock_pre, mock_post):
# Note the unit test setup has mocked out time.sleep() and
# done a * 0.0001, and the test should be under the 5
# second timeout. Thus with below, we should see at
# *least* a 1 second pause running the task.
self.manager.submit_function(lambda: time.sleep(10000))
mock_pre.assert_called_once()
mock_post.assert_called_once()
args, kwargs = mock_post.call_args_list[0]
self.assertTrue(args[0] > 1.0)
class ThreadingTaskManager(task_manager.TaskManager):
"""A subclass of TaskManager which exercises the thread-shifting

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Fix a regression where the ``TaskManager.post_run_task`` ``elapsed_time``
argument was not reflecting the time taken by the actual task.