Throttle SIGHUPs to keepalived

Multiple SIGHUPs in quick succession might cause the master keepalived
to forfeit its mastership (which will cause keepalived to remove IPs of
its external devices, severing connectivity). This can happen when, for
example, associating or disassociating multiple floatingips.

The patch makes the agent throttle SIGHUP sent to keepalived: the very first
SIGHUP is always sent; as for subsequent signals, they are delayed till
agent threshold is reached. (It's 3 seconds by default.)

As an example, when three consequent router updates trigger keepalived
respawn then:
* the very first signal is sent as usual;
* the second signal is deferred and sent in up to 3 seconds since the
  first signal;
* the third signal is ignored, though the change that triggered it will
  be correctly applied by the second signal handler when it is triggered
  after threshold delay.

If the last time a spawn request occurred is older than current-time
minus threshold then there is no delay.

Co-Authored-By: Jakub Libosvar <libosvar@redhat.com>
Co-Authored-By: Cedric Brandily <zzelle@gmail.com>
Co-Authored-By: Ihar Hrachyshka <ihrachys@redhat.com>

Closes-Bug: 1647432
Change-Id: I2955e0de835458a2eea4dd088addf33b656f8670
(cherry picked from commit 977d254cc6)
This commit is contained in:
John Schwarz 2016-12-05 14:15:17 +02:00 committed by Ihar Hrachyshka
parent e0b8d452c2
commit 23c7c8a08e
6 changed files with 241 additions and 18 deletions

View File

@ -34,6 +34,11 @@ HA_DEV_PREFIX = 'ha-'
IP_MONITOR_PROCESS_SERVICE = 'ip_monitor'
SIGTERM_TIMEOUT = 10
# The multiplier is used to compensate execution time of function sending
# SIGHUP to keepalived process. The constant multiplies ha_vrrp_advert_int
# config option and the result is the throttle delay.
THROTTLER_MULTIPLIER = 1.5
class HaRouterNamespace(namespaces.RouterNamespace):
"""Namespace for HA router.
@ -115,7 +120,9 @@ class HaRouter(router.RouterInfo):
keepalived.KeepalivedConf(),
process_monitor,
conf_path=self.agent_conf.ha_confs_path,
namespace=self.ha_namespace)
namespace=self.ha_namespace,
throttle_restart_value=(
self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER))
config = self.keepalived_manager.config

View File

@ -27,6 +27,7 @@ from oslo_utils import fileutils
from neutron._i18n import _, _LE
from neutron.agent.linux import external_process
from neutron.common import constants
from neutron.common import utils
VALID_STATES = ['MASTER', 'BACKUP']
VALID_AUTH_TYPES = ['AH', 'PASS']
@ -367,12 +368,20 @@ class KeepalivedManager(object):
"""
def __init__(self, resource_id, config, process_monitor, conf_path='/tmp',
namespace=None):
namespace=None, throttle_restart_value=None):
self.resource_id = resource_id
self.config = config
self.namespace = namespace
self.process_monitor = process_monitor
self.conf_path = conf_path
# configure throttler for spawn to introduce delay between SIGHUPs,
# otherwise keepalived master may unnecessarily flip to slave
if throttle_restart_value is not None:
self._throttle_spawn(throttle_restart_value)
#pylint: disable=method-hidden
def _throttle_spawn(self, threshold):
self.spawn = utils.throttler(threshold)(self.spawn)
def get_conf_dir(self):
confs_dir = os.path.abspath(os.path.normpath(self.conf_path))

View File

