From 79f64ab04190d605f5720a05715cfa8a21643782 Mon Sep 17 00:00:00 2001 From: Assaf Muller Date: Sun, 28 Sep 2014 14:26:42 +0300 Subject: [PATCH] Send notification to controller about HA router state change The L3 agent gets keepalived state change notifications via a unix domain socket. These events are now batched and send out as a single RPC to the server. In case the same router got updated multiple times during the batch period, only the latest state is sent. Partially-Implements: blueprint report-ha-router-master Change-Id: I36834ad3d9e8a49a702f01acc29c7c38f2d48833 --- neutron/agent/l3/agent.py | 9 +++++ neutron/agent/l3/ha.py | 25 +++++++++++++ neutron/api/rpc/handlers/l3_rpc.py | 21 +++++++++-- neutron/db/l3_hamode_db.py | 17 +++++++++ .../tests/functional/agent/test_l3_agent.py | 36 ++++++++++++++++-- .../tests/unit/agent/metadata/test_driver.py | 2 + neutron/tests/unit/db/test_l3_ha_db.py | 37 +++++++++++++++++++ 7 files changed, 139 insertions(+), 8 deletions(-) diff --git a/neutron/agent/l3/agent.py b/neutron/agent/l3/agent.py index 99998d66756..c39d3a1b6d7 100644 --- a/neutron/agent/l3/agent.py +++ b/neutron/agent/l3/agent.py @@ -73,6 +73,9 @@ class L3PluginApi(object): - get_agent_gateway_port Needed by the agent when operating in DVR/DVR_SNAT mode 1.3 - Get the list of activated services + 1.4 - Added L3 HA update_router_state. This method was reworked in + to update_ha_routers_states + 1.5 - Added update_ha_routers_states """ @@ -120,6 +123,12 @@ class L3PluginApi(object): cctxt = self.client.prepare(version='1.3') return cctxt.call(context, 'get_service_plugin_list') + def update_ha_routers_states(self, context, states): + """Update HA routers states.""" + cctxt = self.client.prepare(version='1.5') + return cctxt.call(context, 'update_ha_routers_states', + host=self.host, states=states) + class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, ha.AgentMixin, diff --git a/neutron/agent/l3/ha.py b/neutron/agent/l3/ha.py index 7f75864d767..66a61913f35 100644 --- a/neutron/agent/l3/ha.py +++ b/neutron/agent/l3/ha.py @@ -24,6 +24,7 @@ from neutron.agent.linux import keepalived from neutron.agent.linux import utils as agent_utils from neutron.common import constants as l3_constants from neutron.i18n import _LE, _LI +from neutron.notifiers import batch_notifier LOG = logging.getLogger(__name__) @@ -91,6 +92,8 @@ class AgentMixin(object): def __init__(self, host): self._init_ha_conf_path() super(AgentMixin, self).__init__(host) + self.state_change_notifier = batch_notifier.BatchNotifier( + self._calculate_batch_duration(), self.notify_server) eventlet.spawn(self._start_keepalived_notifications_server) def _start_keepalived_notifications_server(self): @@ -98,11 +101,22 @@ class AgentMixin(object): L3AgentKeepalivedStateChangeServer(self, self.conf)) state_change_server.run() + def _calculate_batch_duration(self): + # Slave becomes the master after not hearing from it 3 times + detection_time = self.conf.ha_vrrp_advert_int * 3 + + # Keepalived takes a couple of seconds to configure the VIPs + configuration_time = 2 + + # Give it enough slack to batch all events due to the same failure + return (detection_time + configuration_time) * 2 + def enqueue_state_change(self, router_id, state): LOG.info(_LI('Router %(router_id)s transitioned to %(state)s'), {'router_id': router_id, 'state': state}) self._update_metadata_proxy(router_id, state) + self.state_change_notifier.queue_event((router_id, state)) def _update_metadata_proxy(self, router_id, state): try: @@ -122,6 +136,17 @@ class AgentMixin(object): self.metadata_driver.destroy_monitored_metadata_proxy( self.process_monitor, ri.router_id, ri.ns_name, self.conf) + def notify_server(self, batched_events): + translation_map = {'master': 'active', + 'backup': 'standby', + 'fault': 'standby'} + translated_states = dict((router_id, translation_map[state]) for + router_id, state in batched_events) + LOG.debug('Updating server with HA routers states %s', + translated_states) + self.plugin_rpc.update_ha_routers_states( + self.context, translated_states) + def _init_ha_conf_path(self): ha_full_path = os.path.dirname("/%s/" % self.conf.ha_confs_path) agent_utils.ensure_dir(ha_full_path) diff --git a/neutron/api/rpc/handlers/l3_rpc.py b/neutron/api/rpc/handlers/l3_rpc.py index 5865eb7cf4c..9e2d47ed002 100644 --- a/neutron/api/rpc/handlers/l3_rpc.py +++ b/neutron/api/rpc/handlers/l3_rpc.py @@ -35,13 +35,14 @@ LOG = logging.getLogger(__name__) class L3RpcCallback(object): """L3 agent RPC callback in plugin implementations.""" - # 1.0 L3PluginApi BASE_RPC_API_VERSION - # 1.1 Support update_floatingip_statuses + # 1.0 L3PluginApi BASE_RPC_API_VERSION + # 1.1 Support update_floatingip_statuses # 1.2 Added methods for DVR support # 1.3 Added a method that returns the list of activated services # 1.4 Added L3 HA update_router_state. This method was later removed, - # since it was unused. The RPC version was not changed. - target = oslo_messaging.Target(version='1.4') + # since it was unused. The RPC version was not changed + # 1.5 Added update_ha_routers_states + target = oslo_messaging.Target(version='1.5') @property def plugin(self): @@ -209,3 +210,15 @@ class L3RpcCallback(object): 'host %(host)s', {'agent_port': agent_port, 'host': host}) return agent_port + + def update_ha_routers_states(self, context, **kwargs): + """Update states for HA routers. + + Get a map of router_id to its HA state on a host and update the DB. + State must be in: ('active', 'standby'). + """ + states = kwargs.get('states') + host = kwargs.get('host') + + LOG.debug('Updating HA routers states on host %s: %s', host, states) + self.l3plugin.update_routers_states(context, states, host) diff --git a/neutron/db/l3_hamode_db.py b/neutron/db/l3_hamode_db.py index 1c0a6e59576..a7ddfdadc7d 100644 --- a/neutron/db/l3_hamode_db.py +++ b/neutron/db/l3_hamode_db.py @@ -462,3 +462,20 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin): router_ids, active) return self._process_sync_ha_data(context, sync_data, host) + + @classmethod + def _set_router_states(cls, context, bindings, states): + for binding in bindings: + try: + with context.session.begin(subtransactions=True): + binding.state = states[binding.router_id] + except (orm.exc.StaleDataError, orm.exc.ObjectDeletedError): + # Take concurrently deleted routers in to account + pass + + def update_routers_states(self, context, states, host): + """Receive dict of router ID to state and update them all.""" + + bindings = self.get_ha_router_port_bindings( + context, router_ids=states.keys(), host=host) + self._set_router_states(context, bindings, states) diff --git a/neutron/tests/functional/agent/test_l3_agent.py b/neutron/tests/functional/agent/test_l3_agent.py index a822eb2640e..729b083605b 100755 --- a/neutron/tests/functional/agent/test_l3_agent.py +++ b/neutron/tests/functional/agent/test_l3_agent.py @@ -271,6 +271,12 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase): external_port['mac_address'], namespace=router.ns_name) for fip in floating_ips) + def fail_ha_router(self, router): + device_name = router.get_ha_device_name( + router.router[l3_constants.HA_INTERFACE_KEY]['id']) + ha_device = ip_lib.IPDevice(device_name, router.ns_name) + ha_device.link.set_down() + class L3AgentTestCase(L3AgentTestFramework): def test_observer_notifications_legacy_router(self): @@ -286,10 +292,7 @@ class L3AgentTestCase(L3AgentTestFramework): router = self.manage_router(self.agent, router_info) utils.wait_until_true(lambda: router.ha_state == 'master') - device_name = router.get_ha_device_name( - router.router[l3_constants.HA_INTERFACE_KEY]['id']) - ha_device = ip_lib.IPDevice(device_name, router.ns_name) - ha_device.link.set_down() + self.fail_ha_router(router) utils.wait_until_true(lambda: router.ha_state == 'backup') utils.wait_until_true(lambda: enqueue_mock.call_count == 3) @@ -298,6 +301,31 @@ class L3AgentTestCase(L3AgentTestFramework): self.assertEqual((router.router_id, 'master'), calls[1]) self.assertEqual((router.router_id, 'backup'), calls[2]) + def _expected_rpc_report(self, expected): + calls = (args[0][1] for args in + self.agent.plugin_rpc.update_ha_routers_states.call_args_list) + + # Get the last state reported for each router + actual_router_states = {} + for call in calls: + for router_id, state in call.iteritems(): + actual_router_states[router_id] = state + + return actual_router_states == expected + + def test_keepalived_state_change_bulk_rpc(self): + router_info = self.generate_router_info(enable_ha=True) + router1 = self.manage_router(self.agent, router_info) + self.fail_ha_router(router1) + router_info = self.generate_router_info(enable_ha=True) + router2 = self.manage_router(self.agent, router_info) + + utils.wait_until_true(lambda: router1.ha_state == 'backup') + utils.wait_until_true(lambda: router2.ha_state == 'master') + utils.wait_until_true( + lambda: self._expected_rpc_report( + {router1.router_id: 'standby', router2.router_id: 'active'})) + def _test_observer_notifications(self, enable_ha): """Test create, update, delete of router and notifications.""" with mock.patch.object( diff --git a/neutron/tests/unit/agent/metadata/test_driver.py b/neutron/tests/unit/agent/metadata/test_driver.py index 4f74cde1bcb..025b78a414a 100644 --- a/neutron/tests/unit/agent/metadata/test_driver.py +++ b/neutron/tests/unit/agent/metadata/test_driver.py @@ -22,6 +22,7 @@ from oslo_config import cfg from neutron.agent.common import config as agent_config from neutron.agent.l3 import agent as l3_agent from neutron.agent.l3 import config as l3_config +from neutron.agent.l3 import ha as l3_ha_agent from neutron.agent.metadata import driver as metadata_driver from neutron.openstack.common import uuidutils from neutron.tests import base @@ -74,6 +75,7 @@ class TestMetadataDriverProcess(base.BaseTestCase): '._init_ha_conf_path').start() cfg.CONF.register_opts(l3_config.OPTS) + cfg.CONF.register_opts(l3_ha_agent.OPTS) cfg.CONF.register_opts(metadata_driver.MetadataDriver.OPTS) def _test_spawn_metadata_proxy(self, expected_user, expected_group, diff --git a/neutron/tests/unit/db/test_l3_ha_db.py b/neutron/tests/unit/db/test_l3_ha_db.py index b3513516979..b1bbf3e9789 100644 --- a/neutron/tests/unit/db/test_l3_ha_db.py +++ b/neutron/tests/unit/db/test_l3_ha_db.py @@ -387,6 +387,43 @@ class L3HATestCase(L3HATestFramework): routers_after = self.plugin.get_routers(self.admin_ctx) self.assertEqual(routers_before, routers_after) + def test_update_routers_states(self): + router1 = self._create_router() + self._bind_router(router1['id']) + router2 = self._create_router() + self._bind_router(router2['id']) + + routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx, + self.agent1['host']) + for router in routers: + self.assertEqual('standby', router[constants.HA_ROUTER_STATE_KEY]) + + states = {router1['id']: 'active', + router2['id']: 'standby'} + self.plugin.update_routers_states( + self.admin_ctx, states, self.agent1['host']) + + routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx, + self.agent1['host']) + for router in routers: + self.assertEqual(states[router['id']], + router[constants.HA_ROUTER_STATE_KEY]) + + def test_set_router_states_handles_concurrently_deleted_router(self): + router1 = self._create_router() + self._bind_router(router1['id']) + router2 = self._create_router() + self._bind_router(router2['id']) + bindings = self.plugin.get_ha_router_port_bindings( + self.admin_ctx, [router1['id'], router2['id']]) + self.plugin.delete_router(self.admin_ctx, router1['id']) + self.plugin._set_router_states( + self.admin_ctx, bindings, {router1['id']: 'active', + router2['id']: 'active'}) + routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx, + self.agent1['host']) + self.assertEqual('active', routers[0][constants.HA_ROUTER_STATE_KEY]) + def test_exclude_dvr_agents_for_ha_candidates(self): """Test dvr agents are not counted in the ha candidates.