Merge "Add a scenario test for internal dns_name"

This commit is contained in:
Zuul 2018-07-18 12:06:34 +00:00 committed by Gerrit Code Review
commit e80584e81b
2 changed files with 111 additions and 20 deletions

View File

@ -15,6 +15,8 @@
import subprocess
import netaddr
from neutron_lib.api import validators
from neutron_lib import constants as neutron_lib_constants
from oslo_log import log
from tempest.common.utils import net_utils
from tempest.common import waiters
@ -165,7 +167,8 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
self.floating_ips.append(fip)
return fip
def setup_network_and_server(self, router=None, **kwargs):
def setup_network_and_server(
self, router=None, server_name=None, **kwargs):
"""Create network resources and a server.
Creating a network, subnet, router, keypair, security group
@ -187,12 +190,18 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
self.keypair = self.create_keypair()
self.create_loginable_secgroup_rule(
secgroup_id=secgroup['security_group']['id'])
self.server = self.create_server(
flavor_ref=CONF.compute.flavor_ref,
image_ref=CONF.compute.image_ref,
key_name=self.keypair['name'],
networks=[{'uuid': self.network['id']}],
security_groups=[{'name': secgroup['security_group']['name']}])
server_kwargs = {
'flavor_ref': CONF.compute.flavor_ref,
'image_ref': CONF.compute.image_ref,
'key_name': self.keypair['name'],
'networks': [{'uuid': self.network['id']}],
'security_groups': [{'name': secgroup['security_group']['name']}],
}
if server_name is not None:
server_kwargs['name'] = server_name
self.server = self.create_server(**server_kwargs)
self.wait_for_server_active(self.server['server'])
self.port = self.client.list_ports(network_id=self.network['id'],
device_id=self.server[
@ -227,7 +236,8 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
"for the console log", server['id'])
def _check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None, mtu=None, fragmentation=True):
nic=None, mtu=None, fragmentation=True,
timeout=None):
"""check ping server via source ssh connection
:param source: RemoteClient: an ssh connection from which to ping
@ -242,15 +252,21 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
def ping_host(source, host, count=CONF.validation.ping_count,
size=CONF.validation.ping_size, nic=None, mtu=None,
fragmentation=True):
addr = netaddr.IPAddress(host)
cmd = 'ping6' if addr.version == 6 else 'ping'
IP_VERSION_4 = neutron_lib_constants.IP_VERSION_4
IP_VERSION_6 = neutron_lib_constants.IP_VERSION_6
# Use 'ping6' for IPv6 addresses, 'ping' for IPv4 and hostnames
ip_version = (
IP_VERSION_6 if netaddr.valid_ipv6(host) else IP_VERSION_4)
cmd = (
'ping6' if ip_version == IP_VERSION_6 else 'ping')
if nic:
cmd = 'sudo {cmd} -I {nic}'.format(cmd=cmd, nic=nic)
if mtu:
if not fragmentation:
cmd += ' -M do'
size = str(net_utils.get_ping_payload_size(
mtu=mtu, ip_version=addr.version))
mtu=mtu, ip_version=ip_version))
cmd += ' -c{0} -w{0} -s{1} {2}'.format(count, size, host)
return source.exec_command(cmd)
@ -264,22 +280,24 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
'from: %s.', dest, source.host)
return not should_succeed
LOG.debug('ping result: %s', result)
# Assert that the return traffic was from the correct
# source address.
from_source = 'from %s' % dest
self.assertIn(from_source, result)
if validators.validate_ip_address(dest) is None:
# Assert that the return traffic was from the correct
# source address.
from_source = 'from %s' % dest
self.assertIn(from_source, result)
return should_succeed
return test_utils.call_until_true(ping_remote,
CONF.validation.ping_timeout,
1)
return test_utils.call_until_true(
ping_remote, timeout or CONF.validation.ping_timeout, 1)
def check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None, mtu=None, fragmentation=True,
servers=None):
servers=None, timeout=None):
try:
self.assertTrue(self._check_remote_connectivity(
source, dest, should_succeed, nic, mtu, fragmentation))
source, dest, should_succeed, nic, mtu, fragmentation,
timeout=timeout))
except lib_exc.SSHTimeout as ssh_e:
LOG.debug(ssh_e)
self._log_console_output(servers)

View File

@ -0,0 +1,73 @@
# Copyright 2018 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import utils
from tempest.lib import decorators
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import base
CONF = config.CONF
class InternalDNSTest(base.BaseTempestTestCase):
@utils.requires_ext(extension="dns-integration", service="network")
@decorators.idempotent_id('988347de-07af-471a-abfa-65aea9f452a6')
def test_dns_name(self):
"""Test the ability to ping a VM's hostname from another VM.
1) Create two VMs on the same network, giving each a name
2) SSH in to the first VM:
2.1) ping the other VM's internal IP
2.2) ping the otheR VM's hostname
"""
self.setup_network_and_server(server_name='luke')
self.create_pingable_secgroup_rule(
secgroup_id=self.security_groups[-1]['id'])
self.check_connectivity(self.fip['floating_ip_address'],
CONF.validation.image_ssh_user,
self.keypair['private_key'])
leia = self.create_server(
flavor_ref=CONF.compute.flavor_ref,
image_ref=CONF.compute.image_ref,
key_name=self.keypair['name'],
networks=[{'uuid': self.network['id']}],
security_groups=[
{'name': self.security_groups[-1]['name']}],
name='leia')
self.wait_for_server_active(leia['server'])
ssh_client = ssh.Client(
self.fip['floating_ip_address'],
CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
self.assertIn('luke', ssh_client.exec_command('hostname'))
leia_port = self.client.list_ports(
network_id=self.network['id'],
device_id=leia['server']['id'])['ports'][0]
# Ping with a higher timeout because spawning 2 VMs in some
# environment can put significant load on the deployment, resulting
# in very long boot times.
self.check_remote_connectivity(
ssh_client, leia_port['fixed_ips'][0]['ip_address'],
timeout=CONF.validation.ping_timeout * 10)
self.check_remote_connectivity(ssh_client, 'leia')