@ -25,6 +25,7 @@ import os.path
import random
import signal
import sys
import threading
import time
import uuid
import weakref
@ -57,6 +58,8 @@ TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
LOG = logging.getLogger(__name__)
SYNCHRONIZED_PREFIX = 'neutron-'
DEFAULT_THROTTLER_VALUE = 2
synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX)
@ -76,6 +79,51 @@ class WaitTimeout(Exception, eventlet.TimeoutError):
return Exception.__repr__(self)
class LockWithTimer(object):
def __init__(self, threshold):
self._threshold = threshold
self.timestamp = 0
self._lock = threading.Lock()
def acquire(self):
return self._lock.acquire(False)
def release(self):
return self._lock.release()
def time_to_wait(self):
return self.timestamp - time.time() + self._threshold
# REVISIT(jlibosva): Some parts of throttler may be similar to what
# neutron.notifiers.batch_notifier.BatchNotifier does. They
# could be refactored and unified.
def throttler(threshold=DEFAULT_THROTTLER_VALUE):
"""Throttle number of calls to a function to only once per 'threshold'.
"""
def decorator(f):
lock_with_timer = LockWithTimer(threshold)
@functools.wraps(f)
def wrapper(*args, **kwargs):
if lock_with_timer.acquire():
try:
fname = f.__name__
time_to_wait = lock_with_timer.time_to_wait()
if time_to_wait > 0:
LOG.debug("Call of function %s scheduled, sleeping "
"%.1f seconds", fname, time_to_wait)
# Decorated function has been called recently, wait.
eventlet.sleep(time_to_wait)
lock_with_timer.timestamp = time.time()
finally:
lock_with_timer.release()
LOG.debug("Calling throttled function %s", fname)
return f(*args, **kwargs)
return wrapper
return decorator
@removals.remove(
message="Use ensure_tree(path, 0o755) from oslo_utils.fileutils")
def ensure_dir(dir_path):

View File

@ -13,6 +13,7 @@
# under the License.
import sys
import types
import mock
from oslo_config import cfg
@ -21,6 +22,7 @@ from neutron._i18n import _
from neutron.agent.l3 import agent
from neutron.agent.l3 import namespaces
from neutron.agent import l3_agent
from neutron.common import constants
class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
@ -59,6 +61,40 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
super(L3NATAgentForTest, self).__init__(host, conf)
def _create_router(self, router_id, router):
"""Create a router with suffix added to the router namespace name.
This is needed to be able to run two agents serving the same router
on the same node.
"""
router = (
super(L3NATAgentForTest, self)._create_router(router_id, router))
router.get_internal_device_name = types.MethodType(
get_internal_device_name, router)
router.get_external_device_name = types.MethodType(
get_external_device_name, router)
return router
def _append_suffix(dev_name):
# If dev_name = 'xyz123' and the suffix is 'hostB' then the result
# will be 'xy_stB'
return '%s_%s' % (dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:])
def get_internal_device_name(ri, port_id):
return _append_suffix(
(namespaces.INTERNAL_DEV_PREFIX + port_id)
[:constants.LINUX_DEV_LEN])
def get_external_device_name(ri, port_id):
return _append_suffix(
(namespaces.EXTERNAL_DEV_PREFIX + port_id)
[:constants.LINUX_DEV_LEN])
OPTS = [
cfg.StrOpt('test_namespace_suffix', default='testprefix',

View File

@ -14,10 +14,13 @@
import functools
import netaddr
import os
import time
from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.agent.l3 import ha_router
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.common import utils as common_utils
@ -50,25 +53,34 @@ class TestL3Agent(base.BaseFullStackTestCase):
return port['port']['status'] == 'ACTIVE'
common_utils.wait_until_true(lambda: is_port_status_active(), sleep=1)
def _create_and_attach_subnet(
self, tenant_id, subnet_cidr, network_id, router_id):
# For IPv6 subnets, enable_dhcp should be set to true.
enable_dhcp = (netaddr.IPNetwork(subnet_cidr).version ==
constants.IP_VERSION_6)
subnet = self.safe_client.create_subnet(
tenant_id, network_id, subnet_cidr, enable_dhcp=enable_dhcp)
router_interface_info = self.safe_client.add_router_interface(
router_id, subnet['id'])
self.block_until_port_status_active(
router_interface_info['port_id'])
def _boot_fake_vm_in_network(self, host, tenant_id, network_id, wait=True):
vm = self.useFixture(
machine.FakeFullstackMachine(
host, network_id, tenant_id, self.safe_client))
if wait:
vm.block_until_boot()
return vm
def _create_net_subnet_and_vm(self, tenant_id, subnet_cidrs, host, router):
network = self.safe_client.create_network(tenant_id)
for cidr in subnet_cidrs:
# For IPv6 subnets, enable_dhcp should be set to true.
enable_dhcp = (netaddr.IPNetwork(cidr).version ==
constants.IP_VERSION_6)
subnet = self.safe_client.create_subnet(
tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp)
self._create_and_attach_subnet(
tenant_id, cidr, network['id'], router['id'])
router_interface_info = self.safe_client.add_router_interface(
router['id'], subnet['id'])
self.block_until_port_status_active(
router_interface_info['port_id'])
vm = self.useFixture(
machine.FakeFullstackMachine(
host, network['id'], tenant_id, self.safe_client))
vm.block_until_boot()
return vm
return self._boot_fake_vm_in_network(host, tenant_id, network['id'])
class TestLegacyL3Agent(TestL3Agent):
@ -175,7 +187,7 @@ class TestLegacyL3Agent(TestL3Agent):
vm.block_until_ping(external_vm.ipv6)
class TestHAL3Agent(base.BaseFullStackTestCase):
class TestHAL3Agent(TestL3Agent):
def setUp(self):
host_descriptions = [
@ -206,3 +218,64 @@ class TestHAL3Agent(base.BaseFullStackTestCase):
self._is_ha_router_active_on_one_agent,
router['id']),
timeout=90)
def _get_keepalived_state(self, keepalived_state_file):
with open(keepalived_state_file, "r") as fd:
return fd.read()
def _get_state_file_for_master_agent(self, router_id):
for host in self.environment.hosts:
keepalived_state_file = os.path.join(
host.neutron_config.state_path, "ha_confs", router_id, "state")
if self._get_keepalived_state(keepalived_state_file) == "master":
return keepalived_state_file
def test_keepalived_multiple_sighups_does_not_forfeit_mastership(self):
"""Setup a complete "Neutron stack" - both an internal and an external
network+subnet, and a router connected to both.
"""
tenant_id = uuidutils.generate_uuid()
ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id)
router = self.safe_client.create_router(tenant_id, ha=True,
external_network=ext_net['id'])
common_utils.wait_until_true(
lambda:
len(self.client.list_l3_agent_hosting_routers(
router['id'])['agents']) == 2,
timeout=90)
common_utils.wait_until_true(
functools.partial(
self._is_ha_router_active_on_one_agent,
router['id']),
timeout=90)
keepalived_state_file = self._get_state_file_for_master_agent(
router['id'])
self.assertIsNotNone(keepalived_state_file)
network = self.safe_client.create_network(tenant_id)
self._create_and_attach_subnet(
tenant_id, '13.37.0.0/24', network['id'], router['id'])
# Create 10 fake VMs, each with a floating ip. Each floating ip
# association should send a SIGHUP to the keepalived's parent process,
# unless the Throttler works.
host = self.environment.hosts[0]
vms = [self._boot_fake_vm_in_network(host, tenant_id, network['id'],
wait=False)
for i in range(10)]
for vm in vms:
self.safe_client.create_floatingip(
tenant_id, ext_net['id'], vm.ip, vm.neutron_port['id'])
# Check that the keepalived's state file has not changed and is still
# master. This will indicate that the Throttler works. We want to check
# for ha_vrrp_advert_int (the default is 2 seconds), plus a bit more.
time_to_stop = (time.time() +
(common_utils.DEFAULT_THROTTLER_VALUE *
ha_router.THROTTLER_MULTIPLIER * 1.3))
while True:
if time.time() > time_to_stop:
break
self.assertEqual(
"master",
self._get_keepalived_state(keepalived_state_file))

