Merge "[Stateless SG] Add IPv6 and DHCP related tests"
This commit is contained in:
commit
75ce4eb4ad
|
@ -17,6 +17,7 @@ import netaddr
|
|||
from neutron_lib import constants
|
||||
import testtools
|
||||
|
||||
from oslo_log import log
|
||||
from tempest.common import utils as tempest_utils
|
||||
from tempest.common import waiters
|
||||
from tempest.lib.common.utils import data_utils
|
||||
|
@ -32,7 +33,7 @@ from neutron_tempest_plugin.scenario import base
|
|||
from neutron_tempest_plugin.scenario import constants as const
|
||||
|
||||
CONF = config.CONF
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
EPHEMERAL_PORT_RANGE = {'min': 32768, 'max': 65535}
|
||||
|
||||
|
||||
|
@ -96,10 +97,18 @@ class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
|
|||
def resource_setup(cls):
|
||||
super(BaseNetworkSecGroupTest, cls).resource_setup()
|
||||
# setup basic topology for servers we can log into it
|
||||
cls.reserve_external_subnet_cidrs()
|
||||
cls.network = cls.create_network()
|
||||
cls.subnet = cls.create_subnet(cls.network)
|
||||
cls.router = cls.create_router_by_client()
|
||||
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
|
||||
if cls.ipv6_mode:
|
||||
cls.subnet_v6 = cls.create_subnet(
|
||||
cls.network,
|
||||
ip_version=constants.IP_VERSION_6,
|
||||
ipv6_ra_mode=cls.ipv6_mode,
|
||||
ipv6_address_mode=cls.ipv6_mode)
|
||||
cls.create_router_interface(cls.router['id'], cls.subnet_v6['id'])
|
||||
cls.keypair = cls.create_keypair()
|
||||
|
||||
def setUp(self):
|
||||
|
@ -332,6 +341,7 @@ class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
|
|||
self.assertTrue(ext_net_ip)
|
||||
self.check_remote_connectivity(server_ssh_clients[0], ext_net_ip,
|
||||
servers=servers)
|
||||
return server_ssh_clients, fips, servers
|
||||
|
||||
def _test_protocol_number_rule(self):
|
||||
# protocol number is added instead of str in security rule creation
|
||||
|
@ -739,6 +749,7 @@ class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
|
|||
|
||||
class StatefulNetworkSecGroupTest(BaseNetworkSecGroupTest):
|
||||
stateless_sg = False
|
||||
ipv6_mode = None
|
||||
|
||||
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
|
||||
def test_default_sec_grp_scenarios(self):
|
||||
|
@ -922,9 +933,10 @@ class StatefulNetworkSecGroupTest(BaseNetworkSecGroupTest):
|
|||
CONF.neutron_plugin_options.firewall_driver in ['openvswitch', 'None'],
|
||||
"Firewall driver other than 'openvswitch' is required to use "
|
||||
"stateless security groups.")
|
||||
class StatelessNetworkSecGroupTest(BaseNetworkSecGroupTest):
|
||||
class StatelessNetworkSecGroupIPv4Test(BaseNetworkSecGroupTest):
|
||||
required_extensions = ['security-group', 'stateful-security-group']
|
||||
stateless_sg = True
|
||||
ipv6_mode = None
|
||||
|
||||
@decorators.idempotent_id('9e193e3f-56f2-4f4e-886c-988a147958ef')
|
||||
def test_default_sec_grp_scenarios(self):
|
||||
|
@ -1123,3 +1135,57 @@ class StatelessNetworkSecGroupTest(BaseNetworkSecGroupTest):
|
|||
ssh_clients['client'], fips['server']['fixed_ip_address'],
|
||||
mtu=self.network['mtu'] + 1, fragmentation=True,
|
||||
should_succeed=True)
|
||||
|
||||
|
||||
class StatelessSecGroupDualStackSlaacTest(BaseNetworkSecGroupTest):
|
||||
required_extensions = ['security-group', 'stateful-security-group']
|
||||
stateless_sg = True
|
||||
ipv6_mode = 'slaac'
|
||||
|
||||
def _get_port_cidrs(self, port):
|
||||
ips = []
|
||||
subnet_cidrs = {}
|
||||
for fixed_ip in port['fixed_ips']:
|
||||
subnet_id = fixed_ip['subnet_id']
|
||||
subnet_cidr = subnet_cidrs.get('subnet_id')
|
||||
if not subnet_cidr:
|
||||
subnet = self.client.show_subnet(subnet_id)['subnet']
|
||||
subnet_cidr = netaddr.IPNetwork(subnet['cidr'])
|
||||
subnet_cidrs[subnet_id] = subnet_cidr
|
||||
ips.append(
|
||||
netaddr.IPNetwork(
|
||||
"%s/%s" % (fixed_ip['ip_address'], subnet_cidr.prefixlen)))
|
||||
LOG.debug("On port %s found IP cidrs: %s", port['id'], ips)
|
||||
return ips
|
||||
|
||||
def _test_default_sec_grp_scenarios(self):
|
||||
# Make "regular" test like for IPv4 case
|
||||
server_ssh_clients, _, servers = (
|
||||
super()._test_default_sec_grp_scenarios())
|
||||
|
||||
# And additionally ensure that IPv6 addresses are configured properly
|
||||
# in the VM
|
||||
for ssh_client, server in zip(server_ssh_clients, servers):
|
||||
ip_cmd = ip.IPCommand(ssh_client=ssh_client)
|
||||
ports = self.client.list_ports(
|
||||
device_id=server['server']['id'])['ports']
|
||||
for port in ports:
|
||||
configured_cidrs = [ip.network for ip in
|
||||
ip_cmd.list_addresses(port=port)]
|
||||
for port_cidr in self._get_port_cidrs(port):
|
||||
self.assertIn(port_cidr, configured_cidrs)
|
||||
|
||||
@decorators.idempotent_id('e7d64384-ea6a-40aa-b454-854f0990153c')
|
||||
def test_default_sec_grp_scenarios(self):
|
||||
self._test_default_sec_grp_scenarios()
|
||||
|
||||
|
||||
class StatelessSecGroupDualStackDHCPv6StatelessTest(
|
||||
StatelessSecGroupDualStackSlaacTest):
|
||||
required_extensions = ['security-group', 'stateful-security-group']
|
||||
stateless_sg = True
|
||||
ipv6_mode = 'dhcpv6-stateless'
|
||||
|
||||
@decorators.idempotent_id('c61c127c-e08f-4ddf-87a3-58b3c86e5476')
|
||||
def test_default_sec_grp_scenarios(self):
|
||||
self._test_default_sec_grp_scenarios()
|
||||
|
|
Loading…
Reference in New Issue