242 lines
9.9 KiB
Python
242 lines
9.9 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
# Copyright 2013 IBM Corp.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from oslo_log import log
|
|
|
|
from tempest.common import image as common_image
|
|
from tempest.common import waiters
|
|
from tempest import config
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib.common.utils import test_utils
|
|
from tempest.lib import exceptions as lib_exc
|
|
from tempest.scenario import manager
|
|
|
|
CONF = config.CONF
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
# we inherit from NetworkScenarioTest since some test cases need access to
|
|
# check_*_connectivity methods to validate instances are up and accessible
|
|
class ScenarioTest(manager.NetworkScenarioTest):
|
|
"""Base class for scenario tests. Uses tempest own clients. """
|
|
|
|
@classmethod
|
|
def setup_clients(cls):
|
|
super(ScenarioTest, cls).setup_clients()
|
|
# Clients (in alphabetical order)
|
|
cls.flavors_client = cls.os_primary.flavors_client
|
|
if CONF.service_available.glance:
|
|
# Check if glance v1 is available to determine which client to use.
|
|
if CONF.image_feature_enabled.api_v1:
|
|
cls.image_client = cls.os_primary.image_client
|
|
elif CONF.image_feature_enabled.api_v2:
|
|
cls.image_client = cls.os_primary.image_client_v2
|
|
else:
|
|
raise lib_exc.InvalidConfiguration(
|
|
'Either api_v1 or api_v2 must be True in '
|
|
'[image-feature-enabled].')
|
|
# Compute image client
|
|
cls.compute_images_client = cls.os_primary.compute_images_client
|
|
cls.keypairs_client = cls.os_primary.keypairs_client
|
|
# Nova security groups client
|
|
cls.compute_security_groups_client = (
|
|
cls.os_primary.compute_security_groups_client)
|
|
cls.compute_security_group_rules_client = (
|
|
cls.os_primary.compute_security_group_rules_client)
|
|
cls.servers_client = cls.os_primary.servers_client
|
|
# Neutron network client
|
|
cls.networks_client = cls.os_primary.networks_client
|
|
cls.ports_client = cls.os_primary.ports_client
|
|
cls.routers_client = cls.os_primary.routers_client
|
|
cls.subnets_client = cls.os_primary.subnets_client
|
|
cls.floating_ips_client = cls.os_primary.floating_ips_client
|
|
cls.security_groups_client = cls.os_primary.security_groups_client
|
|
cls.security_group_rules_client = (
|
|
cls.os_primary.security_group_rules_client)
|
|
|
|
cls.volumes_client = cls.os_primary.volumes_client_latest
|
|
cls.snapshots_client = cls.os_primary.snapshots_client_latest
|
|
|
|
# ## Test functions library
|
|
#
|
|
# The create_[resource] functions only return body and discard the
|
|
# resp part which is not used in scenario tests
|
|
|
|
def _image_create(self, name, fmt, path,
|
|
disk_format=None, properties=None):
|
|
if properties is None:
|
|
properties = {}
|
|
name = data_utils.rand_name('%s-' % name)
|
|
params = {
|
|
'name': name,
|
|
'container_format': fmt,
|
|
'disk_format': disk_format or fmt,
|
|
}
|
|
if CONF.image_feature_enabled.api_v1:
|
|
params['is_public'] = 'False'
|
|
params['properties'] = properties
|
|
params = {'headers': common_image.image_meta_to_headers(**params)}
|
|
else:
|
|
params['visibility'] = 'private'
|
|
# Additional properties are flattened out in the v2 API.
|
|
params.update(properties)
|
|
body = self.image_client.create_image(**params)
|
|
image = body['image'] if 'image' in body else body
|
|
self.addCleanup(self.image_client.delete_image, image['id'])
|
|
self.assertEqual("queued", image['status'])
|
|
with open(path, 'rb') as image_file:
|
|
if CONF.image_feature_enabled.api_v1:
|
|
self.image_client.update_image(image['id'], data=image_file)
|
|
else:
|
|
self.image_client.store_image_file(image['id'], image_file)
|
|
|
|
if CONF.image_feature_enabled.import_image:
|
|
available_stores = []
|
|
try:
|
|
available_stores = self.image_client.info_stores()['stores']
|
|
except lib_exc.NotFound:
|
|
pass
|
|
available_import_methods = self.image_client.info_import()[
|
|
'import-methods']['value']
|
|
if ('copy-image' in available_import_methods and
|
|
len(available_stores) > 1):
|
|
self.image_client.image_import(image['id'],
|
|
method='copy-image',
|
|
all_stores=True,
|
|
all_stores_must_succeed=False)
|
|
failed_stores = waiters.wait_for_image_copied_to_stores(
|
|
self.image_client, image['id'])
|
|
self.assertEqual(0, len(failed_stores),
|
|
"Failed to copy the following stores: %s" %
|
|
str(failed_stores))
|
|
|
|
return image['id']
|
|
|
|
def _default_security_group(self, client=None, tenant_id=None):
|
|
"""Get default secgroup for given tenant_id.
|
|
|
|
:returns: default secgroup for given tenant
|
|
"""
|
|
if client is None:
|
|
client = self.security_groups_client
|
|
if not tenant_id:
|
|
tenant_id = client.tenant_id
|
|
sgs = [
|
|
sg for sg in list(client.list_security_groups().values())[0]
|
|
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
|
|
]
|
|
msg = "No default security group for tenant %s." % (tenant_id)
|
|
self.assertGreater(len(sgs), 0, msg)
|
|
return sgs[0]
|
|
|
|
def _create_security_group(self):
|
|
# Create security group
|
|
sg_name = data_utils.rand_name(self.__class__.__name__)
|
|
sg_desc = sg_name + " description"
|
|
secgroup = self.compute_security_groups_client.create_security_group(
|
|
name=sg_name, description=sg_desc)['security_group']
|
|
self.assertEqual(secgroup['name'], sg_name)
|
|
self.assertEqual(secgroup['description'], sg_desc)
|
|
self.addCleanup(
|
|
test_utils.call_and_ignore_notfound_exc,
|
|
self.compute_security_groups_client.delete_security_group,
|
|
secgroup['id'])
|
|
|
|
# Add rules to the security group
|
|
self._create_loginable_secgroup_rule(secgroup['id'])
|
|
|
|
return secgroup
|
|
|
|
def _create_loginable_secgroup_rule(self, secgroup_id=None):
|
|
_client = self.compute_security_groups_client
|
|
_client_rules = self.compute_security_group_rules_client
|
|
if secgroup_id is None:
|
|
sgs = _client.list_security_groups()['security_groups']
|
|
for sg in sgs:
|
|
if sg['name'] == 'default':
|
|
secgroup_id = sg['id']
|
|
|
|
# These rules are intended to permit inbound ssh and icmp
|
|
# traffic from all sources, so no group_id is provided.
|
|
# Setting a group_id would only permit traffic from ports
|
|
# belonging to the same security group.
|
|
rulesets = [
|
|
{
|
|
# ssh
|
|
'ip_protocol': 'tcp',
|
|
'from_port': 22,
|
|
'to_port': 22,
|
|
'cidr': '0.0.0.0/0',
|
|
},
|
|
{
|
|
# ping
|
|
'ip_protocol': 'icmp',
|
|
'from_port': -1,
|
|
'to_port': -1,
|
|
'cidr': '0.0.0.0/0',
|
|
}
|
|
]
|
|
rules = list()
|
|
for ruleset in rulesets:
|
|
sg_rule = _client_rules.create_security_group_rule(
|
|
parent_group_id=secgroup_id, **ruleset)['security_group_rule']
|
|
rules.append(sg_rule)
|
|
return rules
|
|
|
|
def _create_security_group_rule(self, secgroup=None,
|
|
sec_group_rules_client=None,
|
|
tenant_id=None,
|
|
security_groups_client=None, **kwargs):
|
|
"""Create a rule from a dictionary of rule parameters.
|
|
|
|
Create a rule in a secgroup. if secgroup not defined will search for
|
|
default secgroup in tenant_id.
|
|
|
|
:param secgroup: the security group.
|
|
:param tenant_id: if secgroup not passed -- the tenant in which to
|
|
search for default secgroup
|
|
:param kwargs: a dictionary containing rule parameters:
|
|
for example, to allow incoming ssh:
|
|
rule = {
|
|
direction: 'ingress'
|
|
protocol:'tcp',
|
|
port_range_min: 22,
|
|
port_range_max: 22
|
|
}
|
|
"""
|
|
if sec_group_rules_client is None:
|
|
sec_group_rules_client = self.security_group_rules_client
|
|
if security_groups_client is None:
|
|
security_groups_client = self.security_groups_client
|
|
if not tenant_id:
|
|
tenant_id = security_groups_client.tenant_id
|
|
if secgroup is None:
|
|
secgroup = self._default_security_group(
|
|
client=security_groups_client, tenant_id=tenant_id)
|
|
|
|
ruleset = dict(security_group_id=secgroup['id'],
|
|
tenant_id=secgroup['tenant_id'])
|
|
ruleset.update(kwargs)
|
|
|
|
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
|
|
sg_rule = sg_rule['security_group_rule']
|
|
|
|
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
|
|
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
|
|
|
|
return sg_rule
|