diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 3829fa5..5636d01 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -100,7 +100,12 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_bind_port_retries', default=100, help='Number of retries to find free port number before ' - 'fail with ZMQBindError.') + 'fail with ZMQBindError.'), + + cfg.StrOpt('rpc_zmq_serialization', default='json', + choices=('json', 'msgpack'), + help='Default serialization mechanism for ' + 'serializing/deserializing outgoing/incoming messages') ] diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py index e8be2a3..63c683f 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py @@ -19,6 +19,7 @@ import threading import futurist import six +from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -118,9 +119,11 @@ class ReplyReceiverProxy(ReplyReceiver): reply_id = socket.recv() assert reply_id is not None, "Reply ID expected!" message_type = int(socket.recv()) - assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!" + assert message_type == zmq_names.REPLY_TYPE, "Reply expected!" message_id = socket.recv() - reply = socket.recv_pyobj() + raw_reply = socket.recv_loaded() + assert isinstance(raw_reply, dict), "Dict expected!" + reply = zmq_response.Response(**raw_reply) LOG.debug("Received reply for %s", message_id) return reply_id, message_type, message_id, reply @@ -130,9 +133,11 @@ class ReplyReceiverDirect(ReplyReceiver): def recv_response(self, socket): empty = socket.recv() assert empty == b'', "Empty expected!" - reply = socket.recv_pyobj() + raw_reply = socket.recv_loaded() + assert isinstance(raw_reply, dict), "Dict expected!" + reply = zmq_response.Response(**raw_reply) LOG.debug("Received reply for %s", reply.message_id) - return reply.reply_id, reply.type_, reply.message_id, reply + return reply.reply_id, reply.msg_type, reply.message_id, reply class AckAndReplyReceiver(ReceiverBase): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py index b6a7b75..35c38a8 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py @@ -17,23 +17,18 @@ from oslo_messaging._drivers.zmq_driver import zmq_names class Response(object): - def __init__(self, id=None, type=None, message_id=None, + def __init__(self, msg_type=None, message_id=None, reply_id=None, reply_body=None, failure=None): - self._id = id - self._type = type + self._msg_type = msg_type self._message_id = message_id self._reply_id = reply_id self._reply_body = reply_body self._failure = failure @property - def id_(self): - return self._id - - @property - def type_(self): - return self._type + def msg_type(self): + return self._msg_type @property def message_id(self): @@ -52,11 +47,10 @@ class Response(object): return self._failure def to_dict(self): - return {zmq_names.FIELD_ID: self._id, - zmq_names.FIELD_TYPE: self._type, + return {zmq_names.FIELD_MSG_TYPE: self._msg_type, zmq_names.FIELD_MSG_ID: self._message_id, zmq_names.FIELD_REPLY_ID: self._reply_id, - zmq_names.FIELD_REPLY: self._reply_body, + zmq_names.FIELD_REPLY_BODY: self._reply_body, zmq_names.FIELD_FAILURE: self._failure} def __str__(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py index f7cde0d..2fb8191 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py @@ -44,8 +44,8 @@ class RequestSenderProxy(SenderBase): socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) socket.send(six.b(request.routing_key), zmq.SNDMORE) socket.send(six.b(request.message_id), zmq.SNDMORE) - socket.send_pyobj(request.context, zmq.SNDMORE) - socket.send_pyobj(request.message) + socket.send_dumped(request.context, zmq.SNDMORE) + socket.send_dumped(request.message) LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message " "%(msg_id)s to target %(target)s", @@ -60,20 +60,23 @@ class ReplySenderProxy(SenderBase): def send(self, socket, reply): LOG.debug("Replying to %s", reply.message_id) - assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!" + assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!" socket.send(b'', zmq.SNDMORE) - socket.send(six.b(str(reply.type_)), zmq.SNDMORE) + socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE) socket.send(reply.reply_id, zmq.SNDMORE) socket.send(reply.message_id, zmq.SNDMORE) - socket.send_pyobj(reply) + socket.send_dumped(reply.to_dict()) class RequestSenderDirect(SenderBase): def send(self, socket, request): socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request) + socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) + socket.send_string(request.message_id, zmq.SNDMORE) + socket.send_dumped(request.context, zmq.SNDMORE) + socket.send_dumped(request.message) LOG.debug("Sending %(msg_type)s message %(msg_id)s to " "target %(target)s", @@ -87,8 +90,8 @@ class ReplySenderDirect(SenderBase): def send(self, socket, reply): LOG.debug("Replying to %s", reply.message_id) - assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!" + assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!" socket.send(reply.reply_id, zmq.SNDMORE) socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(reply) + socket.send_dumped(reply.to_dict()) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index cfde3ad..3715b84 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -60,10 +60,12 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): reply_id = socket.recv() message_type = int(socket.recv()) message_id = socket.recv() - context = socket.recv_pyobj() - message = socket.recv_pyobj() - LOG.debug("[%(host)s] Received message %(id)s", - {"host": self.host, "id": message_id}) + context = socket.recv_loaded() + message = socket.recv_loaded() + LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s", + {"host": self.host, + "msg_type": zmq_names.message_type_str(message_type), + "msg_id": message_id}) if message_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingMessage( context, message, reply_id, message_id, socket, self.sender diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 64cbcfd..99b65ed 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -15,7 +15,7 @@ import logging from oslo_messaging._drivers.zmq_driver.client import zmq_senders -from oslo_messaging._drivers.zmq_driver.server.consumers\ +from oslo_messaging._drivers.zmq_driver.server.consumers \ import zmq_consumer_base from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message from oslo_messaging._drivers.zmq_driver import zmq_async @@ -38,29 +38,31 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): reply_id = socket.recv() empty = socket.recv() assert empty == b'', 'Bad format: empty delimiter expected' - request = socket.recv_pyobj() - return request, reply_id + msg_type = int(socket.recv()) + message_id = socket.recv_string() + context = socket.recv_loaded() + message = socket.recv_loaded() + return reply_id, msg_type, message_id, context, message def receive_message(self, socket): try: - request, reply_id = self._receive_request(socket) - LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", + reply_id, msg_type, message_id, context, message = \ + self._receive_request(socket) + LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s", {"host": self.host, - "type": request.msg_type, - "id": request.message_id, - "target": request.target}) + "msg_type": zmq_names.message_type_str(msg_type), + "msg_id": message_id}) - if request.msg_type == zmq_names.CALL_TYPE: + if msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingMessage( - request.context, request.message, reply_id, - request.message_id, socket, self.sender + context, message, reply_id, message_id, socket, self.sender ) - elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: - return zmq_incoming_message.ZmqIncomingMessage(request.context, - request.message) + elif msg_type in zmq_names.NON_BLOCKING_TYPES: + return zmq_incoming_message.ZmqIncomingMessage(context, + message) else: LOG.error(_LE("Unknown message type: %s"), - zmq_names.message_type_str(request.msg_type)) + zmq_names.message_type_str(msg_type)) except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index a6e32aa..6fd13b7 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -63,8 +63,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): def _receive_request(socket): topic_filter = socket.recv() message_id = socket.recv() - context = socket.recv_pyobj() - message = socket.recv_pyobj() + context = socket.recv_loaded() + message = socket.recv_loaded() LOG.debug("Received %(topic_filter)s topic message %(id)s", {'id': message_id, 'topic_filter': topic_filter}) return context, message diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 2c76227..0ebfef5 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -50,7 +50,7 @@ class ZmqIncomingMessage(base.RpcIncomingMessage): if self.sender is not None: if failure is not None: failure = rpc_common.serialize_remote_exception(failure) - reply = zmq_response.Response(type=zmq_names.REPLY_TYPE, + reply = zmq_response.Response(msg_type=zmq_names.REPLY_TYPE, message_id=self.message_id, reply_id=self.reply_id, reply_body=reply, diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 8b63e0e..f61003c 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -17,15 +17,11 @@ from oslo_messaging._drivers.zmq_driver import zmq_async zmq = zmq_async.import_zmq() -FIELD_TYPE = 'type' -FIELD_FAILURE = 'failure' -FIELD_REPLY = 'reply' -FIELD_ID = 'id' -FIELD_MSG_ID = 'message_id' FIELD_MSG_TYPE = 'msg_type' +FIELD_MSG_ID = 'message_id' FIELD_REPLY_ID = 'reply_id' -FIELD_TARGET = 'target' -FIELD_ROUTING_KEY = 'routing_key' +FIELD_REPLY_BODY = 'reply_body' +FIELD_FAILURE = 'failure' IDX_REPLY_TYPE = 1 diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index a97343e..11c567a 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -23,6 +23,8 @@ from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE, _LI from oslo_messaging import exceptions +from oslo_serialization.serializer import json_serializer +from oslo_serialization.serializer import msgpack_serializer LOG = logging.getLogger(__name__) @@ -31,6 +33,11 @@ zmq = zmq_async.import_zmq() class ZmqSocket(object): + SERIALIZERS = { + 'json': json_serializer.JSONSerializer(), + 'msgpack': msgpack_serializer.MessagePackSerializer() + } + def __init__(self, conf, context, socket_type, high_watermark=0): self.conf = conf self.context = context @@ -45,6 +52,14 @@ class ZmqSocket(object): self.handle.identity = six.b(str(uuid.uuid4())) self.connections = set() + def _get_serializer(self, serialization): + serializer = self.SERIALIZERS.get(serialization, None) + if serializer is None: + raise NotImplementedError( + "Serialization '{}' is not supported".format(serialization) + ) + return serializer + def type_name(self): return zmq_names.socket_type_str(self.socket_type) @@ -77,6 +92,13 @@ class ZmqSocket(object): def send_multipart(self, *args, **kwargs): self.handle.send_multipart(*args, **kwargs) + def send_dumped(self, obj, *args, **kwargs): + serialization = kwargs.pop('serialization', + self.conf.rpc_zmq_serialization) + serializer = self._get_serializer(serialization) + s = serializer.dump_as_bytes(obj) + self.handle.send(s, *args, **kwargs) + def recv(self, *args, **kwargs): return self.handle.recv(*args, **kwargs) @@ -92,6 +114,14 @@ class ZmqSocket(object): def recv_multipart(self, *args, **kwargs): return self.handle.recv_multipart(*args, **kwargs) + def recv_loaded(self, *args, **kwargs): + serialization = kwargs.pop('serialization', + self.conf.rpc_zmq_serialization) + serializer = self._get_serializer(serialization) + s = self.handle.recv(*args, **kwargs) + obj = serializer.load_from_bytes(s) + return obj + def close(self, *args, **kwargs): self.handle.close(*args, **kwargs) @@ -106,10 +136,10 @@ class ZmqSocket(object): "address": address}) self.connect(address) except zmq.ZMQError as e: - errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\ - % (stype, address, e) - LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s"), - (stype, address, e)) + errmsg = _LE("Failed connecting %(stype)s to %(address)s: %(e)s") \ + % {"stype": stype, "address": address, "e": e} + LOG.error(_LE("Failed connecting %(stype)s to %(address)s: %(e)s"), + {"stype": stype, "address": address, "e": e}) raise rpc_common.RPCException(errmsg) def connect_to_host(self, host): diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py index 0287ccf..50e9d1b 100644 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -12,9 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -import pickle +import json +import msgpack import time +import six +import testscenarios + import oslo_messaging from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_pub_publisher @@ -23,6 +27,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging.tests.drivers.zmq import zmq_common +load_tests = testscenarios.load_tests_apply_scenarios zmq = zmq_async.import_zmq() @@ -31,10 +36,18 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): LISTENERS_COUNT = 3 + scenarios = [ + ('json', {'serialization': 'json', + 'dumps': lambda obj: six.b(json.dumps(obj))}), + ('msgpack', {'serialization': 'msgpack', + 'dumps': msgpack.dumps}) + ] + def setUp(self): super(TestPubSub, self).setUp() - kwargs = {'use_pub_sub': True} + kwargs = {'use_pub_sub': True, + 'rpc_zmq_serialization': self.serialization} self.config(**kwargs) self.publisher = zmq_pub_publisher.PubPublisherProxy( @@ -58,8 +71,8 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): zmq_address.target_to_subscribe_filter(target), b"message", b"0000-0000", - pickle.dumps(context), - pickle.dumps(message)]) + self.dumps(context), + self.dumps(message)]) def _check_listener(self, listener): listener._received.wait(timeout=5)