diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py index 9fdd6d7a8..805f1e3de 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py @@ -13,6 +13,7 @@ # under the License. import logging +import uuid from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base @@ -29,6 +30,7 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): def __init__(self, conf, matchmaker): super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER) + self.ack_receiver = AcknowledgementReceiver() def send_request(self, request): @@ -37,6 +39,17 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): dealer_socket, hosts = self._check_hosts_connections(request.target) + if not dealer_socket.connections: + # NOTE(ozamiatin): Here we can provide + # a queue for keeping messages to send them later + # when some listener appears. However such approach + # being more reliable will consume additional memory. + LOG.warning(_LW("Request %s was dropped because no connection") + % request.msg_type) + return + + self.ack_receiver.track_socket(dealer_socket.handle) + if request.msg_type in zmq_names.MULTISEND_TYPES: for _ in range(dealer_socket.connections_count()): self._send_request(dealer_socket, request) @@ -45,18 +58,44 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): def _send_request(self, socket, request): - if not socket.connections: - # NOTE(ozamiatin): Here we can provide - # a queue for keeping messages to send them later - # when some listener appears. However such approach - # being more reliable will consume additional memory. - LOG.warning(_LW("Request %s was dropped because no connection") - % request.msg_type) - return + message_id = str(uuid.uuid1()) socket.send(b'', zmq.SNDMORE) - super(DealerPublisher, self)._send_request(socket, request) + socket.send_string(request.msg_type, zmq.SNDMORE) + socket.send_string(message_id, zmq.SNDMORE) + socket.send_json(request.context, zmq.SNDMORE) + socket.send_json(request.message) LOG.info(_LI("Sending message %(message)s to a target %(target)s") % {"message": request.message, "target": request.target}) + + def cleanup(self): + self.ack_receiver.cleanup() + super(DealerPublisher, self).cleanup() + + +class AcknowledgementReceiver(object): + + def __init__(self): + self.poller = zmq_async.get_poller() + self.thread = zmq_async.get_executor(self.poll_for_acknowledgements) + self.thread.execute() + + def _receive_acknowledgement(self, socket): + empty = socket.recv() + assert empty == b"", "Empty delimiter expected" + ack_message = socket.recv_json() + return ack_message + + def track_socket(self, socket): + self.poller.register(socket, self._receive_acknowledgement) + + def poll_for_acknowledgements(self): + ack_message, socket = self.poller.poll() + LOG.info(_LI("Message %s acknowledged") + % ack_message[zmq_names.FIELD_ID]) + + def cleanup(self): + self.thread.stop() + self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py new file mode 100644 index 000000000..228724b6c --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -0,0 +1,47 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class PubPublisher(zmq_publisher_base.PublisherMultisend): + + def __init__(self, conf, matchmaker): + super(PubPublisher, self).__init__(conf, matchmaker, zmq.PUB) + + def send_request(self, request): + + if request.msg_type not in zmq_names.NOTIFY_TYPES: + raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + + pub_socket, hosts = self._check_hosts_connections(request.target) + self._send_request(pub_socket, request) + + def _send_request(self, socket, request): + + super(PubPublisher, self)._send_request(socket, request) + + LOG.info(_LI("Publishing message %(message)s to a target %(target)s") + % {"message": request.message, + "target": request.target}) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 51de8a5e6..eff59dab9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -110,8 +110,9 @@ class PublisherMultisend(PublisherBase): super(PublisherMultisend, self).__init__(conf, matchmaker) def _check_hosts_connections(self, target): + # TODO(ozamiatin): Place for significant optimization + # Matchmaker cache should be implemented hosts = self.matchmaker.get_hosts(target) - if str(target) in self.outbound_sockets: socket = self.outbound_sockets[str(target)] else: diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py new file mode 100644 index 000000000..b8fc4fe51 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py @@ -0,0 +1,57 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LI, _LW + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class PushPublisher(zmq_publisher_base.PublisherMultisend): + + def __init__(self, conf, matchmaker): + super(PushPublisher, self).__init__(conf, matchmaker, zmq.PUSH) + + def send_request(self, request): + + if request.msg_type == zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + + push_socket, hosts = self._check_hosts_connections(request.target) + + if not push_socket.connections: + LOG.warning(_LW("Request %s was dropped because no connection") + % request.msg_type) + return + + if request.msg_type in zmq_names.MULTISEND_TYPES: + for _ in range(push_socket.connections_count()): + self._send_request(push_socket, request) + else: + self._send_request(push_socket, request) + + def _send_request(self, socket, request): + + super(PushPublisher, self)._send_request(socket, request) + + LOG.info(_LI("Publishing message %(message)s to a target %(target)s") + % {"message": request.message, + "target": request.target}) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 23dfd09eb..26a358f67 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -14,7 +14,6 @@ import contextlib - from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_dealer_publisher from oslo_messaging._drivers.zmq_driver.client.publishers\ diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py new file mode 100644 index 000000000..153f03d22 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -0,0 +1,85 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +import six + +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +@six.add_metaclass(abc.ABCMeta) +class ConsumerBase(object): + + def __init__(self, conf, poller, server): + self.conf = conf + self.poller = poller + self.server = server + self.sockets = [] + self.context = zmq.Context() + + def subscribe_socket(self, socket_type): + try: + socket = zmq_socket.ZmqRandomPortSocket( + self.conf, self.context, socket_type) + self.sockets.append(socket) + self.poller.register(socket, self.receive_message) + LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"), + {"stype": socket_type, + "addr": socket.bind_address, + "port": socket.port}) + return socket + except zmq.ZMQError as e: + errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ + % (self.port, e) + LOG.error(errmsg) + raise rpc_common.RPCException(errmsg) + + @abc.abstractmethod + def listen(self, target): + """Associate new sockets with targets here""" + + @abc.abstractmethod + def receive_message(self, target): + """Method for poller - receiving message routine""" + + def cleanup(self): + for socket in self.sockets: + if not socket.handle.closed: + socket.setsockopt(zmq.LINGER, 0) + socket.close() + self.sockets = [] + + +class SingleSocketConsumer(ConsumerBase): + + def __init__(self, conf, poller, server, socket_type): + super(SingleSocketConsumer, self).__init__(conf, poller, server) + self.socket = self.subscribe_socket(socket_type) + + @property + def address(self): + return self.socket.bind_address + + @property + def port(self): + return self.socket.port diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py new file mode 100644 index 000000000..a90f71b5a --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -0,0 +1,69 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_consumer_base +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class PullIncomingMessage(base.IncomingMessage): + + def __init__(self, listener, context, message): + super(PullIncomingMessage, self).__init__(listener, context, message) + + def reply(self, reply=None, failure=None, log_failure=True): + """Reply is not needed for non-call messages.""" + + def acknowledge(self): + """Acknowledgments are not supported by this type of consumer.""" + + def requeue(self): + """Requeueing is not supported.""" + + +class PullConsumer(zmq_consumer_base.SingleSocketConsumer): + + def __init__(self, conf, poller, server): + super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL) + + def listen(self, target): + LOG.info(_LI("Listen to target %s") % str(target)) + # Do nothing here because we have a single socket + + def receive_message(self, socket): + try: + msg_type = socket.recv_string() + assert msg_type is not None, 'Bad format: msg type expected' + context = socket.recv_json() + message = socket.recv_json() + LOG.info(_LI("Received %(msg_type)s message %(msg)s") + % {"msg_type": msg_type, + "msg": str(message)}) + + if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES): + return PullIncomingMessage(self.server, context, message) + else: + LOG.error(_LE("Unknown message type: %s") % msg_type) + + except zmq.ZMQError as e: + LOG.error(_LE("Receiving message failed: %s") % str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 92b9364ba..2219b0c27 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -14,9 +14,10 @@ import logging -from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_consumer_base from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE, _LI @@ -26,46 +27,52 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class RouterConsumer(object): +class RouterIncomingMessage(base.IncomingMessage): + + def __init__(self, listener, context, message, socket, reply_id, msg_id, + poller): + super(RouterIncomingMessage, self).__init__(listener, context, message) + self.socket = socket + self.reply_id = reply_id + self.msg_id = msg_id + self.message = message + poller.resume_polling(socket) + + def reply(self, reply=None, failure=None, log_failure=True): + """Reply is not needed for non-call messages""" + + def acknowledge(self): + LOG.info("Sending acknowledge for %s", self.msg_id) + ack_message = {zmq_names.FIELD_ID: self.msg_id} + self.socket.send(self.reply_id, zmq.SNDMORE) + self.socket.send(b'', zmq.SNDMORE) + self.socket.send_json(ack_message) + + def requeue(self): + """Requeue is not supported""" + + +class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): - - self.conf = conf - self.poller = poller - self.server = server - - try: - self.context = zmq.Context() - self.socket = self.context.socket(zmq.ROUTER) - self.address = zmq_address.get_tcp_random_address(conf) - self.port = self.socket.bind_to_random_port(self.address) - self.poller.register(self.socket, self._receive_message) - LOG.info(_LI("Run ROUTER consumer on %(addr)s:%(port)d"), - {"addr": self.address, - "port": self.port}) - except zmq.ZMQError as e: - errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ - % (self.port, e) - LOG.error(errmsg) - raise rpc_common.RPCException(errmsg) + super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER) def listen(self, target): LOG.info(_LI("Listen to target %s") % str(target)) - # Do nothing here because we have single socket - - def cleanup(self): - if not self.socket.closed: - self.socket.setsockopt(zmq.LINGER, 0) - self.socket.close() - - def _receive_message(self, socket): + # Do nothing here because we have a single socket + def receive_message(self, socket): try: reply_id = socket.recv() empty = socket.recv() assert empty == b'', 'Bad format: empty delimiter expected' msg_type = socket.recv_string() assert msg_type is not None, 'Bad format: msg type expected' + + msg_id = None + if msg_type != zmq_names.CALL_TYPE: + msg_id = socket.recv_string() + context = socket.recv_json() message = socket.recv_json() LOG.info(_LI("Received %(msg_type)s message %(msg)s") @@ -76,12 +83,10 @@ class RouterConsumer(object): return zmq_incoming_message.ZmqIncomingRequest( self.server, context, message, socket, reply_id, self.poller) - elif msg_type in zmq_names.CAST_TYPES: - return zmq_incoming_message.ZmqCastMessage( - self.server, context, message, socket, self.poller) - elif msg_type in zmq_names.NOTIFY_TYPES: - return zmq_incoming_message.ZmqNotificationMessage( - self.server, context, message, socket, self.poller) + elif msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES): + return RouterIncomingMessage( + self.server, context, message, socket, reply_id, + msg_id, self.poller) else: LOG.error(_LE("Unknown message type: %s") % msg_type) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index d953e9334..9d1351225 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -42,48 +42,14 @@ class ZmqIncomingRequest(base.IncomingMessage): message_reply = {zmq_names.FIELD_REPLY: reply, zmq_names.FIELD_FAILURE: failure, zmq_names.FIELD_LOG_FAILURE: log_failure} - LOG.debug("Replying %s REP", (str(message_reply))) + + LOG.info("Replying %s REP", (str(message_reply))) + self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) self.reply_socket.send_json(message_reply) self.poller.resume_polling(self.reply_socket) - def acknowledge(self): - pass - def requeue(self): - pass - - -class ZmqCastMessage(base.IncomingMessage): - - def __init__(self, listener, context, message, socket, poller): - super(ZmqCastMessage, self).__init__(listener, context, message) - poller.resume_polling(socket) - - def reply(self, reply=None, failure=None, log_failure=True): - """Reply is not needed for fanout(cast) messages""" - - def acknowledge(self): - pass - - def requeue(self): - pass - - -class ZmqNotificationMessage(base.IncomingMessage): - - def __init__(self, listener, context, message, socket, poller): - super(ZmqNotificationMessage, self).__init__(listener, context, - message) - poller.resume_polling(socket) - - def reply(self, reply=None, failure=None, log_failure=True): - """Reply is not needed for notification messages""" - - def acknowledge(self): - pass - - def requeue(self): - pass + """Requeue is not supported""" diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 33fe9247c..1c3c33440 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -19,6 +19,8 @@ zmq = zmq_async.import_zmq() ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER", zmq.ROUTER: "ROUTER", + zmq.PUSH: "PUSH", + zmq.PULL: "PULL", zmq.REQ: "REQ", zmq.REP: "REP", zmq.PUB: "PUB", @@ -27,6 +29,7 @@ ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER", FIELD_FAILURE = 'failure' FIELD_REPLY = 'reply' FIELD_LOG_FAILURE = 'log_failure' +FIELD_ID = 'id' CALL_TYPE = 'call' CAST_TYPE = 'cast' diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index a4f77b7e8..59dee614e 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -14,6 +14,7 @@ import logging +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -31,7 +32,7 @@ class ZmqSocket(object): self.connections = set() def type_name(self): - return zmq_names(self.socket_type) + return zmq_names.socket_type_str(self.socket_type) def connections_count(self): return len(self.connections) @@ -53,5 +54,23 @@ class ZmqSocket(object): def send_json(self, *args, **kwargs): self.handle.send_json(*args, **kwargs) + def recv(self, *args, **kwargs): + return self.handle.recv(*args, **kwargs) + + def recv_string(self, *args, **kwargs): + return self.handle.recv_string(*args, **kwargs) + + def recv_json(self, *args, **kwargs): + return self.handle.recv_json(*args, **kwargs) + def close(self, *args, **kwargs): self.handle.close(*args, **kwargs) + + +class ZmqRandomPortSocket(ZmqSocket): + + def __init__(self, conf, context, socket_type): + super(ZmqRandomPortSocket, self).__init__(context, socket_type) + self.conf = conf + self.bind_address = zmq_address.get_tcp_random_address(self.conf) + self.port = self.handle.bind_to_random_port(self.bind_address) diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index ca15f61ca..21641dd51 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -52,6 +52,7 @@ class TestServerListener(object): try: message = self.listener.poll() if message is not None: + message.acknowledge() self._received.set() self.message = message message.reply(reply=True) @@ -188,7 +189,7 @@ class TestZmqBasics(ZmqBaseTestCase): context = {} target.topic = target.topic + '.info' self.driver.send_notification(target, context, message, '3.0') - self.listener._received.wait() + self.listener._received.wait(5) self.assertTrue(self.listener._received.isSet())