Fix fork-related issues

Many services make use of 'fork' system call to start
new instances of 'workers'. Such approach forces
messaging drivers to perform their
initialization in lazy manner.

Added LazyDriverItem object to init any part of the driver
by first request.

Fixed DEALER-publisher not to block on sending when
no listener connected.

Introduced ZmqSocket wrapper to track connections
in outgoing sockets.

Refactoring of publishers, introduced PublisherMultisend.

Change-Id: I125c946ee9e36061d1b21aa29adcef0611dff201
This commit is contained in:
Oleksii Zamiatin 2015-08-10 18:07:38 +03:00
parent c5a6bfdca3
commit 1adf880a23
8 changed files with 200 additions and 50 deletions

View File

@ -15,6 +15,7 @@
import logging import logging
import pprint import pprint
import socket import socket
import threading
from oslo_config import cfg from oslo_config import cfg
from stevedore import driver from stevedore import driver
@ -82,6 +83,36 @@ zmq_opts = [
] ]
class LazyDriverItem(object):
def __init__(self, item_cls, *args, **kwargs):
self._lock = threading.Lock()
self.item = None
self.item_class = item_cls
self.args = args
self.kwargs = kwargs
def get(self):
# NOTE(ozamiatin): Lazy initialization.
# All init stuff moved closer to usage point - lazy init.
# Better design approach is to initialize in the driver's
# __init__, but 'fork' extensively used by services
# breaks all things.
if self.item is not None:
return self.item
self._lock.acquire()
if self.item is None:
self.item = self.item_class(*self.args, **self.kwargs)
self._lock.release()
return self.item
def cleanup(self):
if self.item:
self.item.cleanup()
class ZmqDriver(base.BaseDriver): class ZmqDriver(base.BaseDriver):
"""ZeroMQ Driver implementation. """ZeroMQ Driver implementation.
@ -115,15 +146,27 @@ class ZmqDriver(base.BaseDriver):
conf.register_opts(zmq_opts) conf.register_opts(zmq_opts)
conf.register_opts(executor_base._pool_opts) conf.register_opts(executor_base._pool_opts)
self.conf = conf self.conf = conf
self.allowed_remote_exmods = allowed_remote_exmods
self.matchmaker = driver.DriverManager( self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker', 'oslo.messaging.zmq.matchmaker',
self.conf.rpc_zmq_matchmaker, self.conf.rpc_zmq_matchmaker,
).driver(self.conf) ).driver(self.conf)
self.server = zmq_server.ZmqServer(self.conf, self.matchmaker) self.server = LazyDriverItem(
self.client = zmq_client.ZmqClient(self.conf, self.matchmaker, zmq_server.ZmqServer, self, self.conf, self.matchmaker)
allowed_remote_exmods)
self.notify_server = LazyDriverItem(
zmq_server.ZmqServer, self, self.conf, self.matchmaker)
self.client = LazyDriverItem(
zmq_client.ZmqClient, self.conf, self.matchmaker,
self.allowed_remote_exmods)
self.notifier = LazyDriverItem(
zmq_client.ZmqClient, self.conf, self.matchmaker,
self.allowed_remote_exmods)
super(ZmqDriver, self).__init__(conf, url, default_exchange, super(ZmqDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods) allowed_remote_exmods)
@ -147,13 +190,14 @@ class ZmqDriver(base.BaseDriver):
N means N retries N means N retries
:type retry: int :type retry: int
""" """
client = self.client.get()
timeout = timeout or self.conf.rpc_response_timeout timeout = timeout or self.conf.rpc_response_timeout
if wait_for_reply: if wait_for_reply:
return self.client.send_call(target, ctxt, message, timeout, retry) return client.send_call(target, ctxt, message, timeout, retry)
elif target.fanout: elif target.fanout:
self.client.send_fanout(target, ctxt, message, timeout, retry) client.send_fanout(target, ctxt, message, timeout, retry)
else: else:
self.client.send_cast(target, ctxt, message, timeout, retry) client.send_cast(target, ctxt, message, timeout, retry)
def send_notification(self, target, ctxt, message, version, retry=None): def send_notification(self, target, ctxt, message, version, retry=None):
"""Send notification to server """Send notification to server
@ -172,11 +216,11 @@ class ZmqDriver(base.BaseDriver):
N means N retries N means N retries
:type retry: int :type retry: int
""" """
client = self.notifier.get()
if target.fanout: if target.fanout:
self.client.send_notify_fanout(target, ctxt, message, version, client.send_notify_fanout(target, ctxt, message, version, retry)
retry)
else: else:
self.client.send_notify(target, ctxt, message, version, retry) client.send_notify(target, ctxt, message, version, retry)
def listen(self, target): def listen(self, target):
"""Listen to a specified target on a server side """Listen to a specified target on a server side
@ -184,8 +228,9 @@ class ZmqDriver(base.BaseDriver):
:param target: Message destination target :param target: Message destination target
:type target: oslo_messaging.Target :type target: oslo_messaging.Target
""" """
self.server.listen(target) server = self.server.get()
return self.server server.listen(target)
return server
def listen_for_notifications(self, targets_and_priorities, pool): def listen_for_notifications(self, targets_and_priorities, pool):
"""Listen to a specified list of targets on a server side """Listen to a specified list of targets on a server side
@ -195,11 +240,14 @@ class ZmqDriver(base.BaseDriver):
:param pool: Not used for zmq implementation :param pool: Not used for zmq implementation
:type pool: object :type pool: object
""" """
self.server.listen_notification(targets_and_priorities) server = self.notify_server.get()
return self.server server.listen_notification(targets_and_priorities)
return server
def cleanup(self): def cleanup(self):
"""Cleanup all driver's connections finally """Cleanup all driver's connections finally
""" """
self.client.cleanup() self.client.cleanup()
self.server.cleanup() self.server.cleanup()
self.notify_server.cleanup()
self.notifier.cleanup()

