Fixing glance-api hangs in the qpid notifier

Glance-api was able to hang in qpid notifier under heavy image creation load.

The ``thread`` and ``select`` modules used by the python-qpid for managing
the AMQP connection. When the eventlet was not able to switch between threads
because leaded to hang and/or pipe(2) leaking issues.

* Monkey patching the ``select`` and ``thread`` modules to be  eventlet friendly
  in order to avoid hanging issues.

* The reference to the connection object in the QpidStrategy
  was replaceable by a concurrent thread, which could cause various issues.
  Using just local variables for storing connection object in order to avoid
  concurrent unsafe manipulation.

Fixing bug 1229042

Change-Id: I8fa8c4f36892b96d406216cb3c64854a94ca9df7
(cherry picked from commit 2e7aa761b6)
This commit is contained in:
Attila Fazekas 2013-09-23 08:44:37 +02:00 committed by Flavio Percoco
parent 83d1efb880
commit 9a557c8f54
3 changed files with 44 additions and 32 deletions

View File

@ -27,8 +27,9 @@ import gettext
import os
import sys
# Monkey patch socket and time
eventlet.patcher.monkey_patch(all=False, socket=True, time=True)
# Monkey patch socket, time, select, threads
eventlet.patcher.monkey_patch(all=False, socket=True, time=True,
select=True, thread=True)
# If ../glance/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...

View File

@ -83,30 +83,31 @@ class QpidStrategy(strategy.Strategy):
def _open_connection(self):
"""Initialize the Qpid notification strategy."""
broker = CONF.qpid_hostname + ":" + CONF.qpid_port
self.connection = qpid.messaging.Connection(broker)
self.connection.username = CONF.qpid_username
self.connection.password = CONF.qpid_password
self.connection.sasl_mechanisms = CONF.qpid_sasl_mechanisms
connection = qpid.messaging.Connection(broker)
connection.username = CONF.qpid_username
connection.password = CONF.qpid_password
connection.sasl_mechanisms = CONF.qpid_sasl_mechanisms
# Hard code this option as enabled so that reconnect logic isn't needed
# in this file at all.
self.connection.reconnect = True
connection.reconnect = True
if CONF.qpid_reconnect_timeout:
self.connection.reconnect_timeout = CONF.qpid_reconnect_timeout
connection.reconnect_timeout = CONF.qpid_reconnect_timeout
if CONF.qpid_reconnect_limit:
self.connection.reconnect_limit = CONF.qpid_reconnect_limit
connection.reconnect_limit = CONF.qpid_reconnect_limit
if CONF.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
connection.reconnect_interval_max = (
CONF.qpid_reconnect_interval_max)
if CONF.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
connection.reconnect_interval_min = (
CONF.qpid_reconnect_interval_min)
if CONF.qpid_reconnect_interval:
self.connection.reconnect_interval = CONF.qpid_reconnect_interval
self.connection.heartbeat = CONF.qpid_heartbeat
self.connection.protocol = CONF.qpid_protocol
self.connection.tcp_nodelay = CONF.qpid_tcp_nodelay
self.connection.open()
connection.reconnect_interval = CONF.qpid_reconnect_interval
connection.heartbeat = CONF.qpid_heartbeat
connection.protocol = CONF.qpid_protocol
connection.tcp_nodelay = CONF.qpid_tcp_nodelay
connection.open()
LOG.info(_('Connected to AMQP server on %s') % broker)
return connection
def _send(self, priority, msg):
addr_opts = {
@ -124,11 +125,10 @@ class QpidStrategy(strategy.Strategy):
topic = "%s.%s" % (CONF.qpid_notification_topic, priority)
address = "%s/%s ; %s" % (CONF.qpid_notification_exchange, topic,
json.dumps(addr_opts))
connection = None
try:
self.connection = None
self._open_connection()
session = self.connection.session()
connection = self._open_connection()
session = connection.session()
sender = session.sender(address)
qpid_msg = qpid.messaging.Message(content=msg)
sender.send(qpid_msg)
@ -138,8 +138,8 @@ class QpidStrategy(strategy.Strategy):
'Message: %(msg)s' % details))
raise
finally:
if self.connection and self.connection.opened():
self.connection.close()
if connection and connection.opened():
connection.close()
def warn(self, msg):
self._send('warn', msg)

View File

@ -353,7 +353,7 @@ class TestQpidNotifier(utils.BaseTestCase):
qpid.messaging.Sender = self.orig_sender
qpid.messaging.Receiver = self.orig_receiver
def _test_notify(self, priority, exception=False, opened=True):
def _test_notify(self, priority, exception=False, exception_send=False):
test_msg = {'a': 'b'}
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
@ -363,7 +363,7 @@ class TestQpidNotifier(utils.BaseTestCase):
self.mock_connection.username = ""
if exception:
self.mock_connection.open().AndRaise(
Exception('Test Exception'))
Exception('Test open Exception'))
else:
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
@ -373,9 +373,14 @@ class TestQpidNotifier(utils.BaseTestCase):
'"create": "always"}' % priority)
self.mock_session.sender(expected_address).AndReturn(
self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
self.mock_connection.opened().AndReturn(opened)
if opened:
if exception_send:
self.mock_sender.send(mox.IgnoreArg()).AndRaise(
Exception('Test send Exception'))
# NOTE(afazekas): the opened and close call is expected
# in this case, but not expected if the open fails
else:
self.mock_sender.send(mox.IgnoreArg())
self.mock_connection.opened().AndReturn(True)
self.mock_connection.close()
self.mocker.ReplayAll()
@ -383,17 +388,17 @@ class TestQpidNotifier(utils.BaseTestCase):
self.config(notifier_strategy="qpid")
notifier = self.notify_qpid.QpidStrategy()
if priority == 'info':
if exception:
if exception or exception_send:
self.assertRaises(Exception, notifier.info, test_msg)
else:
notifier.info(test_msg)
elif priority == 'warn':
if exception:
if exception or exception_send:
self.assertRaises(Exception, notifier.warn, test_msg)
else:
notifier.warn(test_msg)
elif priority == 'error':
if exception:
if exception or exception_send:
self.assertRaises(Exception, notifier.error, test_msg)
else:
notifier.error(test_msg)
@ -412,8 +417,14 @@ class TestQpidNotifier(utils.BaseTestCase):
def test_exception_open_successful(self):
self._test_notify('info', exception=True)
def test_exception_open_failed(self):
self._test_notify('info', exception=True, opened=False)
def test_info_fail(self):
self._test_notify('info', exception_send=True)
def test_warn_fail(self):
self._test_notify('warn', exception_send=True)
def test_error_fail(self):
self._test_notify('error', exception_send=True)
class TestRabbitContentType(utils.BaseTestCase):