Merge "ZMQ: `Lazify` driver code" into feature/zmq
This commit is contained in:
commit
2aa35bb6d9
|
@ -29,12 +29,12 @@ class GreenPoller(zmq_poller.ZmqPoller):
|
|||
def __init__(self):
|
||||
self.incoming_queue = six.moves.queue.Queue()
|
||||
self.green_pool = eventlet.GreenPool()
|
||||
self.threads = []
|
||||
self.thread_by_socket = {}
|
||||
|
||||
def register(self, socket, recv_method=None):
|
||||
self.threads.append(
|
||||
self.green_pool.spawn(self._socket_receive, socket,
|
||||
recv_method))
|
||||
if socket not in self.thread_by_socket:
|
||||
self.thread_by_socket[socket] = self.green_pool.spawn(
|
||||
self._socket_receive, socket, recv_method)
|
||||
|
||||
def _socket_receive(self, socket, recv_method=None):
|
||||
while True:
|
||||
|
@ -59,10 +59,10 @@ class GreenPoller(zmq_poller.ZmqPoller):
|
|||
return incoming[0], incoming[1]
|
||||
|
||||
def close(self):
|
||||
for thread in self.threads:
|
||||
for thread in self.thread_by_socket.values():
|
||||
thread.kill()
|
||||
|
||||
self.threads = []
|
||||
self.thread_by_socket = {}
|
||||
|
||||
|
||||
class HoldReplyPoller(GreenPoller):
|
||||
|
|
|
@ -67,7 +67,7 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
|
|||
if str(target) in self.outbound_sockets:
|
||||
dealer_socket, hosts = self.outbound_sockets[str(target)]
|
||||
else:
|
||||
dealer_socket = self.zmq_context.socket(zmq.DEALER)
|
||||
dealer_socket = zmq.Context().socket(zmq.DEALER)
|
||||
hosts = self.matchmaker.get_hosts(target)
|
||||
for host in hosts:
|
||||
self._connect_to_host(dealer_socket, host)
|
||||
|
|
|
@ -13,24 +13,15 @@
|
|||
# under the License.
|
||||
|
||||
import abc
|
||||
import logging
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CastPublisherBase(object):
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.zmq_context = zmq.Context()
|
||||
self.outbound_sockets = {}
|
||||
super(CastPublisherBase, self).__init__()
|
||||
|
||||
|
|
|
@ -44,10 +44,10 @@ class ZmqServer(base.Listener):
|
|||
raise rpc_common.RPCException(errmsg)
|
||||
|
||||
self.poller = zmq_async.get_poller()
|
||||
self.poller.register(self.socket, self._receive_message)
|
||||
self.matchmaker = matchmaker
|
||||
|
||||
def poll(self, timeout=None):
|
||||
self.poller.register(self.socket, self._receive_message)
|
||||
incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout)
|
||||
return incoming[0]
|
||||
|
||||
|
|
Loading…
Reference in New Issue