diff --git a/neutron/tests/fullstack/test_securitygroup.py b/neutron/tests/fullstack/test_securitygroup.py index e363d68d15e..5cdc2f01bd8 100644 --- a/neutron/tests/fullstack/test_securitygroup.py +++ b/neutron/tests/fullstack/test_securitygroup.py @@ -100,6 +100,8 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest): 'l2_agent_type': constants.AGENT_TYPE_LINUXBRIDGE, 'num_hosts': 2})] + index_to_sg = [0, 0, 1, 2] + # NOTE(toshii): As a firewall_driver can interfere with others, # the recommended way to add test is to expand this method, not # adding another. @@ -116,50 +118,13 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest): 8. test other protocol functionality by using SCTP protocol 9. test two vms with same mac on the same host in different networks + 10. test using multiple security groups """ - index_to_sg = [0, 0, 1, 2] - if self.firewall_driver == 'iptables_hybrid': - # The iptables_hybrid driver lacks isolation between agents - index_to_host = [0] * 4 - else: - index_to_host = [0, 1, 1, 0] tenant_uuid = uuidutils.generate_uuid() - - network = self.safe_client.create_network(tenant_uuid) - self.safe_client.create_subnet( - tenant_uuid, network['id'], '20.0.0.0/24') - - sgs = [self.safe_client.create_security_group(tenant_uuid) - for i in range(3)] - ports = [ - self.safe_client.create_port(tenant_uuid, network['id'], - self.environment.hosts[host].hostname, - security_groups=[], - port_security_enabled=False) - for host in index_to_host] - - self.safe_client.create_security_group_rule( - tenant_uuid, sgs[0]['id'], - remote_group_id=sgs[0]['id'], direction='ingress', - ethertype=constants.IPv4, - protocol=constants.PROTO_NAME_TCP, - port_range_min=3333, port_range_max=3333) - - vms = [ - self.useFixture( - machine.FakeFullstackMachine( - self.environment.hosts[host], - network['id'], - tenant_uuid, - self.safe_client, - neutron_port=ports[port], - use_dhcp=True)) - for port, host in enumerate(index_to_host)] - - for vm in vms: - vm.block_until_boot() - vm.block_until_dhcp_config_done() + subnet_cidr = '20.0.0.0/24' + vms, ports, sgs, network, index_to_host = self._create_resources( + tenant_uuid, subnet_cidr) # 0. check that traffic is allowed when port security is disabled self.assert_connection( @@ -173,7 +138,7 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest): net_helpers.assert_ping(vms[1].namespace, vms[2].ip) # Apply security groups to the ports - for port, sg in zip(ports, index_to_sg): + for port, sg in zip(ports, self.index_to_sg): self.safe_client.client.update_port( port['id'], body={'port': {'port_security_enabled': True, @@ -232,7 +197,7 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest): # 6. check if an established connection stops by deleting # the supporting SG rule. index_to_host.append(index_to_host[2]) - index_to_sg.append(1) + self.index_to_sg.append(1) ports.append( self.safe_client.create_port(tenant_uuid, network['id'], self.environment.hosts[ @@ -306,6 +271,148 @@ class TestSecurityGroupsSameNetwork(BaseSecurityGroupsSameNetworkTest): # 9. test two vms with same mac on the same host in different networks self._test_overlapping_mac_addresses() + # 10. Check using multiple security groups + self._test_using_multiple_security_groups() + + def _test_using_multiple_security_groups(self): + """Test using multiple security groups. + + This test will do following things: + 1. Create three vms with two security groups. vm0, vm1 in sg0; + vm2 in sg1. + 2. Add SSH and ICMP rules in sg0. vm0 and vm1 can ping and ssh + for each other, but can not access between vm0 and vm2. + 3. Using multiple security groups(sg0, sg1) for vm0, and sg1 + have rules allowed sg0 access(ICMP), so vm0 and vm1 can + ping vm2. + 4. Then remove sg0 from vm0, we removed ICMP and SSH rules. + vm0 and vm1 can not ping and ssh for each other. + """ + + tenant_uuid = uuidutils.generate_uuid() + subnet_cidr = '30.0.0.0/24' + vms, ports, sgs, _, _ = self._create_resources(tenant_uuid, + subnet_cidr) + + # Apply security groups to the ports + for port, sg in zip(ports, self.index_to_sg): + self.safe_client.client.update_port( + port['id'], + body={'port': {'port_security_enabled': True, + 'security_groups': [sgs[sg]['id']]}}) + + # Traffic not explicitly allowed (eg. SSH, ICMP) is blocked + self.verify_no_connectivity_between_vms( + vms[1], vms[0], net_helpers.NetcatTester.TCP, 22) + + net_helpers.assert_no_ping(vms[0].namespace, vms[1].ip) + net_helpers.assert_no_ping(vms[0].namespace, vms[2].ip) + net_helpers.assert_no_ping(vms[1].namespace, vms[2].ip) + + # Add SSH and ICMP allowed in the same security group + self.safe_client.create_security_group_rule( + tenant_uuid, sgs[0]['id'], + remote_group_id=sgs[0]['id'], direction='ingress', + ethertype=constants.IPv4, + protocol=constants.PROTO_NAME_TCP, + port_range_min=22, port_range_max=22) + + self.verify_connectivity_between_vms( + vms[1], vms[0], net_helpers.NetcatTester.TCP, 22) + + self.verify_no_connectivity_between_vms( + vms[2], vms[0], net_helpers.NetcatTester.TCP, 22) + + self.safe_client.create_security_group_rule( + tenant_uuid, sgs[0]['id'], + remote_group_id=sgs[0]['id'], direction='ingress', + ethertype=constants.IPv4, + protocol=constants.PROTO_NAME_ICMP) + + net_helpers.assert_ping(vms[1].namespace, vms[0].ip) + net_helpers.assert_no_ping(vms[2].namespace, vms[0].ip) + + # Update vm0 to use two security groups + # Add security group rules(ICMP) in another security group + self.safe_client.client.update_port( + ports[0]['id'], + body={'port': {'security_groups': [sgs[0]['id'], + sgs[1]['id']]}}) + + self.safe_client.create_security_group_rule( + tenant_uuid, sgs[1]['id'], + remote_group_id=sgs[0]['id'], direction='ingress', + ethertype=constants.IPv4, + protocol=constants.PROTO_NAME_ICMP) + + net_helpers.assert_ping(vms[0].namespace, vms[2].ip) + net_helpers.assert_ping(vms[1].namespace, vms[2].ip) + net_helpers.assert_no_ping(vms[2].namespace, vms[0].ip) + net_helpers.assert_no_ping(vms[2].namespace, vms[1].ip) + + self.verify_connectivity_between_vms( + vms[1], vms[0], net_helpers.NetcatTester.TCP, 22) + + self.verify_no_connectivity_between_vms( + vms[2], vms[0], net_helpers.NetcatTester.TCP, 22) + + # Remove first security group from port + self.safe_client.client.update_port( + ports[0]['id'], + body={'port': {'security_groups': [sgs[1]['id']]}}) + + net_helpers.assert_ping(vms[0].namespace, vms[2].ip) + net_helpers.assert_ping(vms[1].namespace, vms[2].ip) + net_helpers.assert_no_ping(vms[2].namespace, vms[0].ip) + net_helpers.assert_no_ping(vms[2].namespace, vms[1].ip) + + self.verify_no_connectivity_between_vms( + vms[1], vms[0], net_helpers.NetcatTester.TCP, 22) + + # NOTE: This can be used after refactor other tests to + # one scenario one test. + def _create_resources(self, tenant_uuid, subnet_cidr): + if self.firewall_driver == 'iptables_hybrid': + # The iptables_hybrid driver lacks isolation between agents + index_to_host = [0] * 4 + else: + index_to_host = [0, 1, 1, 0] + + network = self.safe_client.create_network(tenant_uuid) + self.safe_client.create_subnet( + tenant_uuid, network['id'], subnet_cidr) + + sgs = [self.safe_client.create_security_group(tenant_uuid) + for i in range(3)] + ports = [ + self.safe_client.create_port(tenant_uuid, network['id'], + self.environment.hosts[host].hostname, + security_groups=[], + port_security_enabled=False) + for host in index_to_host] + + self.safe_client.create_security_group_rule( + tenant_uuid, sgs[0]['id'], + remote_group_id=sgs[0]['id'], direction='ingress', + ethertype=constants.IPv4, + protocol=constants.PROTO_NAME_TCP, + port_range_min=3333, port_range_max=3333) + + vms = [ + self.useFixture( + machine.FakeFullstackMachine( + self.environment.hosts[host], + network['id'], + tenant_uuid, + self.safe_client, + neutron_port=ports[port], + use_dhcp=True)) + for port, host in enumerate(index_to_host)] + map(lambda vm: vm.block_until_boot(), vms) + map(lambda vm: vm.block_until_dhcp_config_done(), vms) + + return vms, ports, sgs, network, index_to_host + def _create_vm_on_host( self, project_id, network_id, sg_id, host, mac_address=None): if mac_address: