diff --git a/neutron/cmd/sanity/checks.py b/neutron/cmd/sanity/checks.py index 819d00c23e2..39c97066e5a 100644 --- a/neutron/cmd/sanity/checks.py +++ b/neutron/cmd/sanity/checks.py @@ -134,6 +134,17 @@ def arp_header_match_supported(): actions="NORMAL") +def icmpv6_header_match_supported(): + return ofctl_arg_supported(cmd='add-flow', + table=ovs_const.ARP_SPOOF_TABLE, + priority=1, + dl_type=n_consts.ETHERTYPE_IPV6, + nw_proto=n_consts.PROTO_NUM_ICMP_V6, + icmp_type=n_consts.ICMPV6_TYPE_NA, + nd_target='fdf8:f53b:82e4::10', + actions="NORMAL") + + def vf_management_supported(): is_supported = True required_caps = ( diff --git a/neutron/cmd/sanity_check.py b/neutron/cmd/sanity_check.py index 0cf80a10389..9eca181fa70 100644 --- a/neutron/cmd/sanity_check.py +++ b/neutron/cmd/sanity_check.py @@ -165,6 +165,16 @@ def check_arp_header_match(): return result +def check_icmpv6_header_match(): + result = checks.icmpv6_header_match_supported() + if not result: + LOG.error(_LE('Check for Open vSwitch support of ICMPv6 header ' + 'matching failed. ICMPv6 Neighbor Advt spoofing (part ' + 'of arp spoofing) suppression will not work. A newer ' + 'version of OVS is required.')) + return result + + def check_vf_management(): result = checks.vf_management_supported() if not result: @@ -206,6 +216,8 @@ OPTS = [ help=_('Check for ARP responder support')), BoolOptCallback('arp_header_match', check_arp_header_match, help=_('Check for ARP header match support')), + BoolOptCallback('icmpv6_header_match', check_icmpv6_header_match, + help=_('Check for ICMPv6 header match support')), BoolOptCallback('vf_management', check_vf_management, help=_('Check for VF management support')), BoolOptCallback('read_netns', check_read_netns, @@ -247,6 +259,7 @@ def enable_tests_from_config(): cfg.CONF.set_override('arp_responder', True) if cfg.CONF.AGENT.prevent_arp_spoofing: cfg.CONF.set_override('arp_header_match', True) + cfg.CONF.set_override('icmpv6_header_match', True) if cfg.CONF.ml2_sriov.agent_required: cfg.CONF.set_override('vf_management', True) if not cfg.CONF.AGENT.use_helper_for_ns_read: diff --git a/neutron/common/constants.py b/neutron/common/constants.py index 9a4ada150a5..f15dd3641b5 100644 --- a/neutron/common/constants.py +++ b/neutron/common/constants.py @@ -112,6 +112,8 @@ L3_DISTRIBUTED_EXT_ALIAS = 'dvr' L3_HA_MODE_EXT_ALIAS = 'l3-ha' SUBNET_ALLOCATION_EXT_ALIAS = 'subnet_allocation' +ETHERTYPE_IPV6 = 0x86DD + # Protocol names and numbers for Security Groups/Firewalls PROTO_NAME_TCP = 'tcp' PROTO_NAME_ICMP = 'icmp' @@ -130,6 +132,7 @@ PROTO_NUM_UDP = 17 # Neighbor Advertisement (136) ICMPV6_ALLOWED_TYPES = [130, 131, 132, 135, 136] ICMPV6_TYPE_RA = 134 +ICMPV6_TYPE_NA = 136 DHCPV6_STATEFUL = 'dhcpv6-stateful' DHCPV6_STATELESS = 'dhcpv6-stateless' diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/br_int.py b/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/br_int.py index 76eaf86014b..ce4b790285c 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/br_int.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/native/br_int.py @@ -21,6 +21,8 @@ from oslo_log import log as logging from ryu.lib.packet import ether_types +from ryu.lib.packet import icmpv6 +from ryu.lib.packet import in_proto from neutron.i18n import _LE from neutron.plugins.common import constants as p_const @@ -146,6 +148,34 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge): return ofpp.OFPMatch(in_port=port, eth_type=ether_types.ETH_TYPE_ARP) + @staticmethod + def _icmpv6_reply_match(ofp, ofpp, port): + return ofpp.OFPMatch(in_port=port, + eth_type=ether_types.ETH_TYPE_IPV6, + ip_proto=in_proto.IPPROTO_ICMPV6, + icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT) + + def install_icmpv6_na_spoofing_protection(self, port, ip_addresses): + # Allow neighbor advertisements as long as they match addresses + # that actually belong to the port. + for ip in ip_addresses: + masked_ip = self._cidr_to_ryu(ip) + self.install_normal( + table_id=constants.ARP_SPOOF_TABLE, priority=2, + eth_type=ether_types.ETH_TYPE_IPV6, + ip_proto=in_proto.IPPROTO_ICMPV6, + icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT, + ipv6_nd_target=masked_ip, in_port=port) + + # Now that the rules are ready, direct icmpv6 neighbor advertisement + # traffic from the port into the anti-spoof table. + (_dp, ofp, ofpp) = self._get_dp() + match = self._icmpv6_reply_match(ofp, ofpp, port=port) + self.install_goto(table_id=constants.LOCAL_SWITCHING, + priority=10, + match=match, + dest_table_id=constants.ARP_SPOOF_TABLE) + def install_arp_spoofing_protection(self, port, ip_addresses): # allow ARP replies as long as they match addresses that actually # belong to the port. @@ -171,6 +201,9 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge): def delete_arp_spoofing_protection(self, port): (_dp, ofp, ofpp) = self._get_dp() match = self._arp_reply_match(ofp, ofpp, port=port) + self.delete_flows(table_id=constants.LOCAL_SWITCHING, + match=match) + match = self._icmpv6_reply_match(ofp, ofpp, port=port) self.delete_flows(table_id=constants.LOCAL_SWITCHING, match=match) self.delete_flows(table_id=constants.ARP_SPOOF_TABLE, diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py b/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py index 952513e7176..ef232cd0d14 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py @@ -18,7 +18,7 @@ * references ** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic """ - +from neutron.common import constants as const from neutron.plugins.common import constants as p_const from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \ @@ -110,6 +110,23 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge): self.delete_flows(table_id=constants.LOCAL_SWITCHING, in_port=port, eth_src=mac) + def install_icmpv6_na_spoofing_protection(self, port, ip_addresses): + # Allow neighbor advertisements as long as they match addresses + # that actually belong to the port. + for ip in ip_addresses: + self.install_normal( + table_id=constants.ARP_SPOOF_TABLE, priority=2, + dl_type=const.ETHERTYPE_IPV6, nw_proto=const.PROTO_NUM_ICMP_V6, + icmp_type=const.ICMPV6_TYPE_NA, nd_target=ip, in_port=port) + + # Now that the rules are ready, direct icmpv6 neighbor advertisement + # traffic from the port into the anti-spoof table. + self.add_flow(table=constants.LOCAL_SWITCHING, + priority=10, dl_type=const.ETHERTYPE_IPV6, + nw_proto=const.PROTO_NUM_ICMP_V6, + icmp_type=const.ICMPV6_TYPE_NA, in_port=port, + actions=("resubmit(,%s)" % constants.ARP_SPOOF_TABLE)) + def install_arp_spoofing_protection(self, port, ip_addresses): # allow ARPs as long as they match addresses that actually # belong to the port. @@ -129,5 +146,8 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge): def delete_arp_spoofing_protection(self, port): self.delete_flows(table_id=constants.LOCAL_SWITCHING, in_port=port, proto='arp') + self.delete_flows(table_id=constants.LOCAL_SWITCHING, + in_port=port, nw_proto=const.PROTO_NUM_ICMP_V6, + icmp_type=const.ICMPV6_TYPE_NA) self.delete_flows(table_id=constants.ARP_SPOOF_TABLE, in_port=port) diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 76f0b1dbf11..636da492570 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -40,6 +40,7 @@ from neutron.api.rpc.handlers import dvr_rpc from neutron.common import config from neutron.common import constants as n_const from neutron.common import exceptions +from neutron.common import ipv6_utils as ipv6 from neutron.common import topics from neutron.common import utils as n_utils from neutron import context @@ -96,6 +97,10 @@ class OVSPluginApi(agent_rpc.PluginApi): pass +def has_zero_prefixlen_address(ip_addresses): + return any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in ip_addresses) + + class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, l2population_rpc.L2populationRpcCallBackTunnelMixin, dvr_rpc.DVRAgentRpcCallbackMixin): @@ -867,19 +872,35 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, return # collect all of the addresses and cidrs that belong to the port addresses = {f['ip_address'] for f in port_details['fixed_ips']} + mac_addresses = {vif.vif_mac} if port_details.get('allowed_address_pairs'): addresses |= {p['ip_address'] for p in port_details['allowed_address_pairs']} + mac_addresses |= {p['mac_address'] + for p in port_details['allowed_address_pairs'] + if p.get('mac_address')} - addresses = {ip for ip in addresses - if netaddr.IPNetwork(ip).version == 4} - if any(netaddr.IPNetwork(ip).prefixlen == 0 for ip in addresses): - # don't try to install protection because a /0 prefix allows any - # address anyway and the ARP_SPA can only match on /1 or more. - return + ipv6_addresses = {ip for ip in addresses + if netaddr.IPNetwork(ip).version == 6} + # Allow neighbor advertisements for LLA address. + ipv6_addresses |= {str(ipv6.get_ipv6_addr_by_EUI64( + n_const.IPV6_LLA_PREFIX, mac)) + for mac in mac_addresses} + if not has_zero_prefixlen_address(ipv6_addresses): + # Install protection only when prefix is not zero because a /0 + # prefix allows any address anyway and the nd_target can only + # match on /1 or more. + bridge.install_icmpv6_na_spoofing_protection(port=vif.ofport, + ip_addresses=ipv6_addresses) - bridge.install_arp_spoofing_protection(port=vif.ofport, - ip_addresses=addresses) + ipv4_addresses = {ip for ip in addresses + if netaddr.IPNetwork(ip).version == 4} + if not has_zero_prefixlen_address(ipv4_addresses): + # Install protection only when prefix is not zero because a /0 + # prefix allows any address anyway and the ARP_SPA can only + # match on /1 or more. + bridge.install_arp_spoofing_protection(port=vif.ofport, + ip_addresses=ipv4_addresses) def port_unbound(self, vif_id, net_uuid=None): '''Unbind port. diff --git a/neutron/tests/functional/agent/test_ovs_flows.py b/neutron/tests/functional/agent/test_ovs_flows.py index e0ddbb7102d..94ebb642943 100644 --- a/neutron/tests/functional/agent/test_ovs_flows.py +++ b/neutron/tests/functional/agent/test_ovs_flows.py @@ -162,6 +162,21 @@ class _ARPSpoofTestCase(object): self.dst_p.addr.add('%s/24' % self.dst_addr) net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2) + def test_arp_spoof_blocks_icmpv6_neigh_advt(self): + self.src_addr = '2000::1' + self.dst_addr = '2000::2' + # this will prevent the destination from responding (i.e., icmpv6 + # neighbour advertisement) to the icmpv6 neighbour solicitation + # request for it's own address (2000::2) as spoofing rules added + # below only allow '2000::3'. + self._setup_arp_spoof_for_port(self.dst_p.name, ['2000::3']) + self.src_p.addr.add('%s/64' % self.src_addr) + self.dst_p.addr.add('%s/64' % self.dst_addr) + # make sure the IPv6 addresses are ready before pinging + self.src_p.addr.wait_until_address_ready(self.src_addr) + self.dst_p.addr.wait_until_address_ready(self.dst_addr) + net_helpers.assert_no_ping(self.src_namespace, self.dst_addr, count=2) + def test_arp_spoof_blocks_request(self): # this will prevent the source from sending an ARP # request with its own address @@ -184,6 +199,18 @@ class _ARPSpoofTestCase(object): self.dst_p.addr.add('%s/24' % self.dst_addr) net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2) + def test_arp_spoof_icmpv6_neigh_advt_allowed_address_pairs(self): + self.src_addr = '2000::1' + self.dst_addr = '2000::2' + self._setup_arp_spoof_for_port(self.dst_p.name, ['2000::3', + self.dst_addr]) + self.src_p.addr.add('%s/64' % self.src_addr) + self.dst_p.addr.add('%s/64' % self.dst_addr) + # make sure the IPv6 addresses are ready before pinging + self.src_p.addr.wait_until_address_ready(self.src_addr) + self.dst_p.addr.wait_until_address_ready(self.dst_addr) + net_helpers.assert_ping(self.src_namespace, self.dst_addr, count=2) + def test_arp_spoof_allowed_address_pairs_0cidr(self): self._setup_arp_spoof_for_port(self.dst_p.name, ['9.9.9.9/0', '1.2.3.4']) diff --git a/neutron/tests/functional/sanity/test_sanity.py b/neutron/tests/functional/sanity/test_sanity.py index a47bb4e2759..88846907120 100644 --- a/neutron/tests/functional/sanity/test_sanity.py +++ b/neutron/tests/functional/sanity/test_sanity.py @@ -65,6 +65,9 @@ class SanityTestCaseRoot(functional_base.BaseSudoTestCase): def test_arp_header_match_runs(self): checks.arp_header_match_supported() + def test_icmpv6_header_match_runs(self): + checks.icmpv6_header_match_supported() + def test_vf_management_runs(self): checks.vf_management_supported() diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_int.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_int.py index fab1f247e09..17a865a5566 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_int.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/native/test_br_int.py @@ -283,6 +283,60 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase): ] self.assertEqual(expected, self.mock.mock_calls) + def test_install_icmpv6_na_spoofing_protection(self): + port = 8888 + ip_addresses = ['2001:db8::1', 'fdf8:f53b:82e4::1/128'] + self.br.install_icmpv6_na_spoofing_protection(port, ip_addresses) + (dp, ofp, ofpp) = self._get_dp() + expected = [ + call._send_msg(ofpp.OFPFlowMod(dp, + cookie=0, + instructions=[ + ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [ + ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0), + ]), + ], + match=ofpp.OFPMatch( + eth_type=self.ether_types.ETH_TYPE_IPV6, + icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT, + ip_proto=self.in_proto.IPPROTO_ICMPV6, + ipv6_nd_target='2001:db8::1', + in_port=8888, + ), + priority=2, + table_id=24)), + call._send_msg(ofpp.OFPFlowMod(dp, + cookie=0, + instructions=[ + ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [ + ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0), + ]), + ], + match=ofpp.OFPMatch( + eth_type=self.ether_types.ETH_TYPE_IPV6, + icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT, + ip_proto=self.in_proto.IPPROTO_ICMPV6, + ipv6_nd_target='fdf8:f53b:82e4::1', + in_port=8888, + ), + priority=2, + table_id=24)), + call._send_msg(ofpp.OFPFlowMod(dp, + cookie=0, + instructions=[ + ofpp.OFPInstructionGotoTable(table_id=24), + ], + match=ofpp.OFPMatch( + eth_type=self.ether_types.ETH_TYPE_IPV6, + icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT, + ip_proto=self.in_proto.IPPROTO_ICMPV6, + in_port=8888, + ), + priority=10, + table_id=0)), + ] + self.assertEqual(expected, self.mock.mock_calls) + def test_install_arp_spoofing_protection(self): port = 8888 ip_addresses = ['192.0.2.1', '192.0.2.2/32'] @@ -339,6 +393,11 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase): call.delete_flows(table_id=0, match=ofpp.OFPMatch( eth_type=self.ether_types.ETH_TYPE_ARP, in_port=8888)), + call.delete_flows(table_id=0, match=ofpp.OFPMatch( + eth_type=self.ether_types.ETH_TYPE_IPV6, + icmpv6_type=self.icmpv6.ND_NEIGHBOR_ADVERT, + in_port=8888, + ip_proto=self.in_proto.IPPROTO_ICMPV6)), call.delete_flows(table_id=24, in_port=port), ] self.assertEqual(expected, self.mock.mock_calls) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py index 9bb3c8f2346..8c77e185ceb 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py @@ -16,6 +16,7 @@ import mock +from neutron.common import constants as const from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent.\ openflow.ovs_ofctl import ovs_bridge_test_base @@ -186,6 +187,29 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase): ] self.assertEqual(expected, self.mock.mock_calls) + def test_install_icmpv6_na_spoofing_protection(self): + port = 8888 + ip_addresses = ['2001:db8::1', 'fdf8:f53b:82e4::1/128'] + self.br.install_icmpv6_na_spoofing_protection(port, ip_addresses) + expected = [ + call.add_flow(dl_type=const.ETHERTYPE_IPV6, actions='normal', + icmp_type=const.ICMPV6_TYPE_NA, + nw_proto=const.PROTO_NUM_ICMP_V6, + nd_target='2001:db8::1', + priority=2, table=24, in_port=8888), + call.add_flow(dl_type=const.ETHERTYPE_IPV6, actions='normal', + icmp_type=const.ICMPV6_TYPE_NA, + nw_proto=const.PROTO_NUM_ICMP_V6, + nd_target='fdf8:f53b:82e4::1/128', + priority=2, table=24, in_port=8888), + call.add_flow(dl_type=const.ETHERTYPE_IPV6, + icmp_type=const.ICMPV6_TYPE_NA, + nw_proto=const.PROTO_NUM_ICMP_V6, + priority=10, table=0, in_port=8888, + actions='resubmit(,24)') + ] + self.assertEqual(expected, self.mock.mock_calls) + def test_install_arp_spoofing_protection(self): port = 8888 ip_addresses = ['192.0.2.1', '192.0.2.2/32'] @@ -207,6 +231,8 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase): self.br.delete_arp_spoofing_protection(port) expected = [ call.delete_flows(table_id=0, in_port=8888, proto='arp'), + call.delete_flows(table_id=0, in_port=8888, icmp_type=136, + nw_proto=58), call.delete_flows(table_id=24, in_port=8888), ] self.assertEqual(expected, self.mock.mock_calls) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 1280b10aacb..54a1b55c82c 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -54,6 +54,7 @@ TEST_NETWORK_ID2 = 'net-id-2' class FakeVif(object): ofport = 99 port_name = 'name' + vif_mac = 'aa:bb:cc:11:22:33' class MockFixedIntervalLoopingCall(object): @@ -1387,6 +1388,18 @@ class TestOvsNeutronAgent(object): [mock.call(ip_addresses=set(), port=vif.ofport)], int_br.install_arp_spoofing_protection.mock_calls) + def test_arp_spoofing_basic_rule_setup_fixed_ipv6(self): + vif = FakeVif() + fake_details = {'fixed_ips': [{'ip_address': 'fdf8:f53b:82e4::1'}], + 'device_owner': 'nobody'} + self.agent.prevent_arp_spoofing = True + br = mock.create_autospec(self.agent.int_br) + self.agent.setup_arp_spoofing_protection(br, vif, fake_details) + self.assertEqual( + [mock.call(port=vif.ofport)], + br.delete_arp_spoofing_protection.mock_calls) + self.assertTrue(br.install_icmpv6_na_spoofing_protection.called) + def test_arp_spoofing_fixed_and_allowed_addresses(self): vif = FakeVif() fake_details = { @@ -1406,6 +1419,25 @@ class TestOvsNeutronAgent(object): [mock.call(port=vif.ofport, ip_addresses=addresses)], int_br.install_arp_spoofing_protection.mock_calls) + def test_arp_spoofing_fixed_and_allowed_addresses_ipv6(self): + vif = FakeVif() + fake_details = { + 'device_owner': 'nobody', + 'fixed_ips': [{'ip_address': '2001:db8::1'}, + {'ip_address': '2001:db8::2'}], + 'allowed_address_pairs': [{'ip_address': '2001:db8::200', + 'mac_address': 'aa:22:33:44:55:66'}] + } + self.agent.prevent_arp_spoofing = True + int_br = mock.create_autospec(self.agent.int_br) + self.agent.setup_arp_spoofing_protection(int_br, vif, fake_details) + # make sure all addresses are allowed including ipv6 LLAs + addresses = {'2001:db8::1', '2001:db8::2', '2001:db8::200', + 'fe80::a822:33ff:fe44:5566', 'fe80::a8bb:ccff:fe11:2233'} + self.assertEqual( + [mock.call(port=vif.ofport, ip_addresses=addresses)], + int_br.install_icmpv6_na_spoofing_protection.mock_calls) + def test__get_ofport_moves(self): previous = {'port1': 1, 'port2': 2} current = {'port1': 5, 'port2': 2}