Merge "Add support for client-side rate limiting"

This commit is contained in:
Zuul 2019-03-01 01:58:36 +00:00 committed by Gerrit Code Review
commit 0828f7048e
6 changed files with 259 additions and 7 deletions

View File

@ -0,0 +1,104 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
from six.moves import queue
class FairSemaphore(object):
"""Semaphore class that notifies in order of request.
We cannot use a normal Semaphore because it doesn't give any ordering,
which could lead to a request starving. Instead, handle them in the
order we receive them.
:param int concurrency:
How many concurrent threads can have the semaphore at once.
:param float rate_delay:
How long to wait between the start of each thread receiving the
semaphore.
"""
def __init__(self, concurrency, rate_delay):
self._lock = threading.Lock()
self._concurrency = concurrency
if concurrency:
self._count = 0
self._queue = queue.Queue()
self._rate_delay = rate_delay
self._rate_last_ts = time.time()
def __enter__(self):
"""Aquire a semaphore."""
# If concurrency is None, everyone is free to immediately execute.
if not self._concurrency:
# NOTE: Rate limiting still applies.This will ultimately impact
# concurrency a bit due to the mutex.
with self._lock:
execution_time = self._advance_timer()
else:
execution_time = self._get_ticket()
return self._wait_for_execution(execution_time)
def _wait_for_execution(self, execution_time):
"""Wait until the pre-calculated time to run."""
wait_time = execution_time - time.time()
if wait_time > 0:
time.sleep(wait_time)
def _get_ticket(self):
ticket = threading.Event()
with self._lock:
if self._count <= self._concurrency:
# We can execute, no need to wait. Take a ticket and
# move on.
self._count += 1
return self._advance_timer()
else:
# We need to wait for a ticket before we can execute.
# Put ourselves in the ticket queue to be woken up
# when available.
self._queue.put(ticket)
ticket.wait()
with self._lock:
return self._advance_timer()
def _advance_timer(self):
"""Calculate the time when it's ok to run a command again.
This runs inside of the mutex, serializing the calculation
of when it's ok to run again and setting _rate_last_ts to that
new time so that the next thread to calculate when it's safe to
run starts from the time that the current thread calculated.
"""
self._rate_last_ts = self._rate_last_ts + self._rate_delay
return self._rate_last_ts
def __exit__(self, exc_type, exc_value, traceback):
"""Release the semaphore."""
# If concurrency is None, everyone is free to immediately execute
if not self._concurrency:
return
with self._lock:
# If waiters, wake up the next item in the queue (note
# we're under the queue lock so the queue won't change
# under us).
if self._queue.qsize() > 0:
ticket = self._queue.get()
ticket.set()
else:
# Nothing else to do, give our ticket back
self._count -= 1

View File

