# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import collections import datetime import ipaddress import json import mock import netaddr from oslo_i18n import fixture from oslotest import base as test_base import six import six.moves.xmlrpc_client as xmlrpclib from oslo_serialization import jsonutils class JSONUtilsTestMixin(object): json_impl = None def setUp(self): super(JSONUtilsTestMixin, self).setUp() self.json_patcher = mock.patch.multiple( jsonutils, json=self.json_impl, ) self.json_impl_mock = self.json_patcher.start() def tearDown(self): self.json_patcher.stop() super(JSONUtilsTestMixin, self).tearDown() def test_dumps(self): self.assertEqual('{"a": "b"}', jsonutils.dumps({'a': 'b'})) def test_dump_as_bytes(self): self.assertEqual(b'{"a": "b"}', jsonutils.dump_as_bytes({'a': 'b'})) def test_dumps_namedtuple(self): n = collections.namedtuple("foo", "bar baz")(1, 2) self.assertEqual('[1, 2]', jsonutils.dumps(n)) def test_dump(self): expected = '{"a": "b"}' json_dict = {'a': 'b'} fp = six.StringIO() jsonutils.dump(json_dict, fp) self.assertEqual(expected, fp.getvalue()) def test_dump_namedtuple(self): expected = '[1, 2]' json_dict = collections.namedtuple("foo", "bar baz")(1, 2) fp = six.StringIO() jsonutils.dump(json_dict, fp) self.assertEqual(expected, fp.getvalue()) def test_loads(self): self.assertEqual({'a': 'b'}, jsonutils.loads('{"a": "b"}')) def test_loads_unicode(self): self.assertIsInstance(jsonutils.loads(b'"foo"'), six.text_type) self.assertIsInstance(jsonutils.loads(u'"foo"'), six.text_type) # 'test' in Ukrainian i18n_str_unicode = u'"\u0442\u0435\u0441\u0442"' self.assertIsInstance(jsonutils.loads(i18n_str_unicode), six.text_type) i18n_str = i18n_str_unicode.encode('utf-8') self.assertIsInstance(jsonutils.loads(i18n_str), six.text_type) def test_loads_with_kwargs(self): jsontext = u'{"foo": 3}' result = jsonutils.loads(jsontext, parse_int=lambda x: 5) self.assertEqual(5, result['foo']) def test_load(self): jsontext = u'{"a": "\u0442\u044d\u0441\u0442"}' expected = {u'a': u'\u0442\u044d\u0441\u0442'} for encoding in ('utf-8', 'cp1251'): fp = six.BytesIO(jsontext.encode(encoding)) result = jsonutils.load(fp, encoding=encoding) self.assertEqual(expected, result) for key, val in result.items(): self.assertIsInstance(key, six.text_type) self.assertIsInstance(val, six.text_type) class JSONUtilsTestJson(JSONUtilsTestMixin, test_base.BaseTestCase): json_impl = json class ToPrimitiveTestCase(test_base.BaseTestCase): def setUp(self): super(ToPrimitiveTestCase, self).setUp() self.trans_fixture = self.useFixture(fixture.Translation()) def test_list(self): self.assertEqual(jsonutils.to_primitive([1, 2, 3]), [1, 2, 3]) def test_empty_list(self): self.assertEqual(jsonutils.to_primitive([]), []) def test_tuple(self): self.assertEqual(jsonutils.to_primitive((1, 2, 3)), [1, 2, 3]) def test_dict(self): self.assertEqual(jsonutils.to_primitive(dict(a=1, b=2, c=3)), dict(a=1, b=2, c=3)) def test_empty_dict(self): self.assertEqual(jsonutils.to_primitive({}), {}) def test_datetime(self): x = datetime.datetime(1920, 2, 3, 4, 5, 6, 7) self.assertEqual(jsonutils.to_primitive(x), '1920-02-03T04:05:06.000007') def test_datetime_preserve(self): x = datetime.datetime(1920, 2, 3, 4, 5, 6, 7) self.assertEqual(jsonutils.to_primitive(x, convert_datetime=False), x) def test_DateTime(self): x = xmlrpclib.DateTime() x.decode("19710203T04:05:06") self.assertEqual(jsonutils.to_primitive(x), '1971-02-03T04:05:06.000000') def test_iter(self): class IterClass(object): def __init__(self): self.data = [1, 2, 3, 4, 5] self.index = 0 def __iter__(self): return self def next(self): if self.index == len(self.data): raise StopIteration self.index = self.index + 1 return self.data[self.index - 1] __next__ = next x = IterClass() self.assertEqual(jsonutils.to_primitive(x), [1, 2, 3, 4, 5]) def test_iteritems(self): class IterItemsClass(object): def __init__(self): self.data = dict(a=1, b=2, c=3).items() self.index = 0 def iteritems(self): return self.data x = IterItemsClass() p = jsonutils.to_primitive(x) self.assertEqual(p, {'a': 1, 'b': 2, 'c': 3}) def test_iteritems_with_cycle(self): class IterItemsClass(object): def __init__(self): self.data = dict(a=1, b=2, c=3) self.index = 0 def iteritems(self): return self.data.items() x = IterItemsClass() x2 = IterItemsClass() x.data['other'] = x2 x2.data['other'] = x # If the cycle isn't caught, to_primitive() will eventually result in # an exception due to excessive recursion depth. jsonutils.to_primitive(x) def test_items(self): # Use items() when iteritems() is not available. class ItemsClass(object): def __init__(self): self.data = dict(a=1, b=2, c=3) def items(self): return self.data.items() x = ItemsClass() p = jsonutils.to_primitive(x) self.assertEqual(p, {'a': 1, 'b': 2, 'c': 3}) def test_precedence_items_iteritems(self): class ItemsIterItemsClass(object): def items(self): return {'items': 'items'} def iteritems(self): return {'iteritems': 'iteritems'} x = ItemsIterItemsClass() p = jsonutils.to_primitive(x) # Prefer iteritems over items self.assertEqual(p, {'iteritems': 'iteritems'}) def test_mapping(self): # Make sure collections.Mapping is converted to a dict # and not a list. class MappingClass(collections.Mapping): def __init__(self): self.data = dict(a=1, b=2, c=3) def __getitem__(self, val): return self.data[val] def __iter__(self): return iter(self.data) def __len__(self): return len(self.data) x = MappingClass() p = jsonutils.to_primitive(x) self.assertEqual(p, {'a': 1, 'b': 2, 'c': 3}) def test_instance(self): class MysteryClass(object): a = 10 def __init__(self): self.b = 1 x = MysteryClass() self.assertEqual(jsonutils.to_primitive(x, convert_instances=True), dict(b=1)) self.assertEqual(jsonutils.to_primitive(x), x) def test_typeerror(self): x = bytearray # Class, not instance if six.PY3: self.assertEqual(jsonutils.to_primitive(x), u"") else: self.assertEqual(jsonutils.to_primitive(x), u"") def test_nasties(self): def foo(): pass x = [datetime, foo, dir] ret = jsonutils.to_primitive(x) self.assertEqual(len(ret), 3) self.assertTrue(ret[0].startswith(u".foo at 0x' )) else: self.assertTrue(ret[1].startswith('') def test_depth(self): class LevelsGenerator(object): def __init__(self, levels): self._levels = levels def iteritems(self): if self._levels == 0: return iter([]) else: return iter([(0, LevelsGenerator(self._levels - 1))]) l4_obj = LevelsGenerator(4) json_l2 = {0: {0: '?'}} json_l3 = {0: {0: {0: '?'}}} json_l4 = {0: {0: {0: {0: '?'}}}} ret = jsonutils.to_primitive(l4_obj, max_depth=2) self.assertEqual(ret, json_l2) ret = jsonutils.to_primitive(l4_obj, max_depth=3) self.assertEqual(ret, json_l3) ret = jsonutils.to_primitive(l4_obj, max_depth=4) self.assertEqual(ret, json_l4) def test_ipaddr_using_netaddr(self): thing = {'ip_addr': netaddr.IPAddress('1.2.3.4')} ret = jsonutils.to_primitive(thing) self.assertEqual({'ip_addr': '1.2.3.4'}, ret) def test_ipaddr_using_ipaddress_v4(self): thing = {'ip_addr': ipaddress.ip_address(u'192.168.0.1')} ret = jsonutils.to_primitive(thing) self.assertEqual({'ip_addr': '192.168.0.1'}, ret) def test_ipaddr_using_ipaddress_v6(self): thing = {'ip_addr': ipaddress.ip_address(u'2001:db8::')} ret = jsonutils.to_primitive(thing) self.assertEqual({'ip_addr': '2001:db8::'}, ret) def test_message_with_param(self): msg = self.trans_fixture.lazy('A message with param: %s') msg = msg % 'test_domain' ret = jsonutils.to_primitive(msg) self.assertEqual(msg, ret) def test_message_with_named_param(self): msg = self.trans_fixture.lazy('A message with params: %(param)s') msg = msg % {'param': 'hello'} ret = jsonutils.to_primitive(msg) self.assertEqual(msg, ret)