diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py similarity index 58% rename from oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py rename to oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py index 89031ecc4..4a5eba4e7 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import abc from concurrent import futures import logging @@ -21,6 +22,7 @@ import oslo_messaging from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE @@ -30,34 +32,23 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class DealerPublisher(zmq_publisher_base.PublisherBase): - """Non-CALL publisher using direct connections.""" +class DealerPublisherBase(zmq_publisher_base.PublisherBase): + """Abstract DEALER-publisher.""" - def send_request(self, request): - if request.msg_type == zmq_names.CALL_TYPE: + def __init__(self, conf, matchmaker, sender, receiver): + sockets_manager = zmq_sockets_manager.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER + ) + super(DealerPublisherBase, self).__init__(sockets_manager, sender, + receiver) + + @staticmethod + def _check_pattern(request, supported_pattern): + if request.msg_type != supported_pattern: raise zmq_publisher_base.UnsupportedSendPattern( zmq_names.message_type_str(request.msg_type) ) - try: - socket = self.sockets_manager.get_socket(request.target) - except retrying.RetryError: - return - - if request.msg_type in zmq_names.MULTISEND_TYPES: - for _ in range(socket.connections_count()): - self.sender.send(socket, request) - else: - self.sender.send(socket, request) - - -class DealerCallPublisher(zmq_publisher_base.PublisherBase): - """CALL publisher using direct connections.""" - - def __init__(self, sockets_manager, sender, reply_receiver): - super(DealerCallPublisher, self).__init__(sockets_manager, sender) - self.reply_receiver = reply_receiver - @staticmethod def _raise_timeout(request): raise oslo_messaging.MessagingTimeout( @@ -65,26 +56,12 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase): {"tout": request.timeout, "msg_id": request.message_id} ) - def send_request(self, request): - if request.msg_type != zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern( - zmq_names.message_type_str(request.msg_type) - ) - - try: - socket = self._connect_socket(request.target) - except retrying.RetryError: - self._raise_timeout(request) - - self.sender.send(socket, request) - self.reply_receiver.register_socket(socket) - return self._recv_reply(request) - - def _connect_socket(self, target): - return self.sockets_manager.get_socket(target) + @abc.abstractmethod + def _connect_socket(self, request): + pass def _recv_reply(self, request): - reply_future, = self.reply_receiver.track_request(request) + reply_future, = self.receiver.track_request(request) try: _, reply = reply_future.result(timeout=request.timeout) @@ -95,7 +72,7 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase): except futures.TimeoutError: self._raise_timeout(request) finally: - self.reply_receiver.untrack_request(request) + self.receiver.untrack_request(request) if reply.failure: raise rpc_common.deserialize_remote_exception( @@ -104,6 +81,30 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase): else: return reply.reply_body - def cleanup(self): - self.reply_receiver.stop() - super(DealerCallPublisher, self).cleanup() + def send_call(self, request): + self._check_pattern(request, zmq_names.CALL_TYPE) + + try: + socket = self._connect_socket(request) + except retrying.RetryError: + self._raise_timeout(request) + + self.sender.send(socket, request) + self.receiver.register_socket(socket) + return self._recv_reply(request) + + @abc.abstractmethod + def _send_non_blocking(self, request): + pass + + def send_cast(self, request): + self._check_pattern(request, zmq_names.CAST_TYPE) + self._send_non_blocking(request) + + def send_fanout(self, request): + self._check_pattern(request, zmq_names.CAST_FANOUT_TYPE) + self._send_non_blocking(request) + + def send_notify(self, request): + self._check_pattern(request, zmq_names.NOTIFY_TYPE) + self._send_non_blocking(request) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py new file mode 100644 index 000000000..56d8b4923 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -0,0 +1,53 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import retrying + +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_publisher_base +from oslo_messaging._drivers.zmq_driver.client import zmq_receivers +from oslo_messaging._drivers.zmq_driver.client import zmq_senders +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): + """DEALER-publisher using direct connections.""" + + def __init__(self, conf, matchmaker): + sender = zmq_senders.RequestSenderDirect(conf) + receiver = zmq_receivers.ReplyReceiverDirect(conf) + super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender, + receiver) + + def _connect_socket(self, request): + return self.sockets_manager.get_socket(request.target) + + def _send_non_blocking(self, request): + try: + socket = self._connect_socket(request) + except retrying.RetryError: + return + + if request.msg_type in zmq_names.MULTISEND_TYPES: + for _ in range(socket.connections_count()): + self.sender.send(socket, request) + else: + self.sender.send(socket, request) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 9f53bed15..29dd3fcd3 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -17,10 +17,10 @@ import logging import retrying from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher -from oslo_messaging._drivers.zmq_driver.client.publishers \ - import zmq_publisher_base + import zmq_dealer_publisher_base +from oslo_messaging._drivers.zmq_driver.client import zmq_receivers from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table +from oslo_messaging._drivers.zmq_driver.client import zmq_senders from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -31,17 +31,31 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class DealerPublisherProxy(zmq_publisher_base.PublisherBase): - """Non-CALL publisher via proxy.""" +class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): + """DEALER-publisher via proxy.""" - def __init__(self, sockets_manager, sender): - super(DealerPublisherProxy, self).__init__(sockets_manager, sender) - self.socket = sockets_manager.get_socket_to_publishers() + def __init__(self, conf, matchmaker): + sender = zmq_senders.RequestSenderProxy(conf) + receiver = zmq_receivers.ReplyReceiverProxy(conf) + super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender, + receiver) + self.socket = self.sockets_manager.get_socket_to_publishers() self.routing_table = zmq_routing_table.RoutingTable(self.conf, self.matchmaker) self.connection_updater = \ PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket) + def _connect_socket(self, request): + return self.socket + + def send_call(self, request): + try: + request.routing_key = \ + self.routing_table.get_routable_host(request.target) + except retrying.RetryError: + self._raise_timeout(request) + return super(DealerPublisherProxy, self).send_call(request) + def _get_routing_keys(self, request): try: if request.msg_type in zmq_names.DIRECT_TYPES: @@ -54,48 +68,14 @@ class DealerPublisherProxy(zmq_publisher_base.PublisherBase): except retrying.RetryError: return [] - def send_request(self, request): - if request.msg_type == zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern( - zmq_names.message_type_str(request.msg_type) - ) + def _send_non_blocking(self, request): for routing_key in self._get_routing_keys(request): request.routing_key = routing_key self.sender.send(self.socket, request) def cleanup(self): - self.connection_updater.stop() - self.socket.close() super(DealerPublisherProxy, self).cleanup() - - -class DealerCallPublisherProxy(zmq_dealer_publisher.DealerCallPublisher): - """CALL publisher via proxy.""" - - def __init__(self, sockets_manager, sender, reply_waiter): - super(DealerCallPublisherProxy, self).__init__( - sockets_manager, sender, reply_waiter - ) - self.socket = self.sockets_manager.get_socket_to_publishers() - self.routing_table = zmq_routing_table.RoutingTable(self.conf, - self.matchmaker) - self.connection_updater = \ - PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket) - - def send_request(self, request): - try: - request.routing_key = \ - self.routing_table.get_routable_host(request.target) - except retrying.RetryError: - self._raise_timeout(request) - return super(DealerCallPublisherProxy, self).send_request(request) - - def _connect_socket(self, target): - return self.socket - - def cleanup(self): self.connection_updater.stop() - super(DealerCallPublisherProxy, self).cleanup() self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index bb5f29484..9da0c056e 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -53,29 +53,42 @@ class PublisherBase(object): Publisher can send request objects from zmq_request. """ - def __init__(self, sockets_manager, sender): + def __init__(self, sockets_manager, sender, receiver): """Construct publisher - Accept configuration object and Name Service interface object. - Create zmq.Context and connected sockets dictionary. + Accept sockets manager, sender and receiver objects. - :param conf: configuration object - :type conf: oslo_config.CONF + :param sockets_manager: sockets manager object + :type sockets_manager: zmq_sockets_manager.SocketsManager + :param senders: request sender object + :type senders: zmq_senders.RequestSender + :param receiver: reply receiver object + :type receiver: zmq_receivers.ReplyReceiver """ self.sockets_manager = sockets_manager self.conf = sockets_manager.conf self.matchmaker = sockets_manager.matchmaker self.sender = sender + self.receiver = receiver @abc.abstractmethod - def send_request(self, request): - """Send request to consumer + def send_call(self, request): + pass - :param request: Message data and destination container object - :type request: zmq_request.Request - """ + @abc.abstractmethod + def send_cast(self, request): + pass + + @abc.abstractmethod + def send_fanout(self, request): + pass + + @abc.abstractmethod + def send_notify(self, request): + pass def cleanup(self): """Cleanup publisher. Close allocated connections.""" + self.receiver.stop() self.sockets_manager.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index a8cfe934a..e7362e2f6 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -15,13 +15,10 @@ from oslo_messaging._drivers import common from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher + import zmq_dealer_publisher_direct from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher_proxy from oslo_messaging._drivers.zmq_driver.client import zmq_client_base -from oslo_messaging._drivers.zmq_driver.client import zmq_receivers -from oslo_messaging._drivers.zmq_driver.client import zmq_senders -from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -45,34 +42,18 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase): if conf.use_router_proxy or not conf.use_pub_sub: raise WrongClientException() - self.sockets_manager = zmq_sockets_manager.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER - ) + publisher_direct = \ + zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker) - sender_proxy = zmq_senders.RequestSenderProxy(conf) - sender_direct = zmq_senders.RequestSenderDirect(conf) - - receiver_direct = zmq_receivers.ReplyReceiverDirect(conf) - - fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy( - self.sockets_manager, sender_proxy - ) + publisher_proxy = \ + zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker) super(ZmqClientMixDirectPubSub, self).__init__( conf, matchmaker, allowed_remote_exmods, publishers={ - zmq_names.CALL_TYPE: - zmq_dealer_publisher.DealerCallPublisher( - self.sockets_manager, sender_direct, receiver_direct - ), - - zmq_names.CAST_FANOUT_TYPE: fanout_publisher, - - zmq_names.NOTIFY_TYPE: fanout_publisher, - - "default": - zmq_dealer_publisher.DealerPublisher(self.sockets_manager, - sender_direct) + zmq_names.CAST_FANOUT_TYPE: publisher_proxy, + zmq_names.NOTIFY_TYPE: publisher_proxy, + "default": publisher_direct } ) @@ -90,26 +71,12 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase): if conf.use_pub_sub or conf.use_router_proxy: raise WrongClientException() - self.sockets_manager = zmq_sockets_manager.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER - ) - - sender = zmq_senders.RequestSenderDirect(conf) - - receiver = zmq_receivers.ReplyReceiverDirect(conf) + publisher = \ + zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker) super(ZmqClientDirect, self).__init__( conf, matchmaker, allowed_remote_exmods, - publishers={ - zmq_names.CALL_TYPE: - zmq_dealer_publisher.DealerCallPublisher( - self.sockets_manager, sender, receiver - ), - - "default": - zmq_dealer_publisher.DealerPublisher(self.sockets_manager, - sender) - } + publishers={"default": publisher} ) @@ -128,25 +95,10 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase): if not conf.use_router_proxy: raise WrongClientException() - self.sockets_manager = zmq_sockets_manager.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER - ) - - sender = zmq_senders.RequestSenderProxy(conf) - - receiver = zmq_receivers.ReplyReceiverProxy(conf) + publisher = \ + zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker) super(ZmqClientProxy, self).__init__( conf, matchmaker, allowed_remote_exmods, - publishers={ - zmq_names.CALL_TYPE: - zmq_dealer_publisher_proxy.DealerCallPublisherProxy( - self.sockets_manager, sender, receiver - ), - - "default": - zmq_dealer_publisher_proxy.DealerPublisherProxy( - self.sockets_manager, sender - ) - } + publishers={"default": publisher} ) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py index 7630cc7f0..4643ff359 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py @@ -24,45 +24,44 @@ class ZmqClientBase(object): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None, publishers=None): self.conf = conf - self.context = zmq.Context() self.matchmaker = matchmaker self.allowed_remote_exmods = allowed_remote_exmods or [] self.publishers = publishers - self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \ - or publishers["default"] - self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \ - or publishers["default"] - self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \ - or publishers["default"] - self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \ - or publishers["default"] + self.call_publisher = publishers.get(zmq_names.CALL_TYPE, + publishers["default"]) + self.cast_publisher = publishers.get(zmq_names.CAST_TYPE, + publishers["default"]) + self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE, + publishers["default"]) + self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE, + publishers["default"]) def send_call(self, target, context, message, timeout=None, retry=None): request = zmq_request.CallRequest( target, context=context, message=message, retry=retry, timeout=timeout, allowed_remote_exmods=self.allowed_remote_exmods ) - return self.call_publisher.send_request(request) + return self.call_publisher.send_call(request) def send_cast(self, target, context, message, retry=None): request = zmq_request.CastRequest( target, context=context, message=message, retry=retry ) - self.cast_publisher.send_request(request) + self.cast_publisher.send_cast(request) def send_fanout(self, target, context, message, retry=None): request = zmq_request.FanoutRequest( target, context=context, message=message, retry=retry ) - self.fanout_publisher.send_request(request) + self.fanout_publisher.send_fanout(request) def send_notify(self, target, context, message, version, retry=None): request = zmq_request.NotificationRequest( target, context=context, message=message, retry=retry, version=version ) - self.notify_publisher.send_request(request) + self.notify_publisher.send_notify(request) def cleanup(self): cleaned = set() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py index 2fb819135..3b83d9a50 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py @@ -37,7 +37,15 @@ class SenderBase(object): pass -class RequestSenderProxy(SenderBase): +class RequestSender(SenderBase): + pass + + +class ReplySender(SenderBase): + pass + + +class RequestSenderProxy(RequestSender): def send(self, socket, request): socket.send(b'', zmq.SNDMORE) @@ -55,7 +63,7 @@ class RequestSenderProxy(SenderBase): "target": request.target}) -class ReplySenderProxy(SenderBase): +class ReplySenderProxy(ReplySender): def send(self, socket, reply): LOG.debug("Replying to %s", reply.message_id) @@ -69,7 +77,7 @@ class ReplySenderProxy(SenderBase): socket.send_dumped(reply.to_dict()) -class RequestSenderDirect(SenderBase): +class RequestSenderDirect(RequestSender): def send(self, socket, request): socket.send(b'', zmq.SNDMORE) @@ -85,7 +93,7 @@ class RequestSenderDirect(SenderBase): "target": request.target}) -class ReplySenderDirect(SenderBase): +class ReplySenderDirect(ReplySender): def send(self, socket, reply): LOG.debug("Replying to %s", reply.message_id)