Stale VXLAN and GRE tunnel port/flow deletion
Description: Stale GRE and VXLAN tunnel endpoints persists in neutron db this should be deleted from the database. Also, if local_ip of L2 agent changes the stale tunnel ports and flows persists on br-tun on other Compute Nodes and Network Nodes for that remote ip this should also be removed. Implementation Plugin changes: The plugin side changes are covered in following patch-set https://review.openstack.org/#/c/121000/. Agent changes: Added tunnel_delete rpc for removing stale ports and flows from br-tun. tunnel_sync rpc signature upgrade to obtain 'host'. Added testcases for TunnelRpcCallbackMixin(). This patch-set agent deals with agent side changes. Closes-Bug: #1179223 Closes-Bug: #1381071 Closes-Bug: #1276629 Co-Authored-By: Aman Kumar <amank@hp.com> Co-Authored-By: phanipawan <ppawan@hp.com> Change-Id: I291992ffde5c3ab7152f0d7462deca2e4ac1ba3f
This commit is contained in:
parent
ec3f7562e0
commit
67c4c6d809
|
@ -79,6 +79,7 @@ class PluginApi(object):
|
|||
1.3 - get_device_details rpc signature upgrade to obtain 'host' and
|
||||
return value to include fixed_ips and device_owner for
|
||||
the device port
|
||||
1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
|
||||
'''
|
||||
|
||||
def __init__(self, topic):
|
||||
|
@ -117,7 +118,14 @@ class PluginApi(object):
|
|||
return cctxt.call(context, 'update_device_up', device=device,
|
||||
agent_id=agent_id, host=host)
|
||||
|
||||
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
|
||||
cctxt = self.client.prepare()
|
||||
return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
|
||||
tunnel_type=tunnel_type)
|
||||
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None, host=None):
|
||||
try:
|
||||
cctxt = self.client.prepare(version='1.4')
|
||||
res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
|
||||
tunnel_type=tunnel_type, host=host)
|
||||
except oslo_messaging.UnsupportedVersion:
|
||||
LOG.warn(_LW('Tunnel synchronization requires a server upgrade.'))
|
||||
cctxt = self.client.prepare()
|
||||
res = cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
|
||||
tunnel_type=tunnel_type)
|
||||
return res
|
||||
|
|
|
@ -265,4 +265,13 @@ class TunnelAgentRpcApiMixin(object):
|
|||
cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
|
||||
tunnel_type=tunnel_type)
|
||||
|
||||
# TODO(romilg): Add tunnel_delete rpc in dependent patch-set
|
||||
def _get_tunnel_delete_topic(self):
|
||||
return topics.get_topic_name(self.topic,
|
||||
TUNNEL,
|
||||
topics.DELETE)
|
||||
|
||||
def tunnel_delete(self, context, tunnel_ip, tunnel_type):
|
||||
cctxt = self.client.prepare(topic=self._get_tunnel_delete_topic(),
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'tunnel_delete', tunnel_ip=tunnel_ip,
|
||||
tunnel_type=tunnel_type)
|
||||
|
|
|
@ -45,7 +45,8 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
|
|||
# 1.3 get_device_details rpc signature upgrade to obtain 'host' and
|
||||
# return value to include fixed_ips and device_owner for
|
||||
# the device port
|
||||
target = oslo_messaging.Target(version='1.3')
|
||||
# 1.4 tunnel_sync rpc signature upgrade to obtain 'host'
|
||||
target = oslo_messaging.Target(version='1.4')
|
||||
|
||||
def __init__(self, notifier, type_manager):
|
||||
self.setup_tunnel_callback_mixin(notifier, type_manager)
|
||||
|
|
|
@ -787,7 +787,8 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
for tunnel_type in self.tunnel_types:
|
||||
self.plugin_rpc.tunnel_sync(self.context,
|
||||
self.local_ip,
|
||||
tunnel_type)
|
||||
tunnel_type,
|
||||
cfg.CONF.host)
|
||||
except Exception as e:
|
||||
LOG.debug("Unable to sync tunnel IP %(local_ip)s: %(e)s",
|
||||
{'local_ip': self.local_ip, 'e': e})
|
||||
|
|
|
@ -296,6 +296,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
consumers = [[topics.PORT, topics.UPDATE],
|
||||
[topics.NETWORK, topics.DELETE],
|
||||
[constants.TUNNEL, topics.UPDATE],
|
||||
[constants.TUNNEL, topics.DELETE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE],
|
||||
[topics.DVR, topics.UPDATE]]
|
||||
if self.l2_pop:
|
||||
|
@ -354,6 +355,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
self._setup_tunnel_port(self.tun_br, tun_name, tunnel_ip,
|
||||
tunnel_type)
|
||||
|
||||
def tunnel_delete(self, context, **kwargs):
|
||||
LOG.debug("tunnel_delete received")
|
||||
if not self.enable_tunneling:
|
||||
return
|
||||
tunnel_ip = kwargs.get('tunnel_ip')
|
||||
if not tunnel_ip:
|
||||
LOG.error(_LE("No tunnel_ip specified, cannot delete tunnels"))
|
||||
return
|
||||
tunnel_type = kwargs.get('tunnel_type')
|
||||
if not tunnel_type:
|
||||
LOG.error(_LE("No tunnel_type specified, cannot delete tunnels"))
|
||||
return
|
||||
if tunnel_type not in self.tunnel_types:
|
||||
LOG.error(_LE("tunnel_type %s not supported by agent"),
|
||||
tunnel_type)
|
||||
return
|
||||
ofport = self.tun_br_ofports[tunnel_type].get(tunnel_ip)
|
||||
self.cleanup_tunnel_port(self.tun_br, ofport, tunnel_type)
|
||||
|
||||
def fdb_add(self, context, fdb_entries):
|
||||
LOG.debug("fdb_add received")
|
||||
for lvm, agent_ports in self.get_agent_ports(fdb_entries,
|
||||
|
@ -1309,8 +1329,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
try:
|
||||
return '%08x' % netaddr.IPAddress(ip_address, version=4)
|
||||
except Exception:
|
||||
LOG.warn(_LW("Unable to create tunnel port. "
|
||||
"Invalid remote IP: %s"), ip_address)
|
||||
LOG.warn(_LW("Invalid remote IP: %s"), ip_address)
|
||||
return
|
||||
|
||||
def tunnel_sync(self):
|
||||
|
@ -1318,7 +1337,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
for tunnel_type in self.tunnel_types:
|
||||
details = self.plugin_rpc.tunnel_sync(self.context,
|
||||
self.local_ip,
|
||||
tunnel_type)
|
||||
tunnel_type,
|
||||
cfg.CONF.host)
|
||||
if not self.l2_pop:
|
||||
tunnels = details['tunnels']
|
||||
for tunnel in tunnels:
|
||||
|
|
|
@ -142,4 +142,6 @@ class rpcHyperVApiTestCase(base.BaseTestCase):
|
|||
rpcapi, None,
|
||||
'tunnel_sync', rpc_method='call',
|
||||
tunnel_ip='fake_tunnel_ip',
|
||||
tunnel_type=None)
|
||||
tunnel_type=None,
|
||||
host='fake_host',
|
||||
version='1.4')
|
||||
|
|
|
@ -29,6 +29,7 @@ from neutron.common import constants
|
|||
from neutron.common import exceptions
|
||||
from neutron.common import topics
|
||||
from neutron.plugins.ml2.drivers import type_tunnel
|
||||
from neutron.plugins.ml2 import managers
|
||||
from neutron.plugins.ml2 import rpc as plugin_rpc
|
||||
from neutron.tests import base
|
||||
|
||||
|
@ -37,7 +38,10 @@ class RpcCallbacksTestCase(base.BaseTestCase):
|
|||
|
||||
def setUp(self):
|
||||
super(RpcCallbacksTestCase, self).setUp()
|
||||
self.callbacks = plugin_rpc.RpcCallbacks(mock.Mock(), mock.Mock())
|
||||
self.type_manager = managers.TypeManager()
|
||||
self.notifier = plugin_rpc.AgentNotifierApi(topics.AGENT)
|
||||
self.callbacks = plugin_rpc.RpcCallbacks(self.notifier,
|
||||
self.type_manager)
|
||||
self.manager = mock.patch.object(
|
||||
plugin_rpc.manager, 'NeutronManager').start()
|
||||
self.l3plugin = mock.Mock()
|
||||
|
@ -234,6 +238,17 @@ class RpcApiTestCase(base.BaseTestCase):
|
|||
fanout=True,
|
||||
tunnel_ip='fake_ip', tunnel_type='gre')
|
||||
|
||||
def test_tunnel_delete(self):
|
||||
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
|
||||
self._test_rpc_api(
|
||||
rpcapi,
|
||||
topics.get_topic_name(topics.AGENT,
|
||||
type_tunnel.TUNNEL,
|
||||
topics.DELETE),
|
||||
'tunnel_delete', rpc_method='cast',
|
||||
fanout=True,
|
||||
tunnel_ip='fake_ip', tunnel_type='gre')
|
||||
|
||||
def test_device_details(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
self._test_rpc_api(rpcapi, None,
|
||||
|
@ -263,7 +278,9 @@ class RpcApiTestCase(base.BaseTestCase):
|
|||
self._test_rpc_api(rpcapi, None,
|
||||
'tunnel_sync', rpc_method='call',
|
||||
tunnel_ip='fake_tunnel_ip',
|
||||
tunnel_type=None)
|
||||
tunnel_type=None,
|
||||
host='fake_host',
|
||||
version='1.4')
|
||||
|
||||
def test_update_device_up(self):
|
||||
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
|
||||
|
|
|
@ -17,6 +17,7 @@ import mock
|
|||
|
||||
from neutron.plugins.common import constants as p_const
|
||||
from neutron.plugins.ml2.drivers import type_gre
|
||||
from neutron.tests.unit.ml2 import test_rpcapi
|
||||
from neutron.tests.unit.ml2 import test_type_tunnel
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
|
@ -85,5 +86,12 @@ class GreTypeTest(test_type_tunnel.TunnelTypeTestMixin,
|
|||
|
||||
|
||||
class GreTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin,
|
||||
testlib_api.SqlTestCase):
|
||||
testlib_api.SqlTestCase):
|
||||
DRIVER_CLASS = type_gre.GreTypeDriver
|
||||
|
||||
|
||||
class GreTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin,
|
||||
test_rpcapi.RpcCallbacksTestCase,
|
||||
testlib_api.SqlTestCase):
|
||||
DRIVER_CLASS = type_gre.GreTypeDriver
|
||||
TYPE = p_const.TYPE_GRE
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
import mock
|
||||
from six import moves
|
||||
import testtools
|
||||
from testtools import matchers
|
||||
|
@ -21,6 +23,10 @@ from neutron.common import exceptions as exc
|
|||
from neutron.db import api as db
|
||||
from neutron.plugins.ml2 import driver_api as api
|
||||
|
||||
TUNNEL_IP_ONE = "10.10.10.10"
|
||||
TUNNEL_IP_TWO = "10.10.10.20"
|
||||
HOST_ONE = 'fake_host_one'
|
||||
HOST_TWO = 'fake_host_two'
|
||||
TUN_MIN = 100
|
||||
TUN_MAX = 109
|
||||
TUNNEL_RANGES = [(TUN_MIN, TUN_MAX)]
|
||||
|
@ -219,3 +225,94 @@ class TunnelTypeMultiRangeTestMixin(object):
|
|||
self.TUN_MIN1, self.TUN_MAX1):
|
||||
alloc = self.driver.get_allocation(self.session, key)
|
||||
self.assertFalse(alloc.allocated)
|
||||
|
||||
|
||||
class TunnelRpcCallbackTestMixin(object):
|
||||
|
||||
DRIVER_CLASS = None
|
||||
TYPE = None
|
||||
|
||||
def setUp(self):
|
||||
super(TunnelRpcCallbackTestMixin, self).setUp()
|
||||
self.driver = self.DRIVER_CLASS()
|
||||
|
||||
def _test_tunnel_sync(self, kwargs, delete_tunnel=False):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.notifier, 'tunnel_update'),
|
||||
mock.patch.object(self.notifier, 'tunnel_delete')
|
||||
) as (tunnel_update, tunnel_delete):
|
||||
details = self.callbacks.tunnel_sync('fake_context', **kwargs)
|
||||
tunnels = details['tunnels']
|
||||
for tunnel in tunnels:
|
||||
self.assertEqual(kwargs['tunnel_ip'], tunnel['ip_address'])
|
||||
self.assertEqual(kwargs['host'], tunnel['host'])
|
||||
self.assertTrue(tunnel_update.called)
|
||||
if delete_tunnel:
|
||||
self.assertTrue(tunnel_delete.called)
|
||||
else:
|
||||
self.assertFalse(tunnel_delete.called)
|
||||
|
||||
def _test_tunnel_sync_raises(self, kwargs):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(self.notifier, 'tunnel_update'),
|
||||
mock.patch.object(self.notifier, 'tunnel_delete')
|
||||
) as (tunnel_update, tunnel_delete):
|
||||
self.assertRaises(exc.InvalidInput,
|
||||
self.callbacks.tunnel_sync,
|
||||
'fake_context', **kwargs)
|
||||
self.assertFalse(tunnel_update.called)
|
||||
self.assertFalse(tunnel_delete.called)
|
||||
|
||||
def test_tunnel_sync_called_without_host_passed(self):
|
||||
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
|
||||
'host': None}
|
||||
self._test_tunnel_sync(kwargs)
|
||||
|
||||
def test_tunnel_sync_called_with_host_passed_for_existing_tunnel_ip(self):
|
||||
self.driver.add_endpoint(TUNNEL_IP_ONE, None)
|
||||
|
||||
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
|
||||
'host': HOST_ONE}
|
||||
self._test_tunnel_sync(kwargs)
|
||||
|
||||
def test_tunnel_sync_called_with_host_passed(self):
|
||||
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
|
||||
'host': HOST_ONE}
|
||||
self._test_tunnel_sync(kwargs)
|
||||
|
||||
def test_tunnel_sync_called_for_existing_endpoint(self):
|
||||
self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
|
||||
|
||||
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
|
||||
'host': HOST_ONE}
|
||||
self._test_tunnel_sync(kwargs)
|
||||
|
||||
def test_tunnel_sync_called_for_existing_host_with_tunnel_ip_changed(self):
|
||||
self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
|
||||
|
||||
kwargs = {'tunnel_ip': TUNNEL_IP_TWO, 'tunnel_type': self.TYPE,
|
||||
'host': HOST_ONE}
|
||||
self._test_tunnel_sync(kwargs, True)
|
||||
|
||||
def test_tunnel_sync_called_with_used_tunnel_ip_case_one(self):
|
||||
self.driver.add_endpoint(TUNNEL_IP_ONE, HOST_ONE)
|
||||
|
||||
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
|
||||
'host': HOST_TWO}
|
||||
self._test_tunnel_sync_raises(kwargs)
|
||||
|
||||
def test_tunnel_sync_called_with_used_tunnel_ip_case_two(self):
|
||||
self.driver.add_endpoint(TUNNEL_IP_ONE, None)
|
||||
self.driver.add_endpoint(TUNNEL_IP_TWO, HOST_TWO)
|
||||
|
||||
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'tunnel_type': self.TYPE,
|
||||
'host': HOST_TWO}
|
||||
self._test_tunnel_sync_raises(kwargs)
|
||||
|
||||
def test_tunnel_sync_called_without_tunnel_ip(self):
|
||||
kwargs = {'tunnel_type': self.TYPE, 'host': None}
|
||||
self._test_tunnel_sync_raises(kwargs)
|
||||
|
||||
def test_tunnel_sync_called_without_tunnel_type(self):
|
||||
kwargs = {'tunnel_ip': TUNNEL_IP_ONE, 'host': None}
|
||||
self._test_tunnel_sync_raises(kwargs)
|
||||
|
|
|
@ -17,6 +17,7 @@ import mock
|
|||
|
||||
from neutron.plugins.common import constants as p_const
|
||||
from neutron.plugins.ml2.drivers import type_vxlan
|
||||
from neutron.tests.unit.ml2 import test_rpcapi
|
||||
from neutron.tests.unit.ml2 import test_type_tunnel
|
||||
from neutron.tests.unit import testlib_api
|
||||
|
||||
|
@ -96,3 +97,10 @@ class VxlanTypeTest(test_type_tunnel.TunnelTypeTestMixin,
|
|||
class VxlanTypeMultiRangeTest(test_type_tunnel.TunnelTypeMultiRangeTestMixin,
|
||||
testlib_api.SqlTestCase):
|
||||
DRIVER_CLASS = type_vxlan.VxlanTypeDriver
|
||||
|
||||
|
||||
class VxlanTypeRpcCallbackTest(test_type_tunnel.TunnelRpcCallbackTestMixin,
|
||||
test_rpcapi.RpcCallbacksTestCase,
|
||||
testlib_api.SqlTestCase):
|
||||
DRIVER_CLASS = type_vxlan.VxlanTypeDriver
|
||||
TYPE = p_const.TYPE_VXLAN
|
||||
|
|
|
@ -761,6 +761,7 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
|
|||
self.agent.local_ip = 'agent_ip'
|
||||
self.agent.context = 'fake_context'
|
||||
self.agent.tunnel_types = ['vxlan']
|
||||
self.agent.host = cfg.CONF.host
|
||||
with mock.patch.object(
|
||||
self.agent.plugin_rpc, 'tunnel_sync'
|
||||
) as tunnel_sync_rpc_fn:
|
||||
|
@ -768,7 +769,8 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
|
|||
tunnel_sync_rpc_fn.assert_called_once_with(
|
||||
self.agent.context,
|
||||
self.agent.local_ip,
|
||||
self.agent.tunnel_types[0])
|
||||
self.agent.tunnel_types[0],
|
||||
self.agent.host)
|
||||
|
||||
def test__get_ports(self):
|
||||
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
||||
|
|
|
@ -966,6 +966,18 @@ class TestOvsNeutronAgent(base.BaseTestCase):
|
|||
mock.call(self.agent.tun_br, 'gre-0a0a0a0a', '10.10.10.10', 'gre')]
|
||||
self.agent._setup_tunnel_port.assert_has_calls(expected_calls)
|
||||
|
||||
def test_tunnel_delete(self):
|
||||
kwargs = {'tunnel_ip': '10.10.10.10',
|
||||
'tunnel_type': 'gre'}
|
||||
self.agent.enable_tunneling = True
|
||||
self.agent.tunnel_types = ['gre']
|
||||
self.agent.tun_br_ofports = {'gre': {'10.10.10.10': '1'}}
|
||||
with mock.patch.object(
|
||||
self.agent, 'cleanup_tunnel_port'
|
||||
) as clean_tun_fn:
|
||||
self.agent.tunnel_delete(context=None, **kwargs)
|
||||
self.assertTrue(clean_tun_fn.called)
|
||||
|
||||
def test_ovs_status(self):
|
||||
reply2 = {'current': set(['tap0']),
|
||||
'added': set(['tap2']),
|
||||
|
|
Loading…
Reference in New Issue