View File

@ -14,20 +14,21 @@
import logging import logging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers\ from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()
class DealerPublisher(zmq_publisher_base.PublisherBase): class DealerPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker):
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
def send_request(self, request): def send_request(self, request):
@ -37,41 +38,25 @@ class DealerPublisher(zmq_publisher_base.PublisherBase):
dealer_socket, hosts = self._check_hosts_connections(request.target) dealer_socket, hosts = self._check_hosts_connections(request.target)
if request.msg_type in zmq_names.MULTISEND_TYPES: if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(len(hosts)): for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, request) self._send_request(dealer_socket, request)
else: else:
self._send_request(dealer_socket, request) self._send_request(dealer_socket, request)
def _send_request(self, socket, request): def _send_request(self, socket, request):
if not socket.connections:
# NOTE(ozamiatin): Here we can provide
# a queue for keeping messages to send them later
# when some listener appears. However such approach
# being more reliable will consume additional memory.
LOG.warning(_LW("Request %s was dropped because no connection")
% request.msg_type)
return
socket.send(b'', zmq.SNDMORE) socket.send(b'', zmq.SNDMORE)
super(DealerPublisher, self)._send_request(socket, request) super(DealerPublisher, self)._send_request(socket, request)
LOG.info(_LI("Sending message %(message)s to a target %(target)s") LOG.info(_LI("Sending message %(message)s to a target %(target)s")
% {"message": request.message, % {"message": request.message,
"target": request.target}) "target": request.target})
def _check_hosts_connections(self, target):
if str(target) in self.outbound_sockets:
dealer_socket, hosts = self.outbound_sockets[str(target)]
else:
dealer_socket = zmq.Context().socket(zmq.DEALER)
hosts = self.matchmaker.get_hosts(target)
for host in hosts:
self._connect_to_host(dealer_socket, host, target)
self.outbound_sockets[str(target)] = (dealer_socket, hosts)
return dealer_socket, hosts
@staticmethod
def _connect_to_host(socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
try:
LOG.info(_LI("Connecting DEALER to %(address)s for %(target)s")
% {"address": address,
"target": target})
socket.connect(address)
except zmq.ZMQError as e:
errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\
% (address, e)
LOG.error(errmsg)
raise rpc_common.RPCException(errmsg)

View File

@ -13,13 +13,18 @@
# under the License. # under the License.
import abc import abc
import logging
import six import six
from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LE from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()
@ -93,6 +98,42 @@ class PublisherBase(object):
def cleanup(self): def cleanup(self):
"""Cleanup publisher. Close allocated connections.""" """Cleanup publisher. Close allocated connections."""
for socket, hosts in self.outbound_sockets.values(): for socket in self.outbound_sockets.values():
socket.setsockopt(zmq.LINGER, 0) socket.setsockopt(zmq.LINGER, 0)
socket.close() socket.close()
class PublisherMultisend(PublisherBase):
def __init__(self, conf, matchmaker, socket_type):
self.socket_type = socket_type
super(PublisherMultisend, self).__init__(conf, matchmaker)
def _check_hosts_connections(self, target):
hosts = self.matchmaker.get_hosts(target)
if str(target) in self.outbound_sockets:
socket = self.outbound_sockets[str(target)]
else:
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
self.outbound_sockets[str(target)] = socket
for host in hosts:
self._connect_to_host(socket, host, target)
return socket, hosts
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")
% {"stype": stype,
"address": address,
"target": target})
socket.connect(address)
except zmq.ZMQError as e:
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
% (stype, address, e)
LOG.error(errmsg)
raise rpc_common.RPCException(errmsg)

