Gracefully handle qpid errors
If a qpid error occurs when Glance is sending notifications, the
qpid connection may not be cleaned up properly. This change catches
qpid exceptions to allow proper error handling.
Fixes bug 1164681
Conflicts:
glance/tests/unit/test_notifier.py
(cherry picked from commit 1e98e108c0
)
Change-Id: Ica3ef54b958e4efe3932b90d721324e7860b1ea4
This commit is contained in:
parent
a5522f0bf2
commit
83d1efb880
|
@ -80,7 +80,7 @@ CONF.register_opts(qpid_opts)
|
|||
class QpidStrategy(strategy.Strategy):
|
||||
"""A notifier that puts a message on a queue when called."""
|
||||
|
||||
def _get_session(self):
|
||||
def _open_connection(self):
|
||||
"""Initialize the Qpid notification strategy."""
|
||||
broker = CONF.qpid_hostname + ":" + CONF.qpid_port
|
||||
self.connection = qpid.messaging.Connection(broker)
|
||||
|
@ -106,12 +106,9 @@ class QpidStrategy(strategy.Strategy):
|
|||
self.connection.protocol = CONF.qpid_protocol
|
||||
self.connection.tcp_nodelay = CONF.qpid_tcp_nodelay
|
||||
self.connection.open()
|
||||
session = self.connection.session()
|
||||
LOG.info(_('Connected to AMQP server on %s') % broker)
|
||||
|
||||
return session
|
||||
|
||||
def _sender(self, priority):
|
||||
def _send(self, priority, msg):
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
|
@ -127,19 +124,28 @@ class QpidStrategy(strategy.Strategy):
|
|||
topic = "%s.%s" % (CONF.qpid_notification_topic, priority)
|
||||
address = "%s/%s ; %s" % (CONF.qpid_notification_exchange, topic,
|
||||
json.dumps(addr_opts))
|
||||
return self._get_session().sender(address)
|
||||
|
||||
try:
|
||||
self.connection = None
|
||||
self._open_connection()
|
||||
session = self.connection.session()
|
||||
sender = session.sender(address)
|
||||
qpid_msg = qpid.messaging.Message(content=msg)
|
||||
sender.send(qpid_msg)
|
||||
except Exception:
|
||||
details = dict(priority=priority, msg=msg)
|
||||
LOG.exception(_('Notification error. Priority: %(priority)s '
|
||||
'Message: %(msg)s' % details))
|
||||
raise
|
||||
finally:
|
||||
if self.connection and self.connection.opened():
|
||||
self.connection.close()
|
||||
|
||||
def warn(self, msg):
|
||||
qpid_msg = qpid.messaging.Message(content=msg)
|
||||
self._sender('warn').send(qpid_msg)
|
||||
self.connection.close()
|
||||
self._send('warn', msg)
|
||||
|
||||
def info(self, msg):
|
||||
qpid_msg = qpid.messaging.Message(content=msg)
|
||||
self._sender('info').send(qpid_msg)
|
||||
self.connection.close()
|
||||
self._send('info', msg)
|
||||
|
||||
def error(self, msg):
|
||||
qpid_msg = qpid.messaging.Message(content=msg)
|
||||
self._sender('error').send(qpid_msg)
|
||||
self.connection.close()
|
||||
self._send('error', msg)
|
||||
|
|
|
@ -353,7 +353,7 @@ class TestQpidNotifier(utils.BaseTestCase):
|
|||
qpid.messaging.Sender = self.orig_sender
|
||||
qpid.messaging.Receiver = self.orig_receiver
|
||||
|
||||
def _test_notify(self, priority):
|
||||
def _test_notify(self, priority, exception=False, opened=True):
|
||||
test_msg = {'a': 'b'}
|
||||
|
||||
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
|
||||
|
@ -361,27 +361,42 @@ class TestQpidNotifier(utils.BaseTestCase):
|
|||
self.mock_sender = self.mocker.CreateMock(self.orig_sender)
|
||||
|
||||
self.mock_connection.username = ""
|
||||
self.mock_connection.open()
|
||||
self.mock_connection.session().AndReturn(self.mock_session)
|
||||
for p in ["info", "warn", "error"]:
|
||||
if exception:
|
||||
self.mock_connection.open().AndRaise(
|
||||
Exception('Test Exception'))
|
||||
else:
|
||||
self.mock_connection.open()
|
||||
self.mock_connection.session().AndReturn(self.mock_session)
|
||||
expected_address = ('glance/notifications.%s ; '
|
||||
'{"node": {"x-declare": {"auto-delete": true, '
|
||||
'"durable": false}, "type": "topic"}, '
|
||||
'"create": "always"}' % p)
|
||||
'"create": "always"}' % priority)
|
||||
self.mock_session.sender(expected_address).AndReturn(
|
||||
self.mock_sender)
|
||||
self.mock_sender.send(mox.IgnoreArg())
|
||||
self.mock_sender.send(mox.IgnoreArg())
|
||||
self.mock_connection.opened().AndReturn(opened)
|
||||
if opened:
|
||||
self.mock_connection.close()
|
||||
|
||||
self.mocker.ReplayAll()
|
||||
|
||||
self.config(notifier_strategy="qpid")
|
||||
notifier = self.notify_qpid.QpidStrategy()
|
||||
if priority == 'info':
|
||||
notifier.info(test_msg)
|
||||
if exception:
|
||||
self.assertRaises(Exception, notifier.info, test_msg)
|
||||
else:
|
||||
notifier.info(test_msg)
|
||||
elif priority == 'warn':
|
||||
notifier.warn(test_msg)
|
||||
if exception:
|
||||
self.assertRaises(Exception, notifier.warn, test_msg)
|
||||
else:
|
||||
notifier.warn(test_msg)
|
||||
elif priority == 'error':
|
||||
notifier.error(test_msg)
|
||||
if exception:
|
||||
self.assertRaises(Exception, notifier.error, test_msg)
|
||||
else:
|
||||
notifier.error(test_msg)
|
||||
|
||||
self.mocker.VerifyAll()
|
||||
|
||||
|
@ -394,6 +409,12 @@ class TestQpidNotifier(utils.BaseTestCase):
|
|||
def test_error(self):
|
||||
self._test_notify('error')
|
||||
|
||||
def test_exception_open_successful(self):
|
||||
self._test_notify('info', exception=True)
|
||||
|
||||
def test_exception_open_failed(self):
|
||||
self._test_notify('info', exception=True, opened=False)
|
||||
|
||||
|
||||
class TestRabbitContentType(utils.BaseTestCase):
|
||||
"""Test AMQP/Rabbit notifier works."""
|
||||
|
|
Loading…
Reference in New Issue