Cache the ARP entries in L3 Agent for DVR

There seems to be a timing issue between the
ARP entries that arrive from the server to
the agent and the internal qr-device getting
created by the agent.
So those unsuccessful arp entries are dropped.

This patch makes sure that the early ARP entries
are cached in the agent and then utilized when
the internal device is up.

Closes-Bug: #1501086
Change-Id: I9ec5412f14808de73e8dd86e3d51593946d312a0
(cherry picked from commit d9fb3a66b4)
This commit is contained in:
Swaminathan Vasudevan 2015-09-28 11:43:03 -07:00 committed by Sergey Belous
parent 68e88eb28a
commit 0cc889f318
3 changed files with 109 additions and 7 deletions

View File

@ -13,6 +13,7 @@
# under the License.
import binascii
import collections
import netaddr
from oslo_log import log as logging
@ -25,12 +26,16 @@ from neutron.agent.linux import ip_lib
from neutron.common import constants as l3_constants
from neutron.common import exceptions
from neutron.common import utils as common_utils
from neutron.i18n import _LE
from neutron.i18n import _LE, _LW
LOG = logging.getLogger(__name__)
# xor-folding mask used for IPv6 rule index
MASK_30 = 0x3fffffff
# Tracks the arp entry cache
Arp_entry = collections.namedtuple(
'Arp_entry', 'ip mac subnet_id operation')
class DvrLocalRouter(dvr_router_base.DvrRouterBase):
def __init__(self, agent, host, *args, **kwargs):
@ -41,6 +46,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
self.rtr_fip_subnet = None
self.dist_fip_count = None
self.fip_ns = None
self._pending_arp_set = set()
def get_floating_ips(self):
"""Filter Floating IPs to be hosted on this agent."""
@ -173,21 +179,65 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
if f['subnet_id'] == subnet_id:
return port
def _cache_arp_entry(self, ip, mac, subnet_id, operation):
"""Cache the arp entries if device not ready."""
arp_entry_tuple = Arp_entry(ip=ip,
mac=mac,
subnet_id=subnet_id,
operation=operation)
self._pending_arp_set.add(arp_entry_tuple)
def _process_arp_cache_for_internal_port(self, subnet_id):
"""Function to process the cached arp entries."""
arp_remove = set()
for arp_entry in self._pending_arp_set:
if subnet_id == arp_entry.subnet_id:
try:
state = self._update_arp_entry(
arp_entry.ip, arp_entry.mac,
arp_entry.subnet_id, arp_entry.operation)
except Exception:
state = False
if state:
# If the arp update was successful, then
# go ahead and add it to the remove set
arp_remove.add(arp_entry)
self._pending_arp_set -= arp_remove
def _delete_arp_cache_for_internal_port(self, subnet_id):
"""Function to delete the cached arp entries."""
arp_delete = set()
for arp_entry in self._pending_arp_set:
if subnet_id == arp_entry.subnet_id:
arp_delete.add(arp_entry)
self._pending_arp_set -= arp_delete
def _update_arp_entry(self, ip, mac, subnet_id, operation):
"""Add or delete arp entry into router namespace for the subnet."""
port = self._get_internal_port(subnet_id)
# update arp entry only if the subnet is attached to the router
if not port:
return
return False
try:
# TODO(mrsmith): optimize the calls below for bulk calls
interface_name = self.get_internal_device_name(port['id'])
device = ip_lib.IPDevice(interface_name, namespace=self.ns_name)
if operation == 'add':
device.neigh.add(ip, mac)
elif operation == 'delete':
device.neigh.delete(ip, mac)
if ip_lib.device_exists(interface_name, self.ns_name):
if operation == 'add':
device.neigh.add(ip, mac)
elif operation == 'delete':
device.neigh.delete(ip, mac)
return True
else:
if operation == 'add':
LOG.warn(_LW("Device %s does not exist so ARP entry "
"cannot be updated, will cache information "
"to be applied later when the device exists"),
device)
self._cache_arp_entry(ip, mac, subnet_id, operation)
return False
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("DVR: Failed updating arp entry"))
@ -205,6 +255,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
p['mac_address'],
subnet_id,
'add')
self._process_arp_cache_for_internal_port(subnet_id)
@staticmethod
def _get_snat_idx(ip_cidr):
@ -321,6 +372,9 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
# DVR handling code for SNAT
interface_name = self.get_internal_device_name(port['id'])
self._snat_redirect_remove(sn_port, port, interface_name)
# Clean up the cached arp entries related to the port subnet
for subnet in port['subnets']:
self._delete_arp_cache_for_internal_port(subnet)
def internal_network_removed(self, port):
self._dvr_internal_network_removed(port)