@ -13,6 +13,7 @@
import os
import warnings
from keystoneauth1 import _fair_semaphore
from keystoneauth1 import session
@ -92,6 +93,16 @@ class Adapter(object):
If True, requests returning failing HTTP responses will raise an
exception; if False, the response is returned. This can be
overridden on a per-request basis via the kwarg of the same name.
:param float rate_limit:
A client-side rate limit to impose on requests made through this
adapter in requests per second. For instance, a rate_limit of 2
means to allow no more than 2 requests per second, and a rate_limit
of 0.5 means to allow no more than 1 request every two seconds.
(optional, defaults to None, which means no rate limiting will be
applied).
:param int concurrency:
How many simultaneous http requests this Adapter can be used for.
(optional, defaults to None, which means no limit).
"""
client_name = None
@ -106,7 +117,9 @@ class Adapter(object):
global_request_id=None,
min_version=None, max_version=None,
default_microversion=None, status_code_retries=None,
retriable_status_codes=None, raise_exc=None):
retriable_status_codes=None, raise_exc=None,
rate_limit=None, concurrency=None,
):
if version and (min_version or max_version):
raise TypeError(
"version is mutually exclusive with min_version and"
@ -144,6 +157,15 @@ class Adapter(object):
if client_version:
self.client_version = client_version
rate_delay = 0.0
if rate_limit:
# 1 / rate converts from requests per second to delay
# between requests needed to achieve that rate.
rate_delay = 1.0 / rate_limit
self._rate_semaphore = _fair_semaphore.FairSemaphore(
concurrency, rate_delay)
def _set_endpoint_filter_kwargs(self, kwargs):
if self.service_type:
kwargs.setdefault('service_type', self.service_type)
@ -210,6 +232,8 @@ class Adapter(object):
if self.raise_exc is not None:
kwargs.setdefault('raise_exc', self.raise_exc)
kwargs.setdefault('rate_semaphore', self._rate_semaphore)
return self.session.request(url, method, **kwargs)
def get_token(self, auth=None):

View File

@ -99,6 +99,18 @@ def _sanitize_headers(headers):
return str_dict
class NoOpSemaphore(object):
"""Empty context manager for use as a default semaphore."""
def __enter__(self):
"""Enter the context manager and do nothing."""
pass
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the context manager and do nothing."""
pass
class _JSONEncoder(json.JSONEncoder):
def default(self, o):
@ -285,6 +297,9 @@ class Session(object):
:param bool collect_timing: Whether or not to collect per-method timing
information for each API call. (optional,
defaults to False)
:param rate_semaphore: Semaphore to be used to control concurrency
and rate limiting of requests. (optional,
defaults to no concurrency or rate control)
"""
user_agent = None
@ -298,7 +313,7 @@ class Session(object):
redirect=_DEFAULT_REDIRECT_LIMIT, additional_headers=None,
app_name=None, app_version=None, additional_user_agent=None,
discovery_cache=None, split_loggers=None,
collect_timing=False):
collect_timing=False, rate_semaphore=None):
self.auth = auth
self.session = _construct_session(session)
@ -320,6 +335,7 @@ class Session(object):
self._split_loggers = split_loggers
self._collect_timing = collect_timing
self._api_times = []
self._rate_semaphore = rate_semaphore or NoOpSemaphore()
if timeout is not None:
self.timeout = float(timeout)
@ -561,7 +577,7 @@ class Session(object):
allow=None, client_name=None, client_version=None,
microversion=None, microversion_service_type=None,
status_code_retries=0, retriable_status_codes=None,
**kwargs):
rate_semaphore=None, **kwargs):
"""Send an HTTP request with the specified characteristics.
Wrapper around `requests.Session.request` to handle tasks such as
@ -647,6 +663,9 @@ class Session(object):
should be retried (optional,
defaults to HTTP 503, has no effect
when status_code_retries is 0).
:param rate_semaphore: Semaphore to be used to control concurrency
and rate limiting of requests. (optional,
defaults to no concurrency or rate control)
:param kwargs: any other parameter that can be passed to
:meth:`requests.Session.request` (such as `headers`).
Except:
@ -670,6 +689,7 @@ class Session(object):
logger = logger or utils.get_logger(__name__)
# HTTP 503 - Service Unavailable
retriable_status_codes = retriable_status_codes or [503]
rate_semaphore = rate_semaphore or self._rate_semaphore
headers = kwargs.setdefault('headers', dict())
if microversion:
@ -797,7 +817,8 @@ class Session(object):
send = functools.partial(self._send_request,
url, method, redirect, log, logger,
split_loggers, connect_retries,
status_code_retries, retriable_status_codes)
status_code_retries, retriable_status_codes,
rate_semaphore)
try:
connection_params = self.get_auth_connection_params(auth=auth)
@ -885,8 +906,9 @@ class Session(object):
def _send_request(self, url, method, redirect, log, logger, split_loggers,
connect_retries, status_code_retries,
retriable_status_codes, connect_retry_delay=0.5,
status_code_retry_delay=0.5, **kwargs):
retriable_status_codes, rate_semaphore,
connect_retry_delay=0.5, status_code_retry_delay=0.5,
**kwargs):
# NOTE(jamielennox): We handle redirection manually because the
# requests lib follows some browser patterns where it will redirect
# POSTs as GETs for certain statuses which is not want we want for an
@ -900,7 +922,8 @@ class Session(object):
try:
try:
resp = self.session.request(method, url, **kwargs)
with rate_semaphore:
resp = self.session.request(method, url, **kwargs)
except requests.exceptions.SSLError as e:
msg = 'SSL exception connecting to %(url)s: %(error)s' % {
'url': url, 'error': e}
@ -934,6 +957,7 @@ class Session(object):
url, method, redirect, log, logger, split_loggers,
status_code_retries=status_code_retries,
retriable_status_codes=retriable_status_codes,
rate_semaphore=rate_semaphore,
connect_retries=connect_retries - 1,
connect_retry_delay=connect_retry_delay * 2,
**kwargs)
@ -964,6 +988,7 @@ class Session(object):
# This request actually worked so we can reset the delay count.
new_resp = self._send_request(
location, method, redirect, log, logger, split_loggers,
rate_semaphore=rate_semaphore,
connect_retries=connect_retries,
status_code_retries=status_code_retries,
retriable_status_codes=retriable_status_codes,
@ -989,6 +1014,7 @@ class Session(object):
connect_retries=connect_retries,
status_code_retries=status_code_retries - 1,
retriable_status_codes=retriable_status_codes,
rate_semaphore=rate_semaphore,
status_code_retry_delay=status_code_retry_delay * 2,
**kwargs)

