Stale VXLAN and GRE tunnel port/flow deletion

Description:
Stale GRE and VXLAN tunnel endpoints persists in neutron db this should be
deleted from the database. Also, if local_ip of L2 agent changes the
stale tunnel ports and flows persists on br-tun on other Compute Nodes and
Network Nodes for that remote ip this should also be removed.

Implementation

Plugin changes:
The plugin side changes are covered in following patch-set
https://review.openstack.org/#/c/121000/.

Agent changes:
Added tunnel_delete rpc for removing stale ports and flows from br-tun.
tunnel_sync rpc signature upgrade to obtain 'host'.
Added testcases for TunnelRpcCallbackMixin().

This patch-set agent deals with agent side changes.

Closes-Bug: #1179223
Closes-Bug: #1381071
Closes-Bug: #1276629

Co-Authored-By: Aman Kumar <amank@hp.com>
Co-Authored-By: phanipawan <ppawan@hp.com>

Change-Id: I291992ffde5c3ab7152f0d7462deca2e4ac1ba3f
This commit is contained in:
Romil Gupta 2014-11-20 11:32:07 -08:00
parent ec3f7562e0
commit 67c4c6d809
12 changed files with 200 additions and 15 deletions

View File

@ -79,6 +79,7 @@ class PluginApi(object):
1.3 - get_device_details rpc signature upgrade to obtain 'host' and
return value to include fixed_ips and device_owner for
the device port
1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
'''
def __init__(self, topic):
@ -117,7 +118,14 @@ class PluginApi(object):
return cctxt.call(context, 'update_device_up', device=device,
agent_id=agent_id, host=host)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
cctxt = self.client.prepare()
return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
try:
cctxt = self.client.prepare(version='1.4')
res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type, host=host)
except oslo_messaging.UnsupportedVersion:
LOG.warn(_LW('Tunnel synchronization requires a server upgrade.'))
cctxt = self.client.prepare()
res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)
return res

View File

@ -265,4 +265,13 @@ class TunnelAgentRpcApiMixin(object):
cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)
# TODO(romilg): Add tunnel_delete rpc in dependent patch-set
def _get_tunnel_delete_topic(self):
return topics.get_topic_name(self.topic,
TUNNEL,
topics.DELETE)
def tunnel_delete(self, context, tunnel_ip, tunnel_type):
cctxt = self.client.prepare(topic=self._get_tunnel_delete_topic(),
fanout=True)
cctxt.cast(context, 'tunnel_delete', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)

View File

@ -45,7 +45,8 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
# 1.3 get_device_details rpc signature upgrade to obtain 'host' and
# return value to include fixed_ips and device_owner for
# the device port
target = oslo_messaging.Target(version='1.3')
# 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
target = oslo_messaging.Target(version='1.4')
def __init__(self, notifier, type_manager):
self.setup_tunnel_callback_mixin(notifier, type_manager)

View File

@ -787,7 +787,8 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
for tunnel_type in self.tunnel_types:
self.plugin_rpc.tunnel_sync(self.context,
self.local_ip,
tunnel_type)
tunnel_type,
cfg.CONF.host)
except Exception as e:
LOG.debug("Unable to sync tunnel IP %(local_ip)s: %(e)s",
{'local_ip': self.local_ip, 'e': e})

View File

@ -296,6 +296,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
@ -354,6 +355,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip,
tunnel_type)
def tunnel_delete(self, context, **kwargs):
LOG.debug("tunnel_delete received")
if not self.enable_tunneling:
return
tunnel_ip = kwargs.get('tunnel_ip')
if not tunnel_ip:
LOG.error(_LE("No tunnel_ip specified, cannot delete tunnels"))
return
tunnel_type = kwargs.get('tunnel_type')
if not tunnel_type:
LOG.error(_LE("No tunnel_type specified, cannot delete tunnels"))
return
if tunnel_type not in self.tunnel_types:
LOG.error(_LE("tunnel_type %s not supported by agent"),
tunnel_type)
return
ofport = self.tun_br_ofports[tunnel_type].get(tunnel_ip)
self.cleanup_tunnel_port(self.tun_br, ofport, tunnel_type)
def fdb_add(self, context, fdb_entries):
LOG.debug("fdb_add received")
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
@ -1309,8 +1329,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
try:
return '%08x' % netaddr.IPAddress(ip_address, version=4)
except Exception:
LOG.warn(_LW("Unable to create tunnel port. "
"Invalid remote IP: %s"), ip_address)
LOG.warn(_LW("Invalid remote IP: %s"), ip_address)
return
def tunnel_sync(self):
@ -1318,7 +1337,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
for tunnel_type in self.tunnel_types:
details = self.plugin_rpc.tunnel_sync(self.context,
self.local_ip,
tunnel_type)
tunnel_type,
cfg.CONF.host)
if not self.l2_pop:
tunnels = details['tunnels']
for tunnel in tunnels:

View File

@ -142,4 +142,6 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
rpcapi, None,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip',
tunnel_type=None)
tunnel_type=None,
host='fake_host',
version='1.4')

View File

@ -29,6 +29,7 @@ from neutron.common import constants
from neutron.common import exceptions
from neutron.common import topics
from neutron.plugins.ml2.drivers import type_tunnel
from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import rpc as plugin_rpc
from neutron.tests import base
@ -37,7 +38,10 @@ class RpcCallbacksTestCase(base.BaseTestCase):
def setUp(self):
super(RpcCallbacksTestCase, self).setUp()
self.callbacks = plugin_rpc.RpcCallbacks(mock.Mock(), mock.Mock())
self.type_manager = managers.TypeManager()
self.notifier = plugin_rpc.AgentNotifierApi(topics.AGENT)
self.callbacks = plugin_rpc.RpcCallbacks(self.notifier,
self.type_manager)
self.manager = mock.patch.object(
plugin_rpc.manager, 'NeutronManager').start()
self.l3plugin = mock.Mock()
@ -234,6 +238,17 @@ class RpcApiTestCase(base.BaseTestCase):
fanout=True,
tunnel_ip='fake_ip', tunnel_type='gre')
def test_tunnel_delete(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
self._test_rpc_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
type_tunnel.TUNNEL,
topics.DELETE),
'tunnel_delete', rpc_method='cast',
fanout=True,
tunnel_ip='fake_ip', tunnel_type='gre')
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_rpc_api(rpcapi, None,
@ -263,7 +278,9 @@ class RpcApiTestCase(base.BaseTestCase):
self._test_rpc_api(rpcapi, None,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip',
tunnel_type=None)
tunnel_type=None,
host='fake_host',
version='1.4')
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)

View File

@ -17,6 +17,7 @@ import mock
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_gre
from neutron.tests.unit.ml2 import test_rpcapi
from neutron.tests.unit.ml2 import test_type_tunnel
from neutron.tests.unit import testlib_api
@ -85,5 +86,12 @@ class GreTypeTest(test_type_tunnel.TunnelTypeTestMixin,
class GreTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin,
testlib_api.SqlTestCase):
testlib_api.SqlTestCase):
DRIVER_CLASS = type_gre.GreTypeDriver
class GreTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin,
test_rpcapi.RpcCallbacksTestCase,
testlib_api.SqlTestCase):
DRIVER_CLASS = type_gre.GreTypeDriver
TYPE = p_const.TYPE_GRE

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import mock
from six import moves
import testtools
from testtools import matchers
@ -21,6 +23,10 @@ from neutron.common import exceptions as exc
from neutron.db import api as db
from neutron.plugins.ml2 import driver_api as api
TUNNEL_IP_ONE = "10.10.10.10"
TUNNEL_IP_TWO = "10.10.10.20"
HOST_ONE = 'fake_host_one'
HOST_TWO = 'fake_host_two'
TUN_MIN = 100
TUN_MAX = 109
TUNNEL_RANGES = [(TUN_MIN, TUN_MAX)]
@ -219,3 +225,94 @@ class TunnelTypeMultiRangeTestMixin(object):
self.TUN_MIN1, self.TUN_MAX1):
alloc = self.driver.get_allocation(self.session, key)
self.assertFalse(alloc.allocated)
class TunnelRpcCallbackTestMixin(object):
DRIVER_CLASS = None
TYPE = None
def setUp(self):
super(TunnelRpcCallbackTestMixin, self).setUp()
self.driver = self.DRIVER_CLASS()
def _test_tunnel_sync(self, kwargs, delete_tunnel=False):
with contextlib.nested(
mock.patch.object(self.notifier, 'tunnel_update'),
mock.patch.object(self.notifier, 'tunnel_delete')
) as (tunnel_update, tunnel_delete):
details = self.callbacks.tunnel_sync('fake_context', **kwargs)
tunnels = details['tunnels']
for tunnel in tunnels:
self.assertEqual(kwargs['tunnel_ip'], tunnel['ip_address'])
self.assertEqual(kwargs['host'], tunnel['host'])
self.assertTrue(tunnel_update.called)
if delete_tunnel:
self.assertTrue(tunnel_delete.called)
else:
self.assertFalse(tunnel_delete.called)
def _test_tunnel_sync_raises(self, kwargs):
with contextlib.nested(
mock.patch.object(self.notifier, 'tunnel_update'),
mock.patch.object(self.notifier, 'tunnel_delete')
) as (tunnel_update, tunnel_delete):
self.assertRaises(exc.InvalidInput,
self.callbacks.tunnel_sync,
'fake_context', **kwargs)
self.assertFalse(tunnel_update.called)
self.assertFalse(tunnel_delete.called)
def test_tunnel_sync_called_without_host_passed(self):
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
'host': None}
self._test_tunnel_sync(kwargs)
def test_tunnel_sync_called_with_host_passed_for_existing_tunnel_ip(self):
self.driver.add_endpoint(TUNNEL_IP_ONE, None)
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
'host': HOST_ONE}
self._test_tunnel_sync(kwargs)
def test_tunnel_sync_called_with_host_passed(self):
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
'host': HOST_ONE}
self._test_tunnel_sync(kwargs)
def test_tunnel_sync_called_for_existing_endpoint(self):
self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
'host': HOST_ONE}
self._test_tunnel_sync(kwargs)
def test_tunnel_sync_called_for_existing_host_with_tunnel_ip_changed(self):
self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
kwargs = {'tunnel_ip': TUNNEL_IP_TWO, 'tunnel_type': self.TYPE,
'host': HOST_ONE}
self._test_tunnel_sync(kwargs, True)
def test_tunnel_sync_called_with_used_tunnel_ip_case_one(self):
self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
'host': HOST_TWO}
self._test_tunnel_sync_raises(kwargs)
def test_tunnel_sync_called_with_used_tunnel_ip_case_two(self):
self.driver.add_endpoint(TUNNEL_IP_ONE, None)
self.driver.add_endpoint(TUNNEL_IP_TWO, HOST_TWO)
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
'host': HOST_TWO}
self._test_tunnel_sync_raises(kwargs)
def test_tunnel_sync_called_without_tunnel_ip(self):
kwargs = {'tunnel_type': self.TYPE, 'host': None}
self._test_tunnel_sync_raises(kwargs)
def test_tunnel_sync_called_without_tunnel_type(self):
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'host': None}
self._test_tunnel_sync_raises(kwargs)

View File

@ -17,6 +17,7 @@ import mock
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers import type_vxlan
from neutron.tests.unit.ml2 import test_rpcapi
from neutron.tests.unit.ml2 import test_type_tunnel
from neutron.tests.unit import testlib_api
@ -96,3 +97,10 @@ class VxlanTypeTest(test_type_tunnel.TunnelTypeTestMixin,
class VxlanTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin,
testlib_api.SqlTestCase):
DRIVER_CLASS = type_vxlan.VxlanTypeDriver
class VxlanTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin,
test_rpcapi.RpcCallbacksTestCase,
testlib_api.SqlTestCase):
DRIVER_CLASS = type_vxlan.VxlanTypeDriver
TYPE = p_const.TYPE_VXLAN

View File

@ -761,6 +761,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
self.agent.local_ip = 'agent_ip'
self.agent.context = 'fake_context'
self.agent.tunnel_types = ['vxlan']
self.agent.host = cfg.CONF.host
with mock.patch.object(
self.agent.plugin_rpc, 'tunnel_sync'
) as tunnel_sync_rpc_fn:
@ -768,7 +769,8 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
tunnel_sync_rpc_fn.assert_called_once_with(
self.agent.context,
self.agent.local_ip,
self.agent.tunnel_types[0])
self.agent.tunnel_types[0],
self.agent.host)
def test__get_ports(self):
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')

View File

@ -966,6 +966,18 @@ class TestOvsNeutronAgent(base.BaseTestCase):
mock.call(self.agent.tun_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')]
self.agent._setup_tunnel_port.assert_has_calls(expected_calls)
def test_tunnel_delete(self):
kwargs = {'tunnel_ip': '10.10.10.10',
'tunnel_type': 'gre'}
self.agent.enable_tunneling = True
self.agent.tunnel_types = ['gre']
self.agent.tun_br_ofports = {'gre': {'10.10.10.10': '1'}}
with mock.patch.object(
self.agent, 'cleanup_tunnel_port'
) as clean_tun_fn:
self.agent.tunnel_delete(context=None, **kwargs)
self.assertTrue(clean_tun_fn.called)
def test_ovs_status(self):
reply2 = {'current': set(['tap0']),
'added': set(['tap2']),