View File

@ -360,8 +360,11 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
self.device_exists.return_value = False
ri.get_snat_port_for_internal_port = mock.Mock(
return_value=sn_port)
ri._delete_arp_cache_for_internal_port = mock.Mock()
ri._snat_redirect_modify = mock.Mock()
ri.internal_network_removed(port)
self.assertEqual(
1, ri._delete_arp_cache_for_internal_port.call_count)
ri._snat_redirect_modify.assert_called_with(
sn_port, port,
ri.get_internal_device_name(port['id']),

View File

@ -346,7 +346,10 @@ class TestDvrRouterOperations(base.BaseTestCase):
# Test basic case
ports[0]['subnets'] = [{'id': subnet_id,
'cidr': '1.2.3.0/24'}]
ri._set_subnet_arp_info(subnet_id)
with mock.patch.object(ri,
'_process_arp_cache_for_internal_port') as parp:
ri._set_subnet_arp_info(subnet_id)
self.assertEqual(1, parp.call_count)
self.mock_ip_dev.neigh.add.assert_called_once_with(
'1.2.3.4', '00:11:22:33:44:55')
@ -395,6 +398,48 @@ class TestDvrRouterOperations(base.BaseTestCase):
ri._update_arp_entry(mock.ANY, mock.ANY, 'foo_subnet_id', 'add')
self.assertFalse(f.call_count)
def _setup_test_for_arp_entry_cache(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = l3_test_common.prepare_router_data(num_internal_ports=2)
router['distributed'] = True
ri = dvr_router.DvrLocalRouter(
agent, HOSTNAME, router['id'], router, **self.ri_kwargs)
subnet_id = l3_test_common.get_subnet_id(
ri.router[l3_constants.INTERFACE_KEY][0])
return ri, subnet_id
def test__update_arp_entry_calls_arp_cache_with_no_device(self):
ri, subnet_id = self._setup_test_for_arp_entry_cache()
state = True
with mock.patch.object(l3_agent.ip_lib, 'IPDevice') as rtrdev,\
mock.patch.object(ri, '_cache_arp_entry') as arp_cache:
self.device_exists.return_value = False
state = ri._update_arp_entry(
mock.ANY, mock.ANY, subnet_id, 'add')
self.assertFalse(state)
self.assertTrue(arp_cache.called)
arp_cache.assert_called_once_with(mock.ANY, mock.ANY,
subnet_id, 'add')
self.assertFalse(rtrdev.neigh.add.called)
def test__process_arp_cache_for_internal_port(self):
ri, subnet_id = self._setup_test_for_arp_entry_cache()
ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55',
subnet_id, 'add')
self.assertEqual(1, len(ri._pending_arp_set))
with mock.patch.object(ri, '_update_arp_entry') as update_arp:
update_arp.return_value = True
ri._process_arp_cache_for_internal_port(subnet_id)
self.assertEqual(0, len(ri._pending_arp_set))
def test__delete_arp_cache_for_internal_port(self):
ri, subnet_id = self._setup_test_for_arp_entry_cache()
ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55',
subnet_id, 'add')
self.assertEqual(1, len(ri._pending_arp_set))
ri._delete_arp_cache_for_internal_port(subnet_id)
self.assertEqual(0, len(ri._pending_arp_set))
def test_del_arp_entry(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = l3_test_common.prepare_router_data(num_internal_ports=2)