Add configurable serialization to pika

This patch makes message serialization in the pika driver
configurable.

Change-Id: Ib86cb91110aa89663a54afdd9932d60f6f45f585
Depends-On: Idb12666255a990dfc8f8ff6b43e941b3481b9c1c
This commit is contained in:
Gevorg Davoian 2016-04-11 17:38:42 +03:00
parent dbcd48a8ea
commit 9bd4bbeb2d
6 changed files with 60 additions and 50 deletions

View File

@ -50,6 +50,13 @@ pika_pool_opts = [
"staleness. Stale connections are closed on acquire.") "staleness. Stale connections are closed on acquire.")
] ]
message_opts = [
cfg.StrOpt('default_serializer_type', default='json',
choices=('json', 'msgpack'),
help="Default serialization mechanism for "
"serializing/deserializing outgoing/incoming messages")
]
notification_opts = [ notification_opts = [
cfg.BoolOpt('notification_persistence', default=False, cfg.BoolOpt('notification_persistence', default=False,
help="Persist notification messages."), help="Persist notification messages."),
@ -124,6 +131,7 @@ class PikaDriver(base.BaseDriver):
conf.register_group(opt_group) conf.register_group(opt_group)
conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group) conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group)
conf.register_opts(pika_pool_opts, group=opt_group) conf.register_opts(pika_pool_opts, group=opt_group)
conf.register_opts(message_opts, group=opt_group)
conf.register_opts(rpc_opts, group=opt_group) conf.register_opts(rpc_opts, group=opt_group)
conf.register_opts(notification_opts, group=opt_group) conf.register_opts(notification_opts, group=opt_group)

View File

