Merge "Increase unit test coverage for Common WSGI."

This commit is contained in:
Jenkins 2017-01-17 01:10:18 +00:00 committed by Gerrit Code Review
commit 1e60635123
2 changed files with 316 additions and 61 deletions

View File

@ -461,7 +461,6 @@ class Resource(object):
method = getattr(obj, action)
except AttributeError:
method = getattr(obj, 'default')
return method(*args, **kwargs)
def get_action_args(self, request_environment):
@ -981,13 +980,13 @@ class JSONPatchDeserializer(TextDeserializer):
msg = _('Pointer `%s` contains adjacent "/".') % pointer
raise webob.exc.HTTPBadRequest(explanation=msg)
if len(pointer) > 1 and pointer.endswith('/'):
msg = _('Pointer `%s` end with "/".') % pointer
msg = _('Pointer `%s` ends with "/".') % pointer
raise webob.exc.HTTPBadRequest(explanation=msg)
if pointer[1:].strip() == '/':
msg = _('Pointer `%s` does not contains valid token.') % pointer
msg = _('Pointer `%s` does not contain a valid token.') % pointer
raise webob.exc.HTTPBadRequest(explanation=msg)
if re.search('~[^01]', pointer) or pointer.endswith('~'):
msg = _('Pointer `%s` contains "~" not part of'
msg = _('Pointer `%s` contains "~", which is not part of'
' a recognized escape sequence.') % pointer
raise webob.exc.HTTPBadRequest(explanation=msg)
@ -1108,11 +1107,11 @@ class FormDataDeserializer(TextDeserializer):
def _from_json(self, datastring):
value = datastring
try:
LOG.debug("Trying deserialize '{data}' to json".format(
LOG.debug("Trying to deserialize '{data}' to json".format(
data=datastring))
value = jsonutils.loads(datastring)
except ValueError:
LOG.warning(_LW("Unable deserialize to json, using raw text"))
LOG.warning(_LW("Unable to deserialize to json, using raw text"))
return value
def default(self, request):

View File

@ -23,18 +23,88 @@ from oslo_config import cfg
from xml.dom import minidom
from murano.common import exceptions
from murano.common.i18n import _, _LW
from murano.common import wsgi
from murano.tests.unit import base
CONF = cfg.CONF
class WsgiTest(base.MuranoTestCase):
class TestServiceBrokerResponseSerializer(base.MuranoTestCase):
def test_service_broker_response_serializer(self):
self.sbrs = wsgi.ServiceBrokerResponseSerializer()
result = self.sbrs.serialize("test_response", "application/json")
self.assertEqual(200, result.status_code)
response = webob.Response("test_body")
response.data = "test_data"
result = self.sbrs.serialize(response, "application/json")
self.assertEqual(200, result.status_code)
class TestResponseSerializer(base.MuranoTestCase):
def test_response_serializer(self):
self.response_serializer = wsgi.ResponseSerializer(None)
self.assertRaises(exceptions.UnsupportedContentType,
self.response_serializer.get_body_serializer,
"not_valid")
class TestRequestDeserializer(base.MuranoTestCase):
def test_request_deserializer_deserialize_body(self):
self.request_deserializer = wsgi.RequestDeserializer()
request = mock.Mock()
request.get_content_type.side_effect = (
exceptions.UnsupportedContentType
)
request.body = "body"
self.assertRaises(exceptions.UnsupportedContentType,
self.request_deserializer.deserialize_body,
request, "act")
request.get_content_type.side_effect = None
request.get_content_type.return_value = None
result = self.request_deserializer.deserialize_body(request, "act")
self.assertEqual({}, result)
request.get_content_type.return_value = ""
self.assertRaises(exceptions.UnsupportedContentType,
self.request_deserializer.deserialize_body,
request, "act")
def test_request_deserializer_get_action_args(self):
self.request_deserializer = wsgi.RequestDeserializer()
request_environment = []
result = self.request_deserializer.get_action_args(request_environment)
self.assertEqual({}, result)
request_environment = {'wsgiorg.routing_args': ["", {}]}
result = self.request_deserializer.get_action_args(request_environment)
self.assertEqual({}, result)
class TestService(base.MuranoTestCase):
def setUp(self):
super(TestService, self).setUp()
# Greatly speed up the time it takes to run the tests that call
# wsgi._get_socket below: retry_until will be set to 31, the 1st
# iteration of the while loop will execute because it will evaluate to
# 30 < 31, but 2nd one will not because it will evaluate to 31 < 31.
self.mock_time = mock.patch.object(
wsgi, 'time', **{'time.side_effect': [1, 30, 31]}).start()
self.addCleanup(mock.patch.stopall)
@mock.patch("murano.common.wsgi.socket")
def test_service_get_socket(self, socket):
self.service = wsgi.Service(None, 1)
new_socket = self.service._get_socket(None, None, None)
self.assertIsInstance(new_socket, eventlet.greenio.base.GreenSocket)
self.mock_time.time.side_effect = [1, 30, 31]
socket.TCP_KEEPIDLE = True
new_socket_2 = self.service._get_socket(None, None, None)
self.assertIsInstance(new_socket_2, eventlet.greenio.base.GreenSocket)
@ -54,31 +124,23 @@ class WsgiTest(base.MuranoTestCase):
self.assertRaises(RuntimeError, self.service._get_socket,
None, None, None)
@mock.patch("murano.common.wsgi.time")
@mock.patch("murano.common.wsgi.socket")
@mock.patch("murano.common.wsgi.eventlet")
def test_service_get_socket_os_error(self, eventlet, mock_socket, time):
def test_service_get_socket_os_error(self, eventlet, mock_socket):
mock_socket.error = socket.error
self.service = wsgi.Service(None, 1)
sock_err = socket.error(1)
eventlet.listen.side_effect = sock_err
# Speed up unit test in case of RuntimeError.
time.time.side_effect = [1, 30, 31]
self.assertRaises(socket.error, self.service._get_socket,
None, None, None)
@mock.patch("murano.common.wsgi.time")
@mock.patch("murano.common.wsgi.socket")
@mock.patch("murano.common.wsgi.eventlet")
def test_service_get_socket_socket_error_EADDRINUSE(self, eventlet,
mock_socket, time):
mock_socket):
mock_socket.error = socket.error
self.service = wsgi.Service(None, 1)
sock_err = socket.error(errno.EADDRINUSE)
# Speed up unit test in case of RuntimeError.
time.time.side_effect = [1, 30, 31]
eventlet.listen.side_effect = sock_err
self.assertRaises(RuntimeError, self.service._get_socket,
None, None, None)
@ -116,12 +178,32 @@ class WsgiTest(base.MuranoTestCase):
self.service._run(None, None)
self.assertTrue(eventlet.wsgi.server.called)
def test_backlog_prop(self):
service = wsgi.Service(None, None)
service._backlog = mock.sentinel.backlog
self.assertEqual(mock.sentinel.backlog, service.backlog)
class TestMiddleware(base.MuranoTestCase):
def test_middleware_call(self):
self.middleware = wsgi.Middleware(None)
mock_request = mock.Mock()
mock_request.get_response.return_value = "a response"
self.assertEqual("a response", self.middleware(mock_request))
def test_call_with_response(self):
middleware = wsgi.Middleware(None)
middleware.process_request = mock.Mock(return_value=mock.sentinel.resp)
resp = middleware(mock.sentinel.req)
self.assertEqual(mock.sentinel.resp, resp)
middleware.process_request.assert_called_once_with(mock.sentinel.req)
class TestDebug(base.MuranoTestCase):
def test_debug_call(self):
self.debug = wsgi.Debug(None)
mock_request = mock.Mock()
@ -133,12 +215,33 @@ class WsgiTest(base.MuranoTestCase):
self.assertEqual(mock_response, self.debug(mock_request))
def test_router_dispatch_not_found(self):
self.router = wsgi.Router(None)
req = mock.Mock()
req.environ = {'wsgiorg.routing_args': [False, False]}
response = self.router._dispatch(req)
self.assertIsInstance(response, webob.exc.HTTPNotFound)
@mock.patch('sys.stdout')
def test_print_generator(self, mock_stdout):
for x in wsgi.Debug.print_generator(['foo', 'bar', 'baz']):
pass
mock_stdout.write.assert_has_calls([
mock.call('**************************************** BODY'),
mock.call('\n'),
mock.call('foo'),
mock.call('bar'),
mock.call('baz'),
mock.call(''),
mock.call('\n')
])
class TestRequest(base.MuranoTestCase):
@mock.patch.object(wsgi.webob.BaseRequest, 'path',
**{'rsplit.return_value': ['foo.bar', 'baz']})
def test_best_match_content_type_with_multi_part_path(self, mock_path):
request = wsgi.Request({})
supported_content_types = ['application/baz']
result = request.best_match_content_type(None, supported_content_types)
self.assertEqual('application/baz', result)
class TestResource(base.MuranoTestCase):
def test_resource_call_exceptions(self):
self.resource = wsgi.Resource(None)
@ -175,6 +278,41 @@ class WsgiTest(base.MuranoTestCase):
result = self.resource.get_action_args(request_environment)
self.assertEqual({"k": "v"}, result)
def test_dispatch_except_attribute_error(self):
mock_obj = mock.Mock(spec=wsgi.Resource)
setattr(mock_obj, 'default',
mock.Mock(return_value=mock.sentinel.ret_value))
resource = wsgi.Resource(None)
result = resource.dispatch(mock_obj, 'invalid_action')
self.assertEqual(mock.sentinel.ret_value, result)
mock_obj.default.assert_called_once_with()
class TestActionDispatcher(base.MuranoTestCase):
def test_default(self):
action_dispatcher = wsgi.ActionDispatcher()
self.assertRaises(NotImplementedError, action_dispatcher.default, None)
class TestDictSerializer(base.MuranoTestCase):
def test_default(self):
dict_serializer = wsgi.DictSerializer()
self.assertEqual("", dict_serializer.default(None))
class TestXMLDictSerializer(base.MuranoTestCase):
def test_router_dispatch_not_found(self):
self.router = wsgi.Router(None)
req = mock.Mock()
req.environ = {'wsgiorg.routing_args': [False, False]}
response = self.router._dispatch(req)
self.assertIsInstance(response, webob.exc.HTTPNotFound)
def test_XML_Dict_Serializer_default_string(self):
self.serializer = wsgi.XMLDictSerializer()
actual_xml = self.serializer.default({"XML Root": "some data"})
@ -237,52 +375,100 @@ class WsgiTest(base.MuranoTestCase):
self.assertEqual('<atom:link href="href" rel="rel" type="type"/>',
link_nodes[0].toxml())
def test_service_broker_response_serializer(self):
self.sbrs = wsgi.ServiceBrokerResponseSerializer()
result = self.sbrs.serialize("test_response", "application/json")
self.assertEqual(200, result.status_code)
def test_add_xmlns(self):
xml_dict_serializer = wsgi.XMLDictSerializer()
xml_dict_serializer.xmlns = mock.sentinel.xmlns
mock_node = mock.Mock()
response = webob.Response("test_body")
response.data = "test_data"
result = self.sbrs.serialize(response, "application/json")
self.assertEqual(200, result.status_code)
xml_dict_serializer._add_xmlns(mock_node, has_atom=False)
mock_node.setAttribute.assert_called_once_with(
'xmlns', mock.sentinel.xmlns)
def test_response_serializer(self):
self.response_serializer = wsgi.ResponseSerializer(None)
self.assertRaises(exceptions.UnsupportedContentType,
self.response_serializer.get_body_serializer,
"not_valid")
mock_node.reset_mock()
xml_dict_serializer._add_xmlns(mock_node, has_atom=True)
mock_node.setAttribute.assert_has_calls([
mock.call('xmlns', mock.sentinel.xmlns),
mock.call('xmlns:atom', "http://www.w3.org/2005/Atom")
])
def test_request_deserializer_deserialize_body(self):
self.request_deserializer = wsgi.RequestDeserializer()
request = mock.Mock()
request.get_content_type.side_effect = (
exceptions.UnsupportedContentType
class TestTextDeserializer(base.MuranoTestCase):
def test_default(self):
text_deserializer = wsgi.TextDeserializer()
self.assertEqual({}, text_deserializer.default(None))
class TestJSONDeserializer(base.MuranoTestCase):
def test_from_json_except_value_error(self):
json_deserializer = wsgi.JSONDeserializer()
e = self.assertRaises(exceptions.MalformedRequestBody,
json_deserializer._from_json,
"throw value error")
self.assertIn('cannot understand JSON', str(e))
class TestJSONPatchDeserializer(base.MuranoTestCase):
def test_from_json_patch_except_value_error(self):
json_patch_deserializer = wsgi.JSONPatchDeserializer()
e = self.assertRaises(exceptions.MalformedRequestBody,
json_patch_deserializer._from_json_patch,
"throw value error")
self.assertIn('cannot understand JSON', str(e))
def test_validate_allowed_methods_except_http_forbidden(self):
json_patch_deserializer = wsgi.JSONPatchDeserializer()
json_patch_deserializer.allowed_operations = mock.Mock(
spec_set=wsgi.JSONPatchDeserializer.allowed_operations,
**{'get.return_value': None}
)
request.body = "body"
self.assertRaises(exceptions.UnsupportedContentType,
self.request_deserializer.deserialize_body,
request, "act")
request.get_content_type.side_effect = None
request.get_content_type.return_value = None
result = self.request_deserializer.deserialize_body(request, "act")
self.assertEqual({}, result)
e = self.assertRaises(
webob.exc.HTTPForbidden,
json_patch_deserializer._validate_allowed_methods,
{'op': 'foo_op', 'path': 'foo_path'}, allow_unknown_path=False)
self.assertEqual("Attribute 'f/o/o/_/p/a/t/h' is invalid", str(e))
request.get_content_type.return_value = ""
self.assertRaises(exceptions.UnsupportedContentType,
self.request_deserializer.deserialize_body,
request, "act")
def test_validate_json_pointer(self):
json_patch_deserializer = wsgi.JSONPatchDeserializer()
expected_exception = webob.exc.HTTPBadRequest
bad_pointers = ['pointer', '/ /\n/', '//', '// ', '/~2']
def test_request_deserializer_get_action_args(self):
self.request_deserializer = wsgi.RequestDeserializer()
request_environment = []
result = self.request_deserializer.get_action_args(request_environment)
self.assertEqual({}, result)
for pointer in bad_pointers:
self.assertRaises(expected_exception,
json_patch_deserializer._validate_json_pointer,
pointer)
request_environment = {'wsgiorg.routing_args': ["", {}]}
result = self.request_deserializer.get_action_args(request_environment)
self.assertEqual({}, result)
@mock.patch.object(wsgi, 'jsonschema', autospec=True)
def test_validate_schema_cannot_validate(self, mock_jsonschema):
json_patch_deserializer = wsgi.JSONPatchDeserializer()
json_patch_deserializer.schema = {'type': 'array'}
test_change = {'path': ['foo_path'], 'value': 'foo_value'}
# Assert that jsonschema validation was not performed because
# can_validate = False.
json_patch_deserializer._validate_schema(test_change)
self.assertFalse(mock_jsonschema.validate.called)
mock_jsonschema.validate.reset_mock()
json_patch_deserializer = wsgi.JSONPatchDeserializer()
json_patch_deserializer.schema = {'type': 'object'}
json_patch_deserializer._validate_schema(test_change)
self.assertFalse(mock_jsonschema.validate.called)
class TestMuranoPackageJSONPatchDeserializer(base.MuranoTestCase):
def test_validate_path(self):
patch_deserializer = wsgi.MuranoPackageJSONPatchDeserializer()
e = self.assertRaises(webob.exc.HTTPBadRequest,
patch_deserializer._validate_path,
['foo_path', 'bar_path'])
self.assertEqual(_('Nested paths are not allowed'), str(e))
class TestXMLDeserializer(base.MuranoTestCase):
def test_XMLDeserializer_from_xml(self):
self.deserializer = wsgi.XMLDeserializer()
@ -349,3 +535,73 @@ class WsgiTest(base.MuranoTestCase):
mock_node.childNodes = []
result = self.deserializer.extract_text(mock_node)
self.assertEqual("", result)
def test_from_xml_node_all_branches(self):
mock_grandchild_node = mock.MagicMock(**{
'nodeName': 'qux',
'nodeType': mock.sentinel.grandchild_node_type,
'childNodes': [
mock.Mock(nodeType=3, nodeValue=mock.sentinel.qux_node)
]
})
mock_child_node = mock.MagicMock(**{
'nodeName': mock.sentinel.child_node,
'attributes.keys.return_value': ['foo', 'bar', 'baz'],
'attributes.__getitem__.side_effect': [
mock.Mock(nodeValue=mock.sentinel.foo_node),
mock.Mock(nodeValue=mock.sentinel.bar_node),
mock.Mock(nodeValue=mock.sentinel.baz_node)
],
'childNodes': [
mock_grandchild_node
],
'TEXT_NODE': mock.sentinel.child_node_type
})
mock_node = mock.Mock(**{
'nodeName': mock.sentinel.parent_node,
'childNodes': [
mock_child_node
]
})
listnames = [mock.sentinel.parent_node]
xml_deserializer = wsgi.XMLDeserializer()
result = xml_deserializer._from_xml_node(mock_node, listnames)
expected_result = [{
'foo': mock.sentinel.foo_node,
'bar': mock.sentinel.bar_node,
'baz': mock.sentinel.baz_node,
'qux': mock.sentinel.qux_node
}]
self.assertEqual(1, len(result))
self.assertEqual(len(expected_result[0]), len(result[0]))
for key, val in expected_result[0].items():
self.assertEqual(val, result[0][key])
def test_find_children_named(self):
child_node = mock.Mock(nodeName=mock.sentinel.node_name)
parent_node = mock.Mock(childNodes=[child_node])
result = []
xml_deserializer = wsgi.XMLDeserializer()
for n in xml_deserializer.\
find_children_named(parent_node, mock.sentinel.node_name):
result.append(n)
self.assertEqual(1, len(result))
self.assertEqual(child_node, result[0])
class TestFormDataDeserializer(base.MuranoTestCase):
@mock.patch.object(wsgi, 'LOG', autospec=True)
def test_from_json_except_value_error(self, mock_log):
data_serializer = wsgi.FormDataDeserializer()
value = data_serializer._from_json('value error')
self.assertEqual('value error', value)
mock_log.debug.assert_called_once_with(
"Trying to deserialize 'value error' to json")
mock_log.warning.assert_called_once_with(
_LW('Unable to deserialize to json, using raw text'))