Import rate limiting TaskManager from nodepool

There is logic in the caching and batching layer in shade that is in
service of the rate limiting TaskManager from nodepool- but the logic in
that TaskManager is nowhere to be found in the openstacksdk codebase.
This leads people to want to make changes without the whole picture.

Pull in the code from nodepool so that we can have a full discussion
about things like caching and retries ... and potentially even add some
tests that run things under it.

Change-Id: I0fcefb78cf66a60a4c08de8d39d44222a56a8725
This commit is contained in:
Monty Taylor 2018-06-11 09:52:52 -05:00
parent 9d72be5ac9
commit 348c9e8da3
3 changed files with 66 additions and 1 deletions

View File

@ -243,3 +243,7 @@ class NotSupported(SDKException):
class ValidationException(SDKException):
"""Validation failed for resource."""
class TaskManagerStopped(SDKException):
"""Operations were attempted on a stopped TaskManager."""

View File

@ -20,6 +20,7 @@ import time
import keystoneauth1.exceptions
import six
from six.moves import queue
import openstack._log
from openstack import exceptions
@ -116,6 +117,10 @@ class TaskManager(object):
""" This is a direct action passthrough TaskManager """
pass
def join(self):
""" This is a direct action passthrough TaskManager """
pass
def submit_task(self, task):
"""Submit and execute the given task.
@ -185,6 +190,61 @@ class TaskManager(object):
self.name, task.name, elapsed_time)
class RateLimitingTaskManager(TaskManager):
def __init__(self, name, rate, workers=5):
super(TaskManager, self).__init__(
name=name, workers=workers)
self.daemon = True
self.queue = queue.Queue()
self._running = True
self.rate = float(rate)
self._thread = threading.Thread(name=name, target=self.run)
self._thread.daemon = True
def start(self):
self._thread.start()
def stop(self):
self._running = False
self.queue.put(None)
def join(self):
self._thread.join()
def run(self):
last_ts = 0
try:
while True:
task = self.queue.get()
if not task:
if not self._running:
break
continue
while True:
delta = time.time() - last_ts
if delta >= self.rate:
break
time.sleep(self.rate - delta)
self._log.debug(
"TaskManager {name} queue size: {size})".format(
name=self.name,
size=self.queue.qsize()))
self.run_task(task)
self.queue.task_done()
except Exception:
self._log.exception("TaskManager died")
raise
def submit_task(self, task):
if not self._running:
raise exceptions.TaskManagerStopped(
"TaskManager {name} is no longer running".format(
name=self.name))
self.queue.put(task)
return task.wait()
def wait_for_futures(futures, raise_on_error=True, log=_log):
'''Collect results or failures from a list of running future tasks.'''

View File

@ -16,9 +16,10 @@
import concurrent.futures
import fixtures
import mock
import queue
import threading
from six.moves import queue
from openstack import task_manager
from openstack.tests.unit import base