@ -15,6 +15,8 @@
import select import select
import socket import socket
from oslo_serialization.serializer import json_serializer
from oslo_serialization.serializer import msgpack_serializer
from oslo_utils import timeutils from oslo_utils import timeutils
from pika import exceptions as pika_exceptions from pika import exceptions as pika_exceptions
import six import six
@ -31,3 +33,8 @@ PIKA_CONNECTIVITY_ERRORS = (
EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start() INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start()
MESSAGE_SERIALIZERS = {
'application/json': json_serializer.JSONSerializer(),
'application/msgpack': msgpack_serializer.MessagePackSerializer()
}

View File

@ -137,6 +137,10 @@ class PikaEngine(object):
raise ValueError("notification_retry_delay should be non-negative " raise ValueError("notification_retry_delay should be non-negative "
"integer") "integer")
self.default_content_type = (
'application/' + conf.oslo_messaging_pika.default_serializer_type
)
def _init_if_needed(self): def _init_if_needed(self):
cur_pid = os.getpid() cur_pid = os.getpid()

View File

@ -20,7 +20,6 @@ import uuid
from concurrent import futures from concurrent import futures
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import importutils from oslo_utils import importutils
from oslo_utils import timeutils from oslo_utils import timeutils
from pika import exceptions as pika_exceptions from pika import exceptions as pika_exceptions
@ -101,7 +100,6 @@ class PikaIncomingMessage(base.IncomingMessage):
self._version = version self._version = version
self._content_type = properties.content_type self._content_type = properties.content_type
self._content_encoding = properties.content_encoding
self.unique_id = properties.message_id self.unique_id = properties.message_id
self.expiration_time = ( self.expiration_time = (
@ -109,15 +107,16 @@ class PikaIncomingMessage(base.IncomingMessage):
time.time() + float(properties.expiration) / 1000 time.time() + float(properties.expiration) / 1000
) )
if self._content_type != "application/json": try:
serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[self._content_type]
except KeyError:
raise NotImplementedError( raise NotImplementedError(
"Content-type['{}'] is not valid, " "Content-type['{}'] is not supported.".format(
"'application/json' only is supported.".format(
self._content_type self._content_type
) )
) )
message_dict = jsonutils.loads(body, encoding=self._content_encoding) message_dict = serializer.load_from_bytes(body)
context_dict = {} context_dict = {}
@ -190,7 +189,6 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
reply_outgoing_message = RpcReplyPikaOutgoingMessage( reply_outgoing_message = RpcReplyPikaOutgoingMessage(
self._pika_engine, self.msg_id, reply=reply, failure_info=failure, self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
content_type=self._content_type, content_type=self._content_type,
content_encoding=self._content_encoding
) )
def on_exception(ex): def on_exception(ex):
@ -297,8 +295,7 @@ class PikaOutgoingMessage(object):
and send it and send it
""" """
def __init__(self, pika_engine, message, context, def __init__(self, pika_engine, message, context, content_type=None):
content_type="application/json", content_encoding="utf-8"):
"""Parse RabbitMQ message """Parse RabbitMQ message
:param pika_engine: PikaEngine, shared object with configuration and :param pika_engine: PikaEngine, shared object with configuration and
@ -306,19 +303,23 @@ class PikaOutgoingMessage(object):
:param message: Dictionary, user's message fields :param message: Dictionary, user's message fields
:param context: Dictionary, request context's fields :param context: Dictionary, request context's fields
:param content_type: String, content-type header, defines serialization :param content_type: String, content-type header, defines serialization
mechanism mechanism, if None default content-type from pika_engine is used
:param content_encoding: String, defines encoding for text data
""" """
self._pika_engine = pika_engine self._pika_engine = pika_engine
self._content_type = content_type self._content_type = (
self._content_encoding = content_encoding content_type if content_type is not None else
self._pika_engine.default_content_type
)
if self._content_type != "application/json": try:
self._serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[
self._content_type
]
except KeyError:
raise NotImplementedError( raise NotImplementedError(
"Content-type['{}'] is not valid, " "Content-type['{}'] is not supported.".format(
"'application/json' only is supported.".format(
self._content_type self._content_type
) )
) )
@ -340,7 +341,6 @@ class PikaOutgoingMessage(object):
msg['_$_' + key] = value msg['_$_' + key] = value
props = pika_spec.BasicProperties( props = pika_spec.BasicProperties(
content_encoding=self._content_encoding,
content_type=self._content_type, content_type=self._content_type,
headers={_VERSION_HEADER: _VERSION}, headers={_VERSION_HEADER: _VERSION},
message_id=self.unique_id, message_id=self.unique_id,
@ -447,8 +447,7 @@ class PikaOutgoingMessage(object):
if confirm else if confirm else
self._pika_engine.connection_without_confirmation_pool) self._pika_engine.connection_without_confirmation_pool)
body = jsonutils.dump_as_bytes(msg_dict, body = self._serializer.dump_as_bytes(msg_dict)
encoding=self._content_encoding)
LOG.debug( LOG.debug(
"Sending message:[body:%s; properties: %s] to target: " "Sending message:[body:%s; properties: %s] to target: "
@ -490,10 +489,9 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
"""PikaOutgoingMessage implementation for RPC messages. It adds """PikaOutgoingMessage implementation for RPC messages. It adds
possibility to wait and receive RPC reply possibility to wait and receive RPC reply
""" """
def __init__(self, pika_engine, message, context, def __init__(self, pika_engine, message, context, content_type=None):
content_type="application/json", content_encoding="utf-8"):
super(RpcPikaOutgoingMessage, self).__init__( super(RpcPikaOutgoingMessage, self).__init__(
pika_engine, message, context, content_type, content_encoding pika_engine, message, context, content_type
) )
self.msg_id = None self.msg_id = None
self.reply_q = None self.reply_q = None
@ -549,7 +547,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
correlation_id AMQP property to link this reply with response correlation_id AMQP property to link this reply with response
""" """
def __init__(self, pika_engine, msg_id, reply=None, failure_info=None, def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
content_type="application/json", content_encoding="utf-8"): content_type=None):
"""Initialize with reply information for sending """Initialize with reply information for sending
:param pika_engine: PikaEngine, shared object with configuration and :param pika_engine: PikaEngine, shared object with configuration and
@ -559,8 +557,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
:param failure_info: Tuple, should be a sys.exc_info() tuple. :param failure_info: Tuple, should be a sys.exc_info() tuple.
Should be None if RPC request was successfully processed. Should be None if RPC request was successfully processed.
:param content_type: String, content-type header, defines serialization :param content_type: String, content-type header, defines serialization
mechanism mechanism, if None default content-type from pika_engine is used
:param content_encoding: String, defines encoding for text data
""" """
self.msg_id = msg_id self.msg_id = msg_id
@ -588,7 +585,7 @@ class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
msg = {'s': reply} msg = {'s': reply}
super(RpcReplyPikaOutgoingMessage, self).__init__( super(RpcReplyPikaOutgoingMessage, self).__init__(
pika_engine, msg, None, content_type, content_encoding pika_engine, msg, None, content_type
) )
def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,

View File

@ -51,8 +51,8 @@ _opts = [
('oslo_messaging_rabbit', list( ('oslo_messaging_rabbit', list(
itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts, itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts,
pika_connection_factory.pika_opts, pika_connection_factory.pika_opts,
impl_pika.pika_pool_opts, impl_pika.notification_opts, impl_pika.pika_pool_opts, impl_pika.message_opts,
impl_pika.rpc_opts))), impl_pika.notification_opts, impl_pika.rpc_opts))),
] ]

