Add support to task manager for async tasks

In order to support replacing SwiftService, we need to be able to spin
some tasks into async threads. We'll allow for calculation of rate
limiting to match with the start of the call, but will return a Future
to the call site. This patch is followed by a patch to Nodepool to have
Nodepool's TaskManager subclass shade's TaskManager.

Co-Authored-By: David Shrewsbury <shrewsbury.dave@gmail.com>

Change-Id: Ib473ceece7169d76dd37702d66b188a841ec3f59
This commit is contained in:
Monty Taylor 2016-12-25 08:41:30 -06:00
parent 197ca1b8c7
commit 0851770b0f
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
3 changed files with 53 additions and 6 deletions

View File

@ -140,7 +140,7 @@ class ShadeAdapter(adapter.Adapter):
return meta.obj_to_dict(result, request_id=request_id)
return result
def request(self, url, method, *args, **kwargs):
def request(self, url, method, run_async=False, *args, **kwargs):
name_parts = extract_name(url)
name = '.'.join([self.service_type, method] + name_parts)
class_name = "".join([
@ -155,6 +155,7 @@ class ShadeAdapter(adapter.Adapter):
super(RequestTask, self).__init__(**kw)
self.name = name
self.__class__.__name__ = str(class_name)
self.run_async = run_async
def main(self, client):
self.args.setdefault('raise_exc', False)

View File

@ -15,6 +15,7 @@
# limitations under the License.
import abc
import concurrent.futures
import sys
import threading
import time
@ -72,6 +73,7 @@ class BaseTask(object):
self._result = None
self._response = None
self._finished = threading.Event()
self.run_async = False
self.args = kw
self.name = type(self).__name__
@ -210,17 +212,23 @@ def generate_task_class(method, name, result_filter_cb):
class TaskManager(object):
log = _log.setup_logging(__name__)
def __init__(self, client, name, result_filter_cb=None):
def __init__(
self, client, name, result_filter_cb=None, workers=5, **kwargs):
self.name = name
self._client = client
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=workers)
if not result_filter_cb:
self._result_filter_cb = _result_filter_cb
else:
self._result_filter_cb = result_filter_cb
def set_client(self, client):
self._client = client
def stop(self):
""" This is a direct action passthrough TaskManager """
pass
self._executor.shutdown(wait=True)
def run(self):
""" This is a direct action passthrough TaskManager """
@ -233,15 +241,36 @@ class TaskManager(object):
:param bool raw: If True, return the raw result as received from the
underlying client call.
"""
return self.run_task(task=task, raw=raw)
def _run_task_async(self, task, raw=False):
self.log.debug(
"Manager %s submitting task %s", self.name, task.name)
return self._executor.submit(self._run_task, task, raw=raw)
def run_task(self, task, raw=False):
if hasattr(task, 'run_async') and task.run_async:
return self._run_task_async(task, raw=raw)
else:
return self._run_task(task, raw=raw)
def _run_task(self, task, raw=False):
self.log.debug(
"Manager %s running task %s", self.name, task.name)
start = time.time()
task.run(self._client)
end = time.time()
dt = end - start
self.log.debug(
"Manager %s ran task %s in %ss",
self.name, task.name, (end - start))
"Manager %s ran task %s in %ss", self.name, task.name, dt)
self.post_run_task(dt)
return task.wait(raw)
def post_run_task(self, elasped_time):
pass
# Backwards compatibility
submitTask = submit_task
@ -257,4 +286,4 @@ class TaskManager(object):
task_class = generate_task_class(method, name, result_filter_cb)
return self.manager.submit_task(task_class(**kwargs))
return self._executor.submit_task(task_class(**kwargs))

View File

@ -13,6 +13,9 @@
# limitations under the License.
import concurrent.futures
import mock
from shade import task_manager
from shade.tests.unit import base
@ -56,6 +59,15 @@ class TaskTestSet(task_manager.Task):
return set([1, 2])
class TaskTestAsync(task_manager.Task):
def __init__(self):
super(task_manager.Task, self).__init__()
self.run_async = True
def main(self, client):
pass
class TestTaskManager(base.TestCase):
def setUp(self):
@ -90,3 +102,8 @@ class TestTaskManager(base.TestCase):
def test_dont_munchify_set(self):
ret = self.manager.submit_task(TaskTestSet())
self.assertIsInstance(ret, set)
@mock.patch.object(concurrent.futures.ThreadPoolExecutor, 'submit')
def test_async(self, mock_submit):
self.manager.submit_task(TaskTestAsync())
self.assertTrue(mock_submit.called)