From 9bd4bbeb2d6871e60a2159ac0c8f86a52abc0a73 Mon Sep 17 00:00:00 2001 From: Gevorg Davoian Date: Mon, 11 Apr 2016 17:38:42 +0300 Subject: [PATCH] Add configurable serialization to pika This patch makes message serialization in the pika driver configurable. Change-Id: Ib86cb91110aa89663a54afdd9932d60f6f45f585 Depends-On: Idb12666255a990dfc8f8ff6b43e941b3481b9c1c --- oslo_messaging/_drivers/impl_pika.py | 8 +++ .../_drivers/pika_driver/pika_commons.py | 7 +++ .../_drivers/pika_driver/pika_engine.py | 4 ++ .../_drivers/pika_driver/pika_message.py | 49 +++++++++---------- oslo_messaging/opts.py | 4 +- .../tests/drivers/pika/test_message.py | 38 ++++++-------- 6 files changed, 60 insertions(+), 50 deletions(-) diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 7ad0744b3..812846801 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -50,6 +50,13 @@ pika_pool_opts = [ "staleness. Stale connections are closed on acquire.") ] +message_opts = [ + cfg.StrOpt('default_serializer_type', default='json', + choices=('json', 'msgpack'), + help="Default serialization mechanism for " + "serializing/deserializing outgoing/incoming messages") +] + notification_opts = [ cfg.BoolOpt('notification_persistence', default=False, help="Persist notification messages."), @@ -124,6 +131,7 @@ class PikaDriver(base.BaseDriver): conf.register_group(opt_group) conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group) conf.register_opts(pika_pool_opts, group=opt_group) + conf.register_opts(message_opts, group=opt_group) conf.register_opts(rpc_opts, group=opt_group) conf.register_opts(notification_opts, group=opt_group) diff --git a/oslo_messaging/_drivers/pika_driver/pika_commons.py b/oslo_messaging/_drivers/pika_driver/pika_commons.py index 0737043bd..f5e9086cf 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_commons.py +++ b/oslo_messaging/_drivers/pika_driver/pika_commons.py @@ -15,6 +15,8 @@ import select import socket +from oslo_serialization.serializer import json_serializer +from oslo_serialization.serializer import msgpack_serializer from oslo_utils import timeutils from pika import exceptions as pika_exceptions import six @@ -31,3 +33,8 @@ PIKA_CONNECTIVITY_ERRORS = ( EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start() + +MESSAGE_SERIALIZERS = { + 'application/json': json_serializer.JSONSerializer(), + 'application/msgpack': msgpack_serializer.MessagePackSerializer() +} diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 97b6792d2..b31751f4c 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -137,6 +137,10 @@ class PikaEngine(object): raise ValueError("notification_retry_delay should be non-negative " "integer") + self.default_content_type = ( + 'application/' + conf.oslo_messaging_pika.default_serializer_type + ) + def _init_if_needed(self): cur_pid = os.getpid() diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 2802bedb1..86ede6af1 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -20,7 +20,6 @@ import uuid from concurrent import futures from oslo_log import log as logging -from oslo_serialization import jsonutils from oslo_utils import importutils from oslo_utils import timeutils from pika import exceptions as pika_exceptions @@ -101,7 +100,6 @@ class PikaIncomingMessage(base.IncomingMessage): self._version = version self._content_type = properties.content_type - self._content_encoding = properties.content_encoding self.unique_id = properties.message_id self.expiration_time = ( @@ -109,15 +107,16 @@ class PikaIncomingMessage(base.IncomingMessage): time.time() + float(properties.expiration) / 1000 ) - if self._content_type != "application/json": + try: + serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[self._content_type] + except KeyError: raise NotImplementedError( - "Content-type['{}'] is not valid, " - "'application/json' only is supported.".format( + "Content-type['{}'] is not supported.".format( self._content_type ) ) - message_dict = jsonutils.loads(body, encoding=self._content_encoding) + message_dict = serializer.load_from_bytes(body) context_dict = {} @@ -190,7 +189,6 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage): reply_outgoing_message = RpcReplyPikaOutgoingMessage( self._pika_engine, self.msg_id, reply=reply, failure_info=failure, content_type=self._content_type, - content_encoding=self._content_encoding ) def on_exception(ex): @@ -297,8 +295,7 @@ class PikaOutgoingMessage(object): and send it """ - def __init__(self, pika_engine, message, context, - content_type="application/json", content_encoding="utf-8"): + def __init__(self, pika_engine, message, context, content_type=None): """Parse RabbitMQ message :param pika_engine: PikaEngine, shared object with configuration and @@ -306,19 +303,23 @@ class PikaOutgoingMessage(object): :param message: Dictionary, user's message fields :param context: Dictionary, request context's fields :param content_type: String, content-type header, defines serialization - mechanism - :param content_encoding: String, defines encoding for text data + mechanism, if None default content-type from pika_engine is used """ self._pika_engine = pika_engine - self._content_type = content_type - self._content_encoding = content_encoding + self._content_type = ( + content_type if content_type is not None else + self._pika_engine.default_content_type + ) - if self._content_type != "application/json": + try: + self._serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[ + self._content_type + ] + except KeyError: raise NotImplementedError( - "Content-type['{}'] is not valid, " - "'application/json' only is supported.".format( + "Content-type['{}'] is not supported.".format( self._content_type ) ) @@ -340,7 +341,6 @@ class PikaOutgoingMessage(object): msg['_$_' + key] = value props = pika_spec.BasicProperties( - content_encoding=self._content_encoding, content_type=self._content_type, headers={_VERSION_HEADER: _VERSION}, message_id=self.unique_id, @@ -447,8 +447,7 @@ class PikaOutgoingMessage(object): if confirm else self._pika_engine.connection_without_confirmation_pool) - body = jsonutils.dump_as_bytes(msg_dict, - encoding=self._content_encoding) + body = self._serializer.dump_as_bytes(msg_dict) LOG.debug( "Sending message:[body:%s; properties: %s] to target: " @@ -490,10 +489,9 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage): """PikaOutgoingMessage implementation for RPC messages. It adds possibility to wait and receive RPC reply """ - def __init__(self, pika_engine, message, context, - content_type="application/json", content_encoding="utf-8"): + def __init__(self, pika_engine, message, context, content_type=None): super(RpcPikaOutgoingMessage, self).__init__( - pika_engine, message, context, content_type, content_encoding + pika_engine, message, context, content_type ) self.msg_id = None self.reply_q = None @@ -549,7 +547,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): correlation_id AMQP property to link this reply with response """ def __init__(self, pika_engine, msg_id, reply=None, failure_info=None, - content_type="application/json", content_encoding="utf-8"): + content_type=None): """Initialize with reply information for sending :param pika_engine: PikaEngine, shared object with configuration and @@ -559,8 +557,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): :param failure_info: Tuple, should be a sys.exc_info() tuple. Should be None if RPC request was successfully processed. :param content_type: String, content-type header, defines serialization - mechanism - :param content_encoding: String, defines encoding for text data + mechanism, if None default content-type from pika_engine is used """ self.msg_id = msg_id @@ -588,7 +585,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): msg = {'s': reply} super(RpcReplyPikaOutgoingMessage, self).__init__( - pika_engine, msg, None, content_type, content_encoding + pika_engine, msg, None, content_type ) def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 9fa87f118..7c373c353 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -51,8 +51,8 @@ _opts = [ ('oslo_messaging_rabbit', list( itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts, pika_connection_factory.pika_opts, - impl_pika.pika_pool_opts, impl_pika.notification_opts, - impl_pika.rpc_opts))), + impl_pika.pika_pool_opts, impl_pika.message_opts, + impl_pika.notification_opts, impl_pika.rpc_opts))), ] diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py index 5d29c8ab5..354f3a399 100644 --- a/oslo_messaging/tests/drivers/pika/test_message.py +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -116,7 +116,6 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): ) self._properties = pika.BasicProperties( content_type="application/json", - content_encoding="utf-8", headers={"version": "1.0"}, ) @@ -197,7 +196,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): outgoing_message_mock.assert_called_once_with( self._pika_engine, 123456789, failure_info=None, reply='all_fine', - content_encoding='utf-8', content_type='application/json' + content_type='application/json' ) outgoing_message_mock().send.assert_called_once_with( reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY @@ -236,7 +235,6 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase): self._pika_engine, 123456789, failure_info=failure_info, reply=None, - content_encoding='utf-8', content_type='application/json' ) outgoing_message_mock().send.assert_called_once_with( @@ -263,7 +261,6 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase): self._properties = pika.BasicProperties( content_type="application/json", - content_encoding="utf-8", headers={"version": "1.0"}, correlation_id=123456789 ) @@ -311,6 +308,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase): class PikaOutgoingMessageTestCase(unittest.TestCase): def setUp(self): self._pika_engine = mock.MagicMock() + self._pika_engine.default_content_type = "application/json" self._exchange = "it is exchange" self._routing_key = "it is routing key" self._expiration = 1 @@ -322,8 +320,8 @@ class PikaOutgoingMessageTestCase(unittest.TestCase): self._message = {"msg_type": 1, "msg_str": "hello"} self._context = {"request_id": 555, "token": "it is a token"} - @patch("oslo_serialization.jsonutils.dumps", - new=functools.partial(jsonutils.dumps, sort_keys=True)) + @patch("oslo_serialization.jsonutils.dump_as_bytes", + new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) def test_send_with_confirmation(self): message = pika_drv_msg.PikaOutgoingMessage( self._pika_engine, self._message, self._context @@ -359,7 +357,6 @@ class PikaOutgoingMessageTestCase(unittest.TestCase): props = self._pika_engine.connection_with_confirmation_pool.acquire( ).__enter__().channel.publish.call_args[1]["properties"] - self.assertEqual('utf-8', props.content_encoding) self.assertEqual('application/json', props.content_type) self.assertEqual(2, props.delivery_mode) self.assertTrue(self._expiration * 1000 - float(props.expiration) < @@ -367,8 +364,8 @@ class PikaOutgoingMessageTestCase(unittest.TestCase): self.assertEqual({'version': '1.0'}, props.headers) self.assertTrue(props.message_id) - @patch("oslo_serialization.jsonutils.dumps", - new=functools.partial(jsonutils.dumps, sort_keys=True)) + @patch("oslo_serialization.jsonutils.dump_as_bytes", + new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) def test_send_without_confirmation(self): message = pika_drv_msg.PikaOutgoingMessage( self._pika_engine, self._message, self._context @@ -404,7 +401,6 @@ class PikaOutgoingMessageTestCase(unittest.TestCase): props = self._pika_engine.connection_without_confirmation_pool.acquire( ).__enter__().channel.publish.call_args[1]["properties"] - self.assertEqual('utf-8', props.content_encoding) self.assertEqual('application/json', props.content_type) self.assertEqual(1, props.delivery_mode) self.assertTrue(self._expiration * 1000 - float(props.expiration) @@ -421,12 +417,13 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): self._pika_engine = mock.MagicMock() self._pika_engine.get_rpc_exchange_name.return_value = self._exchange self._pika_engine.get_rpc_queue_name.return_value = self._routing_key + self._pika_engine.default_content_type = "application/json" self._message = {"msg_type": 1, "msg_str": "hello"} self._context = {"request_id": 555, "token": "it is a token"} - @patch("oslo_serialization.jsonutils.dumps", - new=functools.partial(jsonutils.dumps, sort_keys=True)) + @patch("oslo_serialization.jsonutils.dump_as_bytes", + new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) def test_send_cast_message(self): message = pika_drv_msg.RpcPikaOutgoingMessage( self._pika_engine, self._message, self._context @@ -463,7 +460,6 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): props = self._pika_engine.connection_with_confirmation_pool.acquire( ).__enter__().channel.publish.call_args[1]["properties"] - self.assertEqual('utf-8', props.content_encoding) self.assertEqual('application/json', props.content_type) self.assertEqual(1, props.delivery_mode) self.assertTrue(expiration * 1000 - float(props.expiration) < 100) @@ -472,8 +468,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): self.assertIsNone(props.reply_to) self.assertTrue(props.message_id) - @patch("oslo_serialization.jsonutils.dumps", - new=functools.partial(jsonutils.dumps, sort_keys=True)) + @patch("oslo_serialization.jsonutils.dump_as_bytes", + new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) def test_send_call_message(self): message = pika_drv_msg.RpcPikaOutgoingMessage( self._pika_engine, self._message, self._context @@ -521,7 +517,6 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase): props = self._pika_engine.connection_with_confirmation_pool.acquire( ).__enter__().channel.publish.call_args[1]["properties"] - self.assertEqual('utf-8', props.content_encoding) self.assertEqual('application/json', props.content_type) self.assertEqual(1, props.delivery_mode) self.assertTrue(expiration * 1000 - float(props.expiration) < 100) @@ -544,11 +539,12 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): self._rpc_reply_exchange = "rpc_reply_exchange" self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange + self._pika_engine.default_content_type = "application/json" self._msg_id = 12345567 - @patch("oslo_serialization.jsonutils.dumps", - new=functools.partial(jsonutils.dumps, sort_keys=True)) + @patch("oslo_serialization.jsonutils.dump_as_bytes", + new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) def test_success_message_send(self): message = pika_drv_msg.RpcReplyPikaOutgoingMessage( self._pika_engine, self._msg_id, reply="all_fine" @@ -567,7 +563,6 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): props = self._pika_engine.connection_with_confirmation_pool.acquire( ).__enter__().channel.publish.call_args[1]["properties"] - self.assertEqual('utf-8', props.content_encoding) self.assertEqual('application/json', props.content_type) self.assertEqual(1, props.delivery_mode) self.assertTrue(self._expiration * 1000 - float(props.expiration) < @@ -578,8 +573,8 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): self.assertTrue(props.message_id) @patch("traceback.format_exception", new=lambda x, y, z: z) - @patch("oslo_serialization.jsonutils.dumps", - new=functools.partial(jsonutils.dumps, sort_keys=True)) + @patch("oslo_serialization.jsonutils.dump_as_bytes", + new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) def test_failure_message_send(self): failure_info = (oslo_messaging.MessagingException, oslo_messaging.MessagingException("Error message"), @@ -612,7 +607,6 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): props = self._pika_engine.connection_with_confirmation_pool.acquire( ).__enter__().channel.publish.call_args[1]["properties"] - self.assertEqual('utf-8', props.content_encoding) self.assertEqual('application/json', props.content_type) self.assertEqual(1, props.delivery_mode) self.assertTrue(self._expiration * 1000 - float(props.expiration) <