diff --git a/neutron/agent/l3/ha_router.py b/neutron/agent/l3/ha_router.py index b9fba67e10a..fa7fdda172d 100644 --- a/neutron/agent/l3/ha_router.py +++ b/neutron/agent/l3/ha_router.py @@ -34,6 +34,11 @@ HA_DEV_PREFIX = 'ha-' IP_MONITOR_PROCESS_SERVICE = 'ip_monitor' SIGTERM_TIMEOUT = 10 +# The multiplier is used to compensate execution time of function sending +# SIGHUP to keepalived process. The constant multiplies ha_vrrp_advert_int +# config option and the result is the throttle delay. +THROTTLER_MULTIPLIER = 1.5 + class HaRouterNamespace(namespaces.RouterNamespace): """Namespace for HA router. @@ -124,7 +129,9 @@ class HaRouter(router.RouterInfo): keepalived.KeepalivedConf(), process_monitor, conf_path=self.agent_conf.ha_confs_path, - namespace=self.ha_namespace) + namespace=self.ha_namespace, + throttle_restart_value=( + self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER)) config = self.keepalived_manager.config diff --git a/neutron/agent/linux/keepalived.py b/neutron/agent/linux/keepalived.py index c5ba8c83f8c..e4ab6c7b884 100644 --- a/neutron/agent/linux/keepalived.py +++ b/neutron/agent/linux/keepalived.py @@ -27,6 +27,7 @@ from oslo_utils import fileutils from neutron._i18n import _, _LE from neutron.agent.linux import external_process from neutron.common import constants +from neutron.common import utils VALID_STATES = ['MASTER', 'BACKUP'] VALID_AUTH_TYPES = ['AH', 'PASS'] @@ -367,12 +368,20 @@ class KeepalivedManager(object): """ def __init__(self, resource_id, config, process_monitor, conf_path='/tmp', - namespace=None): + namespace=None, throttle_restart_value=None): self.resource_id = resource_id self.config = config self.namespace = namespace self.process_monitor = process_monitor self.conf_path = conf_path + # configure throttler for spawn to introduce delay between SIGHUPs, + # otherwise keepalived master may unnecessarily flip to slave + if throttle_restart_value is not None: + self._throttle_spawn(throttle_restart_value) + + #pylint: disable=method-hidden + def _throttle_spawn(self, threshold): + self.spawn = utils.throttler(threshold)(self.spawn) def get_conf_dir(self): confs_dir = os.path.abspath(os.path.normpath(self.conf_path)) diff --git a/neutron/common/utils.py b/neutron/common/utils.py index e84ae3b63a1..20ae9f7d6cb 100644 --- a/neutron/common/utils.py +++ b/neutron/common/utils.py @@ -25,6 +25,7 @@ import os.path import random import signal import sys +import threading import time import uuid import weakref @@ -54,6 +55,8 @@ TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" LOG = logging.getLogger(__name__) SYNCHRONIZED_PREFIX = 'neutron-' +DEFAULT_THROTTLER_VALUE = 2 + synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX) @@ -61,6 +64,51 @@ class WaitTimeout(Exception): """Default exception coming from wait_until_true() function.""" +class LockWithTimer(object): + def __init__(self, threshold): + self._threshold = threshold + self.timestamp = 0 + self._lock = threading.Lock() + + def acquire(self): + return self._lock.acquire(False) + + def release(self): + return self._lock.release() + + def time_to_wait(self): + return self.timestamp - time.time() + self._threshold + + +# REVISIT(jlibosva): Some parts of throttler may be similar to what +# neutron.notifiers.batch_notifier.BatchNotifier does. They +# could be refactored and unified. +def throttler(threshold=DEFAULT_THROTTLER_VALUE): + """Throttle number of calls to a function to only once per 'threshold'. + """ + def decorator(f): + lock_with_timer = LockWithTimer(threshold) + + @functools.wraps(f) + def wrapper(*args, **kwargs): + if lock_with_timer.acquire(): + try: + fname = f.__name__ + time_to_wait = lock_with_timer.time_to_wait() + if time_to_wait > 0: + LOG.debug("Call of function %s scheduled, sleeping " + "%.1f seconds", fname, time_to_wait) + # Decorated function has been called recently, wait. + eventlet.sleep(time_to_wait) + lock_with_timer.timestamp = time.time() + finally: + lock_with_timer.release() + LOG.debug("Calling throttled function %s", fname) + return f(*args, **kwargs) + return wrapper + return decorator + + @removals.remove( message="Use ensure_tree(path, 0o755) from oslo_utils.fileutils") def ensure_dir(dir_path): diff --git a/neutron/tests/common/agents/l3_agent.py b/neutron/tests/common/agents/l3_agent.py index 3557bdaaf09..fcc46890f4c 100755 --- a/neutron/tests/common/agents/l3_agent.py +++ b/neutron/tests/common/agents/l3_agent.py @@ -13,6 +13,7 @@ # under the License. import sys +import types import mock from oslo_config import cfg @@ -21,6 +22,7 @@ from neutron._i18n import _ from neutron.agent.l3 import agent from neutron.agent.l3 import namespaces from neutron.agent import l3_agent +from neutron.common import constants class L3NATAgentForTest(agent.L3NATAgentWithStateReport): @@ -59,6 +61,40 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport): super(L3NATAgentForTest, self).__init__(host, conf) + def _create_router(self, router_id, router): + """Create a router with suffix added to the router namespace name. + + This is needed to be able to run two agents serving the same router + on the same node. + """ + router = ( + super(L3NATAgentForTest, self)._create_router(router_id, router)) + + router.get_internal_device_name = types.MethodType( + get_internal_device_name, router) + router.get_external_device_name = types.MethodType( + get_external_device_name, router) + + return router + + +def _append_suffix(dev_name): + # If dev_name = 'xyz123' and the suffix is 'hostB' then the result + # will be 'xy_stB' + return '%s_%s' % (dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:]) + + +def get_internal_device_name(ri, port_id): + return _append_suffix( + (namespaces.INTERNAL_DEV_PREFIX + port_id) + [:constants.LINUX_DEV_LEN]) + + +def get_external_device_name(ri, port_id): + return _append_suffix( + (namespaces.EXTERNAL_DEV_PREFIX + port_id) + [:constants.LINUX_DEV_LEN]) + OPTS = [ cfg.StrOpt('test_namespace_suffix', default='testprefix', diff --git a/neutron/tests/fullstack/test_l3_agent.py b/neutron/tests/fullstack/test_l3_agent.py index 2e78bd5f88c..5c07dc5ef03 100644 --- a/neutron/tests/fullstack/test_l3_agent.py +++ b/neutron/tests/fullstack/test_l3_agent.py @@ -14,10 +14,13 @@ import functools import netaddr +import os +import time from neutron_lib import constants from oslo_utils import uuidutils +from neutron.agent.l3 import ha_router from neutron.agent.l3 import namespaces from neutron.agent.linux import ip_lib from neutron.common import utils as common_utils @@ -50,25 +53,34 @@ class TestL3Agent(base.BaseFullStackTestCase): return port['port']['status'] == 'ACTIVE' common_utils.wait_until_true(lambda: is_port_status_active(), sleep=1) + def _create_and_attach_subnet( + self, tenant_id, subnet_cidr, network_id, router_id): + # For IPv6 subnets, enable_dhcp should be set to true. + enable_dhcp = (netaddr.IPNetwork(subnet_cidr).version == + constants.IP_VERSION_6) + subnet = self.safe_client.create_subnet( + tenant_id, network_id, subnet_cidr, enable_dhcp=enable_dhcp) + + router_interface_info = self.safe_client.add_router_interface( + router_id, subnet['id']) + self.block_until_port_status_active( + router_interface_info['port_id']) + + def _boot_fake_vm_in_network(self, host, tenant_id, network_id, wait=True): + vm = self.useFixture( + machine.FakeFullstackMachine( + host, network_id, tenant_id, self.safe_client)) + if wait: + vm.block_until_boot() + return vm + def _create_net_subnet_and_vm(self, tenant_id, subnet_cidrs, host, router): network = self.safe_client.create_network(tenant_id) for cidr in subnet_cidrs: - # For IPv6 subnets, enable_dhcp should be set to true. - enable_dhcp = (netaddr.IPNetwork(cidr).version == - constants.IP_VERSION_6) - subnet = self.safe_client.create_subnet( - tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp) + self._create_and_attach_subnet( + tenant_id, cidr, network['id'], router['id']) - router_interface_info = self.safe_client.add_router_interface( - router['id'], subnet['id']) - self.block_until_port_status_active( - router_interface_info['port_id']) - - vm = self.useFixture( - machine.FakeFullstackMachine( - host, network['id'], tenant_id, self.safe_client)) - vm.block_until_boot() - return vm + return self._boot_fake_vm_in_network(host, tenant_id, network['id']) class TestLegacyL3Agent(TestL3Agent): @@ -175,7 +187,7 @@ class TestLegacyL3Agent(TestL3Agent): vm.block_until_ping(external_vm.ipv6) -class TestHAL3Agent(base.BaseFullStackTestCase): +class TestHAL3Agent(TestL3Agent): def setUp(self): host_descriptions = [ @@ -206,3 +218,64 @@ class TestHAL3Agent(base.BaseFullStackTestCase): self._is_ha_router_active_on_one_agent, router['id']), timeout=90) + + def _get_keepalived_state(self, keepalived_state_file): + with open(keepalived_state_file, "r") as fd: + return fd.read() + + def _get_state_file_for_master_agent(self, router_id): + for host in self.environment.hosts: + keepalived_state_file = os.path.join( + host.neutron_config.state_path, "ha_confs", router_id, "state") + + if self._get_keepalived_state(keepalived_state_file) == "master": + return keepalived_state_file + + def test_keepalived_multiple_sighups_does_not_forfeit_mastership(self): + """Setup a complete "Neutron stack" - both an internal and an external + network+subnet, and a router connected to both. + """ + tenant_id = uuidutils.generate_uuid() + ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id) + router = self.safe_client.create_router(tenant_id, ha=True, + external_network=ext_net['id']) + common_utils.wait_until_true( + lambda: + len(self.client.list_l3_agent_hosting_routers( + router['id'])['agents']) == 2, + timeout=90) + common_utils.wait_until_true( + functools.partial( + self._is_ha_router_active_on_one_agent, + router['id']), + timeout=90) + keepalived_state_file = self._get_state_file_for_master_agent( + router['id']) + self.assertIsNotNone(keepalived_state_file) + network = self.safe_client.create_network(tenant_id) + self._create_and_attach_subnet( + tenant_id, '13.37.0.0/24', network['id'], router['id']) + + # Create 10 fake VMs, each with a floating ip. Each floating ip + # association should send a SIGHUP to the keepalived's parent process, + # unless the Throttler works. + host = self.environment.hosts[0] + vms = [self._boot_fake_vm_in_network(host, tenant_id, network['id'], + wait=False) + for i in range(10)] + for vm in vms: + self.safe_client.create_floatingip( + tenant_id, ext_net['id'], vm.ip, vm.neutron_port['id']) + + # Check that the keepalived's state file has not changed and is still + # master. This will indicate that the Throttler works. We want to check + # for ha_vrrp_advert_int (the default is 2 seconds), plus a bit more. + time_to_stop = (time.time() + + (common_utils.DEFAULT_THROTTLER_VALUE * + ha_router.THROTTLER_MULTIPLIER * 1.3)) + while True: + if time.time() > time_to_stop: + break + self.assertEqual( + "master", + self._get_keepalived_state(keepalived_state_file)) diff --git a/neutron/tests/unit/common/test_utils.py b/neutron/tests/unit/common/test_utils.py index db9a9423a11..0e3c49aabe9 100644 --- a/neutron/tests/unit/common/test_utils.py +++ b/neutron/tests/unit/common/test_utils.py @@ -703,3 +703,53 @@ class ImportModulesRecursivelyTestCase(base.BaseTestCase): for module in expected_modules: self.assertIn(module, modules) self.assertIn(module, sys.modules) + + +class TestThrottler(base.BaseTestCase): + def test_throttler(self): + threshold = 1 + orig_function = mock.Mock() + # Add this magic name as it's required by functools + orig_function.__name__ = 'mock_func' + throttled_func = utils.throttler(threshold)(orig_function) + + throttled_func() + + sleep = utils.eventlet.sleep + + def sleep_mock(amount_to_sleep): + sleep(amount_to_sleep) + self.assertTrue(threshold > amount_to_sleep) + + with mock.patch.object(utils.eventlet, "sleep", + side_effect=sleep_mock): + throttled_func() + + self.assertEqual(2, orig_function.call_count) + + lock_with_timer = six.get_function_closure( + throttled_func)[1].cell_contents + timestamp = lock_with_timer.timestamp - threshold + lock_with_timer.timestamp = timestamp + + throttled_func() + + self.assertEqual(3, orig_function.call_count) + self.assertTrue(timestamp < lock_with_timer.timestamp) + + def test_method_docstring_is_preserved(self): + class Klass(object): + @utils.throttler() + def method(self): + """Docstring""" + + self.assertEqual("Docstring", Klass.method.__doc__) + + def test_method_still_callable(self): + class Klass(object): + @utils.throttler() + def method(self): + pass + + obj = Klass() + obj.method()