Add exponential back-off RPC client

This adds an exponential backoff mechanism for timeout values
on any RPC calls in Neutron that don't explicitly request a timeout
value. This will prevent the clients from DDoSing the server by
giving up on requests and retrying them before they are fulfilled.

Each RPC call method in each namespace gets its own timeout value since
some calls are expected to be much more expensive than others and we
don't want to modify the timeouts of cheap calls.

The backoff currently has no reduction mechanism under the assumption
that timeouts not legitimately caused by heavy system load
(i.e. messages completely dropped by AMQP) are rare enough that the
cost of shrinking the timeout back down and potentially causing
another server timeout isn't worth it. The timeout does have a ceiling
of 10 times the configured default timeout value.

Whenever a timeout exception occurs, the client will also sleep for a
random value between 0 and the configured default timeout value to
introduce a splay across all of the agents that may be trying to
communicate with the server.

This patch is intended to be uninvasive for candidacy to be
back-ported. A larger refactor of delivering data to the agents
is being discussed in I3af200ad84483e6e1fe619d516ff20bc87041f7c.

Closes-Bug: #1554332
Change-Id: I923e415c1b8e9a431be89221c78c14f39c42c80f
This commit is contained in:
Kevin Benton 2016-02-16 01:50:23 -08:00
parent 5024077fc9
commit 3e668b6a37
2 changed files with 210 additions and 5 deletions

View File

@ -14,13 +14,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import random
import time
from neutron_lib import exceptions as lib_exceptions
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import serializer as om_serializer
from oslo_service import service
from oslo_utils import excutils
from neutron._i18n import _LE, _LW
from neutron.common import exceptions
from neutron import context
@ -77,6 +83,7 @@ def cleanup():
assert NOTIFIER is not None
TRANSPORT.cleanup()
NOTIFICATION_TRANSPORT.cleanup()
_ContextWrapper.reset_timeouts()
TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None
@ -92,13 +99,84 @@ def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
class _ContextWrapper(object):
"""Wraps oslo messaging contexts to set the timeout for calls.
This intercepts RPC calls and sets the timeout value to the globally
adapting value for each method. An oslo messaging timeout results in
a doubling of the timeout value for the method on which it timed out.
There currently is no logic to reduce the timeout since busy Neutron
servers are more frequently the cause of timeouts rather than lost
messages.
"""
_METHOD_TIMEOUTS = collections.defaultdict(
lambda: TRANSPORT.conf.rpc_response_timeout)
@classmethod
def reset_timeouts(cls):
cls._METHOD_TIMEOUTS.clear()
def __init__(self, original_context):
self._original_context = original_context
def __getattr__(self, name):
return getattr(self._original_context, name)
def call(self, ctxt, method, **kwargs):
# two methods with the same name in different namespaces should
# be tracked independently
if self._original_context.target.namespace:
scoped_method = '%s.%s' % (self._original_context.target.namespace,
method)
else:
scoped_method = method
# set the timeout from the global method timeout tracker for this
# method
self._original_context.timeout = self._METHOD_TIMEOUTS[scoped_method]
try:
return self._original_context.call(ctxt, method, **kwargs)
except oslo_messaging.MessagingTimeout:
with excutils.save_and_reraise_exception():
wait = random.uniform(0, TRANSPORT.conf.rpc_response_timeout)
LOG.error(_LE("Timeout in RPC method %(method)s. Waiting for "
"%(wait)s seconds before next attempt. If the "
"server is not down, consider increasing the "
"rpc_response_timeout option as Neutron "
"server(s) may be overloaded and unable to "
"respond quickly enough."),
{'wait': int(round(wait)), 'method': scoped_method})
ceiling = TRANSPORT.conf.rpc_response_timeout * 10
new_timeout = min(self._original_context.timeout * 2, ceiling)
if new_timeout > self._METHOD_TIMEOUTS[scoped_method]:
LOG.warning(_LW("Increasing timeout for %(method)s calls "
"to %(new)s seconds. Restart the agent to "
"restore it to the default value."),
{'method': scoped_method, 'new': new_timeout})
self._METHOD_TIMEOUTS[scoped_method] = new_timeout
time.sleep(wait)
class BackingOffClient(oslo_messaging.RPCClient):
"""An oslo messaging RPC Client that implements a timeout backoff.
This has all of the same interfaces as oslo_messaging.RPCClient but
if the timeout parameter is not specified, the _ContextWrapper returned
will track when call timeout exceptions occur and exponentially increase
the timeout for the given call method.
"""
def prepare(self, *args, **kwargs):
ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
# don't enclose Contexts that explicitly set a timeout
return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = RequestContextSerializer(serializer)
return oslo_messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
return BackingOffClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):

