831 lines
27 KiB
Python
831 lines
27 KiB
Python
# Copyright 2011 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
Tests dealing with HTTP rate-limiting.
|
|
"""
|
|
|
|
import httplib
|
|
from trove.quota.models import Quota
|
|
import testtools
|
|
import webob
|
|
|
|
from mock import Mock, MagicMock
|
|
import six
|
|
|
|
from trove.common import limits
|
|
from trove.common.limits import Limit
|
|
from trove.limits import views
|
|
from trove.limits.service import LimitsController
|
|
from oslo.serialization import jsonutils
|
|
from trove.quota.quota import QUOTAS
|
|
|
|
TEST_LIMITS = [
|
|
Limit("GET", "/delayed", "^/delayed", 1, limits.PER_MINUTE),
|
|
Limit("POST", "*", ".*", 7, limits.PER_MINUTE),
|
|
Limit("POST", "/mgmt", "^/mgmt", 3, limits.PER_MINUTE),
|
|
Limit("PUT", "*", "", 10, limits.PER_MINUTE),
|
|
]
|
|
|
|
|
|
class BaseLimitTestSuite(testtools.TestCase):
|
|
"""Base test suite which provides relevant stubs and time abstraction."""
|
|
|
|
def setUp(self):
|
|
super(BaseLimitTestSuite, self).setUp()
|
|
self.absolute_limits = {"max_instances": 55,
|
|
"max_volumes": 100,
|
|
"max_backups": 40}
|
|
|
|
|
|
class LimitsControllerTest(BaseLimitTestSuite):
|
|
def setUp(self):
|
|
super(LimitsControllerTest, self).setUp()
|
|
|
|
def test_limit_index_empty(self):
|
|
limit_controller = LimitsController()
|
|
|
|
req = MagicMock()
|
|
req.environ = {}
|
|
|
|
QUOTAS.get_all_quotas_by_tenant = MagicMock(return_value={})
|
|
|
|
view = limit_controller.index(req, "test_tenant_id")
|
|
expected = {'limits': [{'verb': 'ABSOLUTE'}]}
|
|
self.assertEqual(expected, view._data)
|
|
|
|
def test_limit_index(self):
|
|
tenant_id = "test_tenant_id"
|
|
limit_controller = LimitsController()
|
|
|
|
limits = [
|
|
{
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "POST",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
},
|
|
{
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "PUT",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
},
|
|
{
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "DELETE",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
},
|
|
{
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "GET",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
}
|
|
]
|
|
|
|
abs_limits = {"instances": Quota(tenant_id=tenant_id,
|
|
resource="instances",
|
|
hard_limit=100),
|
|
|
|
"backups": Quota(tenant_id=tenant_id,
|
|
resource="backups",
|
|
hard_limit=40),
|
|
|
|
"volumes": Quota(tenant_id=tenant_id,
|
|
resource="volumes",
|
|
hard_limit=55)}
|
|
|
|
req = MagicMock()
|
|
req.environ = {"trove.limits": limits}
|
|
|
|
QUOTAS.get_all_quotas_by_tenant = MagicMock(return_value=abs_limits)
|
|
|
|
view = limit_controller.index(req, tenant_id)
|
|
|
|
expected = {
|
|
'limits': [
|
|
{
|
|
'max_instances': 100,
|
|
'max_backups': 40,
|
|
'verb': 'ABSOLUTE',
|
|
'max_volumes': 55
|
|
},
|
|
{
|
|
'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'POST',
|
|
'remaining': 2,
|
|
'unit': 'MINUTE'
|
|
},
|
|
{
|
|
'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'PUT',
|
|
'remaining': 2,
|
|
'unit': 'MINUTE'
|
|
},
|
|
{
|
|
'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'DELETE',
|
|
'remaining': 2,
|
|
'unit': 'MINUTE'
|
|
},
|
|
{
|
|
'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'GET',
|
|
'remaining': 2,
|
|
'unit': 'MINUTE'
|
|
}
|
|
]
|
|
}
|
|
|
|
self.assertEqual(expected, view._data)
|
|
|
|
|
|
class TestLimiter(limits.Limiter):
|
|
"""Note: This was taken from Nova."""
|
|
pass
|
|
|
|
|
|
class LimitMiddlewareTest(BaseLimitTestSuite):
|
|
"""
|
|
Tests for the `limits.RateLimitingMiddleware` class.
|
|
"""
|
|
|
|
@webob.dec.wsgify
|
|
def _empty_app(self, request):
|
|
"""Do-nothing WSGI app."""
|
|
pass
|
|
|
|
def setUp(self):
|
|
"""Prepare middleware for use through fake WSGI app."""
|
|
super(LimitMiddlewareTest, self).setUp()
|
|
_limits = '(GET, *, .*, 1, MINUTE)'
|
|
self.app = limits.RateLimitingMiddleware(self._empty_app, _limits,
|
|
"%s.TestLimiter" %
|
|
self.__class__.__module__)
|
|
|
|
def test_limit_class(self):
|
|
# Test that middleware selected correct limiter class.
|
|
assert isinstance(self.app._limiter, TestLimiter)
|
|
|
|
def test_good_request(self):
|
|
# Test successful GET request through middleware.
|
|
request = webob.Request.blank("/")
|
|
response = request.get_response(self.app)
|
|
self.assertEqual(200, response.status_int)
|
|
|
|
def test_limited_request_json(self):
|
|
# Test a rate-limited (413) GET request through middleware.
|
|
request = webob.Request.blank("/")
|
|
response = request.get_response(self.app)
|
|
self.assertEqual(200, response.status_int)
|
|
|
|
request = webob.Request.blank("/")
|
|
response = request.get_response(self.app)
|
|
self.assertEqual(413, response.status_int)
|
|
|
|
self.assertTrue('Retry-After' in response.headers)
|
|
retry_after = int(response.headers['Retry-After'])
|
|
self.assertAlmostEqual(retry_after, 60, 1)
|
|
|
|
body = jsonutils.loads(response.body)
|
|
expected = "Only 1 GET request(s) can be made to * every minute."
|
|
value = body["overLimit"]["details"].strip()
|
|
self.assertEqual(expected, value)
|
|
|
|
self.assertTrue("retryAfter" in body["overLimit"])
|
|
retryAfter = body["overLimit"]["retryAfter"]
|
|
self.assertEqual("60", retryAfter)
|
|
|
|
|
|
class LimitTest(BaseLimitTestSuite):
|
|
"""
|
|
Tests for the `limits.Limit` class.
|
|
"""
|
|
|
|
def test_GET_no_delay(self):
|
|
# Test a limit handles 1 GET per second.
|
|
limit = Limit("GET", "*", ".*", 1, 1)
|
|
|
|
limit._get_time = MagicMock(return_value=0.0)
|
|
delay = limit("GET", "/anything")
|
|
self.assertEqual(None, delay)
|
|
self.assertEqual(0, limit.next_request)
|
|
self.assertEqual(0, limit.last_request)
|
|
|
|
def test_GET_delay(self):
|
|
# Test two calls to 1 GET per second limit.
|
|
limit = Limit("GET", "*", ".*", 1, 1)
|
|
limit._get_time = MagicMock(return_value=0.0)
|
|
|
|
delay = limit("GET", "/anything")
|
|
self.assertEqual(None, delay)
|
|
|
|
delay = limit("GET", "/anything")
|
|
self.assertEqual(1, delay)
|
|
self.assertEqual(1, limit.next_request)
|
|
self.assertEqual(0, limit.last_request)
|
|
|
|
limit._get_time = MagicMock(return_value=4.0)
|
|
|
|
delay = limit("GET", "/anything")
|
|
self.assertEqual(None, delay)
|
|
self.assertEqual(4, limit.next_request)
|
|
self.assertEqual(4, limit.last_request)
|
|
|
|
|
|
class ParseLimitsTest(BaseLimitTestSuite):
|
|
"""
|
|
Tests for the default limits parser in the in-memory
|
|
`limits.Limiter` class.
|
|
"""
|
|
|
|
def test_invalid(self):
|
|
# Test that parse_limits() handles invalid input correctly.
|
|
self.assertRaises(ValueError, limits.Limiter.parse_limits,
|
|
';;;;;')
|
|
|
|
def test_bad_rule(self):
|
|
# Test that parse_limits() handles bad rules correctly.
|
|
self.assertRaises(ValueError, limits.Limiter.parse_limits,
|
|
'GET, *, .*, 20, minute')
|
|
|
|
def test_missing_arg(self):
|
|
# Test that parse_limits() handles missing args correctly.
|
|
self.assertRaises(ValueError, limits.Limiter.parse_limits,
|
|
'(GET, *, .*, 20)')
|
|
|
|
def test_bad_value(self):
|
|
# Test that parse_limits() handles bad values correctly.
|
|
self.assertRaises(ValueError, limits.Limiter.parse_limits,
|
|
'(GET, *, .*, foo, minute)')
|
|
|
|
def test_bad_unit(self):
|
|
# Test that parse_limits() handles bad units correctly.
|
|
self.assertRaises(ValueError, limits.Limiter.parse_limits,
|
|
'(GET, *, .*, 20, lightyears)')
|
|
|
|
def test_multiple_rules(self):
|
|
# Test that parse_limits() handles multiple rules correctly.
|
|
try:
|
|
l = limits.Limiter.parse_limits('(get, *, .*, 20, minute);'
|
|
'(PUT, /foo*, /foo.*, 10, hour);'
|
|
'(POST, /bar*, /bar.*, 5, second);'
|
|
'(Say, /derp*, /derp.*, 1, day)')
|
|
except ValueError as e:
|
|
assert False, str(e)
|
|
|
|
# Make sure the number of returned limits are correct
|
|
self.assertEqual(4, len(l))
|
|
|
|
# Check all the verbs...
|
|
expected = ['GET', 'PUT', 'POST', 'SAY']
|
|
self.assertEqual(expected, [t.verb for t in l])
|
|
|
|
# ...the URIs...
|
|
expected = ['*', '/foo*', '/bar*', '/derp*']
|
|
self.assertEqual(expected, [t.uri for t in l])
|
|
|
|
# ...the regexes...
|
|
expected = ['.*', '/foo.*', '/bar.*', '/derp.*']
|
|
self.assertEqual(expected, [t.regex for t in l])
|
|
|
|
# ...the values...
|
|
expected = [20, 10, 5, 1]
|
|
self.assertEqual(expected, [t.value for t in l])
|
|
|
|
# ...and the units...
|
|
expected = [limits.PER_MINUTE, limits.PER_HOUR,
|
|
limits.PER_SECOND, limits.PER_DAY]
|
|
self.assertEqual(expected, [t.unit for t in l])
|
|
|
|
|
|
class LimiterTest(BaseLimitTestSuite):
|
|
"""
|
|
Tests for the in-memory `limits.Limiter` class.
|
|
"""
|
|
|
|
def update_limits(self, delay, limit_list):
|
|
for ln in limit_list:
|
|
ln._get_time = Mock(return_value=delay)
|
|
|
|
def setUp(self):
|
|
"""Run before each test."""
|
|
super(LimiterTest, self).setUp()
|
|
userlimits = {'user:user3': ''}
|
|
|
|
self.update_limits(0.0, TEST_LIMITS)
|
|
self.limiter = limits.Limiter(TEST_LIMITS, **userlimits)
|
|
|
|
def _check(self, num, verb, url, username=None):
|
|
"""Check and yield results from checks."""
|
|
for x in xrange(num):
|
|
yield self.limiter.check_for_delay(verb, url, username)[0]
|
|
|
|
def _check_sum(self, num, verb, url, username=None):
|
|
"""Check and sum results from checks."""
|
|
results = self._check(num, verb, url, username)
|
|
return sum(item for item in results if item)
|
|
|
|
def test_no_delay_GET(self):
|
|
"""
|
|
Simple test to ensure no delay on a single call for a limit verb we
|
|
didn"t set.
|
|
"""
|
|
delay = self.limiter.check_for_delay("GET", "/anything")
|
|
self.assertEqual((None, None), delay)
|
|
|
|
def test_no_delay_PUT(self):
|
|
# Simple test to ensure no delay on a single call for a known limit.
|
|
delay = self.limiter.check_for_delay("PUT", "/anything")
|
|
self.assertEqual((None, None), delay)
|
|
|
|
def test_delay_PUT(self):
|
|
"""
|
|
Ensure the 11th PUT will result in a delay of 6.0 seconds until
|
|
the next request will be granted.
|
|
"""
|
|
expected = [None] * 10 + [6.0]
|
|
results = list(self._check(11, "PUT", "/anything"))
|
|
|
|
self.assertEqual(expected, results)
|
|
|
|
def test_delay_POST(self):
|
|
"""
|
|
Ensure the 8th POST will result in a delay of 6.0 seconds until
|
|
the next request will be granted.
|
|
"""
|
|
expected = [None] * 7
|
|
results = list(self._check(7, "POST", "/anything"))
|
|
self.assertEqual(expected, results)
|
|
|
|
expected = 60.0 / 7.0
|
|
results = self._check_sum(1, "POST", "/anything")
|
|
self.assertAlmostEqual(expected, results, 8)
|
|
|
|
def test_delay_POST_mgmt(self):
|
|
"""
|
|
Ensure the 4th mgmt POST will result in a delay of 6.0 seconds until
|
|
the next request will be granted.
|
|
"""
|
|
expected = [None] * 3
|
|
results = list(self._check(3, "POST", "/mgmt"))
|
|
self.assertEqual(expected, results)
|
|
|
|
expected = 60.0 / 3.0
|
|
results = self._check_sum(1, "POST", "/mgmt")
|
|
self.assertAlmostEqual(expected, results, 4)
|
|
|
|
def test_delay_GET(self):
|
|
# Ensure the 11th GET will result in NO delay.
|
|
expected = [None] * 11
|
|
results = list(self._check(11, "GET", "/mgmt"))
|
|
|
|
self.assertEqual(expected, results)
|
|
|
|
def test_delay_PUT_wait(self):
|
|
"""
|
|
Ensure after hitting the limit and then waiting for the correct
|
|
amount of time, the limit will be lifted.
|
|
"""
|
|
expected = [None] * 10 + [6.0]
|
|
results = list(self._check(11, "PUT", "/anything"))
|
|
self.assertEqual(expected, results)
|
|
|
|
# Advance time
|
|
self.update_limits(6.0, self.limiter.levels[None])
|
|
|
|
expected = [None, 6.0]
|
|
results = list(self._check(2, "PUT", "/anything"))
|
|
self.assertEqual(expected, results)
|
|
|
|
def test_multiple_delays(self):
|
|
# Ensure multiple requests still get a delay.
|
|
expected = [None] * 10 + [6.0] * 10
|
|
results = list(self._check(20, "PUT", "/anything"))
|
|
self.assertEqual(expected, results)
|
|
|
|
self.update_limits(1.0, self.limiter.levels[None])
|
|
|
|
expected = [5.0] * 10
|
|
results = list(self._check(10, "PUT", "/anything"))
|
|
self.assertEqual(expected, results)
|
|
|
|
def test_user_limit(self):
|
|
# Test user-specific limits.
|
|
self.assertEqual([], self.limiter.levels['user3'])
|
|
|
|
def test_multiple_users(self):
|
|
# Tests involving multiple users.
|
|
# User1
|
|
self.update_limits(0.0, self.limiter.levels["user1"])
|
|
expected = [None] * 10 + [6.0] * 10
|
|
results = list(self._check(20, "PUT", "/anything", "user1"))
|
|
self.assertEqual(expected, results)
|
|
|
|
# User2
|
|
expected = [None] * 10 + [6.0] * 5
|
|
results = list(self._check(15, "PUT", "/anything", "user2"))
|
|
self.assertEqual(expected, results)
|
|
|
|
# User3
|
|
expected = [None] * 20
|
|
results = list(self._check(20, "PUT", "/anything", "user3"))
|
|
self.assertEqual(expected, results)
|
|
|
|
# User1 again
|
|
self.update_limits(1.0, self.limiter.levels["user1"])
|
|
expected = [5.0] * 10
|
|
results = list(self._check(10, "PUT", "/anything", "user1"))
|
|
self.assertEqual(expected, results)
|
|
|
|
# User2 again
|
|
self.update_limits(2.0, self.limiter.levels["user2"])
|
|
expected = [4.0] * 5
|
|
results = list(self._check(5, "PUT", "/anything", "user2"))
|
|
self.assertEqual(expected, results)
|
|
|
|
|
|
class WsgiLimiterTest(BaseLimitTestSuite):
|
|
"""
|
|
Tests for `limits.WsgiLimiter` class.
|
|
"""
|
|
|
|
def setUp(self):
|
|
"""Run before each test."""
|
|
super(WsgiLimiterTest, self).setUp()
|
|
self.app = limits.WsgiLimiter(TEST_LIMITS)
|
|
|
|
def _request_data(self, verb, path):
|
|
"""Get data describing a limit request verb/path."""
|
|
return jsonutils.dumps({"verb": verb, "path": path})
|
|
|
|
def _request(self, verb, url, username=None):
|
|
"""Make sure that POSTing to the given url causes the given username
|
|
to perform the given action. Make the internal rate limiter return
|
|
delay and make sure that the WSGI app returns the correct response.
|
|
"""
|
|
if username:
|
|
request = webob.Request.blank("/%s" % username)
|
|
else:
|
|
request = webob.Request.blank("/")
|
|
|
|
request.method = "POST"
|
|
request.body = self._request_data(verb, url)
|
|
response = request.get_response(self.app)
|
|
|
|
if "X-Wait-Seconds" in response.headers:
|
|
self.assertEqual(403, response.status_int)
|
|
return response.headers["X-Wait-Seconds"]
|
|
|
|
self.assertEqual(204, response.status_int)
|
|
|
|
def test_invalid_methods(self):
|
|
# Only POSTs should work.
|
|
for method in ["GET", "PUT", "DELETE", "HEAD", "OPTIONS"]:
|
|
request = webob.Request.blank("/", method=method)
|
|
response = request.get_response(self.app)
|
|
self.assertEqual(405, response.status_int)
|
|
|
|
def test_good_url(self):
|
|
delay = self._request("GET", "/something")
|
|
self.assertEqual(None, delay)
|
|
|
|
def test_escaping(self):
|
|
delay = self._request("GET", "/something/jump%20up")
|
|
self.assertEqual(None, delay)
|
|
|
|
def test_response_to_delays(self):
|
|
delay = self._request("GET", "/delayed")
|
|
self.assertEqual(None, delay)
|
|
|
|
delay = self._request("GET", "/delayed")
|
|
self.assertAlmostEqual(float(delay), 60, 1)
|
|
|
|
def test_response_to_delays_usernames(self):
|
|
delay = self._request("GET", "/delayed", "user1")
|
|
self.assertEqual(None, delay)
|
|
|
|
delay = self._request("GET", "/delayed", "user2")
|
|
self.assertEqual(None, delay)
|
|
|
|
delay = self._request("GET", "/delayed", "user1")
|
|
self.assertAlmostEqual(float(delay), 60, 1)
|
|
|
|
delay = self._request("GET", "/delayed", "user2")
|
|
self.assertAlmostEqual(float(delay), 60, 1)
|
|
|
|
|
|
class FakeHttplibSocket(object):
|
|
"""
|
|
Fake `httplib.HTTPResponse` replacement.
|
|
"""
|
|
|
|
def __init__(self, response_string):
|
|
"""Initialize new `FakeHttplibSocket`."""
|
|
self._buffer = six.StringIO(response_string)
|
|
|
|
def makefile(self, _mode, _other):
|
|
"""Returns the socket's internal buffer."""
|
|
return self._buffer
|
|
|
|
|
|
class FakeHttplibConnection(object):
|
|
"""
|
|
Fake `httplib.HTTPConnection`.
|
|
"""
|
|
|
|
def __init__(self, app, host):
|
|
"""
|
|
Initialize `FakeHttplibConnection`.
|
|
"""
|
|
self.app = app
|
|
self.host = host
|
|
|
|
def request(self, method, path, body="", headers=None):
|
|
"""
|
|
Requests made via this connection actually get translated and routed
|
|
into our WSGI app, we then wait for the response and turn it back into
|
|
an `httplib.HTTPResponse`.
|
|
"""
|
|
if not headers:
|
|
headers = {}
|
|
|
|
req = webob.Request.blank(path)
|
|
req.method = method
|
|
req.headers = headers
|
|
req.host = self.host
|
|
req.body = body
|
|
|
|
resp = str(req.get_response(self.app))
|
|
resp = "HTTP/1.0 %s" % resp
|
|
sock = FakeHttplibSocket(resp)
|
|
self.http_response = httplib.HTTPResponse(sock)
|
|
self.http_response.begin()
|
|
|
|
def getresponse(self):
|
|
"""Return our generated response from the request."""
|
|
return self.http_response
|
|
|
|
|
|
def wire_HTTPConnection_to_WSGI(host, app):
|
|
"""Monkeypatches HTTPConnection so that if you try to connect to host, you
|
|
are instead routed straight to the given WSGI app.
|
|
|
|
After calling this method, when any code calls
|
|
|
|
httplib.HTTPConnection(host)
|
|
|
|
the connection object will be a fake. Its requests will be sent directly
|
|
to the given WSGI app rather than through a socket.
|
|
|
|
Code connecting to hosts other than host will not be affected.
|
|
|
|
This method may be called multiple times to map different hosts to
|
|
different apps.
|
|
|
|
This method returns the original HTTPConnection object, so that the caller
|
|
can restore the default HTTPConnection interface (for all hosts).
|
|
"""
|
|
|
|
class HTTPConnectionDecorator(object):
|
|
"""Wraps the real HTTPConnection class so that when you instantiate
|
|
the class you might instead get a fake instance.
|
|
"""
|
|
|
|
def __init__(self, wrapped):
|
|
self.wrapped = wrapped
|
|
|
|
def __call__(self, connection_host, *args, **kwargs):
|
|
if connection_host == host:
|
|
return FakeHttplibConnection(app, host)
|
|
else:
|
|
return self.wrapped(connection_host, *args, **kwargs)
|
|
|
|
oldHTTPConnection = httplib.HTTPConnection
|
|
httplib.HTTPConnection = HTTPConnectionDecorator(httplib.HTTPConnection)
|
|
return oldHTTPConnection
|
|
|
|
|
|
class WsgiLimiterProxyTest(BaseLimitTestSuite):
|
|
"""
|
|
Tests for the `limits.WsgiLimiterProxy` class.
|
|
"""
|
|
|
|
def setUp(self):
|
|
"""
|
|
Do some nifty HTTP/WSGI magic which allows for WSGI to be called
|
|
directly by something like the `httplib` library.
|
|
"""
|
|
super(WsgiLimiterProxyTest, self).setUp()
|
|
self.app = limits.WsgiLimiter(TEST_LIMITS)
|
|
self.oldHTTPConnection = (
|
|
wire_HTTPConnection_to_WSGI("169.254.0.1:80", self.app))
|
|
self.proxy = limits.WsgiLimiterProxy("169.254.0.1:80")
|
|
|
|
def test_200(self):
|
|
# Successful request test.
|
|
delay = self.proxy.check_for_delay("GET", "/anything")
|
|
self.assertEqual((None, None), delay)
|
|
|
|
def test_403(self):
|
|
# Forbidden request test.
|
|
delay = self.proxy.check_for_delay("GET", "/delayed")
|
|
self.assertEqual((None, None), delay)
|
|
|
|
delay, error = self.proxy.check_for_delay("GET", "/delayed")
|
|
error = error.strip()
|
|
|
|
self.assertAlmostEqual(float(delay), 60, 1)
|
|
self.assertEqual("403 Forbidden\n\nOnly 1 GET request(s) can be"
|
|
" made to /delayed every minute.", error)
|
|
|
|
def tearDown(self):
|
|
# restore original HTTPConnection object
|
|
httplib.HTTPConnection = self.oldHTTPConnection
|
|
super(WsgiLimiterProxyTest, self).tearDown()
|
|
|
|
|
|
class LimitsViewTest(testtools.TestCase):
|
|
def setUp(self):
|
|
super(LimitsViewTest, self).setUp()
|
|
|
|
def test_empty_data(self):
|
|
"""
|
|
Test the default returned results if an empty dictionary is given
|
|
"""
|
|
rate_limit = {}
|
|
view = views.LimitView(rate_limit)
|
|
self.assertIsNotNone(view)
|
|
|
|
data = view.data()
|
|
expected = {'limit': {'regex': '',
|
|
'nextAvailable': '1970-01-01T00:00:00Z',
|
|
'uri': '',
|
|
'value': '',
|
|
'verb': '',
|
|
'remaining': 0,
|
|
'unit': ''}}
|
|
|
|
self.assertEqual(expected, data)
|
|
|
|
def test_data(self):
|
|
"""
|
|
Test the returned results for a fully populated dictionary
|
|
"""
|
|
rate_limit = {
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "POST",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
}
|
|
|
|
view = views.LimitView(rate_limit)
|
|
self.assertIsNotNone(view)
|
|
|
|
data = view.data()
|
|
expected = {'limit': {'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'POST',
|
|
'remaining': 2,
|
|
'unit': 'MINUTE'}}
|
|
|
|
self.assertEqual(expected, data)
|
|
|
|
|
|
class LimitsViewsTest(testtools.TestCase):
|
|
def setUp(self):
|
|
super(LimitsViewsTest, self).setUp()
|
|
|
|
def test_empty_data(self):
|
|
rate_limits = []
|
|
abs_view = dict()
|
|
|
|
view_data = views.LimitViews(abs_view, rate_limits)
|
|
self.assertIsNotNone(view_data)
|
|
|
|
data = view_data.data()
|
|
expected = {'limits': [{'verb': 'ABSOLUTE'}]}
|
|
|
|
self.assertEqual(expected, data)
|
|
|
|
def test_data(self):
|
|
rate_limits = [
|
|
{
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "POST",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
},
|
|
{
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "PUT",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
},
|
|
{
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "DELETE",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
},
|
|
{
|
|
"URI": "*",
|
|
"regex": ".*",
|
|
"value": 10,
|
|
"verb": "GET",
|
|
"remaining": 2,
|
|
"unit": "MINUTE",
|
|
"resetTime": 1311272226
|
|
}
|
|
]
|
|
abs_view = {"instances": 55, "volumes": 100, "backups": 40}
|
|
|
|
view_data = views.LimitViews(abs_view, rate_limits)
|
|
self.assertIsNotNone(view_data)
|
|
|
|
data = view_data.data()
|
|
expected = {'limits': [{'max_instances': 55,
|
|
'max_backups': 40,
|
|
'verb': 'ABSOLUTE',
|
|
'max_volumes': 100},
|
|
{'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'POST',
|
|
'remaining': 2, 'unit': 'MINUTE'},
|
|
{'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'PUT',
|
|
'remaining': 2,
|
|
'unit': 'MINUTE'},
|
|
{'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'DELETE',
|
|
'remaining': 2,
|
|
'unit': 'MINUTE'},
|
|
{'regex': '.*',
|
|
'nextAvailable': '2011-07-21T18:17:06Z',
|
|
'uri': '*',
|
|
'value': 10,
|
|
'verb': 'GET',
|
|
'remaining': 2, 'unit': 'MINUTE'}]}
|
|
|
|
self.assertEqual(expected, data)
|