242 lines
7.4 KiB
Python
242 lines
7.4 KiB
Python
import io
|
|
|
|
import pytest
|
|
import six
|
|
|
|
import falcon
|
|
import falcon.testing as testing
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
return testing.TestClient(falcon.API())
|
|
|
|
|
|
# NOTE(kgriffs): Concept from Gunicorn's source (wsgi.py)
|
|
class FileWrapper(object):
|
|
|
|
def __init__(self, file_like, block_size=8192):
|
|
self.file_like = file_like
|
|
self.block_size = block_size
|
|
|
|
def __getitem__(self, key):
|
|
data = self.file_like.read(self.block_size)
|
|
if data:
|
|
return data
|
|
|
|
raise IndexError
|
|
|
|
|
|
class HelloResource(object):
|
|
sample_status = '200 OK'
|
|
sample_unicode = (u'Hello World! \x80' +
|
|
six.text_type(testing.rand_string(0, 0)))
|
|
|
|
sample_utf8 = sample_unicode.encode('utf-8')
|
|
|
|
def __init__(self, mode):
|
|
self.called = False
|
|
self.mode = mode
|
|
|
|
def on_get(self, req, resp):
|
|
self.called = True
|
|
self.req, self.resp = req, resp
|
|
|
|
resp.status = falcon.HTTP_200
|
|
|
|
if 'stream' in self.mode:
|
|
if 'filelike' in self.mode:
|
|
stream = io.BytesIO(self.sample_utf8)
|
|
else:
|
|
stream = [self.sample_utf8]
|
|
|
|
if 'stream_len' in self.mode:
|
|
stream_len = len(self.sample_utf8)
|
|
else:
|
|
stream_len = None
|
|
|
|
if 'use_helper' in self.mode:
|
|
resp.set_stream(stream, stream_len)
|
|
else:
|
|
resp.stream = stream
|
|
resp.stream_len = stream_len
|
|
|
|
if 'body' in self.mode:
|
|
if 'bytes' in self.mode:
|
|
resp.body = self.sample_utf8
|
|
else:
|
|
resp.body = self.sample_unicode
|
|
|
|
if 'data' in self.mode:
|
|
resp.data = self.sample_utf8
|
|
|
|
def on_head(self, req, resp):
|
|
self.on_get(req, resp)
|
|
|
|
|
|
class ClosingBytesIO(io.BytesIO):
|
|
|
|
close_called = False
|
|
|
|
def close(self):
|
|
super(ClosingBytesIO, self).close()
|
|
self.close_called = True
|
|
|
|
|
|
class NonClosingBytesIO(io.BytesIO):
|
|
|
|
# Not callable; test that CloseableStreamIterator ignores it
|
|
close = False
|
|
|
|
|
|
class ClosingFilelikeHelloResource(object):
|
|
sample_status = '200 OK'
|
|
sample_unicode = (u'Hello World! \x80' +
|
|
six.text_type(testing.rand_string(0, 0)))
|
|
|
|
sample_utf8 = sample_unicode.encode('utf-8')
|
|
|
|
def __init__(self, stream_factory):
|
|
self.called = False
|
|
self.stream = stream_factory(self.sample_utf8)
|
|
self.stream_len = len(self.sample_utf8)
|
|
|
|
def on_get(self, req, resp):
|
|
self.called = True
|
|
self.req, self.resp = req, resp
|
|
resp.status = falcon.HTTP_200
|
|
resp.set_stream(self.stream, self.stream_len)
|
|
|
|
|
|
class NoStatusResource(object):
|
|
def on_get(self, req, resp):
|
|
pass
|
|
|
|
|
|
class TestHelloWorld(object):
|
|
|
|
def test_env_headers_list_of_tuples(self):
|
|
env = testing.create_environ(headers=[('User-Agent', 'Falcon-Test')])
|
|
assert env['HTTP_USER_AGENT'] == 'Falcon-Test'
|
|
|
|
def test_root_route(self, client):
|
|
doc = {u'message': u'Hello world!'}
|
|
resource = testing.SimpleTestResource(json=doc)
|
|
client.app.add_route('/', resource)
|
|
|
|
result = client.simulate_get()
|
|
assert result.json == doc
|
|
|
|
def test_no_route(self, client):
|
|
result = client.simulate_get('/seenoevil')
|
|
assert result.status_code == 404
|
|
|
|
@pytest.mark.parametrize('path,resource,get_body', [
|
|
('/body', HelloResource('body'), lambda r: r.body.encode('utf-8')),
|
|
('/bytes', HelloResource('body, bytes'), lambda r: r.body),
|
|
('/data', HelloResource('data'), lambda r: r.data),
|
|
])
|
|
def test_body(self, client, path, resource, get_body):
|
|
client.app.add_route(path, resource)
|
|
|
|
result = client.simulate_get(path)
|
|
resp = resource.resp
|
|
|
|
content_length = int(result.headers['content-length'])
|
|
assert content_length == len(resource.sample_utf8)
|
|
|
|
assert result.status == resource.sample_status
|
|
assert resp.status == resource.sample_status
|
|
assert get_body(resp) == resource.sample_utf8
|
|
assert result.content == resource.sample_utf8
|
|
|
|
def test_no_body_on_head(self, client):
|
|
client.app.add_route('/body', HelloResource('body'))
|
|
result = client.simulate_head('/body')
|
|
|
|
assert not result.content
|
|
assert result.status_code == 200
|
|
|
|
def test_stream_chunked(self, client):
|
|
resource = HelloResource('stream')
|
|
client.app.add_route('/chunked-stream', resource)
|
|
|
|
result = client.simulate_get('/chunked-stream')
|
|
|
|
assert result.content == resource.sample_utf8
|
|
assert 'content-length' not in result.headers
|
|
|
|
def test_stream_known_len(self, client):
|
|
resource = HelloResource('stream, stream_len')
|
|
client.app.add_route('/stream', resource)
|
|
|
|
result = client.simulate_get('/stream')
|
|
assert resource.called
|
|
|
|
expected_len = resource.resp.stream_len
|
|
actual_len = int(result.headers['content-length'])
|
|
assert actual_len == expected_len
|
|
assert len(result.content) == expected_len
|
|
assert result.content == resource.sample_utf8
|
|
|
|
def test_filelike(self, client):
|
|
resource = HelloResource('stream, stream_len, filelike')
|
|
client.app.add_route('/filelike', resource)
|
|
|
|
for file_wrapper in (None, FileWrapper):
|
|
result = client.simulate_get('/filelike', file_wrapper=file_wrapper)
|
|
assert resource.called
|
|
|
|
expected_len = resource.resp.stream_len
|
|
actual_len = int(result.headers['content-length'])
|
|
assert actual_len == expected_len
|
|
assert len(result.content) == expected_len
|
|
|
|
for file_wrapper in (None, FileWrapper):
|
|
result = client.simulate_get('/filelike', file_wrapper=file_wrapper)
|
|
assert resource.called
|
|
|
|
expected_len = resource.resp.stream_len
|
|
actual_len = int(result.headers['content-length'])
|
|
assert actual_len == expected_len
|
|
assert len(result.content) == expected_len
|
|
|
|
@pytest.mark.parametrize('stream_factory,assert_closed', [
|
|
(ClosingBytesIO, True), # Implements close()
|
|
(NonClosingBytesIO, False), # Has a non-callable "close" attr
|
|
])
|
|
def test_filelike_closing(self, client, stream_factory, assert_closed):
|
|
resource = ClosingFilelikeHelloResource(stream_factory)
|
|
client.app.add_route('/filelike-closing', resource)
|
|
|
|
result = client.simulate_get('/filelike-closing', file_wrapper=None)
|
|
assert resource.called
|
|
|
|
expected_len = resource.resp.stream_len
|
|
actual_len = int(result.headers['content-length'])
|
|
assert actual_len == expected_len
|
|
assert len(result.content) == expected_len
|
|
|
|
if assert_closed:
|
|
assert resource.stream.close_called
|
|
|
|
def test_filelike_using_helper(self, client):
|
|
resource = HelloResource('stream, stream_len, filelike, use_helper')
|
|
client.app.add_route('/filelike-helper', resource)
|
|
|
|
result = client.simulate_get('/filelike-helper')
|
|
assert resource.called
|
|
|
|
expected_len = resource.resp.stream_len
|
|
actual_len = int(result.headers['content-length'])
|
|
assert actual_len == expected_len
|
|
assert len(result.content) == expected_len
|
|
|
|
def test_status_not_set(self, client):
|
|
client.app.add_route('/nostatus', NoStatusResource())
|
|
|
|
result = client.simulate_get('/nostatus')
|
|
|
|
assert not result.content
|
|
assert result.status_code == 200
|