View File

@ -818,3 +818,53 @@ class ImportModulesRecursivelyTestCase(base.BaseTestCase):
for module in expected_modules:
self.assertIn(module, modules)
self.assertIn(module, sys.modules)
class TestThrottler(base.BaseTestCase):
def test_throttler(self):
threshold = 1
orig_function = mock.Mock()
# Add this magic name as it's required by functools
orig_function.__name__ = 'mock_func'
throttled_func = utils.throttler(threshold)(orig_function)
throttled_func()
sleep = utils.eventlet.sleep
def sleep_mock(amount_to_sleep):
sleep(amount_to_sleep)
self.assertTrue(threshold > amount_to_sleep)
with mock.patch.object(utils.eventlet, "sleep",
side_effect=sleep_mock):
throttled_func()
self.assertEqual(2, orig_function.call_count)
lock_with_timer = six.get_function_closure(
throttled_func)[1].cell_contents
timestamp = lock_with_timer.timestamp - threshold
lock_with_timer.timestamp = timestamp
throttled_func()
self.assertEqual(3, orig_function.call_count)
self.assertTrue(timestamp < lock_with_timer.timestamp)
def test_method_docstring_is_preserved(self):
class Klass(object):
@utils.throttler()
def method(self):
"""Docstring"""
self.assertEqual("Docstring", Klass.method.__doc__)
def test_method_still_callable(self):
class Klass(object):
@utils.throttler()
def method(self):
pass
obj = Klass()
obj.method()