
248 lines
9.8 KiB

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json as jsonutils
from requests.adapters import HTTPAdapter
from requests.cookies import MockRequest, MockResponse
from requests.cookies import RequestsCookieJar
from requests.cookies import merge_cookies, cookiejar_from_dict
from requests.packages.urllib3.response import HTTPResponse
import six
from requests_mock import compat
from requests_mock import exceptions
_BODY_ARGS = frozenset(['raw', 'body', 'content', 'text', 'json'])
_HTTP_ARGS = frozenset(['status_code', 'reason', 'headers', 'cookies'])
_http_adapter = HTTPAdapter()
class CookieJar(RequestsCookieJar):
def set(self, name, value, **kwargs):
"""Add a cookie to the Jar.
:param str name: cookie name/key.
:param str value: cookie value.
:param int version: Integer or None. Netscape cookies have version 0.
RFC 2965 and RFC 2109 cookies have a version cookie-attribute of 1.
However, note that cookielib may 'downgrade' RFC 2109 cookies to
Netscape cookies, in which case version is 0.
:param str port: String representing a port or a set of ports
(eg. '80', or '80,8080'),
:param str domain: The domain the cookie should apply to.
:param str path: Cookie path (a string, eg. '/acme/rocket_launchers').
:param bool secure: True if cookie should only be returned over a
secure connection.
:param int expires: Integer expiry date in seconds since epoch or None.
:param bool discard: True if this is a session cookie.
:param str comment: String comment from the server explaining the
function of this cookie.
:param str comment_url: URL linking to a comment from the server
explaining the function of this cookie.
# just here to provide the function documentation
return super(CookieJar, self).set(name, value, **kwargs)
def _check_body_arguments(**kwargs):
# mutual exclusion, only 1 body method may be provided
provided = [x for x in _BODY_ARGS if kwargs.pop(x, None) is not None]
if len(provided) > 1:
raise RuntimeError('You may only supply one body element. You '
'supplied %s' % ', '.join(provided))
extra = [x for x in kwargs if x not in _HTTP_ARGS]
if extra:
raise TypeError('Too many arguments provided. Unexpected '
'arguments %s.' % ', '.join(extra))
class _FakeConnection(object):
"""An object that can mock the necessary parts of a socket interface."""
def send(self, request, **kwargs):
msg = 'This response was created without a connection. You are ' \
'therefore unable to make a request directly on that connection.'
raise exceptions.InvalidRequest(msg)
def close(self):
def _extract_cookies(request, response, cookies):
"""Add cookies to the response.
Cookies in requests are extracted from the headers in the original_response
httplib.HTTPMessage which we don't create so we have to do this step
# This will add cookies set manually via the Set-Cookie or Set-Cookie2
# header but this only allows 1 cookie to be set.
http_message = compat._FakeHTTPMessage(response.headers)
# This allows you to pass either a CookieJar or a dictionary to request_uri
# or directly to create_response. To allow more than one cookie to be set.
if cookies:
merge_cookies(response.cookies, cookies)
class _IOReader(six.BytesIO):
"""A reader that makes a BytesIO look like a HTTPResponse.
A HTTPResponse will return an empty string when you read from it after
the socket has been closed. A BytesIO will raise a ValueError. For
compatibility we want to do the same thing a HTTPResponse does.
def read(self, *args, **kwargs):
if self.closed:
return six.b('')
# not a new style object in python 2
return, *args, **kwargs)
def create_response(request, **kwargs):
:param int status_code: The status code to return upon a successful
match. Defaults to 200.
:param HTTPResponse raw: A HTTPResponse object to return upon a
successful match.
:param io.IOBase body: An IO object with a read() method that can
return a body on successful match.
:param bytes content: A byte string to return upon a successful match.
:param unicode text: A text string to return upon a successful match.
:param object json: A python object to be converted to a JSON string
and returned upon a successful match.
:param dict headers: A dictionary object containing headers that are
returned upon a successful match.
:param CookieJar cookies: A cookie jar with cookies to set on the
connection = kwargs.pop('connection', _FakeConnection())
raw = kwargs.pop('raw', None)
body = kwargs.pop('body', None)
content = kwargs.pop('content', None)
text = kwargs.pop('text', None)
json = kwargs.pop('json', None)
encoding = None
if content is not None and not isinstance(content, six.binary_type):
raise TypeError('Content should be binary data')
if text is not None and not isinstance(text, six.string_types):
raise TypeError('Text should be string data')
if json is not None:
text = jsonutils.dumps(json)
if text is not None:
encoding = 'utf-8'
content = text.encode(encoding)
if content is not None:
body = _IOReader(content)
if not raw:
raw = HTTPResponse(status=kwargs.get('status_code', _DEFAULT_STATUS),
headers=kwargs.get('headers', {}),
body=body or _IOReader(six.b('')),
response = _http_adapter.build_response(request, raw)
response.connection = connection
response.encoding = encoding
_extract_cookies(request, response, kwargs.get('cookies'))
return response
class _Context(object):
"""Stores the data being used to process a current URL match."""
def __init__(self, headers, status_code, reason, cookies):
self.headers = headers
self.status_code = status_code
self.reason = reason
self.cookies = cookies
class _MatcherResponse(object):
def __init__(self, **kwargs):
self._exc = kwargs.pop('exc', None)
# If the user is asking for an exception to be thrown then prevent them
# specifying any sort of body or status response as it won't be used.
# This may be protecting the user too much but can be removed later.
if self._exc and kwargs:
raise TypeError('Cannot provide other arguments with exc.')
self._params = kwargs
# whilst in general you shouldn't do type checking in python this
# makes sure we don't end up with differences between the way types
# are handled between python 2 and 3.
content = self._params.get('content')
text = self._params.get('text')
if content is not None and not (callable(content) or
isinstance(content, six.binary_type)):
raise TypeError('Content should be a callback or binary data')
if text is not None and not (callable(text) or
isinstance(text, six.string_types)):
raise TypeError('Text should be a callback or string data')
def get_response(self, request):
# if an error was requested then raise that instead of doing response
if self._exc:
raise self._exc
# If a cookie dict is passed convert it into a CookieJar so that the
# cookies object available in a callback context is always a jar.
cookies = self._params.get('cookies', CookieJar())
if isinstance(cookies, dict):
cookies = cookiejar_from_dict(cookies, CookieJar())
context = _Context(self._params.get('headers', {}).copy(),
self._params.get('status_code', _DEFAULT_STATUS),
# if a body element is a callback then execute it
def _call(f, *args, **kwargs):
return f(request, context, *args, **kwargs) if callable(f) else f
return create_response(request,