View File

@ -0,0 +1,86 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from threading import Thread
from timeit import default_timer as timer
import mock
from six.moves import queue
import testtools
from keystoneauth1 import _fair_semaphore
class SemaphoreTests(testtools.TestCase):
def _thread_worker(self):
while True:
# get returns the Item, but we don't care about the value so we
# purposely don't assign it to anything.
self.q.get()
with self.s:
self.mock_payload.do_something()
self.q.task_done()
# Have 5 threads do 10 different "things" coordinated by the fair
# semaphore.
def _concurrency_core(self, concurrency, delay):
self.s = _fair_semaphore.FairSemaphore(concurrency, delay)
self.q = queue.Queue()
for i in range(5):
t = Thread(target=self._thread_worker)
t.daemon = True
t.start()
for item in range(0, 10):
self.q.put(item)
self.q.join()
def setUp(self):
super(SemaphoreTests, self).setUp()
self.mock_payload = mock.Mock()
# We should be waiting at least 0.1s between operations, so
# the 10 operations must take at *least* 1 second
def test_semaphore_no_concurrency(self):
start = timer()
self._concurrency_core(None, 0.1)
end = timer()
self.assertTrue((end - start) > 1.0)
self.assertEqual(self.mock_payload.do_something.call_count, 10)
def test_semaphore_single_concurrency(self):
start = timer()
self._concurrency_core(1, 0.1)
end = timer()
self.assertTrue((end - start) > 1.0)
self.assertEqual(self.mock_payload.do_something.call_count, 10)
def test_semaphore_multiple_concurrency(self):
start = timer()
self._concurrency_core(5, 0.1)
end = timer()
self.assertTrue((end - start) > 1.0)
self.assertEqual(self.mock_payload.do_something.call_count, 10)
# do some high speed tests; I don't think we can really assert
# much about these other than they don't deadlock...
def test_semaphore_fast_no_concurrency(self):
self._concurrency_core(None, 0.0)
def test_semaphore_fast_single_concurrency(self):
self._concurrency_core(1, 0.0)
def test_semaphore_fast_multiple_concurrency(self):
self._concurrency_core(5, 0.0)

View File

@ -1565,6 +1565,7 @@ class AdapterTest(utils.TestCase):
with mock.patch.object(sess, 'request') as m:
adapter.Adapter(sess, **adap_kwargs).get(url, **get_kwargs)
m.assert_called_once_with(url, 'GET', endpoint_filter={},
rate_semaphore=mock.ANY,
**exp_kwargs)
# No default_microversion in Adapter, no microversion in get()
@ -1588,6 +1589,7 @@ class AdapterTest(utils.TestCase):
with mock.patch.object(sess, 'request') as m:
adapter.Adapter(sess, **adap_kwargs).get(url, **get_kwargs)
m.assert_called_once_with(url, 'GET', endpoint_filter={},
rate_semaphore=mock.ANY,
**exp_kwargs)
# No raise_exc in Adapter or get()

View File

@ -0,0 +1,10 @@
---
features:
- |
Support added for client-side rate limiting. Two new parameters now
exist for ``keystoneauth1.adapter.Adapter``. ``rate`` expresses a
maximum rate at which to execute requests. ``parallel_limit`` allows
for the creation of a semaphore to control the maximum number of
requests that can be active at any one given point in time.
Both default to ``None`` which has the normal behavior or not limiting
requests in any manner.