View File

@ -116,7 +116,6 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
) )
self._properties = pika.BasicProperties( self._properties = pika.BasicProperties(
content_type="application/json", content_type="application/json",
content_encoding="utf-8",
headers={"version": "1.0"}, headers={"version": "1.0"},
) )
@ -197,7 +196,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
outgoing_message_mock.assert_called_once_with( outgoing_message_mock.assert_called_once_with(
self._pika_engine, 123456789, failure_info=None, reply='all_fine', self._pika_engine, 123456789, failure_info=None, reply='all_fine',
content_encoding='utf-8', content_type='application/json' content_type='application/json'
) )
outgoing_message_mock().send.assert_called_once_with( outgoing_message_mock().send.assert_called_once_with(
reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
@ -236,7 +235,6 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
self._pika_engine, 123456789, self._pika_engine, 123456789,
failure_info=failure_info, failure_info=failure_info,
reply=None, reply=None,
content_encoding='utf-8',
content_type='application/json' content_type='application/json'
) )
outgoing_message_mock().send.assert_called_once_with( outgoing_message_mock().send.assert_called_once_with(
@ -263,7 +261,6 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
self._properties = pika.BasicProperties( self._properties = pika.BasicProperties(
content_type="application/json", content_type="application/json",
content_encoding="utf-8",
headers={"version": "1.0"}, headers={"version": "1.0"},
correlation_id=123456789 correlation_id=123456789
) )
@ -311,6 +308,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
class PikaOutgoingMessageTestCase(unittest.TestCase): class PikaOutgoingMessageTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self._pika_engine = mock.MagicMock() self._pika_engine = mock.MagicMock()
self._pika_engine.default_content_type = "application/json"
self._exchange = "it is exchange" self._exchange = "it is exchange"
self._routing_key = "it is routing key" self._routing_key = "it is routing key"
self._expiration = 1 self._expiration = 1
@ -322,8 +320,8 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
self._message = {"msg_type": 1, "msg_str": "hello"} self._message = {"msg_type": 1, "msg_str": "hello"}
self._context = {"request_id": 555, "token": "it is a token"} self._context = {"request_id": 555, "token": "it is a token"}
@patch("oslo_serialization.jsonutils.dumps", @patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dumps, sort_keys=True)) new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_send_with_confirmation(self): def test_send_with_confirmation(self):
message = pika_drv_msg.PikaOutgoingMessage( message = pika_drv_msg.PikaOutgoingMessage(
self._pika_engine, self._message, self._context self._pika_engine, self._message, self._context
@ -359,7 +357,6 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire( props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"] ).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('utf-8', props.content_encoding)
self.assertEqual('application/json', props.content_type) self.assertEqual('application/json', props.content_type)
self.assertEqual(2, props.delivery_mode) self.assertEqual(2, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) < self.assertTrue(self._expiration * 1000 - float(props.expiration) <
@ -367,8 +364,8 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
self.assertEqual({'version': '1.0'}, props.headers) self.assertEqual({'version': '1.0'}, props.headers)
self.assertTrue(props.message_id) self.assertTrue(props.message_id)
@patch("oslo_serialization.jsonutils.dumps", @patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dumps, sort_keys=True)) new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_send_without_confirmation(self): def test_send_without_confirmation(self):
message = pika_drv_msg.PikaOutgoingMessage( message = pika_drv_msg.PikaOutgoingMessage(
self._pika_engine, self._message, self._context self._pika_engine, self._message, self._context
@ -404,7 +401,6 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_without_confirmation_pool.acquire( props = self._pika_engine.connection_without_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"] ).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('utf-8', props.content_encoding)
self.assertEqual('application/json', props.content_type) self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode) self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) self.assertTrue(self._expiration * 1000 - float(props.expiration)
@ -421,12 +417,13 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
self._pika_engine = mock.MagicMock() self._pika_engine = mock.MagicMock()
self._pika_engine.get_rpc_exchange_name.return_value = self._exchange self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
self._pika_engine.get_rpc_queue_name.return_value = self._routing_key self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
self._pika_engine.default_content_type = "application/json"
self._message = {"msg_type": 1, "msg_str": "hello"} self._message = {"msg_type": 1, "msg_str": "hello"}
self._context = {"request_id": 555, "token": "it is a token"} self._context = {"request_id": 555, "token": "it is a token"}
@patch("oslo_serialization.jsonutils.dumps", @patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dumps, sort_keys=True)) new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_send_cast_message(self): def test_send_cast_message(self):
message = pika_drv_msg.RpcPikaOutgoingMessage( message = pika_drv_msg.RpcPikaOutgoingMessage(
self._pika_engine, self._message, self._context self._pika_engine, self._message, self._context
@ -463,7 +460,6 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire( props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"] ).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('utf-8', props.content_encoding)
self.assertEqual('application/json', props.content_type) self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode) self.assertEqual(1, props.delivery_mode)
self.assertTrue(expiration * 1000 - float(props.expiration) < 100) self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
@ -472,8 +468,8 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
self.assertIsNone(props.reply_to) self.assertIsNone(props.reply_to)
self.assertTrue(props.message_id) self.assertTrue(props.message_id)
@patch("oslo_serialization.jsonutils.dumps", @patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dumps, sort_keys=True)) new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_send_call_message(self): def test_send_call_message(self):
message = pika_drv_msg.RpcPikaOutgoingMessage( message = pika_drv_msg.RpcPikaOutgoingMessage(
self._pika_engine, self._message, self._context self._pika_engine, self._message, self._context
@ -521,7 +517,6 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire( props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"] ).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('utf-8', props.content_encoding)
self.assertEqual('application/json', props.content_type) self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode) self.assertEqual(1, props.delivery_mode)
self.assertTrue(expiration * 1000 - float(props.expiration) < 100) self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
@ -544,11 +539,12 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
self._rpc_reply_exchange = "rpc_reply_exchange" self._rpc_reply_exchange = "rpc_reply_exchange"
self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
self._pika_engine.default_content_type = "application/json"
self._msg_id = 12345567 self._msg_id = 12345567
@patch("oslo_serialization.jsonutils.dumps", @patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dumps, sort_keys=True)) new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_success_message_send(self): def test_success_message_send(self):
message = pika_drv_msg.RpcReplyPikaOutgoingMessage( message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
self._pika_engine, self._msg_id, reply="all_fine" self._pika_engine, self._msg_id, reply="all_fine"
@ -567,7 +563,6 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire( props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"] ).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('utf-8', props.content_encoding)
self.assertEqual('application/json', props.content_type) self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode) self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) < self.assertTrue(self._expiration * 1000 - float(props.expiration) <
@ -578,8 +573,8 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
self.assertTrue(props.message_id) self.assertTrue(props.message_id)
@patch("traceback.format_exception", new=lambda x, y, z: z) @patch("traceback.format_exception", new=lambda x, y, z: z)
@patch("oslo_serialization.jsonutils.dumps", @patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dumps, sort_keys=True)) new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_failure_message_send(self): def test_failure_message_send(self):
failure_info = (oslo_messaging.MessagingException, failure_info = (oslo_messaging.MessagingException,
oslo_messaging.MessagingException("Error message"), oslo_messaging.MessagingException("Error message"),
@ -612,7 +607,6 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire( props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"] ).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('utf-8', props.content_encoding)
self.assertEqual('application/json', props.content_type) self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode) self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) < self.assertTrue(self._expiration * 1000 - float(props.expiration) <