From 67c4c6d809e4c9e112d9fb848b5bdce9d5cd04ac Mon Sep 17 00:00:00 2001 From: Romil Gupta Date: Thu, 20 Nov 2014 11:32:07 -0800 Subject: [PATCH] Stale VXLAN and GRE tunnel port/flow deletion Description: Stale GRE and VXLAN tunnel endpoints persists in neutron db this should be deleted from the database. Also, if local_ip of L2 agent changes the stale tunnel ports and flows persists on br-tun on other Compute Nodes and Network Nodes for that remote ip this should also be removed. Implementation Plugin changes: The plugin side changes are covered in following patch-set https://review.openstack.org/#/c/121000/. Agent changes: Added tunnel_delete rpc for removing stale ports and flows from br-tun. tunnel_sync rpc signature upgrade to obtain 'host'. Added testcases for TunnelRpcCallbackMixin(). This patch-set agent deals with agent side changes. Closes-Bug: #1179223 Closes-Bug: #1381071 Closes-Bug: #1276629 Co-Authored-By: Aman Kumar Co-Authored-By: phanipawan Change-Id: I291992ffde5c3ab7152f0d7462deca2e4ac1ba3f --- neutron/agent/rpc.py | 16 ++- neutron/plugins/ml2/drivers/type_tunnel.py | 11 ++- neutron/plugins/ml2/rpc.py | 3 +- .../ofagent/agent/ofa_neutron_agent.py | 3 +- .../openvswitch/agent/ovs_neutron_agent.py | 26 ++++- .../tests/unit/hyperv/test_hyperv_rpcapi.py | 4 +- neutron/tests/unit/ml2/test_rpcapi.py | 21 +++- neutron/tests/unit/ml2/test_type_gre.py | 10 +- neutron/tests/unit/ml2/test_type_tunnel.py | 97 +++++++++++++++++++ neutron/tests/unit/ml2/test_type_vxlan.py | 8 ++ .../unit/ofagent/test_ofa_neutron_agent.py | 4 +- .../openvswitch/test_ovs_neutron_agent.py | 12 +++ 12 files changed, 200 insertions(+), 15 deletions(-) diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 9909856e0f3..f8adbc2b936 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -79,6 +79,7 @@ class PluginApi(object): 1.3 - get_device_details rpc signature upgrade to obtain 'host' and return value to include fixed_ips and device_owner for the device port + 1.4 - tunnel_sync rpc signature upgrade to obtain 'host' ''' def __init__(self, topic): @@ -117,7 +118,14 @@ class PluginApi(object): return cctxt.call(context, 'update_device_up', device=device, agent_id=agent_id, host=host) - def tunnel_sync(self, context, tunnel_ip, tunnel_type=None): - cctxt = self.client.prepare() - return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip, - tunnel_type=tunnel_type) + def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None): + try: + cctxt = self.client.prepare(version='1.4') + res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip, + tunnel_type=tunnel_type, host=host) + except oslo_messaging.UnsupportedVersion: + LOG.warn(_LW('Tunnel synchronization requires a server upgrade.')) + cctxt = self.client.prepare() + res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip, + tunnel_type=tunnel_type) + return res diff --git a/neutron/plugins/ml2/drivers/type_tunnel.py b/neutron/plugins/ml2/drivers/type_tunnel.py index c64e24e8e18..25472438ac4 100644 --- a/neutron/plugins/ml2/drivers/type_tunnel.py +++ b/neutron/plugins/ml2/drivers/type_tunnel.py @@ -265,4 +265,13 @@ class TunnelAgentRpcApiMixin(object): cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip, tunnel_type=tunnel_type) - # TODO(romilg): Add tunnel_delete rpc in dependent patch-set + def _get_tunnel_delete_topic(self): + return topics.get_topic_name(self.topic, + TUNNEL, + topics.DELETE) + + def tunnel_delete(self, context, tunnel_ip, tunnel_type): + cctxt = self.client.prepare(topic=self._get_tunnel_delete_topic(), + fanout=True) + cctxt.cast(context, 'tunnel_delete', tunnel_ip=tunnel_ip, + tunnel_type=tunnel_type) diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index b08d06be169..c92e5474046 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -45,7 +45,8 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin): # 1.3 get_device_details rpc signature upgrade to obtain 'host' and # return value to include fixed_ips and device_owner for # the device port - target = oslo_messaging.Target(version='1.3') + # 1.4 tunnel_sync rpc signature upgrade to obtain 'host' + target = oslo_messaging.Target(version='1.4') def __init__(self, notifier, type_manager): self.setup_tunnel_callback_mixin(notifier, type_manager) diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py index 6525df0af18..98856c5a530 100644 --- a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -787,7 +787,8 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, for tunnel_type in self.tunnel_types: self.plugin_rpc.tunnel_sync(self.context, self.local_ip, - tunnel_type) + tunnel_type, + cfg.CONF.host) except Exception as e: LOG.debug("Unable to sync tunnel IP %(local_ip)s: %(e)s", {'local_ip': self.local_ip, 'e': e}) diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index b1a3565bb7c..311b4a2af47 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -296,6 +296,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], [constants.TUNNEL, topics.UPDATE], + [constants.TUNNEL, topics.DELETE], [topics.SECURITY_GROUP, topics.UPDATE], [topics.DVR, topics.UPDATE]] if self.l2_pop: @@ -354,6 +355,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip, tunnel_type) + def tunnel_delete(self, context, **kwargs): + LOG.debug("tunnel_delete received") + if not self.enable_tunneling: + return + tunnel_ip = kwargs.get('tunnel_ip') + if not tunnel_ip: + LOG.error(_LE("No tunnel_ip specified, cannot delete tunnels")) + return + tunnel_type = kwargs.get('tunnel_type') + if not tunnel_type: + LOG.error(_LE("No tunnel_type specified, cannot delete tunnels")) + return + if tunnel_type not in self.tunnel_types: + LOG.error(_LE("tunnel_type %s not supported by agent"), + tunnel_type) + return + ofport = self.tun_br_ofports[tunnel_type].get(tunnel_ip) + self.cleanup_tunnel_port(self.tun_br, ofport, tunnel_type) + def fdb_add(self, context, fdb_entries): LOG.debug("fdb_add received") for lvm, agent_ports in self.get_agent_ports(fdb_entries, @@ -1309,8 +1329,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, try: return '%08x' % netaddr.IPAddress(ip_address, version=4) except Exception: - LOG.warn(_LW("Unable to create tunnel port. " - "Invalid remote IP: %s"), ip_address) + LOG.warn(_LW("Invalid remote IP: %s"), ip_address) return def tunnel_sync(self): @@ -1318,7 +1337,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, for tunnel_type in self.tunnel_types: details = self.plugin_rpc.tunnel_sync(self.context, self.local_ip, - tunnel_type) + tunnel_type, + cfg.CONF.host) if not self.l2_pop: tunnels = details['tunnels'] for tunnel in tunnels: diff --git a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py index 7f2c8672ff3..6c144ad92f9 100644 --- a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py +++ b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py @@ -142,4 +142,6 @@ class rpcHyperVApiTestCase(base.BaseTestCase): rpcapi, None, 'tunnel_sync', rpc_method='call', tunnel_ip='fake_tunnel_ip', - tunnel_type=None) + tunnel_type=None, + host='fake_host', + version='1.4') diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index a3b4955f62a..efb1dbd5784 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -29,6 +29,7 @@ from neutron.common import constants from neutron.common import exceptions from neutron.common import topics from neutron.plugins.ml2.drivers import type_tunnel +from neutron.plugins.ml2 import managers from neutron.plugins.ml2 import rpc as plugin_rpc from neutron.tests import base @@ -37,7 +38,10 @@ class RpcCallbacksTestCase(base.BaseTestCase): def setUp(self): super(RpcCallbacksTestCase, self).setUp() - self.callbacks = plugin_rpc.RpcCallbacks(mock.Mock(), mock.Mock()) + self.type_manager = managers.TypeManager() + self.notifier = plugin_rpc.AgentNotifierApi(topics.AGENT) + self.callbacks = plugin_rpc.RpcCallbacks(self.notifier, + self.type_manager) self.manager = mock.patch.object( plugin_rpc.manager, 'NeutronManager').start() self.l3plugin = mock.Mock() @@ -234,6 +238,17 @@ class RpcApiTestCase(base.BaseTestCase): fanout=True, tunnel_ip='fake_ip', tunnel_type='gre') + def test_tunnel_delete(self): + rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) + self._test_rpc_api( + rpcapi, + topics.get_topic_name(topics.AGENT, + type_tunnel.TUNNEL, + topics.DELETE), + 'tunnel_delete', rpc_method='cast', + fanout=True, + tunnel_ip='fake_ip', tunnel_type='gre') + def test_device_details(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) self._test_rpc_api(rpcapi, None, @@ -263,7 +278,9 @@ class RpcApiTestCase(base.BaseTestCase): self._test_rpc_api(rpcapi, None, 'tunnel_sync', rpc_method='call', tunnel_ip='fake_tunnel_ip', - tunnel_type=None) + tunnel_type=None, + host='fake_host', + version='1.4') def test_update_device_up(self): rpcapi = agent_rpc.PluginApi(topics.PLUGIN) diff --git a/neutron/tests/unit/ml2/test_type_gre.py b/neutron/tests/unit/ml2/test_type_gre.py index 3bdb4e103af..6c4051792ce 100644 --- a/neutron/tests/unit/ml2/test_type_gre.py +++ b/neutron/tests/unit/ml2/test_type_gre.py @@ -17,6 +17,7 @@ import mock from neutron.plugins.common import constants as p_const from neutron.plugins.ml2.drivers import type_gre +from neutron.tests.unit.ml2 import test_rpcapi from neutron.tests.unit.ml2 import test_type_tunnel from neutron.tests.unit import testlib_api @@ -85,5 +86,12 @@ class GreTypeTest(test_type_tunnel.TunnelTypeTestMixin, class GreTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin, - testlib_api.SqlTestCase): + testlib_api.SqlTestCase): DRIVER_CLASS = type_gre.GreTypeDriver + + +class GreTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin, + test_rpcapi.RpcCallbacksTestCase, + testlib_api.SqlTestCase): + DRIVER_CLASS = type_gre.GreTypeDriver + TYPE = p_const.TYPE_GRE diff --git a/neutron/tests/unit/ml2/test_type_tunnel.py b/neutron/tests/unit/ml2/test_type_tunnel.py index 2095f90eb31..6637c6035b2 100644 --- a/neutron/tests/unit/ml2/test_type_tunnel.py +++ b/neutron/tests/unit/ml2/test_type_tunnel.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import contextlib +import mock from six import moves import testtools from testtools import matchers @@ -21,6 +23,10 @@ from neutron.common import exceptions as exc from neutron.db import api as db from neutron.plugins.ml2 import driver_api as api +TUNNEL_IP_ONE = "10.10.10.10" +TUNNEL_IP_TWO = "10.10.10.20" +HOST_ONE = 'fake_host_one' +HOST_TWO = 'fake_host_two' TUN_MIN = 100 TUN_MAX = 109 TUNNEL_RANGES = [(TUN_MIN, TUN_MAX)] @@ -219,3 +225,94 @@ class TunnelTypeMultiRangeTestMixin(object): self.TUN_MIN1, self.TUN_MAX1): alloc = self.driver.get_allocation(self.session, key) self.assertFalse(alloc.allocated) + + +class TunnelRpcCallbackTestMixin(object): + + DRIVER_CLASS = None + TYPE = None + + def setUp(self): + super(TunnelRpcCallbackTestMixin, self).setUp() + self.driver = self.DRIVER_CLASS() + + def _test_tunnel_sync(self, kwargs, delete_tunnel=False): + with contextlib.nested( + mock.patch.object(self.notifier, 'tunnel_update'), + mock.patch.object(self.notifier, 'tunnel_delete') + ) as (tunnel_update, tunnel_delete): + details = self.callbacks.tunnel_sync('fake_context', **kwargs) + tunnels = details['tunnels'] + for tunnel in tunnels: + self.assertEqual(kwargs['tunnel_ip'], tunnel['ip_address']) + self.assertEqual(kwargs['host'], tunnel['host']) + self.assertTrue(tunnel_update.called) + if delete_tunnel: + self.assertTrue(tunnel_delete.called) + else: + self.assertFalse(tunnel_delete.called) + + def _test_tunnel_sync_raises(self, kwargs): + with contextlib.nested( + mock.patch.object(self.notifier, 'tunnel_update'), + mock.patch.object(self.notifier, 'tunnel_delete') + ) as (tunnel_update, tunnel_delete): + self.assertRaises(exc.InvalidInput, + self.callbacks.tunnel_sync, + 'fake_context', **kwargs) + self.assertFalse(tunnel_update.called) + self.assertFalse(tunnel_delete.called) + + def test_tunnel_sync_called_without_host_passed(self): + kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE, + 'host': None} + self._test_tunnel_sync(kwargs) + + def test_tunnel_sync_called_with_host_passed_for_existing_tunnel_ip(self): + self.driver.add_endpoint(TUNNEL_IP_ONE, None) + + kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE, + 'host': HOST_ONE} + self._test_tunnel_sync(kwargs) + + def test_tunnel_sync_called_with_host_passed(self): + kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE, + 'host': HOST_ONE} + self._test_tunnel_sync(kwargs) + + def test_tunnel_sync_called_for_existing_endpoint(self): + self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE) + + kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE, + 'host': HOST_ONE} + self._test_tunnel_sync(kwargs) + + def test_tunnel_sync_called_for_existing_host_with_tunnel_ip_changed(self): + self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE) + + kwargs = {'tunnel_ip': TUNNEL_IP_TWO, 'tunnel_type': self.TYPE, + 'host': HOST_ONE} + self._test_tunnel_sync(kwargs, True) + + def test_tunnel_sync_called_with_used_tunnel_ip_case_one(self): + self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE) + + kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE, + 'host': HOST_TWO} + self._test_tunnel_sync_raises(kwargs) + + def test_tunnel_sync_called_with_used_tunnel_ip_case_two(self): + self.driver.add_endpoint(TUNNEL_IP_ONE, None) + self.driver.add_endpoint(TUNNEL_IP_TWO, HOST_TWO) + + kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE, + 'host': HOST_TWO} + self._test_tunnel_sync_raises(kwargs) + + def test_tunnel_sync_called_without_tunnel_ip(self): + kwargs = {'tunnel_type': self.TYPE, 'host': None} + self._test_tunnel_sync_raises(kwargs) + + def test_tunnel_sync_called_without_tunnel_type(self): + kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'host': None} + self._test_tunnel_sync_raises(kwargs) diff --git a/neutron/tests/unit/ml2/test_type_vxlan.py b/neutron/tests/unit/ml2/test_type_vxlan.py index bf57b1d4edf..b794261b2ac 100644 --- a/neutron/tests/unit/ml2/test_type_vxlan.py +++ b/neutron/tests/unit/ml2/test_type_vxlan.py @@ -17,6 +17,7 @@ import mock from neutron.plugins.common import constants as p_const from neutron.plugins.ml2.drivers import type_vxlan +from neutron.tests.unit.ml2 import test_rpcapi from neutron.tests.unit.ml2 import test_type_tunnel from neutron.tests.unit import testlib_api @@ -96,3 +97,10 @@ class VxlanTypeTest(test_type_tunnel.TunnelTypeTestMixin, class VxlanTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin, testlib_api.SqlTestCase): DRIVER_CLASS = type_vxlan.VxlanTypeDriver + + +class VxlanTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin, + test_rpcapi.RpcCallbacksTestCase, + testlib_api.SqlTestCase): + DRIVER_CLASS = type_vxlan.VxlanTypeDriver + TYPE = p_const.TYPE_VXLAN diff --git a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py index 8398a2394b1..ee728a85e8a 100644 --- a/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py +++ b/neutron/tests/unit/ofagent/test_ofa_neutron_agent.py @@ -761,6 +761,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase): self.agent.local_ip = 'agent_ip' self.agent.context = 'fake_context' self.agent.tunnel_types = ['vxlan'] + self.agent.host = cfg.CONF.host with mock.patch.object( self.agent.plugin_rpc, 'tunnel_sync' ) as tunnel_sync_rpc_fn: @@ -768,7 +769,8 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase): tunnel_sync_rpc_fn.assert_called_once_with( self.agent.context, self.agent.local_ip, - self.agent.tunnel_types[0]) + self.agent.tunnel_types[0], + self.agent.host) def test__get_ports(self): ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index 3e124fbf69a..03d5a724b85 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -966,6 +966,18 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.call(self.agent.tun_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')] self.agent._setup_tunnel_port.assert_has_calls(expected_calls) + def test_tunnel_delete(self): + kwargs = {'tunnel_ip': '10.10.10.10', + 'tunnel_type': 'gre'} + self.agent.enable_tunneling = True + self.agent.tunnel_types = ['gre'] + self.agent.tun_br_ofports = {'gre': {'10.10.10.10': '1'}} + with mock.patch.object( + self.agent, 'cleanup_tunnel_port' + ) as clean_tun_fn: + self.agent.tunnel_delete(context=None, **kwargs) + self.assertTrue(clean_tun_fn.called) + def test_ovs_status(self): reply2 = {'current': set(['tap0']), 'added': set(['tap2']),