Merge "Add support for per-service rate limits"

This commit is contained in:
Zuul 2018-10-28 23:42:05 +00:00 committed by Gerrit Code Review
commit a30cc754f2
8 changed files with 118 additions and 15 deletions

View File

@ -116,11 +116,18 @@ class OpenStackSDKAdapter(adapter.Adapter):
This allows using the nodepool MultiThreaded Rate Limiting TaskManager.
"""
def __init__(self, session=None, task_manager=None, *args, **kwargs):
def __init__(
self, session=None,
task_manager=None,
rate_limit=None, concurrency=None,
*args, **kwargs):
super(OpenStackSDKAdapter, self).__init__(
session=session, *args, **kwargs)
if not task_manager:
task_manager = _task_manager.TaskManager(name=self.service_type)
task_manager = _task_manager.TaskManager(
name=self.service_type,
rate=rate_limit,
workers=concurrency)
task_manager.start()
self.task_manager = task_manager
@ -143,6 +150,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
ret = self.task_manager.submit_function(
request_method, run_async=True, name=name,
connect_retries=connect_retries, raise_exc=raise_exc,
tag=self.service_type,
**kwargs)
if run_async:
return ret

View File

@ -15,7 +15,6 @@
import copy
import warnings
from keystoneauth1 import adapter
from keystoneauth1 import discover
import keystoneauth1.exceptions.catalog
from keystoneauth1 import session as ks_session
@ -23,6 +22,7 @@ import os_service_types
import requestsexceptions
from six.moves import urllib
from openstack import _adapter
from openstack import version as openstack_version
from openstack import _log
from openstack.config import _util
@ -247,6 +247,17 @@ class CloudRegion(object):
value = converter(value)
return value
def _get_service_config(self, key, service_type):
config_dict = self.config.get(key)
if not config_dict:
return None
if not isinstance(config_dict, dict):
return config_dict
for st in self._service_type_manager.get_all_types(service_type):
if st in config_dict:
return config_dict[st]
def get_interface(self, service_type=None):
return self._get_config(
'interface', service_type, fallback_to_unprefixed=True)
@ -438,7 +449,8 @@ class CloudRegion(object):
return interface_versions.get(service_type, [])
def get_session_client(
self, service_type, version=None, constructor=adapter.Adapter,
self, service_type, version=None,
constructor=_adapter.OpenStackSDKAdapter,
**kwargs):
"""Return a prepped keystoneauth Adapter for a given service.
@ -498,6 +510,8 @@ class CloudRegion(object):
max_version=max_api_version,
endpoint_override=endpoint_override,
default_microversion=version_request.default_microversion,
rate_limit=self.get_rate_limit(service_type),
concurrency=self.get_concurrency(service_type),
**kwargs)
if version_request.default_microversion:
default_microversion = version_request.default_microversion
@ -724,3 +738,11 @@ class CloudRegion(object):
def get_password_callback(self):
return self._password_callback
def get_rate_limit(self, service_type=None):
return self._get_service_config(
'rate_limit', service_type=service_type)
def get_concurrency(self, service_type=None):
return self._get_service_config(
'concurrency', service_type=service_type)

View File

