Catch exceptions for all rpc casts

I15cc2d6ae48e505c2da121880e27481dedf36d3b catches exceptions for
specific RPC endpoints affected by a recent change related to
push-notifications. There may be more changes like that in the future,
so instead of fixing them one by one, this patch consistently catches
exceptions from all cast calls for all RPC clients.

Change-Id: Ia7e6cd717758a9d5b18fe9cb07c55938f52040ce
Partial-Bug: #1705351
This commit is contained in:
Ihar Hrachyshka 2017-07-24 09:22:58 -07:00
parent cbe0f03f08
commit 85d8f6f1c8
3 changed files with 88 additions and 60 deletions

View File

@ -73,7 +73,7 @@ def cleanup():
assert NOTIFIER is not None
TRANSPORT.cleanup()
NOTIFICATION_TRANSPORT.cleanup()
_ContextWrapper.reset_timeouts()
_BackingOffContextWrapper.reset_timeouts()
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
@ -98,6 +98,24 @@ def _get_default_method_timeouts():
class _ContextWrapper(object):
def __init__(self, original_context):
self._original_context = original_context
def __getattr__(self, name):
return getattr(self._original_context, name)
def cast(self, ctxt, method, **kwargs):
try:
self._original_context.cast(ctxt, method, **kwargs)
except Exception:
# TODO(kevinbenton): make catch specific to missing exchange once
# bug/1705351 is resolved on the oslo.messaging side; if
# oslo.messaging auto-creates the exchange, then just remove the
# code completely
LOG.exception("Ignored exception during cast")
class _BackingOffContextWrapper(_ContextWrapper):
"""Wraps oslo messaging contexts to set the timeout for calls.
This intercepts RPC calls and sets the timeout value to the globally
@ -130,12 +148,6 @@ class _ContextWrapper(object):
})
cls._max_timeout = max_timeout
def __init__(self, original_context):
self._original_context = original_context
def __getattr__(self, name):
return getattr(self._original_context, name)
def call(self, ctxt, method, **kwargs):
# two methods with the same name in different namespaces should
# be tracked independently
@ -178,19 +190,21 @@ class BackingOffClient(oslo_messaging.RPCClient):
"""An oslo messaging RPC Client that implements a timeout backoff.
This has all of the same interfaces as oslo_messaging.RPCClient but
if the timeout parameter is not specified, the _ContextWrapper returned
will track when call timeout exceptions occur and exponentially increase
the timeout for the given call method.
if the timeout parameter is not specified, the _BackingOffContextWrapper
returned will track when call timeout exceptions occur and exponentially
increase the timeout for the given call method.
"""
def prepare(self, *args, **kwargs):
ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
# don't enclose Contexts that explicitly set a timeout
return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx
# don't back off contexts that explicitly set a timeout
if 'timeout' in kwargs:
return _ContextWrapper(ctx)
return _BackingOffContextWrapper(ctx)
@staticmethod
def set_max_timeout(max_timeout):
'''Set RPC timeout ceiling for all backing-off RPC clients.'''
_ContextWrapper.set_max_timeout(max_timeout)
_BackingOffContextWrapper.set_max_timeout(max_timeout)
def get_client(target, version_cap=None, serializer=None):

View File

@ -22,7 +22,6 @@ from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from oslo_log import log
import oslo_messaging
import six
from sqlalchemy.orm import exc
from neutron._i18n import _LE, _LW
@ -364,19 +363,6 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
'failed_devices_down': failed_devices_down}
def _suppress_cast_exceptions(f):
"""Decorator to ignore exchange not found exceptions."""
@six.wraps(f)
def wrapped(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception:
# TODO(kevinbenton): make catch specific to missing exchange once
# bug/1705351 is resolved on the oslo.messaging side.
LOG.exception("Ignored exception during cast")
return wrapped
class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
sg_rpc.SecurityGroupAgentRpcApiMixin,
type_tunnel.TunnelAgentRpcApiMixin):
@ -407,13 +393,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
@_suppress_cast_exceptions
def network_delete(self, context, network_id):
cctxt = self.client.prepare(topic=self.topic_network_delete,
fanout=True)
cctxt.cast(context, 'network_delete', network_id=network_id)
@_suppress_cast_exceptions
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
cctxt = self.client.prepare(topic=self.topic_port_update,
@ -422,13 +406,11 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
network_type=network_type, segmentation_id=segmentation_id,
physical_network=physical_network)
@_suppress_cast_exceptions
def port_delete(self, context, port_id):
cctxt = self.client.prepare(topic=self.topic_port_delete,
fanout=True)
cctxt.cast(context, 'port_delete', port_id=port_id)
@_suppress_cast_exceptions
def network_update(self, context, network):
cctxt = self.client.prepare(topic=self.topic_network_update,
fanout=True, version='1.4')

View File

@ -335,21 +335,26 @@ class TimeoutTestCase(base.DietTestCase):
# ensure that the timeout was not increased and the back-off sleep
# wasn't called
self.assertEqual(
5, rpc._ContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
5,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
self.assertFalse(self.sleep.called)
def test_timeout_store_defaults(self):
# any method should default to the configured timeout
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'])
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
# a change to an existing should not affect new or existing ones
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_3'])
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_3'])
def test_method_timeout_sleep(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 2
@ -362,7 +367,7 @@ class TimeoutTestCase(base.DietTestCase):
self.sleep.reset_mock()
def test_method_timeout_increases_on_timeout_exception(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
for i in range(5):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
@ -378,15 +383,17 @@ class TimeoutTestCase(base.DietTestCase):
for i in range(5):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
def test_timeout_unchanged_on_other_exception(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc.TRANSPORT._send.side_effect = ValueError
with testtools.ExpectedException(ValueError):
self.client.call(self.call_context, 'method_1')
@ -398,8 +405,8 @@ class TimeoutTestCase(base.DietTestCase):
self.assertEqual([1, 1], timeouts)
def test_timeouts_for_methods_tracked_independently(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
for method in ('method_1', 'method_1', 'method_2',
'method_1', 'method_2'):
with testtools.ExpectedException(messaging.MessagingTimeout):
@ -409,8 +416,8 @@ class TimeoutTestCase(base.DietTestCase):
self.assertEqual([1, 2, 1, 4, 2], timeouts)
def test_timeouts_for_namespaces_tracked_independently(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
rpc._ContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
for ns in ('ns1', 'ns2'):
self.client.target.namespace = ns
for i in range(4):
@ -421,7 +428,7 @@ class TimeoutTestCase(base.DietTestCase):
self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts)
def test_method_timeout_increases_with_prepare(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
ctx = self.client.prepare(version='1.4')
with testtools.ExpectedException(messaging.MessagingTimeout):
ctx.call(self.call_context, 'method_1')
@ -435,23 +442,48 @@ class TimeoutTestCase(base.DietTestCase):
def test_set_max_timeout_caps_all_methods(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 300
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 100
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 100
rpc.BackingOffClient.set_max_timeout(50)
# both explicitly tracked
self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
# as well as new methods
self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'])
self.assertEqual(
50, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_2'])
def test_set_max_timeout_retains_lower_timeouts(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 10
rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'] = 10
rpc.BackingOffClient.set_max_timeout(50)
self.assertEqual(10, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(
10, rpc._BackingOffContextWrapper._METHOD_TIMEOUTS['method_1'])
def test_set_max_timeout_overrides_default_timeout(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 10
self.assertEqual(10 * 10, rpc._ContextWrapper.get_max_timeout())
rpc._ContextWrapper.set_max_timeout(10)
self.assertEqual(10, rpc._ContextWrapper.get_max_timeout())
self.assertEqual(
10 * 10, rpc._BackingOffContextWrapper.get_max_timeout())
rpc._BackingOffContextWrapper.set_max_timeout(10)
self.assertEqual(10, rpc._BackingOffContextWrapper.get_max_timeout())
class CastExceptionTestCase(base.DietTestCase):
def setUp(self):
super(CastExceptionTestCase, self).setUp()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(rpc.cleanup)
rpc.init(CONF)
rpc.TRANSPORT = mock.MagicMock()
rpc.TRANSPORT._send.side_effect = Exception
target = messaging.Target(version='1.0', topic='testing')
self.client = rpc.get_client(target)
self.cast_context = mock.Mock()
def test_cast_catches_exception(self):
self.client.cast(self.cast_context, 'method_1')
class TestConnection(base.DietTestCase):