Adds first basic tests and config options

Adds the following config options under the "hyperv" section:

- hypervisor_version (the compute nodes' hypervisor_version)
- vhd_image_ref
- vhdx_image_ref
- gen2_image_ref

Adds tests for VHD, VHDX, Generation 2 VM images.
This commit is contained in:
Claudiu Belu 2017-04-02 10:53:18 -07:00
parent caa38fbc51
commit 7f0fa003af
4 changed files with 353 additions and 2 deletions

View File

@ -13,3 +13,36 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from tempest import config
CONF = config.CONF
hyperv_group = cfg.OptGroup(name='hyperv',
title='Hyper-V Driver Tempest Options')
HyperVGroup = [
cfg.IntOpt('hypervisor_version',
help="Compute nodes' hypervisor version, used to determine "
"which tests to run. It must have following value: "
"major_version * 1000 + minor_version"
"For example, Windows / Hyper-V Server 2012 R2 would have "
"the value 6003"),
cfg.StrOpt('vhd_image_ref',
help="Valid VHD image reference to be used in tests."),
cfg.StrOpt('vhdx_image_ref',
help="Valid VHDX image reference to be used in tests."),
cfg.StrOpt('gen2_image_ref',
help="Valid Generation 2 VM VHDX image reference to be used "
"in tests."),
]
_opts = [
(hyperv_group, HyperVGroup),
]
def list_opts():
return _opts

View File

@ -30,7 +30,24 @@ class OSWinTempestPlugin(plugins.TempestPlugin):
return full_test_dir, base_path
def register_opts(self, conf):
pass
"""Add additional configuration options to tempest.
This method will be run for the plugin during the register_opts()
function in tempest.config
:param conf: The conf object that can be used to register additional
config options on.
"""
for config_opt_group, config_opts in project_config.list_opts():
config.register_opt_group(conf, config_opt_group, config_opts)
def get_opt_lists(self):
pass
"""Get a list of options for sample config generation.
:return: A list of tuples with the group name and options in that
group.
:return type: list
"""
return [(group.name, opts)
for group, opts in project_config.list_opts()]

View File

@ -0,0 +1,69 @@
# Copyright 2017 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oswin_tempest_plugin import config
from oswin_tempest_plugin.tests import test_base
CONF = config.CONF
class _BaseDiskTestMixin(object):
"""Image types / formats test suite.
This test suite will spawn instances with a configured image and will
check their network connectivity. The purpose of this test suite is to
cover different image formats and types (VHD, VHDX, Generation 2 VMs).
"""
_CONF_OPTION_NAME = ''
@classmethod
def skip_checks(cls):
super(_BaseDiskTestMixin, cls).skip_checks()
# check if the needed image ref has been configured.
if not cls._IMAGE_REF:
msg = ('The config option "%s" has not been set. Skipping.' %
cls._CONF_OPTION_NAME)
raise cls.skipException(msg)
def test_disk(self):
server_tuple = self._create_server()
self._check_server_connectivity(server_tuple)
class VhdDiskTest(test_base.TestBase, _BaseDiskTestMixin):
_IMAGE_REF = CONF.hyperv.vhd_image_ref
_CONF_OPTION_NAME = 'hyperv.vhd_image_ref'
# TODO(claudiub): validate that the images really are VHD / VHDX.
class VhdxDiskTest(test_base.TestBase, _BaseDiskTestMixin):
_IMAGE_REF = CONF.hyperv.vhdx_image_ref
_CONF_OPTION_NAME = 'hyperv.vhdx_image_ref'
class Generation2DiskTest(test_base.TestBase, _BaseDiskTestMixin):
# Generation 2 VMs have been introduced in Windows / Hyper-V Server 2012 R2
_MIN_HYPERV_VERSION = 6003
_IMAGE_REF = CONF.hyperv.gen2_image_ref
_CONF_OPTION_NAME = 'hyperv.gen2_image_ref'
# TODO(claudiub): Add validation that the given gen2_image_ref really has
# the 'hw_machine_type=hyperv-gen2' property.

View File

@ -0,0 +1,232 @@
# Copyright 2017 Cloudbase Solutions SRL
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from oslo_log import log as logging
from tempest.common import compute
from tempest.common.utils.linux import remote_client
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
import tempest.test
from oswin_tempest_plugin import config
CONF = config.CONF
LOG = logging.getLogger(__name__)
Server_tuple = collections.namedtuple(
'Server_tuple',
['server', 'floating_ip', 'keypair', 'security_groups'])
class TestBase(tempest.test.BaseTestCase):
"""Base class for tests."""
credentials = ['primary']
# Inheriting TestCases should change this version if needed.
_MIN_HYPERV_VERSION = 6002
# Inheriting TestCases should change this image ref if needed.
_IMAGE_REF = CONF.compute.image_ref
@classmethod
def skip_checks(cls):
super(TestBase, cls).skip_checks()
# check if the configured hypervisor_version is higher than
# the test's required minimum Hyper-V version.
# TODO(claudiub): instead of expecting a config option specifying
# the hypervisor version, we could check nova's compute nodes for
# their hypervisor versions.
config_vers = CONF.hyperv.hypervisor_version
if config_vers < cls._MIN_HYPERV_VERSION:
msg = ('Configured hypervisor_version (%(config_vers)s) is not '
'supported. It must be higher than %(req_vers)s.' % {
'config_vers': config_vers,
'req_vers': cls._MIN_HYPERV_VERSION})
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(TestBase, cls).setup_clients()
# Compute client
cls.compute_fips_client = (
cls.os_primary.compute_floating_ips_client)
cls.keypairs_client = cls.os_primary.keypairs_client
cls.servers_client = cls.os_primary.servers_client
# Neutron network client
cls.security_groups_client = (
cls.os_primary.security_groups_client)
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
def create_floating_ip(self, server):
"""Create a floating IP and associates to a server on Nova"""
pool_name = CONF.network.floating_network_name
floating_ip = (
self.compute_fips_client.create_floating_ip(pool=pool_name))
floating_ip = floating_ip['floating_ip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.compute_fips_client.delete_floating_ip,
floating_ip['id'])
self.compute_fips_client.associate_floating_ip_to_server(
floating_ip['ip'], server['id'])
return floating_ip
def create_keypair(self):
name = data_utils.rand_name(self.__class__.__name__)
body = self.keypairs_client.create_keypair(name=name)
self.addCleanup(self.keypairs_client.delete_keypair, name)
return body['keypair']
def _get_image_ref(self):
return self._IMAGE_REF
def _get_flavor_ref(self):
return CONF.compute.flavor_ref
def _create_server(self):
"""Wrapper utility that returns a test server.
This wrapper utility calls the common create test server and
returns a test server.
"""
clients = self.os_primary
name = data_utils.rand_name(self.__class__.__name__ + "-server")
image_id = self._get_image_ref()
flavor = self._get_flavor_ref()
keypair = self.create_keypair()
tenant_network = self.get_tenant_network()
security_group = self._create_security_group()
# we need to pass the security group's name to the instance.
sg_group_names = [{'name': security_group['name']}]
body, _ = compute.create_test_server(
clients, name=name,
flavor=flavor,
image_id=image_id,
key_name=keypair['name'],
tenant_network=tenant_network,
security_groups=sg_group_names,
wait_until='ACTIVE')
self.addCleanup(waiters.wait_for_server_termination,
self.servers_client, body['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.servers_client.delete_server, body['id'])
server = clients.servers_client.show_server(body['id'])['server']
floating_ip = self.create_floating_ip(server)
server_tuple = Server_tuple(
server=server,
keypair=keypair,
floating_ip=floating_ip,
security_groups=[security_group])
return server_tuple
def _create_security_group(self):
sg_name = data_utils.rand_name(self.__class__.__name__)
sg_desc = sg_name + " description"
secgroup = self.security_groups_client.create_security_group(
name=sg_name, description=sg_desc)['security_group']
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.security_groups_client.delete_security_group,
secgroup['id'])
# Add rules to the security group
self._create_loginable_secgroup_rule(secgroup)
return secgroup
def _create_loginable_secgroup_rule(self, secgroup):
"""Create loginable security group rule
This function will create:
1. egress and ingress tcp port 22 allow rule in order to allow ssh
access for ipv4.
3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
"""
rulesets = [
# ssh
dict(protocol='tcp',
port_range_min=22,
port_range_max=22),
# ping
dict(protocol='icmp'),
]
for ruleset in rulesets:
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
sg_rule = self._create_security_group_rule(
secgroup, **ruleset)
def _create_security_group_rule(self, secgroup, **kwargs):
"""Create a rule from a dictionary of rule parameters.
Creates a rule in a secgroup.
:param secgroup: the security group.
:param kwargs: a dictionary containing rule parameters:
for example, to allow incoming ssh:
rule = {
direction: 'ingress'
protocol:'tcp',
port_range_min: 22,
port_range_max: 22
}
"""
ruleset = dict(security_group_id=secgroup['id'],
tenant_id=secgroup['tenant_id'])
ruleset.update(kwargs)
sec_group_rules_client = self.security_group_rules_client
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = sg_rule['security_group_rule']
return sg_rule
def _wait_for_server_status(self, server, status='ACTIVE'):
waiters.wait_for_server_status(self.servers_client, server['id'],
status)
def _get_server_client(self, server_tuple):
"""Get a SSH client to a remote server
:returns: RemoteClient object
"""
server = server_tuple.server
ip_address = server_tuple.floating_ip['ip']
private_key = server_tuple.keypair['private_key']
# ssh into the VM.
username = CONF.validation.image_ssh_user
linux_client = remote_client.RemoteClient(
ip_address, username, pkey=private_key, password=None,
server=server, servers_client=self.servers_client)
linux_client.validate_authentication()
return linux_client
def _check_server_connectivity(self, server_tuple):
# if server connectivity works, an SSH client can be opened.
self._get_server_client(server_tuple)