Throttle SIGHUPs to keepalived
Multiple SIGHUPs in quick succession might cause the master keepalived
to forfeit its mastership (which will cause keepalived to remove IPs of
its external devices, severing connectivity). This can happen when, for
example, associating or disassociating multiple floatingips.
The patch makes the agent throttle SIGHUP sent to keepalived: the very first
SIGHUP is always sent; as for subsequent signals, they are delayed till
agent threshold is reached. (It's 3 seconds by default.)
As an example, when three consequent router updates trigger keepalived
respawn then:
* the very first signal is sent as usual;
* the second signal is deferred and sent in up to 3 seconds since the
first signal;
* the third signal is ignored, though the change that triggered it will
be correctly applied by the second signal handler when it is triggered
after threshold delay.
If the last time a spawn request occurred is older than current-time
minus threshold then there is no delay.
Co-Authored-By: Jakub Libosvar <libosvar@redhat.com>
Co-Authored-By: Cedric Brandily <zzelle@gmail.com>
Co-Authored-By: Ihar Hrachyshka <ihrachys@redhat.com>
Closes-Bug: 1647432
Change-Id: I2955e0de835458a2eea4dd088addf33b656f8670
(cherry picked from commit 977d254cc6
)
This commit is contained in:
parent
e0b8d452c2
commit
23c7c8a08e
|
@ -34,6 +34,11 @@ HA_DEV_PREFIX = 'ha-'
|
|||
IP_MONITOR_PROCESS_SERVICE = 'ip_monitor'
|
||||
SIGTERM_TIMEOUT = 10
|
||||
|
||||
# The multiplier is used to compensate execution time of function sending
|
||||
# SIGHUP to keepalived process. The constant multiplies ha_vrrp_advert_int
|
||||
# config option and the result is the throttle delay.
|
||||
THROTTLER_MULTIPLIER = 1.5
|
||||
|
||||
|
||||
class HaRouterNamespace(namespaces.RouterNamespace):
|
||||
"""Namespace for HA router.
|
||||
|
@ -115,7 +120,9 @@ class HaRouter(router.RouterInfo):
|
|||
keepalived.KeepalivedConf(),
|
||||
process_monitor,
|
||||
conf_path=self.agent_conf.ha_confs_path,
|
||||
namespace=self.ha_namespace)
|
||||
namespace=self.ha_namespace,
|
||||
throttle_restart_value=(
|
||||
self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER))
|
||||
|
||||
config = self.keepalived_manager.config
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ from oslo_utils import fileutils
|
|||
from neutron._i18n import _, _LE
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.common import constants
|
||||
from neutron.common import utils
|
||||
|
||||
VALID_STATES = ['MASTER', 'BACKUP']
|
||||
VALID_AUTH_TYPES = ['AH', 'PASS']
|
||||
|
@ -367,12 +368,20 @@ class KeepalivedManager(object):
|
|||
"""
|
||||
|
||||
def __init__(self, resource_id, config, process_monitor, conf_path='/tmp',
|
||||
namespace=None):
|
||||
namespace=None, throttle_restart_value=None):
|
||||
self.resource_id = resource_id
|
||||
self.config = config
|
||||
self.namespace = namespace
|
||||
self.process_monitor = process_monitor
|
||||
self.conf_path = conf_path
|
||||
# configure throttler for spawn to introduce delay between SIGHUPs,
|
||||
# otherwise keepalived master may unnecessarily flip to slave
|
||||
if throttle_restart_value is not None:
|
||||
self._throttle_spawn(throttle_restart_value)
|
||||
|
||||
#pylint: disable=method-hidden
|
||||
def _throttle_spawn(self, threshold):
|
||||
self.spawn = utils.throttler(threshold)(self.spawn)
|
||||
|
||||
def get_conf_dir(self):
|
||||
confs_dir = os.path.abspath(os.path.normpath(self.conf_path))
|
||||
|
|
|
@ -25,6 +25,7 @@ import os.path
|
|||
import random
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
import weakref
|
||||
|
@ -57,6 +58,8 @@ TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
|
|||
LOG = logging.getLogger(__name__)
|
||||
SYNCHRONIZED_PREFIX = 'neutron-'
|
||||
|
||||
DEFAULT_THROTTLER_VALUE = 2
|
||||
|
||||
synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX)
|
||||
|
||||
|
||||
|
@ -76,6 +79,51 @@ class WaitTimeout(Exception, eventlet.TimeoutError):
|
|||
return Exception.__repr__(self)
|
||||
|
||||
|
||||
class LockWithTimer(object):
|
||||
def __init__(self, threshold):
|
||||
self._threshold = threshold
|
||||
self.timestamp = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def acquire(self):
|
||||
return self._lock.acquire(False)
|
||||
|
||||
def release(self):
|
||||
return self._lock.release()
|
||||
|
||||
def time_to_wait(self):
|
||||
return self.timestamp - time.time() + self._threshold
|
||||
|
||||
|
||||
# REVISIT(jlibosva): Some parts of throttler may be similar to what
|
||||
# neutron.notifiers.batch_notifier.BatchNotifier does. They
|
||||
# could be refactored and unified.
|
||||
def throttler(threshold=DEFAULT_THROTTLER_VALUE):
|
||||
"""Throttle number of calls to a function to only once per 'threshold'.
|
||||
"""
|
||||
def decorator(f):
|
||||
lock_with_timer = LockWithTimer(threshold)
|
||||
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
if lock_with_timer.acquire():
|
||||
try:
|
||||
fname = f.__name__
|
||||
time_to_wait = lock_with_timer.time_to_wait()
|
||||
if time_to_wait > 0:
|
||||
LOG.debug("Call of function %s scheduled, sleeping "
|
||||
"%.1f seconds", fname, time_to_wait)
|
||||
# Decorated function has been called recently, wait.
|
||||
eventlet.sleep(time_to_wait)
|
||||
lock_with_timer.timestamp = time.time()
|
||||
finally:
|
||||
lock_with_timer.release()
|
||||
LOG.debug("Calling throttled function %s", fname)
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
@removals.remove(
|
||||
message="Use ensure_tree(path, 0o755) from oslo_utils.fileutils")
|
||||
def ensure_dir(dir_path):
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import sys
|
||||
import types
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
@ -21,6 +22,7 @@ from neutron._i18n import _
|
|||
from neutron.agent.l3 import agent
|
||||
from neutron.agent.l3 import namespaces
|
||||
from neutron.agent import l3_agent
|
||||
from neutron.common import constants
|
||||
|
||||
|
||||
class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
|
||||
|
@ -59,6 +61,40 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
|
|||
|
||||
super(L3NATAgentForTest, self).__init__(host, conf)
|
||||
|
||||
def _create_router(self, router_id, router):
|
||||
"""Create a router with suffix added to the router namespace name.
|
||||
|
||||
This is needed to be able to run two agents serving the same router
|
||||
on the same node.
|
||||
"""
|
||||
router = (
|
||||
super(L3NATAgentForTest, self)._create_router(router_id, router))
|
||||
|
||||
router.get_internal_device_name = types.MethodType(
|
||||
get_internal_device_name, router)
|
||||
router.get_external_device_name = types.MethodType(
|
||||
get_external_device_name, router)
|
||||
|
||||
return router
|
||||
|
||||
|
||||
def _append_suffix(dev_name):
|
||||
# If dev_name = 'xyz123' and the suffix is 'hostB' then the result
|
||||
# will be 'xy_stB'
|
||||
return '%s_%s' % (dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:])
|
||||
|
||||
|
||||
def get_internal_device_name(ri, port_id):
|
||||
return _append_suffix(
|
||||
(namespaces.INTERNAL_DEV_PREFIX + port_id)
|
||||
[:constants.LINUX_DEV_LEN])
|
||||
|
||||
|
||||
def get_external_device_name(ri, port_id):
|
||||
return _append_suffix(
|
||||
(namespaces.EXTERNAL_DEV_PREFIX + port_id)
|
||||
[:constants.LINUX_DEV_LEN])
|
||||
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('test_namespace_suffix', default='testprefix',
|
||||
|
|
|
@ -14,10 +14,13 @@
|
|||
|
||||
import functools
|
||||
import netaddr
|
||||
import os
|
||||
import time
|
||||
|
||||
from neutron_lib import constants
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.agent.l3 import ha_router
|
||||
from neutron.agent.l3 import namespaces
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import utils as common_utils
|
||||
|
@ -50,25 +53,34 @@ class TestL3Agent(base.BaseFullStackTestCase):
|
|||
return port['port']['status'] == 'ACTIVE'
|
||||
common_utils.wait_until_true(lambda: is_port_status_active(), sleep=1)
|
||||
|
||||
def _create_and_attach_subnet(
|
||||
self, tenant_id, subnet_cidr, network_id, router_id):
|
||||
# For IPv6 subnets, enable_dhcp should be set to true.
|
||||
enable_dhcp = (netaddr.IPNetwork(subnet_cidr).version ==
|
||||
constants.IP_VERSION_6)
|
||||
subnet = self.safe_client.create_subnet(
|
||||
tenant_id, network_id, subnet_cidr, enable_dhcp=enable_dhcp)
|
||||
|
||||
router_interface_info = self.safe_client.add_router_interface(
|
||||
router_id, subnet['id'])
|
||||
self.block_until_port_status_active(
|
||||
router_interface_info['port_id'])
|
||||
|
||||
def _boot_fake_vm_in_network(self, host, tenant_id, network_id, wait=True):
|
||||
vm = self.useFixture(
|
||||
machine.FakeFullstackMachine(
|
||||
host, network_id, tenant_id, self.safe_client))
|
||||
if wait:
|
||||
vm.block_until_boot()
|
||||
return vm
|
||||
|
||||
def _create_net_subnet_and_vm(self, tenant_id, subnet_cidrs, host, router):
|
||||
network = self.safe_client.create_network(tenant_id)
|
||||
for cidr in subnet_cidrs:
|
||||
# For IPv6 subnets, enable_dhcp should be set to true.
|
||||
enable_dhcp = (netaddr.IPNetwork(cidr).version ==
|
||||
constants.IP_VERSION_6)
|
||||
subnet = self.safe_client.create_subnet(
|
||||
tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp)
|
||||
self._create_and_attach_subnet(
|
||||
tenant_id, cidr, network['id'], router['id'])
|
||||
|
||||
router_interface_info = self.safe_client.add_router_interface(
|
||||
router['id'], subnet['id'])
|
||||
self.block_until_port_status_active(
|
||||
router_interface_info['port_id'])
|
||||
|
||||
vm = self.useFixture(
|
||||
machine.FakeFullstackMachine(
|
||||
host, network['id'], tenant_id, self.safe_client))
|
||||
vm.block_until_boot()
|
||||
return vm
|
||||
return self._boot_fake_vm_in_network(host, tenant_id, network['id'])
|
||||
|
||||
|
||||
class TestLegacyL3Agent(TestL3Agent):
|
||||
|
@ -175,7 +187,7 @@ class TestLegacyL3Agent(TestL3Agent):
|
|||
vm.block_until_ping(external_vm.ipv6)
|
||||
|
||||
|
||||
class TestHAL3Agent(base.BaseFullStackTestCase):
|
||||
class TestHAL3Agent(TestL3Agent):
|
||||
|
||||
def setUp(self):
|
||||
host_descriptions = [
|
||||
|
@ -206,3 +218,64 @@ class TestHAL3Agent(base.BaseFullStackTestCase):
|
|||
self._is_ha_router_active_on_one_agent,
|
||||
router['id']),
|
||||
timeout=90)
|
||||
|
||||
def _get_keepalived_state(self, keepalived_state_file):
|
||||
with open(keepalived_state_file, "r") as fd:
|
||||
return fd.read()
|
||||
|
||||
def _get_state_file_for_master_agent(self, router_id):
|
||||
for host in self.environment.hosts:
|
||||
keepalived_state_file = os.path.join(
|
||||
host.neutron_config.state_path, "ha_confs", router_id, "state")
|
||||
|
||||
if self._get_keepalived_state(keepalived_state_file) == "master":
|
||||
return keepalived_state_file
|
||||
|
||||
def test_keepalived_multiple_sighups_does_not_forfeit_mastership(self):
|
||||
"""Setup a complete "Neutron stack" - both an internal and an external
|
||||
network+subnet, and a router connected to both.
|
||||
"""
|
||||
tenant_id = uuidutils.generate_uuid()
|
||||
ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id)
|
||||
router = self.safe_client.create_router(tenant_id, ha=True,
|
||||
external_network=ext_net['id'])
|
||||
common_utils.wait_until_true(
|
||||
lambda:
|
||||
len(self.client.list_l3_agent_hosting_routers(
|
||||
router['id'])['agents']) == 2,
|
||||
timeout=90)
|
||||
common_utils.wait_until_true(
|
||||
functools.partial(
|
||||
self._is_ha_router_active_on_one_agent,
|
||||
router['id']),
|
||||
timeout=90)
|
||||
keepalived_state_file = self._get_state_file_for_master_agent(
|
||||
router['id'])
|
||||
self.assertIsNotNone(keepalived_state_file)
|
||||
network = self.safe_client.create_network(tenant_id)
|
||||
self._create_and_attach_subnet(
|
||||
tenant_id, '13.37.0.0/24', network['id'], router['id'])
|
||||
|
||||
# Create 10 fake VMs, each with a floating ip. Each floating ip
|
||||
# association should send a SIGHUP to the keepalived's parent process,
|
||||
# unless the Throttler works.
|
||||
host = self.environment.hosts[0]
|
||||
vms = [self._boot_fake_vm_in_network(host, tenant_id, network['id'],
|
||||
wait=False)
|
||||
for i in range(10)]
|
||||
for vm in vms:
|
||||
self.safe_client.create_floatingip(
|
||||
tenant_id, ext_net['id'], vm.ip, vm.neutron_port['id'])
|
||||
|
||||
# Check that the keepalived's state file has not changed and is still
|
||||
# master. This will indicate that the Throttler works. We want to check
|
||||
# for ha_vrrp_advert_int (the default is 2 seconds), plus a bit more.
|
||||
time_to_stop = (time.time() +
|
||||
(common_utils.DEFAULT_THROTTLER_VALUE *
|
||||
ha_router.THROTTLER_MULTIPLIER * 1.3))
|
||||
while True:
|
||||
if time.time() > time_to_stop:
|
||||
break
|
||||
self.assertEqual(
|
||||
"master",
|
||||
self._get_keepalived_state(keepalived_state_file))
|
||||
|
|
|
@ -818,3 +818,53 @@ class ImportModulesRecursivelyTestCase(base.BaseTestCase):
|
|||
for module in expected_modules:
|
||||
self.assertIn(module, modules)
|
||||
self.assertIn(module, sys.modules)
|
||||
|
||||
|
||||
class TestThrottler(base.BaseTestCase):
|
||||
def test_throttler(self):
|
||||
threshold = 1
|
||||
orig_function = mock.Mock()
|
||||
# Add this magic name as it's required by functools
|
||||
orig_function.__name__ = 'mock_func'
|
||||
throttled_func = utils.throttler(threshold)(orig_function)
|
||||
|
||||
throttled_func()
|
||||
|
||||
sleep = utils.eventlet.sleep
|
||||
|
||||
def sleep_mock(amount_to_sleep):
|
||||
sleep(amount_to_sleep)
|
||||
self.assertTrue(threshold > amount_to_sleep)
|
||||
|
||||
with mock.patch.object(utils.eventlet, "sleep",
|
||||
side_effect=sleep_mock):
|
||||
throttled_func()
|
||||
|
||||
self.assertEqual(2, orig_function.call_count)
|
||||
|
||||
lock_with_timer = six.get_function_closure(
|
||||
throttled_func)[1].cell_contents
|
||||
timestamp = lock_with_timer.timestamp - threshold
|
||||
lock_with_timer.timestamp = timestamp
|
||||
|
||||
throttled_func()
|
||||
|
||||
self.assertEqual(3, orig_function.call_count)
|
||||
self.assertTrue(timestamp < lock_with_timer.timestamp)
|
||||
|
||||
def test_method_docstring_is_preserved(self):
|
||||
class Klass(object):
|
||||
@utils.throttler()
|
||||
def method(self):
|
||||
"""Docstring"""
|
||||
|
||||
self.assertEqual("Docstring", Klass.method.__doc__)
|
||||
|
||||
def test_method_still_callable(self):
|
||||
class Klass(object):
|
||||
@utils.throttler()
|
||||
def method(self):
|
||||
pass
|
||||
|
||||
obj = Klass()
|
||||
obj.method()
|
||||
|
|
Loading…
Reference in New Issue