Warn user if needed when the process is forked

This change warns the library consumer when the process if forked and
we can't be sure that the library work as expected.

This also add some documentation about forking oslo.messaging Transport
object.

Change-Id: I2938421775aa72866adac198d70214856d45e165
Related-bug: #1330199
This commit is contained in:
Mehdi Abaakouk 2014-11-27 14:46:52 +01:00
parent 0650bde775
commit eb21f6b263
3 changed files with 42 additions and 0 deletions

View File

@ -14,3 +14,17 @@ Transport
.. autoclass:: TransportHost
.. autofunction:: set_transport_defaults
About fork oslo.messaging transport object
------------------------------------------
oslo.messaging can't ensure that forking a process that shares the same
transport object is safe for the library consumer, because it relies on
different 3rd party libraries that don't ensure that too, but in certain
case/driver it works:
* rabbit: works only if no connection have already been established.
* qpid: doesn't work (qpid library have a global state that use fd
that can't be resetted)
* amqp1: works

View File

@ -16,6 +16,7 @@
import functools
import itertools
import logging
import os
import random
import time
@ -490,6 +491,7 @@ class Connection(object):
random.shuffle(self.brokers_params)
self.brokers = itertools.cycle(self.brokers_params)
self._initial_pid = os.getpid()
self.reconnect()
def _connect(self, broker):
@ -578,6 +580,21 @@ class Connection(object):
LOG.debug("Re-established AMQP queues")
def ensure(self, error_callback, method, retry=None):
current_pid = os.getpid()
if self._initial_pid != current_pid:
# NOTE(sileht):
# to get the same level of fork support that rabbit driver have
# (ie: allow fork before the first connection established)
# we could use the kombu workaround:
# https://github.com/celery/kombu/blob/master/kombu/transport/
# qpid_patches.py#L67
LOG.warn("Process forked! "
"This can results to unpredictable behavior. "
"See: http://docs.openstack.org/developer/"
"oslo.messaging/transport.html")
self._initial_pid = current_pid
while True:
try:
return method()

View File

@ -15,6 +15,7 @@
import functools
import itertools
import logging
import os
import socket
import ssl
import time
@ -472,6 +473,8 @@ class Connection(object):
hostname, port,
virtual_host)
self._initial_pid = os.getpid()
self.do_consume = True
self.channel = None
@ -553,6 +556,14 @@ class Connection(object):
retry = N means N retries
"""
current_pid = os.getpid()
if self._initial_pid != current_pid:
LOG.warn("Process forked after connection established! "
"This can results to unpredictable behavior. "
"See: http://docs.openstack.org/developer/"
"oslo.messaging/transport.html")
self._initial_pid = current_pid
if retry is None:
retry = self.max_retries
if retry is None or retry < 0: