test: Migrate several test modules away from the deprecated framework (#901)
* Migrate additional test modules away from the deprecated framework, taking care to demonstrate both unittest- and pytest-style testing. * Add an additional Cookie class and parse cookies in the Result object to facilitate cookie testing. * Extend BoundStream to implement more of the IO protocol so that it works with mock WSGI input objects (and potentially improves compatibility as a drop-in replacement for the native WSGI input object). * Disable pypy3 testing on Travis, since it uses a buggy cookie impl, and also we don't support pypy3 (yet) since overall it isn't fully baked. * Specify latest pypy version for Travis testing, which has improved cookie handling.
This commit is contained in:
parent
b36ffe6179
commit
2ee5e549d7
|
@ -9,10 +9,10 @@ matrix:
|
|||
include:
|
||||
- python: 2.7 # these are just to make travis's UI a bit prettier
|
||||
env: JYTHON=true
|
||||
- python: pypy
|
||||
- python: pypy-5.3
|
||||
env: TOXENV=pypy
|
||||
- python: pypy3
|
||||
env: TOXENV=pypy3
|
||||
# - python: pypy3
|
||||
# env: TOXENV=pypy3
|
||||
- python: 2.7
|
||||
env: TOXENV=pep8
|
||||
- python: 2.6
|
||||
|
|
|
@ -4,7 +4,7 @@ Testing
|
|||
=======
|
||||
|
||||
.. automodule:: falcon.testing
|
||||
:members: Result,
|
||||
:members: Result, Cookie,
|
||||
simulate_request, simulate_get, simulate_head, simulate_post,
|
||||
simulate_put, simulate_options, simulate_patch, simulate_delete,
|
||||
TestClient, TestCase, SimpleTestResource, StartResponseMock,
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
"""Utilities for the Request class."""
|
||||
|
||||
import io
|
||||
|
||||
|
||||
def header_property(wsgi_name):
|
||||
"""Creates a read-only header property.
|
||||
|
@ -36,7 +38,7 @@ def header_property(wsgi_name):
|
|||
return property(fget)
|
||||
|
||||
|
||||
class BoundedStream(object):
|
||||
class BoundedStream(io.IOBase):
|
||||
"""Wrap *wsgi.input* streams to make them more robust.
|
||||
|
||||
``socket._fileobject`` and ``io.BufferedReader`` are sometimes used
|
||||
|
@ -96,6 +98,18 @@ class BoundedStream(object):
|
|||
self._bytes_remaining -= size
|
||||
return target(size)
|
||||
|
||||
def readable(self):
|
||||
"""Always returns ``True``."""
|
||||
return True
|
||||
|
||||
def seekable(self):
|
||||
"""Always returns ``False``."""
|
||||
return False
|
||||
|
||||
def writeable(self):
|
||||
"""Always returns ``False``."""
|
||||
return False
|
||||
|
||||
def read(self, size=None):
|
||||
"""Read from the stream.
|
||||
|
||||
|
@ -138,6 +152,11 @@ class BoundedStream(object):
|
|||
|
||||
return self._read(hint, self.stream.readlines)
|
||||
|
||||
def write(self, data):
|
||||
"""Always raises IOError; writing is not supported."""
|
||||
|
||||
raise IOError('Stream is not writeable')
|
||||
|
||||
|
||||
# NOTE(kgriffs): Alias for backwards-compat
|
||||
Body = BoundedStream
|
||||
|
|
|
@ -33,11 +33,15 @@ WSGI callable, without having to stand up a WSGI server.
|
|||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import wsgiref.validate
|
||||
|
||||
from six.moves import http_cookies
|
||||
|
||||
from falcon.testing import helpers
|
||||
from falcon.testing.srmock import StartResponseMock
|
||||
from falcon.util import CaseInsensitiveDict, to_query_str
|
||||
from falcon.util import CaseInsensitiveDict, http_date_to_dt, to_query_str
|
||||
|
||||
|
||||
class Result(object):
|
||||
|
@ -55,7 +59,19 @@ class Result(object):
|
|||
status (str): HTTP status string given in the response
|
||||
status_code (int): The code portion of the HTTP status string
|
||||
headers (CaseInsensitiveDict): A case-insensitive dictionary
|
||||
containing all the headers in the response
|
||||
containing all the headers in the response, except for
|
||||
cookies, which may be accessed via the `cookies`
|
||||
attribute.
|
||||
|
||||
Note:
|
||||
|
||||
Multiple instances of a header in the response are
|
||||
currently not supported; it is unspecified which value
|
||||
will "win" and be represented in `headers`.
|
||||
|
||||
cookies (dict): A dictionary of
|
||||
:py:class:`falcon.testing.Cookie` values parsed from the
|
||||
response, by name.
|
||||
encoding (str): Text encoding of the response body, or ``None``
|
||||
if the encoding can not be determined.
|
||||
content (bytes): Raw response body, or ``bytes`` if the
|
||||
|
@ -79,6 +95,41 @@ class Result(object):
|
|||
self._status_code = int(status[:3])
|
||||
self._headers = CaseInsensitiveDict(headers)
|
||||
|
||||
cookies = http_cookies.SimpleCookie()
|
||||
for name, value in headers:
|
||||
if name.lower() == 'set-cookie':
|
||||
cookies.load(value)
|
||||
|
||||
if sys.version_info < (2, 7):
|
||||
match = re.match('([^=]+)=', value)
|
||||
assert match
|
||||
|
||||
cookie_name = match.group(1)
|
||||
|
||||
# NOTE(kgriffs): py26 has a bug that causes
|
||||
# SimpleCookie to incorrectly parse the "expires"
|
||||
# attribute, so we have to do it ourselves. This
|
||||
# algorithm is obviously very naive, but it should
|
||||
# work well enough until we stop supporting
|
||||
# 2.6, at which time we can remove this code.
|
||||
match = re.search('expires=([^;]+)', value)
|
||||
if match:
|
||||
cookies[cookie_name]['expires'] = match.group(1)
|
||||
|
||||
# NOTE(kgriffs): py26's SimpleCookie won't parse
|
||||
# the "httponly" and "secure" attributes, so we
|
||||
# have to do it ourselves.
|
||||
if 'httponly' in value:
|
||||
cookies[cookie_name]['httponly'] = True
|
||||
|
||||
if 'secure' in value:
|
||||
cookies[cookie_name]['secure'] = True
|
||||
|
||||
self._cookies = dict(
|
||||
(morsel.key, Cookie(morsel))
|
||||
for morsel in cookies.values()
|
||||
)
|
||||
|
||||
self._encoding = helpers.get_encoding_from_headers(self._headers)
|
||||
|
||||
@property
|
||||
|
@ -93,6 +144,10 @@ class Result(object):
|
|||
def headers(self):
|
||||
return self._headers
|
||||
|
||||
@property
|
||||
def cookies(self):
|
||||
return self._cookies
|
||||
|
||||
@property
|
||||
def encoding(self):
|
||||
return self._encoding
|
||||
|
@ -121,6 +176,81 @@ class Result(object):
|
|||
return json.loads(self.text)
|
||||
|
||||
|
||||
class Cookie(object):
|
||||
"""Represents a cookie returned by a simulated request.
|
||||
|
||||
Args:
|
||||
morsel: A ``Morsel`` object from which to derive the cookie
|
||||
data.
|
||||
|
||||
Attributes:
|
||||
name (str): The cookie's name.
|
||||
value (str): The value of the cookie.
|
||||
expires(datetime.datetime): Expiration timestamp for the cookie,
|
||||
or ``None`` if not specified.
|
||||
path (str): The path prefix to which this cookie is restricted,
|
||||
or ``None`` if not specified.
|
||||
domain (str): The domain to which this cookie is restricted,
|
||||
or ``None`` if not specified.
|
||||
max_age (int): The lifetime of the cookie in seconds, or
|
||||
``None`` if not specified.
|
||||
secure (bool): Whether or not the cookie may only only be
|
||||
transmitted from the client via HTTPS.
|
||||
http_only (bool): Whether or not the cookie may only be
|
||||
included in unscripted requests from the client.
|
||||
"""
|
||||
|
||||
def __init__(self, morsel):
|
||||
self._name = morsel.key
|
||||
self._value = morsel.value
|
||||
|
||||
for name in (
|
||||
'expires',
|
||||
'path',
|
||||
'domain',
|
||||
'max_age',
|
||||
'secure',
|
||||
'httponly',
|
||||
):
|
||||
value = morsel[name.replace('_', '-')] or None
|
||||
setattr(self, '_' + name, value)
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
return self._value
|
||||
|
||||
@property
|
||||
def expires(self):
|
||||
if self._expires:
|
||||
return http_date_to_dt(self._expires, obs_date=True)
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self._path
|
||||
|
||||
@property
|
||||
def domain(self):
|
||||
return self._domain
|
||||
|
||||
@property
|
||||
def max_age(self):
|
||||
return int(self._max_age) if self._max_age else None
|
||||
|
||||
@property
|
||||
def secure(self):
|
||||
return bool(self._secure)
|
||||
|
||||
@property
|
||||
def http_only(self):
|
||||
return bool(self._httponly)
|
||||
|
||||
|
||||
def simulate_request(app, method='GET', path='/', query_string=None,
|
||||
headers=None, body=None, file_wrapper=None,
|
||||
params=None, params_csv=True):
|
||||
|
|
|
@ -2,81 +2,98 @@ from falcon.request import Request
|
|||
import falcon.testing as testing
|
||||
|
||||
|
||||
class TestAccessRoute(testing.TestBase):
|
||||
def test_remote_addr_only():
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'Forwarded': ('for=192.0.2.43, for="[2001:db8:cafe::17]:555",'
|
||||
'for="unknown", by=_hidden,for="\\"\\\\",'
|
||||
'for="198\\.51\\.100\\.17\\:1236";'
|
||||
'proto=https;host=example.com')
|
||||
}))
|
||||
|
||||
def test_remote_addr_only(self):
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'Forwarded': ('for=192.0.2.43, for="[2001:db8:cafe::17]:555",'
|
||||
'for="unknown", by=_hidden,for="\\"\\\\",'
|
||||
'for="198\\.51\\.100\\.17\\:1236";'
|
||||
'proto=https;host=example.com')
|
||||
}))
|
||||
self.assertEqual(req.remote_addr, '127.0.0.1')
|
||||
assert req.remote_addr == '127.0.0.1'
|
||||
|
||||
def test_rfc_forwarded(self):
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'Forwarded': ('for=192.0.2.43,for=,'
|
||||
'for="[2001:db8:cafe::17]:555",'
|
||||
'for=x,'
|
||||
'for="unknown", by=_hidden,for="\\"\\\\",'
|
||||
'for="_don\\\"t_\\try_this\\\\at_home_\\42",'
|
||||
'for="198\\.51\\.100\\.17\\:1236";'
|
||||
'proto=https;host=example.com')
|
||||
}))
|
||||
compares = ['192.0.2.43', '2001:db8:cafe::17', 'x',
|
||||
'unknown', '"\\', '_don"t_try_this\\at_home_42',
|
||||
'198.51.100.17']
|
||||
self.assertEqual(req.access_route, compares)
|
||||
# test cached
|
||||
self.assertEqual(req.access_route, compares)
|
||||
|
||||
def test_malformed_rfc_forwarded(self):
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'Forwarded': 'for'
|
||||
}))
|
||||
self.assertEqual(req.access_route, [])
|
||||
# test cached
|
||||
self.assertEqual(req.access_route, [])
|
||||
def test_rfc_forwarded():
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'Forwarded': ('for=192.0.2.43,for=,'
|
||||
'for="[2001:db8:cafe::17]:555",'
|
||||
'for=x,'
|
||||
'for="unknown", by=_hidden,for="\\"\\\\",'
|
||||
'for="_don\\\"t_\\try_this\\\\at_home_\\42",'
|
||||
'for="198\\.51\\.100\\.17\\:1236";'
|
||||
'proto=https;host=example.com')
|
||||
}))
|
||||
|
||||
def test_x_forwarded_for(self):
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'X-Forwarded-For': ('192.0.2.43, 2001:db8:cafe::17,'
|
||||
'unknown, _hidden, 203.0.113.60')
|
||||
}))
|
||||
self.assertEqual(req.access_route,
|
||||
['192.0.2.43', '2001:db8:cafe::17',
|
||||
'unknown', '_hidden', '203.0.113.60'])
|
||||
compares = ['192.0.2.43', '2001:db8:cafe::17', 'x',
|
||||
'unknown', '"\\', '_don"t_try_this\\at_home_42',
|
||||
'198.51.100.17']
|
||||
|
||||
def test_x_real_ip(self):
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'X-Real-IP': '2001:db8:cafe::17'
|
||||
}))
|
||||
self.assertEqual(req.access_route, ['2001:db8:cafe::17'])
|
||||
req.access_route == compares
|
||||
|
||||
def test_remote_addr(self):
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route'))
|
||||
self.assertEqual(req.access_route, ['127.0.0.1'])
|
||||
# test cached
|
||||
req.access_route == compares
|
||||
|
||||
def test_remote_addr_missing(self):
|
||||
env = testing.create_environ(host='example.com', path='/access_route')
|
||||
del env['REMOTE_ADDR']
|
||||
|
||||
req = Request(env)
|
||||
self.assertEqual(req.access_route, [])
|
||||
def test_malformed_rfc_forwarded():
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'Forwarded': 'for'
|
||||
}))
|
||||
|
||||
req.access_route == []
|
||||
|
||||
# test cached
|
||||
req.access_route == []
|
||||
|
||||
|
||||
def test_x_forwarded_for():
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'X-Forwarded-For': ('192.0.2.43, 2001:db8:cafe::17,'
|
||||
'unknown, _hidden, 203.0.113.60')
|
||||
}))
|
||||
|
||||
assert req.access_route == [
|
||||
'192.0.2.43',
|
||||
'2001:db8:cafe::17',
|
||||
'unknown',
|
||||
'_hidden',
|
||||
'203.0.113.60'
|
||||
]
|
||||
|
||||
|
||||
def test_x_real_ip():
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route',
|
||||
headers={
|
||||
'X-Real-IP': '2001:db8:cafe::17'
|
||||
}))
|
||||
|
||||
assert req.access_route == ['2001:db8:cafe::17']
|
||||
|
||||
|
||||
def test_remote_addr():
|
||||
req = Request(testing.create_environ(
|
||||
host='example.com',
|
||||
path='/access_route'))
|
||||
|
||||
assert req.access_route == ['127.0.0.1']
|
||||
|
||||
|
||||
def test_remote_addr_missing():
|
||||
env = testing.create_environ(host='example.com', path='/access_route')
|
||||
del env['REMOTE_ADDR']
|
||||
|
||||
req = Request(env)
|
||||
assert req.access_route == []
|
||||
|
|
|
@ -173,11 +173,13 @@ class ZooResource(object):
|
|||
self.fish = fish
|
||||
|
||||
|
||||
class TestHooks(testing.TestBase):
|
||||
class TestHooks(testing.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestHooks, self).setUp()
|
||||
|
||||
def before(self):
|
||||
self.resource = WrappedRespondersResource()
|
||||
self.api.add_route(self.test_route, self.resource)
|
||||
self.api.add_route('/', self.resource)
|
||||
|
||||
self.field_resource = TestFieldResource()
|
||||
self.api.add_route('/queue/{id}/messages', self.field_resource)
|
||||
|
@ -190,76 +192,73 @@ class TestHooks(testing.TestBase):
|
|||
|
||||
def test_multiple_resource_hooks(self):
|
||||
zoo_resource = ZooResource()
|
||||
self.api.add_route(self.test_route, zoo_resource)
|
||||
self.api.add_route('/', zoo_resource)
|
||||
|
||||
self.simulate_request(self.test_route)
|
||||
result = self.simulate_get('/')
|
||||
|
||||
self.assertEqual('not fluffy', self.srmock.headers_dict['X-Frogs'])
|
||||
self.assertEqual('fluffy', self.srmock.headers_dict['X-Bunnies'])
|
||||
self.assertEqual('not fluffy', result.headers['X-Frogs'])
|
||||
self.assertEqual('fluffy', result.headers['X-Bunnies'])
|
||||
|
||||
self.assertEqual('fluffy', zoo_resource.bunnies)
|
||||
self.assertEqual('not fluffy', zoo_resource.frogs)
|
||||
self.assertEqual('slippery', zoo_resource.fish)
|
||||
|
||||
def test_input_validator(self):
|
||||
self.simulate_request(self.test_route, method='PUT')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
result = self.simulate_put('/')
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
def test_param_validator(self):
|
||||
self.simulate_request(self.test_route, query_string='limit=10',
|
||||
body='{}')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
result = self.simulate_get('/', query_string='limit=10', body='{}')
|
||||
self.assertEqual(result.status_code, 200)
|
||||
|
||||
self.simulate_request(self.test_route, query_string='limit=101')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
result = self.simulate_get('/', query_string='limit=101')
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
def test_field_validator(self):
|
||||
self.simulate_request('/queue/10/messages')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
result = self.simulate_get('/queue/10/messages')
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertEqual(self.field_resource.id, 10)
|
||||
|
||||
self.simulate_request('/queue/bogus/messages')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
result = self.simulate_get('/queue/bogus/messages')
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
def test_parser(self):
|
||||
self.simulate_request(self.test_route,
|
||||
body=json.dumps({'animal': 'falcon'}))
|
||||
|
||||
self.simulate_get('/', body=json.dumps({'animal': 'falcon'}))
|
||||
self.assertEqual(self.resource.doc, {'animal': 'falcon'})
|
||||
|
||||
def test_wrapped_resource(self):
|
||||
self.simulate_request('/wrapped', method='PATCH')
|
||||
self.assertEqual(falcon.HTTP_405, self.srmock.status)
|
||||
result = self.simulate_patch('/wrapped')
|
||||
self.assertEqual(result.status_code, 405)
|
||||
|
||||
self.simulate_request('/wrapped', query_string='limit=10')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
result = self.simulate_get('/wrapped', query_string='limit=10')
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertEqual('fuzzy', self.wrapped_resource.bunnies)
|
||||
|
||||
self.simulate_request('/wrapped', method='HEAD')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
result = self.simulate_head('/wrapped')
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertEqual('fuzzy', self.wrapped_resource.bunnies)
|
||||
|
||||
self.simulate_request('/wrapped', method='POST')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
result = self.simulate_post('/wrapped')
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertEqual('slippery', self.wrapped_resource.fish)
|
||||
|
||||
self.simulate_request('/wrapped', query_string='limit=101')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
self.assertEqual('fuzzy', self.wrapped_resource.bunnies)
|
||||
result = self.simulate_get('/wrapped', query_string='limit=101')
|
||||
self.assertEqual(result.status_code, 400)
|
||||
self.assertEqual(self.wrapped_resource.bunnies, 'fuzzy')
|
||||
|
||||
def test_wrapped_resource_with_hooks_aware_of_resource(self):
|
||||
self.simulate_request('/wrapped_aware', method='PATCH')
|
||||
self.assertEqual(falcon.HTTP_405, self.srmock.status)
|
||||
result = self.simulate_patch('/wrapped_aware')
|
||||
self.assertEqual(result.status_code, 405)
|
||||
|
||||
self.simulate_request('/wrapped_aware', query_string='limit=10')
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
self.assertEqual('fuzzy', self.wrapped_aware_resource.bunnies)
|
||||
result = self.simulate_get('/wrapped_aware', query_string='limit=10')
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertEqual(self.wrapped_aware_resource.bunnies, 'fuzzy')
|
||||
|
||||
for method in ('HEAD', 'PUT', 'POST'):
|
||||
self.simulate_request('/wrapped_aware', method=method)
|
||||
self.assertEqual(falcon.HTTP_200, self.srmock.status)
|
||||
self.assertEqual('fuzzy', self.wrapped_aware_resource.bunnies)
|
||||
result = self.simulate_request(method, '/wrapped_aware')
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertEqual(self.wrapped_aware_resource.bunnies, 'fuzzy')
|
||||
|
||||
self.simulate_request('/wrapped_aware', query_string='limit=101')
|
||||
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||
self.assertEqual('fuzzy', self.wrapped_aware_resource.bunnies)
|
||||
result = self.simulate_get('/wrapped_aware', query_string='limit=101')
|
||||
self.assertEqual(result.status_code, 400)
|
||||
self.assertEqual(self.wrapped_aware_resource.bunnies, 'fuzzy')
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
import io
|
||||
|
||||
import pytest
|
||||
|
||||
from falcon.request_helpers import BoundedStream
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bounded_stream():
|
||||
return BoundedStream(io.BytesIO(), 1024)
|
||||
|
||||
|
||||
def test_not_writeable(bounded_stream):
|
||||
assert not bounded_stream.writeable()
|
||||
|
||||
with pytest.raises(IOError):
|
||||
bounded_stream.write(b'something something')
|
|
@ -1,10 +1,8 @@
|
|||
from datetime import datetime, timedelta, tzinfo
|
||||
import re
|
||||
import sys
|
||||
|
||||
import ddt
|
||||
import pytest
|
||||
from six.moves.http_cookies import Morsel
|
||||
from testtools.matchers import LessThan
|
||||
|
||||
import falcon
|
||||
import falcon.testing as testing
|
||||
|
@ -59,167 +57,241 @@ class CookieResourceMaxAgeFloatString:
|
|||
'foostring', 'bar', max_age='15', secure=False, http_only=False)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestCookies(testing.TestBase):
|
||||
@pytest.fixture(scope='module')
|
||||
def client():
|
||||
app = falcon.API()
|
||||
app.add_route('/', CookieResource())
|
||||
app.add_route('/test-convert', CookieResourceMaxAgeFloatString())
|
||||
|
||||
#
|
||||
# Response
|
||||
#
|
||||
return testing.TestClient(app)
|
||||
|
||||
def test_response_base_case(self):
|
||||
self.resource = CookieResource()
|
||||
self.api.add_route(self.test_route, self.resource)
|
||||
self.simulate_request(self.test_route, method='GET')
|
||||
if sys.version_info >= (3, 4, 3):
|
||||
value = 'foo=bar; Domain=example.com; HttpOnly; Path=/; Secure'
|
||||
else:
|
||||
value = 'foo=bar; Domain=example.com; httponly; Path=/; secure'
|
||||
self.assertIn(('set-cookie', value), self.srmock.headers)
|
||||
|
||||
def test_response_complex_case(self):
|
||||
self.resource = CookieResource()
|
||||
self.api.add_route(self.test_route, self.resource)
|
||||
self.simulate_request(self.test_route, method='HEAD')
|
||||
if sys.version_info >= (3, 4, 3):
|
||||
value = 'foo=bar; HttpOnly; Max-Age=300; Secure'
|
||||
else:
|
||||
value = 'foo=bar; httponly; Max-Age=300; secure'
|
||||
self.assertIn(('set-cookie', value), self.srmock.headers)
|
||||
if sys.version_info >= (3, 4, 3):
|
||||
value = 'bar=baz; Secure'
|
||||
else:
|
||||
value = 'bar=baz; secure'
|
||||
self.assertIn(('set-cookie', value), self.srmock.headers)
|
||||
self.assertNotIn(('set-cookie', 'bad=cookie'), self.srmock.headers)
|
||||
# =====================================================================
|
||||
# Response
|
||||
# =====================================================================
|
||||
|
||||
def test_cookie_expires_naive(self):
|
||||
self.resource = CookieResource()
|
||||
self.api.add_route(self.test_route, self.resource)
|
||||
self.simulate_request(self.test_route, method='POST')
|
||||
self.assertIn(
|
||||
('set-cookie', 'foo=bar; expires=Sat, 01 Jan 2050 00:00:00 GMT'),
|
||||
self.srmock.headers)
|
||||
|
||||
def test_cookie_expires_aware(self):
|
||||
self.resource = CookieResource()
|
||||
self.api.add_route(self.test_route, self.resource)
|
||||
self.simulate_request(self.test_route, method='PUT')
|
||||
self.assertIn(
|
||||
('set-cookie', 'foo=bar; expires=Fri, 31 Dec 2049 23:00:00 GMT'),
|
||||
self.srmock.headers)
|
||||
def test_response_base_case(client):
|
||||
result = client.simulate_get('/')
|
||||
|
||||
def test_cookies_setable(self):
|
||||
resp = falcon.Response()
|
||||
cookie = result.cookies['foo']
|
||||
assert cookie.name == 'foo'
|
||||
assert cookie.value == 'bar'
|
||||
assert cookie.domain == 'example.com'
|
||||
assert cookie.http_only
|
||||
|
||||
self.assertIsNone(resp._cookies)
|
||||
# NOTE(kgriffs): Explicitly test for None to ensure
|
||||
# falcon.testing.Cookie is returning exactly what we
|
||||
# expect. Apps using falcon.testing.Cookie can be a
|
||||
# bit more cavalier if they wish.
|
||||
assert cookie.max_age is None
|
||||
assert cookie.expires is None
|
||||
|
||||
resp.set_cookie('foo', 'wrong-cookie', max_age=301)
|
||||
resp.set_cookie('foo', 'bar', max_age=300)
|
||||
morsel = resp._cookies['foo']
|
||||
assert cookie.path == '/'
|
||||
assert cookie.secure
|
||||
|
||||
self.assertIsInstance(morsel, Morsel)
|
||||
self.assertEqual(morsel.key, 'foo')
|
||||
self.assertEqual(morsel.value, 'bar')
|
||||
self.assertEqual(morsel['max-age'], 300)
|
||||
|
||||
def test_cookie_max_age_float_and_string(self):
|
||||
# Falcon implicitly converts max-age values to integers,
|
||||
# for ensuring RFC 6265-compliance of the attribute value.
|
||||
self.resource = CookieResourceMaxAgeFloatString()
|
||||
self.api.add_route(self.test_route, self.resource)
|
||||
self.simulate_request(self.test_route, method='GET')
|
||||
self.assertIn(
|
||||
('set-cookie', 'foofloat=bar; Max-Age=15'), self.srmock.headers)
|
||||
self.assertIn(
|
||||
('set-cookie', 'foostring=bar; Max-Age=15'), self.srmock.headers)
|
||||
def test_response_complex_case(client):
|
||||
result = client.simulate_head('/')
|
||||
|
||||
def test_response_unset_cookie(self):
|
||||
resp = falcon.Response()
|
||||
resp.unset_cookie('bad')
|
||||
resp.set_cookie('bad', 'cookie', max_age=300)
|
||||
resp.unset_cookie('bad')
|
||||
assert len(result.cookies) == 3
|
||||
|
||||
morsels = list(resp._cookies.values())
|
||||
self.assertEqual(len(morsels), 1)
|
||||
cookie = result.cookies['foo']
|
||||
assert cookie.value == 'bar'
|
||||
assert cookie.domain is None
|
||||
assert cookie.expires is None
|
||||
assert cookie.http_only
|
||||
assert cookie.max_age == 300
|
||||
assert cookie.path is None
|
||||
assert cookie.secure
|
||||
|
||||
bad_cookie = morsels[0]
|
||||
self.assertEqual(bad_cookie['expires'], -1)
|
||||
cookie = result.cookies['bar']
|
||||
assert cookie.value == 'baz'
|
||||
assert cookie.domain is None
|
||||
assert cookie.expires is None
|
||||
assert not cookie.http_only
|
||||
assert cookie.max_age is None
|
||||
assert cookie.path is None
|
||||
assert cookie.secure
|
||||
|
||||
output = bad_cookie.OutputString()
|
||||
self.assertTrue('bad=;' in output or 'bad="";' in output)
|
||||
cookie = result.cookies['bad']
|
||||
assert cookie.value == '' # An unset cookie has an empty value
|
||||
assert cookie.domain is None
|
||||
|
||||
match = re.search('expires=([^;]+)', output)
|
||||
self.assertIsNotNone(match)
|
||||
assert cookie.expires < datetime.utcnow()
|
||||
|
||||
expiration = http_date_to_dt(match.group(1), obs_date=True)
|
||||
self.assertThat(expiration, LessThan(datetime.utcnow()))
|
||||
# NOTE(kgriffs): I know accessing a private attr like this is
|
||||
# naughty of me, but we just need to sanity-check that the
|
||||
# string is GMT.
|
||||
assert cookie._expires.endswith('GMT')
|
||||
|
||||
def test_cookie_timezone(self):
|
||||
tz = TimezoneGMT()
|
||||
self.assertEqual('GMT', tz.tzname(timedelta(0)))
|
||||
assert cookie.http_only
|
||||
assert cookie.max_age is None
|
||||
assert cookie.path is None
|
||||
assert cookie.secure
|
||||
|
||||
#
|
||||
# Request
|
||||
#
|
||||
|
||||
def test_request_cookie_parsing(self):
|
||||
# testing with a github-ish set of cookies
|
||||
headers = [
|
||||
('Cookie', '''
|
||||
logged_in=no;_gh_sess=eyJzZXXzaW9uX2lkIjoiN2;
|
||||
tz=Europe/Berlin; _ga=GA1.2.332347814.1422308165;
|
||||
_gat=1;
|
||||
_octo=GH1.1.201722077.1422308165'''),
|
||||
]
|
||||
def test_cookie_expires_naive(client):
|
||||
result = client.simulate_post('/')
|
||||
|
||||
environ = testing.create_environ(headers=headers)
|
||||
req = falcon.Request(environ)
|
||||
cookie = result.cookies['foo']
|
||||
assert cookie.value == 'bar'
|
||||
assert cookie.domain is None
|
||||
assert cookie.expires == datetime(year=2050, month=1, day=1)
|
||||
assert not cookie.http_only
|
||||
assert cookie.max_age is None
|
||||
assert cookie.path is None
|
||||
assert not cookie.secure
|
||||
|
||||
self.assertEqual('no', req.cookies['logged_in'])
|
||||
self.assertEqual('Europe/Berlin', req.cookies['tz'])
|
||||
self.assertEqual('GH1.1.201722077.1422308165', req.cookies['_octo'])
|
||||
|
||||
self.assertIn('logged_in', req.cookies)
|
||||
self.assertIn('_gh_sess', req.cookies)
|
||||
self.assertIn('tz', req.cookies)
|
||||
self.assertIn('_ga', req.cookies)
|
||||
self.assertIn('_gat', req.cookies)
|
||||
self.assertIn('_octo', req.cookies)
|
||||
def test_cookie_expires_aware(client):
|
||||
result = client.simulate_put('/')
|
||||
|
||||
def test_unicode_inside_ascii_range(self):
|
||||
resp = falcon.Response()
|
||||
cookie = result.cookies['foo']
|
||||
assert cookie.value == 'bar'
|
||||
assert cookie.domain is None
|
||||
assert cookie.expires == datetime(year=2049, month=12, day=31, hour=23)
|
||||
assert not cookie.http_only
|
||||
assert cookie.max_age is None
|
||||
assert cookie.path is None
|
||||
assert not cookie.secure
|
||||
|
||||
# should be ok
|
||||
resp.set_cookie('non_unicode_ascii_name_1', 'ascii_value')
|
||||
resp.set_cookie(u'unicode_ascii_name_1', 'ascii_value')
|
||||
resp.set_cookie('non_unicode_ascii_name_2', u'unicode_ascii_value')
|
||||
resp.set_cookie(u'unicode_ascii_name_2', u'unicode_ascii_value')
|
||||
|
||||
@ddt.data(
|
||||
def test_cookies_setable(client):
|
||||
resp = falcon.Response()
|
||||
|
||||
assert resp._cookies is None
|
||||
|
||||
resp.set_cookie('foo', 'wrong-cookie', max_age=301)
|
||||
resp.set_cookie('foo', 'bar', max_age=300)
|
||||
morsel = resp._cookies['foo']
|
||||
|
||||
assert isinstance(morsel, Morsel)
|
||||
assert morsel.key == 'foo'
|
||||
assert morsel.value == 'bar'
|
||||
assert morsel['max-age'] == 300
|
||||
|
||||
|
||||
@pytest.mark.parametrize('cookie_name', ('foofloat', 'foostring'))
|
||||
def test_cookie_max_age_float_and_string(client, cookie_name):
|
||||
# NOTE(tbug): Falcon implicitly converts max-age values to integers,
|
||||
# to ensure RFC 6265-compliance of the attribute value.
|
||||
|
||||
result = client.simulate_get('/test-convert')
|
||||
|
||||
cookie = result.cookies[cookie_name]
|
||||
assert cookie.value == 'bar'
|
||||
assert cookie.domain is None
|
||||
assert cookie.expires is None
|
||||
assert not cookie.http_only
|
||||
assert cookie.max_age == 15
|
||||
assert cookie.path is None
|
||||
assert not cookie.secure
|
||||
|
||||
|
||||
def test_response_unset_cookie(client):
|
||||
resp = falcon.Response()
|
||||
resp.unset_cookie('bad')
|
||||
resp.set_cookie('bad', 'cookie', max_age=300)
|
||||
resp.unset_cookie('bad')
|
||||
|
||||
morsels = list(resp._cookies.values())
|
||||
len(morsels) == 1
|
||||
|
||||
bad_cookie = morsels[0]
|
||||
bad_cookie['expires'] == -1
|
||||
|
||||
output = bad_cookie.OutputString()
|
||||
assert 'bad=;' in output or 'bad="";' in output
|
||||
|
||||
match = re.search('expires=([^;]+)', output)
|
||||
assert match
|
||||
|
||||
expiration = http_date_to_dt(match.group(1), obs_date=True)
|
||||
assert expiration < datetime.utcnow()
|
||||
|
||||
|
||||
def test_cookie_timezone(client):
|
||||
tz = TimezoneGMT()
|
||||
assert tz.tzname(timedelta(0)) == 'GMT'
|
||||
|
||||
|
||||
# =====================================================================
|
||||
# Request
|
||||
# =====================================================================
|
||||
|
||||
|
||||
def test_request_cookie_parsing():
|
||||
# testing with a github-ish set of cookies
|
||||
headers = [
|
||||
(
|
||||
'Cookie',
|
||||
'''
|
||||
logged_in=no;_gh_sess=eyJzZXXzaW9uX2lkIjoiN2;
|
||||
tz=Europe/Berlin; _ga=GA1.2.332347814.1422308165;
|
||||
_gat=1;
|
||||
_octo=GH1.1.201722077.1422308165
|
||||
'''
|
||||
),
|
||||
]
|
||||
|
||||
environ = testing.create_environ(headers=headers)
|
||||
req = falcon.Request(environ)
|
||||
|
||||
assert req.cookies['logged_in'] == 'no'
|
||||
assert req.cookies['tz'] == 'Europe/Berlin'
|
||||
assert req.cookies['_octo'] == 'GH1.1.201722077.1422308165'
|
||||
|
||||
assert 'logged_in' in req.cookies
|
||||
assert '_gh_sess' in req.cookies
|
||||
assert 'tz' in req.cookies
|
||||
assert '_ga' in req.cookies
|
||||
assert '_gat' in req.cookies
|
||||
assert '_octo' in req.cookies
|
||||
|
||||
|
||||
def test_unicode_inside_ascii_range():
|
||||
resp = falcon.Response()
|
||||
|
||||
# should be ok
|
||||
resp.set_cookie('non_unicode_ascii_name_1', 'ascii_value')
|
||||
resp.set_cookie(u'unicode_ascii_name_1', 'ascii_value')
|
||||
resp.set_cookie('non_unicode_ascii_name_2', u'unicode_ascii_value')
|
||||
resp.set_cookie(u'unicode_ascii_name_2', u'unicode_ascii_value')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'name',
|
||||
(
|
||||
UNICODE_TEST_STRING,
|
||||
UNICODE_TEST_STRING.encode('utf-8'),
|
||||
42
|
||||
)
|
||||
def test_non_ascii_name(self, name):
|
||||
resp = falcon.Response()
|
||||
self.assertRaises(KeyError, resp.set_cookie,
|
||||
name, 'ok_value')
|
||||
)
|
||||
def test_non_ascii_name(name):
|
||||
resp = falcon.Response()
|
||||
with pytest.raises(KeyError):
|
||||
resp.set_cookie(name, 'ok_value')
|
||||
|
||||
@ddt.data(
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'value',
|
||||
(
|
||||
UNICODE_TEST_STRING,
|
||||
UNICODE_TEST_STRING.encode('utf-8'),
|
||||
42
|
||||
)
|
||||
def test_non_ascii_value(self, value):
|
||||
resp = falcon.Response()
|
||||
)
|
||||
def test_non_ascii_value(value):
|
||||
resp = falcon.Response()
|
||||
|
||||
# NOTE(tbug): we need to grab the exception to check
|
||||
# that it is not instance of UnicodeEncodeError, so
|
||||
# we cannot simply use assertRaises
|
||||
try:
|
||||
resp.set_cookie('ok_name', value)
|
||||
except ValueError as e:
|
||||
self.assertIsInstance(e, ValueError)
|
||||
self.assertNotIsInstance(e, UnicodeEncodeError)
|
||||
else:
|
||||
self.fail('set_bad_cookie_value did not fail as expected')
|
||||
# NOTE(tbug): we need to grab the exception to check
|
||||
# that it is not instance of UnicodeEncodeError, so
|
||||
# we cannot simply use pytest.raises
|
||||
try:
|
||||
resp.set_cookie('ok_name', value)
|
||||
except ValueError as e:
|
||||
assert isinstance(e, ValueError)
|
||||
assert not isinstance(e, UnicodeEncodeError)
|
||||
else:
|
||||
pytest.fail('set_bad_cookie_value did not fail as expected')
|
||||
|
|
Loading…
Reference in New Issue