View File

@ -20,6 +20,7 @@ import mock
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging import conffixture as messaging_conffixture
import testtools
from neutron.common import rpc
from neutron import context
@ -140,7 +141,7 @@ class TestRPC(base.DietTestCase):
self.assertEqual(['foo', 'bar'], exmods)
@mock.patch.object(rpc, 'RequestContextSerializer')
@mock.patch.object(messaging, 'RPCClient')
@mock.patch.object(rpc, 'BackingOffClient')
def test_get_client(self, mock_client, mock_ser):
rpc.TRANSPORT = mock.Mock()
tgt = mock.Mock()
@ -304,6 +305,132 @@ class ServiceTestCase(base.DietTestCase):
rpc_server.wait.assert_called_once_with()
class TimeoutTestCase(base.DietTestCase):
def setUp(self):
super(TimeoutTestCase, self).setUp()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(rpc.cleanup)
rpc.init(CONF)
rpc.TRANSPORT = mock.MagicMock()
rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout
target = messaging.Target(version='1.0', topic='testing')
self.client = rpc.get_client(target)
self.call_context = mock.Mock()
self.sleep = mock.patch('time.sleep').start()
rpc.TRANSPORT.conf.rpc_response_timeout = 10
def test_timeout_unaffected_when_explicitly_set(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 5
ctx = self.client.prepare(topic='sandwiches', timeout=77)
with testtools.ExpectedException(messaging.MessagingTimeout):
ctx.call(self.call_context, 'create_pb_and_j')
# ensure that the timeout was not increased and the back-off sleep
# wasn't called
self.assertEqual(
5, rpc._ContextWrapper._METHOD_TIMEOUTS['create_pb_and_j'])
self.assertFalse(self.sleep.called)
def test_timeout_store_defaults(self):
# any method should default to the configured timeout
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'])
# a change to an existing should not affect new or existing ones
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 7000
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
self.assertEqual(rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_3'])
def test_method_timeout_sleep(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 2
for i in range(100):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
# sleep value should always be between 0 and configured timeout
self.assertGreaterEqual(self.sleep.call_args_list[0][0][0], 0)
self.assertLessEqual(self.sleep.call_args_list[0][0][0], 2)
self.sleep.reset_mock()
def test_method_timeout_increases_on_timeout_exception(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
for i in range(5):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
# we only care to check the timeouts sent to the transport
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 2, 4, 8, 16], timeouts)
def test_method_timeout_10x_config_ceiling(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 10
# 5 doublings should max out at the 10xdefault ceiling
for i in range(5):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
self.assertEqual(10 * rpc.TRANSPORT.conf.rpc_response_timeout,
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
def test_timeout_unchanged_on_other_exception(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc.TRANSPORT._send.side_effect = ValueError
with testtools.ExpectedException(ValueError):
self.client.call(self.call_context, 'method_1')
rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method_1')
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 1], timeouts)
def test_timeouts_for_methods_tracked_independently(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'] = 1
for method in ('method_1', 'method_1', 'method_2',
'method_1', 'method_2'):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, method)
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 2, 1, 4, 2], timeouts)
def test_timeouts_for_namespaces_tracked_independently(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['ns1.method'] = 1
rpc._ContextWrapper._METHOD_TIMEOUTS['ns2.method'] = 1
for ns in ('ns1', 'ns2'):
self.client.target.namespace = ns
for i in range(4):
with testtools.ExpectedException(messaging.MessagingTimeout):
self.client.call(self.call_context, 'method')
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 2, 4, 8, 1, 2, 4, 8], timeouts)
def test_method_timeout_increases_with_prepare(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 1
ctx = self.client.prepare(version='1.4')
with testtools.ExpectedException(messaging.MessagingTimeout):
ctx.call(self.call_context, 'method_1')
with testtools.ExpectedException(messaging.MessagingTimeout):
ctx.call(self.call_context, 'method_1')
# we only care to check the timeouts sent to the transport
timeouts = [call[1]['timeout']
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 2], timeouts)
class TestConnection(base.DietTestCase):
def setUp(self):
super(TestConnection, self).setUp()