From 0cc889f3185a2e8fe3fef45190e4fb457ce40150 Mon Sep 17 00:00:00 2001 From: Swaminathan Vasudevan Date: Mon, 28 Sep 2015 11:43:03 -0700 Subject: [PATCH] Cache the ARP entries in L3 Agent for DVR There seems to be a timing issue between the ARP entries that arrive from the server to the agent and the internal qr-device getting created by the agent. So those unsuccessful arp entries are dropped. This patch makes sure that the early ARP entries are cached in the agent and then utilized when the internal device is up. Closes-Bug: #1501086 Change-Id: I9ec5412f14808de73e8dd86e3d51593946d312a0 (cherry picked from commit d9fb3a66b4aead65fac2df08a5f34538e7af4d7b) --- neutron/agent/l3/dvr_local_router.py | 66 +++++++++++++++++-- neutron/tests/unit/agent/l3/test_agent.py | 3 + .../unit/agent/l3/test_dvr_local_router.py | 47 ++++++++++++- 3 files changed, 109 insertions(+), 7 deletions(-) diff --git a/neutron/agent/l3/dvr_local_router.py b/neutron/agent/l3/dvr_local_router.py index 1ecc79fc477..257fd4594af 100644 --- a/neutron/agent/l3/dvr_local_router.py +++ b/neutron/agent/l3/dvr_local_router.py @@ -13,6 +13,7 @@ # under the License. import binascii +import collections import netaddr from oslo_log import log as logging @@ -25,12 +26,16 @@ from neutron.agent.linux import ip_lib from neutron.common import constants as l3_constants from neutron.common import exceptions from neutron.common import utils as common_utils -from neutron.i18n import _LE +from neutron.i18n import _LE, _LW LOG = logging.getLogger(__name__) # xor-folding mask used for IPv6 rule index MASK_30 = 0x3fffffff +# Tracks the arp entry cache +Arp_entry = collections.namedtuple( + 'Arp_entry', 'ip mac subnet_id operation') + class DvrLocalRouter(dvr_router_base.DvrRouterBase): def __init__(self, agent, host, *args, **kwargs): @@ -41,6 +46,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): self.rtr_fip_subnet = None self.dist_fip_count = None self.fip_ns = None + self._pending_arp_set = set() def get_floating_ips(self): """Filter Floating IPs to be hosted on this agent.""" @@ -173,21 +179,65 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): if f['subnet_id'] == subnet_id: return port + def _cache_arp_entry(self, ip, mac, subnet_id, operation): + """Cache the arp entries if device not ready.""" + arp_entry_tuple = Arp_entry(ip=ip, + mac=mac, + subnet_id=subnet_id, + operation=operation) + self._pending_arp_set.add(arp_entry_tuple) + + def _process_arp_cache_for_internal_port(self, subnet_id): + """Function to process the cached arp entries.""" + arp_remove = set() + for arp_entry in self._pending_arp_set: + if subnet_id == arp_entry.subnet_id: + try: + state = self._update_arp_entry( + arp_entry.ip, arp_entry.mac, + arp_entry.subnet_id, arp_entry.operation) + except Exception: + state = False + if state: + # If the arp update was successful, then + # go ahead and add it to the remove set + arp_remove.add(arp_entry) + + self._pending_arp_set -= arp_remove + + def _delete_arp_cache_for_internal_port(self, subnet_id): + """Function to delete the cached arp entries.""" + arp_delete = set() + for arp_entry in self._pending_arp_set: + if subnet_id == arp_entry.subnet_id: + arp_delete.add(arp_entry) + self._pending_arp_set -= arp_delete + def _update_arp_entry(self, ip, mac, subnet_id, operation): """Add or delete arp entry into router namespace for the subnet.""" port = self._get_internal_port(subnet_id) # update arp entry only if the subnet is attached to the router if not port: - return + return False try: # TODO(mrsmith): optimize the calls below for bulk calls interface_name = self.get_internal_device_name(port['id']) device = ip_lib.IPDevice(interface_name, namespace=self.ns_name) - if operation == 'add': - device.neigh.add(ip, mac) - elif operation == 'delete': - device.neigh.delete(ip, mac) + if ip_lib.device_exists(interface_name, self.ns_name): + if operation == 'add': + device.neigh.add(ip, mac) + elif operation == 'delete': + device.neigh.delete(ip, mac) + return True + else: + if operation == 'add': + LOG.warn(_LW("Device %s does not exist so ARP entry " + "cannot be updated, will cache information " + "to be applied later when the device exists"), + device) + self._cache_arp_entry(ip, mac, subnet_id, operation) + return False except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE("DVR: Failed updating arp entry")) @@ -205,6 +255,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): p['mac_address'], subnet_id, 'add') + self._process_arp_cache_for_internal_port(subnet_id) @staticmethod def _get_snat_idx(ip_cidr): @@ -321,6 +372,9 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): # DVR handling code for SNAT interface_name = self.get_internal_device_name(port['id']) self._snat_redirect_remove(sn_port, port, interface_name) + # Clean up the cached arp entries related to the port subnet + for subnet in port['subnets']: + self._delete_arp_cache_for_internal_port(subnet) def internal_network_removed(self, port): self._dvr_internal_network_removed(port) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index 1099f1e4195..a81d4d3c0e6 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -360,8 +360,11 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): self.device_exists.return_value = False ri.get_snat_port_for_internal_port = mock.Mock( return_value=sn_port) + ri._delete_arp_cache_for_internal_port = mock.Mock() ri._snat_redirect_modify = mock.Mock() ri.internal_network_removed(port) + self.assertEqual( + 1, ri._delete_arp_cache_for_internal_port.call_count) ri._snat_redirect_modify.assert_called_with( sn_port, port, ri.get_internal_device_name(port['id']), diff --git a/neutron/tests/unit/agent/l3/test_dvr_local_router.py b/neutron/tests/unit/agent/l3/test_dvr_local_router.py index 06175302b59..a723b9b5103 100644 --- a/neutron/tests/unit/agent/l3/test_dvr_local_router.py +++ b/neutron/tests/unit/agent/l3/test_dvr_local_router.py @@ -346,7 +346,10 @@ class TestDvrRouterOperations(base.BaseTestCase): # Test basic case ports[0]['subnets'] = [{'id': subnet_id, 'cidr': '1.2.3.0/24'}] - ri._set_subnet_arp_info(subnet_id) + with mock.patch.object(ri, + '_process_arp_cache_for_internal_port') as parp: + ri._set_subnet_arp_info(subnet_id) + self.assertEqual(1, parp.call_count) self.mock_ip_dev.neigh.add.assert_called_once_with( '1.2.3.4', '00:11:22:33:44:55') @@ -395,6 +398,48 @@ class TestDvrRouterOperations(base.BaseTestCase): ri._update_arp_entry(mock.ANY, mock.ANY, 'foo_subnet_id', 'add') self.assertFalse(f.call_count) + def _setup_test_for_arp_entry_cache(self): + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + router = l3_test_common.prepare_router_data(num_internal_ports=2) + router['distributed'] = True + ri = dvr_router.DvrLocalRouter( + agent, HOSTNAME, router['id'], router, **self.ri_kwargs) + subnet_id = l3_test_common.get_subnet_id( + ri.router[l3_constants.INTERFACE_KEY][0]) + return ri, subnet_id + + def test__update_arp_entry_calls_arp_cache_with_no_device(self): + ri, subnet_id = self._setup_test_for_arp_entry_cache() + state = True + with mock.patch.object(l3_agent.ip_lib, 'IPDevice') as rtrdev,\ + mock.patch.object(ri, '_cache_arp_entry') as arp_cache: + self.device_exists.return_value = False + state = ri._update_arp_entry( + mock.ANY, mock.ANY, subnet_id, 'add') + self.assertFalse(state) + self.assertTrue(arp_cache.called) + arp_cache.assert_called_once_with(mock.ANY, mock.ANY, + subnet_id, 'add') + self.assertFalse(rtrdev.neigh.add.called) + + def test__process_arp_cache_for_internal_port(self): + ri, subnet_id = self._setup_test_for_arp_entry_cache() + ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55', + subnet_id, 'add') + self.assertEqual(1, len(ri._pending_arp_set)) + with mock.patch.object(ri, '_update_arp_entry') as update_arp: + update_arp.return_value = True + ri._process_arp_cache_for_internal_port(subnet_id) + self.assertEqual(0, len(ri._pending_arp_set)) + + def test__delete_arp_cache_for_internal_port(self): + ri, subnet_id = self._setup_test_for_arp_entry_cache() + ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55', + subnet_id, 'add') + self.assertEqual(1, len(ri._pending_arp_set)) + ri._delete_arp_cache_for_internal_port(subnet_id) + self.assertEqual(0, len(ri._pending_arp_set)) + def test_del_arp_entry(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) router = l3_test_common.prepare_router_data(num_internal_ports=2)