From aac17c6be90c2640ce9df4b02027d8fc01944fd8 Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Thu, 9 Feb 2017 06:43:55 +0000 Subject: [PATCH] Reconcile quitting_rpc_timeout with backoff RPC client With backoff client, setting .timeout property on it doesn't take any effect. It means that starting from Mitaka, we broke quitting_rpc_timeout option. Now, when the TERM signal is received, we reset the dict capturing per-method timeouts; and we cap waiting times by the value of the option. This significantly reduces time needed for the agent to gracefully shut down. Change-Id: I2d86ed7a6f337395bfcfdb0698ec685cf384f172 Related-Bug: #1663458 --- neutron/common/rpc.py | 45 ++++++++++++++++--- .../openvswitch/agent/ovs_neutron_agent.py | 5 ++- neutron/tests/unit/common/test_rpc.py | 20 +++++++++ .../agent/test_ovs_neutron_agent.py | 15 ++++--- 4 files changed, 72 insertions(+), 13 deletions(-) diff --git a/neutron/common/rpc.py b/neutron/common/rpc.py index 01bd99d6b74..0c1ad09307b 100644 --- a/neutron/common/rpc.py +++ b/neutron/common/rpc.py @@ -89,6 +89,14 @@ def get_allowed_exmods(): return ALLOWED_EXMODS + EXTRA_EXMODS +def _get_default_method_timeout(): + return TRANSPORT.conf.rpc_response_timeout + + +def _get_default_method_timeouts(): + return collections.defaultdict(_get_default_method_timeout) + + class _ContextWrapper(object): """Wraps oslo messaging contexts to set the timeout for calls. @@ -99,12 +107,28 @@ class _ContextWrapper(object): servers are more frequently the cause of timeouts rather than lost messages. """ - _METHOD_TIMEOUTS = collections.defaultdict( - lambda: TRANSPORT.conf.rpc_response_timeout) + _METHOD_TIMEOUTS = _get_default_method_timeouts() + _max_timeout = None @classmethod def reset_timeouts(cls): - cls._METHOD_TIMEOUTS.clear() + # restore the original default timeout factory + cls._METHOD_TIMEOUTS = _get_default_method_timeouts() + cls._max_timeout = None + + @classmethod + def get_max_timeout(cls): + return cls._max_timeout or _get_default_method_timeout() * 10 + + @classmethod + def set_max_timeout(cls, max_timeout): + if max_timeout < cls.get_max_timeout(): + cls._METHOD_TIMEOUTS = collections.defaultdict( + lambda: max_timeout, **{ + k: min(v, max_timeout) + for k, v in cls._METHOD_TIMEOUTS.items() + }) + cls._max_timeout = max_timeout def __init__(self, original_context): self._original_context = original_context @@ -127,7 +151,11 @@ class _ContextWrapper(object): return self._original_context.call(ctxt, method, **kwargs) except oslo_messaging.MessagingTimeout: with excutils.save_and_reraise_exception(): - wait = random.uniform(0, TRANSPORT.conf.rpc_response_timeout) + wait = random.uniform( + 0, + min(self._METHOD_TIMEOUTS[scoped_method], + TRANSPORT.conf.rpc_response_timeout) + ) LOG.error(_LE("Timeout in RPC method %(method)s. Waiting for " "%(wait)s seconds before next attempt. If the " "server is not down, consider increasing the " @@ -135,8 +163,8 @@ class _ContextWrapper(object): "server(s) may be overloaded and unable to " "respond quickly enough."), {'wait': int(round(wait)), 'method': scoped_method}) - ceiling = TRANSPORT.conf.rpc_response_timeout * 10 - new_timeout = min(self._original_context.timeout * 2, ceiling) + new_timeout = min( + self._original_context.timeout * 2, self.get_max_timeout()) if new_timeout > self._METHOD_TIMEOUTS[scoped_method]: LOG.warning(_LW("Increasing timeout for %(method)s calls " "to %(new)s seconds. Restart the agent to " @@ -159,6 +187,11 @@ class BackingOffClient(oslo_messaging.RPCClient): # don't enclose Contexts that explicitly set a timeout return _ContextWrapper(ctx) if 'timeout' not in kwargs else ctx + @staticmethod + def set_max_timeout(max_timeout): + '''Set RPC timeout ceiling for all backing-off RPC clients.''' + _ContextWrapper.set_max_timeout(max_timeout) + def get_client(target, version_cap=None, serializer=None): assert TRANSPORT is not None diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 72cb4159ba5..825013816f7 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -2090,6 +2090,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def _handle_sigterm(self, signum, frame): self.catch_sigterm = True if self.quitting_rpc_timeout: + LOG.info( + _LI('SIGTERM received, capping RPC timeout by %d seconds.'), + self.quitting_rpc_timeout) self.set_rpc_timeout(self.quitting_rpc_timeout) def _handle_sighup(self, signum, frame): @@ -2112,7 +2115,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def set_rpc_timeout(self, timeout): for rpc_api in (self.plugin_rpc, self.sg_plugin_rpc, self.dvr_plugin_rpc, self.state_rpc): - rpc_api.client.timeout = timeout + rpc_api.client.set_max_timeout(timeout) def _check_agent_configurations(self): if (self.enable_distributed_routing and self.enable_tunneling diff --git a/neutron/tests/unit/common/test_rpc.py b/neutron/tests/unit/common/test_rpc.py index d09931a19c3..80fef50de72 100644 --- a/neutron/tests/unit/common/test_rpc.py +++ b/neutron/tests/unit/common/test_rpc.py @@ -433,6 +433,26 @@ class TimeoutTestCase(base.DietTestCase): for call in rpc.TRANSPORT._send.call_args_list] self.assertEqual([1, 2], timeouts) + def test_set_max_timeout_caps_all_methods(self): + rpc.TRANSPORT.conf.rpc_response_timeout = 300 + rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 100 + rpc.BackingOffClient.set_max_timeout(50) + # both explicitly tracked + self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + # as well as new methods + self.assertEqual(50, rpc._ContextWrapper._METHOD_TIMEOUTS['method_2']) + + def test_set_max_timeout_retains_lower_timeouts(self): + rpc._ContextWrapper._METHOD_TIMEOUTS['method_1'] = 10 + rpc.BackingOffClient.set_max_timeout(50) + self.assertEqual(10, rpc._ContextWrapper._METHOD_TIMEOUTS['method_1']) + + def test_set_max_timeout_overrides_default_timeout(self): + rpc.TRANSPORT.conf.rpc_response_timeout = 10 + self.assertEqual(10 * 10, rpc._ContextWrapper.get_max_timeout()) + rpc._ContextWrapper.set_max_timeout(10) + self.assertEqual(10, rpc._ContextWrapper.get_max_timeout()) + class TestConnection(base.DietTestCase): def setUp(self): diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index af9ee84b2cc..028ad02e8a7 100755 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -28,6 +28,7 @@ from neutron.agent.common import utils from neutron.agent.linux import async_process from neutron.agent.linux import ip_lib from neutron.common import constants as c_const +from neutron.common import rpc as n_rpc from neutron.plugins.common import constants as p_const from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants @@ -1929,12 +1930,14 @@ class TestOvsNeutronAgent(object): self.assertFalse(cleanup.called) def test_set_rpc_timeout(self): - self.agent._handle_sigterm(None, None) - for rpc_client in (self.agent.plugin_rpc.client, - self.agent.sg_plugin_rpc.client, - self.agent.dvr_plugin_rpc.client, - self.agent.state_rpc.client): - self.assertEqual(10, rpc_client.timeout) + with mock.patch.object( + n_rpc.BackingOffClient, 'set_max_timeout') as smt: + self.agent._handle_sigterm(None, None) + for rpc_client in (self.agent.plugin_rpc.client, + self.agent.sg_plugin_rpc.client, + self.agent.dvr_plugin_rpc.client, + self.agent.state_rpc.client): + smt.assert_called_with(10) def test_set_rpc_timeout_no_value(self): self.agent.quitting_rpc_timeout = None