diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index 72429f1f1..dcf9da588 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -29,12 +29,12 @@ class GreenPoller(zmq_poller.ZmqPoller): def __init__(self): self.incoming_queue = six.moves.queue.Queue() self.green_pool = eventlet.GreenPool() - self.threads = [] + self.thread_by_socket = {} def register(self, socket, recv_method=None): - self.threads.append( - self.green_pool.spawn(self._socket_receive, socket, - recv_method)) + if socket not in self.thread_by_socket: + self.thread_by_socket[socket] = self.green_pool.spawn( + self._socket_receive, socket, recv_method) def _socket_receive(self, socket, recv_method=None): while True: @@ -59,10 +59,10 @@ class GreenPoller(zmq_poller.ZmqPoller): return incoming[0], incoming[1] def close(self): - for thread in self.threads: + for thread in self.thread_by_socket.values(): thread.kill() - self.threads = [] + self.thread_by_socket = {} class HoldReplyPoller(GreenPoller): diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py index 379d8ef3a..f1257badb 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py @@ -67,7 +67,7 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): if str(target) in self.outbound_sockets: dealer_socket, hosts = self.outbound_sockets[str(target)] else: - dealer_socket = self.zmq_context.socket(zmq.DEALER) + dealer_socket = zmq.Context().socket(zmq.DEALER) hosts = self.matchmaker.get_hosts(target) for host in hosts: self._connect_to_host(dealer_socket, host) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py index 098454524..38a470ba8 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py @@ -13,24 +13,15 @@ # under the License. import abc -import logging import six -from oslo_messaging._drivers.zmq_driver import zmq_async - - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - @six.add_metaclass(abc.ABCMeta) class CastPublisherBase(object): def __init__(self, conf): self.conf = conf - self.zmq_context = zmq.Context() self.outbound_sockets = {} super(CastPublisherBase, self).__init__() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py index 17b04e86c..981966ddc 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py @@ -44,10 +44,10 @@ class ZmqServer(base.Listener): raise rpc_common.RPCException(errmsg) self.poller = zmq_async.get_poller() - self.poller.register(self.socket, self._receive_message) self.matchmaker = matchmaker def poll(self, timeout=None): + self.poller.register(self.socket, self._receive_message) incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout) return incoming[0]