@ -220,6 +220,7 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta,
strict=False,
use_direct_get=False,
task_manager=None,
rate_limit=None,
**kwargs):
"""Create a connection to a cloud.
@ -262,6 +263,12 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta,
Defaults to None which causes a direct-action Task Manager to be
used.
:type manager: :class:`~openstack.task_manager.TaskManager`
:param rate_limit:
Client-side rate limit, expressed in calls per second. The
parameter can either be a single float, or it can be a dict with
keys as service-type and values as floats expressing the calls
per second for that service. Defaults to None, which means no
rate-limiting is performed.
:param kwargs: If a config is not provided, the rest of the parameters
provided are assumed to be arguments to be passed to the
CloudRegion contructor.
@ -294,7 +301,8 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta,
self.task_manager = task_manager
else:
self.task_manager = _task_manager.TaskManager(
self.config.full_name)
self.config.full_name,
rate=rate_limit)
self.task_manager.start()
self._session = None

View File

@ -45,7 +45,9 @@ class Task(object):
the main payload at execution time.
"""
def __init__(self, main=None, name=None, run_async=False, *args, **kwargs):
def __init__(
self, main=None, name=None, run_async=False,
tag=None, *args, **kwargs):
self._exception = None
self._traceback = None
self._result = None
@ -56,6 +58,7 @@ class Task(object):
self.args = args
self.kwargs = kwargs
self.name = name or type(self).__name__
self.tag = tag
def main(self):
return self._main(*self.args, **self.kwargs)
@ -103,12 +106,22 @@ class TaskManager(object):
self.daemon = True
self.queue = queue.Queue()
self._running = True
if rate is not None:
rate = float(rate)
self.rate = rate
if isinstance(rate, dict):
self._waits = {}
for (k, v) in rate.items():
if v:
self._waits[k] = 1.0 / v
else:
if rate:
self._waits = {None: 1.0 / rate}
else:
self._waits = {}
self._thread = threading.Thread(name=name, target=self.run)
self._thread.daemon = True
def _get_wait(self, tag):
return self._waits.get(tag, self._waits.get(None))
@property
def executor(self):
if not self._executor:
@ -129,7 +142,7 @@ class TaskManager(object):
self._thread.join()
def run(self):
last_ts = 0
last_ts_dict = {}
try:
while True:
task = self.queue.get()
@ -137,12 +150,15 @@ class TaskManager(object):
if not self._running:
break
continue
if self.rate:
wait = self._get_wait(task.tag)
if wait:
last_ts = last_ts_dict.get(task.tag, 0)
while True:
delta = time.time() - last_ts
if delta >= self.rate:
if delta >= wait:
break
time.sleep(self.rate - delta)
time.sleep(wait - delta)
last_ts_dict[task.tag] = time.time()
self._log.debug(
"TaskManager {name} queue size: {size})".format(
name=self.name,
@ -171,12 +187,14 @@ class TaskManager(object):
return task.wait()
def submit_function(
self, method, name=None, run_async=False, *args, **kwargs):
self, method, name=None, run_async=False, tag=None,
*args, **kwargs):
""" Allows submitting an arbitrary method for work.
:param method: Callable to run in the TaskManager.
:param str name: Name to use for the generated Task object.
:param bool run_async: Whether to run this task async or not.
:param str tag: Named rate-limiting context for the task.
:param args: positional arguments to pass to the method when it runs.
:param kwargs: keyword arguments to pass to the method when it runs.
"""
@ -185,10 +203,12 @@ class TaskManager(object):
self.executor.submit, method, *args, **kwargs)
task = Task(
main=payload, name=name,
run_async=run_async)
run_async=run_async,
tag=tag)
else:
task = Task(
main=method, name=name,
tag=tag,
*args, **kwargs)
return self.submit_task(task)

View File

@ -124,6 +124,8 @@ class TestCase(base.TestCase):
self.strict_cloud = openstack.connection.Connection(
config=self.cloud_config,
strict=True)
self.addCleanup(self.cloud.task_manager.stop)
self.addCleanup(self.strict_cloud.task_manager.stop)
# FIXME(notmorgan): Convert the uri_registry, discovery.json, and
# use of keystone_v3/v2 to a proper fixtures.Fixture. For now this

View File

@ -63,6 +63,25 @@ class TaskTestSet(task_manager.Task):
return set([1, 2])
class TestRateTransforms(base.TestCase):
def test_rate_parameter_scalar(self):
manager = task_manager.TaskManager(name='test', rate=0.1234)
self.assertEqual(1 / 0.1234, manager._get_wait('compute'))
self.assertEqual(1 / 0.1234, manager._get_wait(None))
def test_rate_parameter_dict(self):
manager = task_manager.TaskManager(
name='test',
rate={
'compute': 20,
'network': 10,
})
self.assertEqual(1 / 20, manager._get_wait('compute'))
self.assertEqual(1 / 10, manager._get_wait('network'))
self.assertIsNone(manager._get_wait('object-store'))
class TestTaskManager(base.TestCase):
def setUp(self):

View File

@ -84,6 +84,22 @@ class TestConnection(base.TestCase):
self.assertEqual(mock_session, conn.session)
self.assertEqual('auth.example.com', conn.config.name)
def test_task_manager_rate_scalar(self):
conn = connection.Connection(cloud='sample', rate_limit=20)
self.assertEqual(1 / 20, conn.task_manager._get_wait('object-store'))
self.assertEqual(1 / 20, conn.task_manager._get_wait(None))
def test_task_manager_rate_dict(self):
conn = connection.Connection(
cloud='sample',
rate_limit={
'compute': 20,
'network': 10,
})
self.assertEqual(1 / 20, conn.task_manager._get_wait('compute'))
self.assertEqual(1 / 10, conn.task_manager._get_wait('network'))
self.assertIsNone(conn.task_manager._get_wait('object-store'))
def test_create_session(self):
conn = connection.Connection(cloud='sample')
self.assertIsNotNone(conn)

View File

@ -0,0 +1,8 @@
---
features:
- |
Client-side rate limiting is now directly exposed via ``rate_limit``
and ``concurrency`` parameters. A single value can be given that applies
to all services, or a dict of service-type and value if different
client-side rate or concurrency limits should be used for different
services.