View File

@ -52,7 +52,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
LOG.info(_LI("Connecting REQ to %s") % connect_address) LOG.info(_LI("Connecting REQ to %s") % connect_address)
socket.connect(connect_address) socket.connect(connect_address)
self.outbound_sockets[str(target)] = (socket, [host]) self.outbound_sockets[str(target)] = socket
return socket return socket
except zmq.ZMQError as e: except zmq.ZMQError as e:

View File

@ -30,6 +30,7 @@ class RouterConsumer(object):
def __init__(self, conf, poller, server): def __init__(self, conf, poller, server):
self.conf = conf
self.poller = poller self.poller = poller
self.server = server self.server = server
@ -38,6 +39,7 @@ class RouterConsumer(object):
self.socket = self.context.socket(zmq.ROUTER) self.socket = self.context.socket(zmq.ROUTER)
self.address = zmq_address.get_tcp_random_address(conf) self.address = zmq_address.get_tcp_random_address(conf)
self.port = self.socket.bind_to_random_port(self.address) self.port = self.socket.bind_to_random_port(self.address)
self.poller.register(self.socket, self._receive_message)
LOG.info(_LI("Run ROUTER consumer on %(addr)s:%(port)d"), LOG.info(_LI("Run ROUTER consumer on %(addr)s:%(port)d"),
{"addr": self.address, {"addr": self.address,
"port": self.port}) "port": self.port})
@ -49,7 +51,7 @@ class RouterConsumer(object):
def listen(self, target): def listen(self, target):
LOG.info(_LI("Listen to target %s") % str(target)) LOG.info(_LI("Listen to target %s") % str(target))
self.poller.register(self.socket, self._receive_message) # Do nothing here because we have single socket
def cleanup(self): def cleanup(self):
if not self.socket.closed: if not self.socket.closed:
@ -66,7 +68,9 @@ class RouterConsumer(object):
assert msg_type is not None, 'Bad format: msg type expected' assert msg_type is not None, 'Bad format: msg type expected'
context = socket.recv_json() context = socket.recv_json()
message = socket.recv_json() message = socket.recv_json()
LOG.debug("Received %s message %s" % (msg_type, str(message))) LOG.info(_LI("Received %(msg_type)s message %(msg)s")
% {"msg_type": msg_type,
"msg": str(message)})
if msg_type == zmq_names.CALL_TYPE: if msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingRequest( return zmq_incoming_message.ZmqIncomingRequest(

View File

@ -28,8 +28,8 @@ zmq = zmq_async.import_zmq()
class ZmqServer(base.Listener): class ZmqServer(base.Listener):
def __init__(self, conf, matchmaker=None): def __init__(self, driver, conf, matchmaker=None):
self.conf = conf super(ZmqServer, self).__init__(driver)
self.matchmaker = matchmaker self.matchmaker = matchmaker
self.poller = zmq_async.get_poller() self.poller = zmq_async.get_poller()
self.rpc_consumer = zmq_router_consumer.RouterConsumer( self.rpc_consumer = zmq_router_consumer.RouterConsumer(

View File

@ -12,6 +12,17 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER",
zmq.ROUTER: "ROUTER",
zmq.REQ: "REQ",
zmq.REP: "REP",
zmq.PUB: "PUB",
zmq.SUB: "SUB"}
FIELD_FAILURE = 'failure' FIELD_FAILURE = 'failure'
FIELD_REPLY = 'reply' FIELD_REPLY = 'reply'
@ -33,3 +44,7 @@ MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE)
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE) DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE)
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE) NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE)
def socket_type_str(socket_type):
return ZMQ_SOCKET_STR[socket_type]

View File

@ -0,0 +1,57 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class ZmqSocket(object):
def __init__(self, context, socket_type):
self.context = context
self.socket_type = socket_type
self.handle = context.socket(socket_type)
self.connections = set()
def type_name(self):
return zmq_names(self.socket_type)
def connections_count(self):
return len(self.connections)
def connect(self, address):
if address not in self.connections:
self.handle.connect(address)
self.connections.add(address)
def setsockopt(self, *args, **kwargs):
self.handle.setsockopt(*args, **kwargs)
def send(self, *args, **kwargs):
self.handle.send(*args, **kwargs)
def send_string(self, *args, **kwargs):
self.handle.send_string(*args, **kwargs)
def send_json(self, *args, **kwargs):
self.handle.send_json(*args, **kwargs)
def close(self, *args, **kwargs):
self.handle.close(*args, **kwargs)