Make RateLimitingTaskManager the TaskManager

There isn't really a reason to not run the multi-threaded rate-limiting
task manager all the time.

Modify it slightly so that rate=None means "don't rate limit", which
should keep the existing behavior.

Change-Id: I3ede4fd0b12a65effade238c4ea967aca51869ba
This commit is contained in:
Monty Taylor 2018-09-22 06:46:37 -05:00
parent 348c9e8da3
commit 9db8bae0a1
4 changed files with 48 additions and 64 deletions

View File

@ -121,6 +121,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
session=session, *args, **kwargs)
if not task_manager:
task_manager = _task_manager.TaskManager(name=self.service_type)
task_manager.start()
self.task_manager = task_manager

View File

@ -290,6 +290,7 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta,
self.task_manager = task_manager or _task_manager.TaskManager(
self.config.full_name)
self.task_manager.start()
self._session = None
self._proxies = {}

View File

@ -95,11 +95,19 @@ class Task(object):
class TaskManager(object):
def __init__(self, name, log=_log, workers=5, **kwargs):
def __init__(self, name, rate=None, log=_log, workers=5, **kwargs):
self.name = name
self._executor = None
self._log = log
self._workers = workers
self.daemon = True
self.queue = queue.Queue()
self._running = True
if rate is not None:
rate = float(rate)
self.rate = rate
self._thread = threading.Thread(name=name, target=self.run)
self._thread.daemon = True
@property
def executor(self):
@ -108,18 +116,42 @@ class TaskManager(object):
max_workers=self._workers)
return self._executor
def start(self):
self._thread.start()
def stop(self):
""" This is a direct action passthrough TaskManager """
self._running = False
self.queue.put(None)
if self._executor:
self._executor.shutdown()
def run(self):
""" This is a direct action passthrough TaskManager """
pass
def join(self):
""" This is a direct action passthrough TaskManager """
pass
self._thread.join()
def run(self):
last_ts = 0
try:
while True:
task = self.queue.get()
if not task:
if not self._running:
break
continue
if self.rate:
while True:
delta = time.time() - last_ts
if delta >= self.rate:
break
time.sleep(self.rate - delta)
self._log.debug(
"TaskManager {name} queue size: {size})".format(
name=self.name,
size=self.queue.qsize()))
self.run_task(task)
self.queue.task_done()
except Exception:
self._log.exception("TaskManager died")
raise
def submit_task(self, task):
"""Submit and execute the given task.
@ -131,7 +163,11 @@ class TaskManager(object):
This method calls task.wait() so that it only returns when the
task is complete.
"""
self.run_task(task=task)
if not self._running:
raise exceptions.TaskManagerStopped(
"TaskManager {name} is no longer running".format(
name=self.name))
self.queue.put(task)
return task.wait()
def submit_function(
@ -190,61 +226,6 @@ class TaskManager(object):
self.name, task.name, elapsed_time)
class RateLimitingTaskManager(TaskManager):
def __init__(self, name, rate, workers=5):
super(TaskManager, self).__init__(
name=name, workers=workers)
self.daemon = True
self.queue = queue.Queue()
self._running = True
self.rate = float(rate)
self._thread = threading.Thread(name=name, target=self.run)
self._thread.daemon = True
def start(self):
self._thread.start()
def stop(self):
self._running = False
self.queue.put(None)
def join(self):
self._thread.join()
def run(self):
last_ts = 0
try:
while True:
task = self.queue.get()
if not task:
if not self._running:
break
continue
while True:
delta = time.time() - last_ts
if delta >= self.rate:
break
time.sleep(self.rate - delta)
self._log.debug(
"TaskManager {name} queue size: {size})".format(
name=self.name,
size=self.queue.qsize()))
self.run_task(task)
self.queue.task_done()
except Exception:
self._log.exception("TaskManager died")
raise
def submit_task(self, task):
if not self._running:
raise exceptions.TaskManagerStopped(
"TaskManager {name} is no longer running".format(
name=self.name))
self.queue.put(task)
return task.wait()
def wait_for_futures(futures, raise_on_error=True, log=_log):
'''Collect results or failures from a list of running future tasks.'''

View File

@ -68,6 +68,7 @@ class TestTaskManager(base.TestCase):
def setUp(self):
super(TestTaskManager, self).setUp()
self.manager = task_manager.TaskManager(name='test')
self.manager.start()
def test_wait_re_raise(self):
"""Test that Exceptions thrown in a Task is reraised correctly