Prevent rabbit from raising unexpected exceptions

Publishing a message using the kombu connection autoretry method may
allow exceptions from the py-amqp library to be raised up to the
application. This does not conform to the documented oslo.messaging
API.

Enhance the try except block to capture any exception and translate it
into a MessageDeliveryFailure.

There are a few cases where exceptions will be raised during autoretry
publishing: recoverable connection or channel errors, and
non-recoverable connection or channel errors.

autoretry will only retry if the error is recoverable. Non recoverable
errors are re-raised immediately regardless of the retry count.

In the case of a recoverable error it seems unlikely that retrying
either the connection or the channel yet again is going to get us
anywhere, so in this case we simply clean up the channel state, log an
error and fail the operation.

In the case of non-recoverable error we are out of luck (think
authentication failure) - further retrying will not help.  Best we can
do is clean up state and log the heck out of it.

Change-Id: I2f65d2ee19a8c3e9a323b30404abbf0cbb45a216
Closes-Bug: #1705351
Closes-Bug: #1707160
This commit is contained in:
Kenneth Giusti 2017-07-24 13:36:17 -04:00
parent ad4401c475
commit f059bba6ae
2 changed files with 20 additions and 10 deletions

View File

@ -807,9 +807,11 @@ class Connection(object):
ret, channel = autoretry_method()
self._set_current_channel(channel)
return ret
except kombu.exceptions.OperationalError as exc:
LOG.debug("Received recoverable error from kombu:",
exc_info=True)
except rpc_amqp.AMQPDestinationNotFound:
# NOTE(sileht): we must reraise this without
# trigger error_callback
raise
except Exception as exc:
error_callback and error_callback(exc)
self._set_current_channel(None)
# NOTE(sileht): number of retry exceeded and the connection
@ -821,13 +823,6 @@ class Connection(object):
'tries: %(err_str)s') % info
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
except rpc_amqp.AMQPDestinationNotFound:
# NOTE(sileht): we must reraise this without
# trigger error_callback
raise
except Exception as exc:
error_callback and error_callback(exc)
raise
def _set_current_channel(self, new_channel):
"""Change the channel to use.

View File

@ -30,6 +30,7 @@ import oslo_messaging
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
from oslo_messaging.exceptions import MessageDeliveryFailure
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@ -285,6 +286,20 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
try_send(e_active)
self.assertIn('foobar', conn._declared_exchanges)
def test_send_exception_remap(self):
bad_exc = Exception("Non-oslo.messaging exception")
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
exchange_mock = mock.Mock()
with transport._driver._get_connection(
driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
with mock.patch('kombu.messaging.Producer.publish',
side_effect=bad_exc):
self.assertRaises(MessageDeliveryFailure,
conn._ensure_publishing,
conn._publish, exchange_mock, 'msg')
class TestRabbitConsume(test_utils.BaseTestCase):