Merge "Reconcile quitting_rpc_timeout with backoff RPC client" into stable/newton

This commit is contained in:
Jenkins 2017-03-27 23:21:55 +00:00 committed by Gerrit Code Review
commit 882aa97f4a
4 changed files with 72 additions and 13 deletions

View File

@ -100,6 +100,14 @@ def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
def _get_default_method_timeout():
return TRANSPORT.conf.rpc_response_timeout
def _get_default_method_timeouts():
return collections.defaultdict(_get_default_method_timeout)
class _ContextWrapper(object):
"""Wraps oslo messaging contexts to set the timeout for calls.
@ -110,12 +118,28 @@ class _ContextWrapper(object):
servers are more frequently the cause of timeouts rather than lost
messages.
"""
_METHOD_TIMEOUTS = collections.defaultdict(
lambda: TRANSPORT.conf.rpc_response_timeout)
_METHOD_TIMEOUTS = _get_default_method_timeouts()
_max_timeout = None
@classmethod
def reset_timeouts(cls):
cls._METHOD_TIMEOUTS.clear()
# restore the original default timeout factory
cls._METHOD_TIMEOUTS = _get_default_method_timeouts()
cls._max_timeout = None
@classmethod
def get_max_timeout(cls):
return cls._max_timeout or _get_default_method_timeout() * 10
@classmethod
def set_max_timeout(cls, max_timeout):
if max_timeout < cls.get_max_timeout():
cls._METHOD_TIMEOUTS = collections.defaultdict(
lambda: max_timeout, **{
k: min(v, max_timeout)
for k, v in cls._METHOD_TIMEOUTS.items()
})
cls._max_timeout = max_timeout
def __init__(self, original_context):
self._original_context = original_context
@ -138,7 +162,11 @@ class _ContextWrapper(object):
return self._original_context.call(ctxt, method, **kwargs)
except oslo_messaging.MessagingTimeout:
with excutils.save_and_reraise_exception():
wait = random.uniform(0, TRANSPORT.conf.rpc_response_timeout)
wait = random.uniform(
0,
min(self._METHOD_TIMEOUTS[scoped_method],
TRANSPORT.conf.rpc_response_timeout)
)
LOG.error(_LE("Timeout in RPC method %(method)s. Waiting for "
"%(wait)s seconds before next attempt. If the "
"server is not down, consider increasing the "
@ -146,8 +174,8 @@ class _ContextWrapper(object):
"server(s) may be overloaded and unable to "
"respond quickly enough."),
{'wait': int(round(wait)), 'method': scoped_method})
ceiling = TRANSPORT.conf.rpc_response_timeout * 10
new_timeout = min(self._original_context.timeout * 2, ceiling)
new_timeout = min(
self._original_context.timeout * 2, self.get_max_timeout())
if new_timeout > self._METHOD_TIMEOUTS[scoped_method]:
LOG.warning(_LW("Increasing timeout for %(method)s calls "
"to %(new)s seconds. Restart the agent to "
@ -170,6 +198,11 @@ class BackingOffClient(oslo_messaging.RPCClient):
# don't enclose Contexts that explicitly set a timeout
return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx
@staticmethod
def set_max_timeout(max_timeout):
'''Set RPC timeout ceiling for all backing-off RPC clients.'''
_ContextWrapper.set_max_timeout(max_timeout)
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None

View File

@ -2095,6 +2095,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def _handle_sigterm(self, signum, frame):
self.catch_sigterm = True
if self.quitting_rpc_timeout:
LOG.info(
_LI('SIGTERM received, capping RPC timeout by %d seconds.'),
self.quitting_rpc_timeout)
self.set_rpc_timeout(self.quitting_rpc_timeout)
def _handle_sighup(self, signum, frame):
@ -2117,7 +2120,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def set_rpc_timeout(self, timeout):
for rpc_api in (self.plugin_rpc, self.sg_plugin_rpc,
self.dvr_plugin_rpc, self.state_rpc):
rpc_api.client.timeout = timeout
rpc_api.client.set_max_timeout(timeout)
def _check_agent_configurations(self):
if (self.enable_distributed_routing and self.enable_tunneling

View File

@ -437,6 +437,26 @@ class TimeoutTestCase(base.DietTestCase):
for call in rpc.TRANSPORT._send.call_args_list]
self.assertEqual([1, 2], timeouts)
def test_set_max_timeout_caps_all_methods(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 300
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 100
rpc.BackingOffClient.set_max_timeout(50)
# both explicitly tracked
self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
# as well as new methods
self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_2'])
def test_set_max_timeout_retains_lower_timeouts(self):
rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 10
rpc.BackingOffClient.set_max_timeout(50)
self.assertEqual(10, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'])
def test_set_max_timeout_overrides_default_timeout(self):
rpc.TRANSPORT.conf.rpc_response_timeout = 10
self.assertEqual(10 * 10, rpc._ContextWrapper.get_max_timeout())
rpc._ContextWrapper.set_max_timeout(10)
self.assertEqual(10, rpc._ContextWrapper.get_max_timeout())
class TestConnection(base.DietTestCase):
def setUp(self):

View File

@ -28,6 +28,7 @@ from neutron.agent.common import utils
from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
from neutron.common import constants as c_const
from neutron.common import rpc as n_rpc
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
@ -1883,12 +1884,14 @@ class TestOvsNeutronAgent(object):
self.assertFalse(cleanup.called)
def test_set_rpc_timeout(self):
self.agent._handle_sigterm(None, None)
for rpc_client in (self.agent.plugin_rpc.client,
self.agent.sg_plugin_rpc.client,
self.agent.dvr_plugin_rpc.client,
self.agent.state_rpc.client):
self.assertEqual(10, rpc_client.timeout)
with mock.patch.object(
n_rpc.BackingOffClient, 'set_max_timeout') as smt:
self.agent._handle_sigterm(None, None)
for rpc_client in (self.agent.plugin_rpc.client,
self.agent.sg_plugin_rpc.client,
self.agent.dvr_plugin_rpc.client,
self.agent.state_rpc.client):
smt.assert_called_with(10)
def test_set_rpc_timeout_no_value(self):
self.agent.quitting_rpc_timeout = None