145 lines
5.0 KiB
Python
145 lines
5.0 KiB
Python
import io
|
|
|
|
import falcon
|
|
from falcon import request_helpers
|
|
import falcon.testing as testing
|
|
|
|
SIZE_1_KB = 1024
|
|
|
|
|
|
class TestRequestBody(testing.TestBase):
|
|
|
|
def before(self):
|
|
self.resource = testing.TestResource()
|
|
self.api.add_route('/', self.resource)
|
|
|
|
def test_empty_body(self):
|
|
self.simulate_request('/', body='')
|
|
stream = self.resource.req.stream
|
|
|
|
stream.seek(0, 2)
|
|
self.assertEqual(stream.tell(), 0)
|
|
|
|
def test_tiny_body(self):
|
|
expected_body = '.'
|
|
self.simulate_request('', body=expected_body)
|
|
stream = self.resource.req.stream
|
|
|
|
actual_body = stream.read(1)
|
|
self.assertEqual(actual_body, expected_body.encode('utf-8'))
|
|
|
|
stream.seek(0, 2)
|
|
self.assertEqual(stream.tell(), 1)
|
|
|
|
def test_tiny_body_overflow(self):
|
|
expected_body = '.'
|
|
self.simulate_request('', body=expected_body)
|
|
stream = self.resource.req.stream
|
|
|
|
# Read too many bytes; shouldn't block
|
|
actual_body = stream.read(len(expected_body) + 1)
|
|
self.assertEqual(actual_body, expected_body.encode('utf-8'))
|
|
|
|
def test_read_body(self):
|
|
expected_body = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB)
|
|
expected_len = len(expected_body)
|
|
headers = {'Content-Length': str(expected_len)}
|
|
|
|
self.simulate_request('', body=expected_body, headers=headers)
|
|
|
|
content_len = self.resource.req.get_header('content-length')
|
|
self.assertEqual(content_len, str(expected_len))
|
|
|
|
stream = self.resource.req.stream
|
|
|
|
actual_body = stream.read()
|
|
self.assertEqual(actual_body, expected_body.encode('utf-8'))
|
|
|
|
stream.seek(0, 2)
|
|
self.assertEqual(stream.tell(), expected_len)
|
|
|
|
self.assertEqual(stream.tell(), expected_len)
|
|
|
|
def test_bounded_stream_property_empty_body(self):
|
|
"""Test that we can get a bounded stream outside of wsgiref."""
|
|
|
|
environ = testing.create_environ()
|
|
req = falcon.Request(environ)
|
|
|
|
bounded_stream = req.bounded_stream
|
|
|
|
# NOTE(kgriffs): Verify that we aren't creating a new object
|
|
# each time the property is called. Also ensures branch
|
|
# coverage of the property implementation.
|
|
assert bounded_stream is req.bounded_stream
|
|
|
|
data = bounded_stream.read()
|
|
self.assertEqual(len(data), 0)
|
|
|
|
def test_body_stream_wrapper(self):
|
|
data = testing.rand_string(SIZE_1_KB / 2, SIZE_1_KB)
|
|
expected_body = data.encode('utf-8')
|
|
expected_len = len(expected_body)
|
|
|
|
# NOTE(kgriffs): Append newline char to each line
|
|
# to match readlines behavior
|
|
expected_lines = [(line + '\n').encode('utf-8')
|
|
for line in data.split('\n')]
|
|
|
|
# NOTE(kgriffs): Remove trailing newline to simulate
|
|
# what readlines does
|
|
expected_lines[-1] = expected_lines[-1][:-1]
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.read(), expected_body)
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.read(2), expected_body[0:2])
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.read(expected_len + 1), expected_body)
|
|
|
|
# NOTE(kgriffs): Test that reading past the end does not
|
|
# hang, but returns the empty string.
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
for i in range(expected_len + 1):
|
|
expected_value = expected_body[i:i + 1] if i < expected_len else b''
|
|
self.assertEqual(body.read(1), expected_value)
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.readline(), expected_lines[0])
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.readline(-1), expected_lines[0])
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.readline(expected_len + 1), expected_lines[0])
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.readlines(), expected_lines)
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.readlines(-1), expected_lines)
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(body.readlines(expected_len + 1), expected_lines)
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
self.assertEqual(next(body), expected_lines[0])
|
|
|
|
stream = io.BytesIO(expected_body)
|
|
body = request_helpers.Body(stream, expected_len)
|
|
for i, line in enumerate(body):
|
|
self.assertEqual(line, expected_lines[i])
|