156 lines
4.8 KiB
Python
156 lines
4.8 KiB
Python
import os
|
|
import sys
|
|
import time
|
|
from wsgiref.simple_server import make_server
|
|
|
|
try:
|
|
import multiprocessing
|
|
except ImportError:
|
|
pass # Jython
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
import falcon
|
|
from falcon.request_helpers import BoundedStream
|
|
import falcon.testing as testing
|
|
|
|
_SERVER_HOST = 'localhost'
|
|
_SERVER_PORT = 9800 + os.getpid() % 100 # Facilitates parallel test execution
|
|
_SERVER_BASE_URL = 'http://{0}:{1}/'.format(_SERVER_HOST, _SERVER_PORT)
|
|
_SIZE_1_KB = 1024
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
# NOTE(kgriffs): Jython does not support the multiprocessing
|
|
# module. We could alternatively implement these tests
|
|
# using threads, but then we have to force a garbage
|
|
# collection in between each test in order to make
|
|
# the server relinquish its socket, and the gc module
|
|
# doesn't appear to do anything under Jython.
|
|
|
|
'java' in sys.platform,
|
|
reason='Incompatible with Jython'
|
|
)
|
|
@pytest.mark.usefixtures('_setup_wsgi_server')
|
|
class TestWSGIServer(object):
|
|
|
|
def test_get(self):
|
|
resp = requests.get(_SERVER_BASE_URL)
|
|
assert resp.status_code == 200
|
|
assert resp.text == '127.0.0.1'
|
|
|
|
def test_put(self):
|
|
body = '{}'
|
|
resp = requests.put(_SERVER_BASE_URL, data=body)
|
|
assert resp.status_code == 200
|
|
assert resp.text == '{}'
|
|
|
|
def test_head_405(self):
|
|
body = '{}'
|
|
resp = requests.head(_SERVER_BASE_URL, data=body)
|
|
assert resp.status_code == 405
|
|
|
|
def test_post(self):
|
|
body = testing.rand_string(_SIZE_1_KB / 2, _SIZE_1_KB)
|
|
resp = requests.post(_SERVER_BASE_URL, data=body)
|
|
assert resp.status_code == 200
|
|
assert resp.text == body
|
|
|
|
def test_post_invalid_content_length(self):
|
|
headers = {'Content-Length': 'invalid'}
|
|
resp = requests.post(_SERVER_BASE_URL, headers=headers)
|
|
assert resp.status_code == 200
|
|
assert resp.text == ''
|
|
|
|
def test_post_read_bounded_stream(self):
|
|
body = testing.rand_string(_SIZE_1_KB / 2, _SIZE_1_KB)
|
|
resp = requests.post(_SERVER_BASE_URL + 'bucket', data=body)
|
|
assert resp.status_code == 200
|
|
assert resp.text == body
|
|
|
|
def test_post_read_bounded_stream_no_body(self):
|
|
resp = requests.post(_SERVER_BASE_URL + 'bucket')
|
|
assert not resp.text
|
|
|
|
|
|
def _run_server(stop_event):
|
|
class Things(object):
|
|
def on_get(self, req, resp):
|
|
resp.body = req.remote_addr
|
|
|
|
def on_post(self, req, resp):
|
|
resp.body = req.stream.read(_SIZE_1_KB)
|
|
|
|
def on_put(self, req, resp):
|
|
# NOTE(kgriffs): Test that reading past the end does
|
|
# not hang.
|
|
req_body = (req.stream.read(1)
|
|
for i in range(req.content_length + 1))
|
|
|
|
resp.body = b''.join(req_body)
|
|
|
|
class Bucket(object):
|
|
def on_post(self, req, resp):
|
|
# NOTE(kgriffs): The framework automatically detects
|
|
# wsgiref's input object type and wraps it; we'll probably
|
|
# do away with this at some point, but for now we
|
|
# verify the functionality,
|
|
assert isinstance(req.stream, BoundedStream)
|
|
|
|
# NOTE(kgriffs): Ensure we are reusing the same object for
|
|
# the sake of efficiency and to ensure a shared state of the
|
|
# stream. (only in the case that we have decided to
|
|
# automatically wrap the WSGI input object, i.e. when
|
|
# running under wsgiref or similar).
|
|
assert req.stream is req.bounded_stream
|
|
|
|
# NOTE(kgriffs): This would normally block when
|
|
# Content-Length is 0 and the WSGI input object.
|
|
# BoundedStream fixes that. This is just a sanity check to
|
|
# make sure req.bounded_stream is what we think it is;
|
|
# BoundedStream itself has its own unit tests in
|
|
# test_request_body.py
|
|
resp.body = req.bounded_stream.read()
|
|
|
|
# NOTE(kgriffs): No need to also test the same read() for
|
|
# req.stream, since we already asserted they are the same
|
|
# objects.
|
|
|
|
api = application = falcon.API()
|
|
api.add_route('/', Things())
|
|
api.add_route('/bucket', Bucket())
|
|
|
|
server = make_server(_SERVER_HOST, _SERVER_PORT, application)
|
|
|
|
while not stop_event.is_set():
|
|
server.handle_request()
|
|
|
|
|
|
@pytest.fixture
|
|
def _setup_wsgi_server():
|
|
stop_event = multiprocessing.Event()
|
|
process = multiprocessing.Process(
|
|
target=_run_server,
|
|
args=(stop_event,)
|
|
)
|
|
|
|
process.start()
|
|
|
|
# NOTE(kgriffs): Let the server start up
|
|
time.sleep(0.2)
|
|
|
|
yield
|
|
|
|
stop_event.set()
|
|
|
|
# NOTE(kgriffs): Pump the request handler loop in case execution
|
|
# made it to the next server.handle_request() before we sent the
|
|
# event.
|
|
try:
|
|
requests.get(_SERVER_BASE_URL)
|
|
except Exception:
|
|
pass # Thread already exited
|
|
|
|
process.join()
|