# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import testtools import time from neutron.agent.common import utils import os_ken.lib.packet from os_ken.ofproto import inet from oslo_log import log from dragonflow.controller.common import constants from dragonflow.tests.common import app_testing_objects from dragonflow.tests.common import constants as const from dragonflow.tests.fullstack import apps from dragonflow.tests.fullstack import test_base LOG = log.getLogger(__name__) _CONTROLLER_RECONNECT_TIMEOUT = 10 class TestArpResponder(test_base.DFTestBase): def setUp(self): super(TestArpResponder, self).setUp() self.topology = None self.policy = None self.topology = app_testing_objects.Topology( self.neutron, self.nb_api) self.addCleanup(self.topology.close) subnet1 = self.topology.create_subnet(cidr='192.168.10.0/24') port1 = subnet1.create_port() port2 = subnet1.create_port() time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT) # Create policy arp_packet = self._create_arp_request( src_port=port1.port.get_logical_port(), dst_port=port2.port.get_logical_port(), ) send_arp_request = app_testing_objects.SendAction( subnet1.subnet_id, port1.port_id, arp_packet, ) ignore_action = app_testing_objects.IgnoreAction() log_action = app_testing_objects.LogAction() key1 = (subnet1.subnet_id, port1.port_id) port_policies = { key1: app_testing_objects.PortPolicy( rules=[ app_testing_objects.PortPolicyRule( # Detect arp replies app_testing_objects.OsKenARPReplyFilter(), actions=[ log_action, app_testing_objects.StopSimulationAction() ] ), app_testing_objects.PortPolicyRule( # Ignore IPv6 packets app_testing_objects.OsKenIPv6Filter(), actions=[ ignore_action ] ), ], default_action=app_testing_objects.RaiseAction( "Unexpected packet" ) ), } self.policy = app_testing_objects.Policy( initial_actions=[send_arp_request], port_policies=port_policies, unknown_port_action=ignore_action ) self.addCleanup(self.policy.close) def _create_arp_request(self, src_port, dst_port): ethernet = os_ken.lib.packet.ethernet.ethernet( src=str(src_port.mac), dst=str(constants.BROADCAST_MAC), ethertype=os_ken.lib.packet.ethernet.ether.ETH_TYPE_ARP, ) arp = os_ken.lib.packet.arp.arp_ip( opcode=os_ken.lib.packet.arp.ARP_REQUEST, src_mac=str(src_port.mac), src_ip=str(src_port.ip), dst_mac='00:00:00:00:00:00', dst_ip=str(dst_port.ip), ) result = os_ken.lib.packet.packet.Packet() result.add_protocol(ethernet) result.add_protocol(arp) result.serialize() return result.data def tearDown(self): super(TestArpResponder, self).tearDown() self.policy.close() self.topology.close() def test_simple_response(self): """ 2 ports on 1 subnet. 1 port asks for MAC of other. Policy: port1: Send ARP request Receive ARP response port2: Do nothing """ apps.start_policy(self.policy, self.topology, const.DEFAULT_RESOURCE_READY_TIMEOUT) class TestNeighborAdvertiser(test_base.DFTestBase): def setUp(self): super(TestNeighborAdvertiser, self).setUp() self.topology = None self.policy = None # Disable Duplicate Address Detection requests from the interface self.dad_conf = utils.execute(['sysctl', '-n', 'net.ipv6.conf.default.accept_dad']) utils.execute(['sysctl', '-w', 'net.ipv6.conf.default.accept_dad=0'], run_as_root=True) # Disable Router Solicitation requests from the interface self.router_solicit_conf = utils.execute( ['sysctl', '-n', 'net.ipv6.conf.default.router_solicitations']) utils.execute(['sysctl', '-w', 'net.ipv6.conf.default.router_solicitations=0'], run_as_root=True) self.topology = app_testing_objects.Topology( self.neutron, self.nb_api) self.addCleanup(self.topology.close) subnet1 = self.topology.create_subnet(cidr='1111:1111:1111::/64') port1 = subnet1.create_port() port2 = subnet1.create_port() time.sleep(const.DEFAULT_RESOURCE_READY_TIMEOUT) # Create Neighbor Solicitation packet ns_packet = self._create_ns_request( src_port=port1.port.get_logical_port(), dst_port=port2.port.get_logical_port(), ) send_ns_request = app_testing_objects.SendAction( subnet1.subnet_id, port1.port_id, ns_packet, ) ignore_action = app_testing_objects.IgnoreAction() log_action = app_testing_objects.LogAction() key1 = (subnet1.subnet_id, port1.port_id) adv_filter = app_testing_objects.OsKenNeighborAdvertisementFilter() port_policies = { key1: app_testing_objects.PortPolicy( rules=[ app_testing_objects.PortPolicyRule( # Detect advertisements adv_filter, actions=[ log_action, app_testing_objects.StopSimulationAction() ] ), app_testing_objects.PortPolicyRule( # Filter local VM's Multicast requests app_testing_objects.OsKenIpv6MulticastFilter(), actions=[ignore_action] ) ], default_action=app_testing_objects.RaiseAction( "Unexpected packet" ) ), } self.policy = app_testing_objects.Policy( initial_actions=[send_ns_request], port_policies=port_policies, unknown_port_action=ignore_action ) self.addCleanup(self.policy.close) def _create_ns_request(self, src_port, dst_port): ethernet = os_ken.lib.packet.ethernet.ethernet( src=str(src_port.mac), dst=str(constants.BROADCAST_MAC), ethertype=os_ken.lib.packet.ethernet.ether.ETH_TYPE_IPV6, ) ipv6 = os_ken.lib.packet.ipv6.ipv6( src=str(src_port.ip), dst=str(dst_port.ip), nxt=inet.IPPROTO_ICMPV6 ) icmpv6 = os_ken.lib.packet.icmpv6.icmpv6( type_=os_ken.lib.packet.icmpv6.ND_NEIGHBOR_SOLICIT, data=os_ken.lib.packet.icmpv6.nd_neighbor( dst=str(dst_port.ip) ) ) result = os_ken.lib.packet.packet.Packet() result.add_protocol(ethernet) result.add_protocol(ipv6) result.add_protocol(icmpv6) result.serialize() return result.data def tearDown(self): super(TestNeighborAdvertiser, self).tearDown() self.topology.close() self.policy.close() utils.execute(['sysctl', '-w', 'net.ipv6.conf.default.accept_dad={}'. format(self.dad_conf)], run_as_root=True) utils.execute(['sysctl', '-w', 'net.ipv6.conf.default.router_solicitations={}'. format(self.router_solicit_conf)], run_as_root=True) @testtools.skip('bug/1820977') def test_simple_response(self): """ 2 ports on 1 subnet. 1 port asks for MAC of other. Policy: port1: Send Neighbor Solicitation request Receive Neighbor Advertisement port2: Do nothing """ apps.start_policy(self.policy, self.topology, const.DEFAULT_RESOURCE_READY_TIMEOUT)