Fix PRD-1299. Change cinder client usage to nova

client for verification of volume attachment

In case if wrong provider location is provided cinder client
says that attachment was successful. But phisically it is not happened.
To fix this change usage of cinder client to verification
of attachment to nova client.
Also delete not used modules from the package:
* common.networking_common
* etc.image_vdi
*common.extended_tempest_conf
This commit is contained in:
Tatyana Leontovich 2013-08-07 13:31:23 +03:00
parent f328543b47
commit f049e25adc
7 changed files with 25 additions and 876 deletions

View File

@ -1,275 +0,0 @@
[identity]
# This section contains configuration options that a variety of Tempest
# test clients use when authenticating with different user/tenant
# combinations
# The type of endpoint for a Identity service. Unless you have a
# custom Keystone service catalog implementation, you probably want to leave
# this value as "identity"
catalog_type = identity
# Ignore SSL certificate validation failures? Use when in testing
# environments that have self-signed SSL certs.
disable_ssl_certificate_validation = False
# URL for where to find the OpenStack Identity API endpoint (Keystone)
uri = http://10.20.0.101:5000/v2.0/
# Should typically be left as keystone unless you have a non-Keystone
# authentication API service
strategy = keystone
# The identity region
region = RegionOne
# This should be the username of a user WITHOUT administrative privileges
username = demo
# The above non-administrative user's password
password = secret
# The above non-administrative user's tenant name
tenant_name = demo
# This should be the username of an alternate user WITHOUT
# administrative privileges
alt_username = demo2
# The above non-administrative user's password
alt_password = secret
# The above non-administrative user's tenant name
alt_tenant_name = demo2
# This should be the username of a user WITH administrative privileges
admin_username = admin
# The above non-administrative user's password
admin_password = nova
# The above non-administrative user's tenant name
admin_tenant_name = admin
[compute]
# This section contains configuration options used when executing tests
# against the OpenStack Compute API.
# Allows test cases to create/destroy tenants and users. This option
# enables isolated test cases and better parallel execution,
# but also requires that OpenStack Identity API admin credentials
# are known.
allow_tenant_isolation = true
# Allows test cases to create/destroy tenants and users. This option
# enables isolated test cases and better parallel execution,
# but also requires that OpenStack Identity API admin credentials
# are known.
allow_tenant_reuse = true
# Reference data for tests. The ref and ref_alt should be
# distinct images/flavors.
image_ref = {$IMAGE_ID}
image_ref_alt = {$IMAGE_ID_ALT}
flavor_ref = 1
flavor_ref_alt = 1
# Number of seconds to wait while looping to check the status of an
# instance that is building.
build_interval = 10
# Number of seconds to time out on waiting for an instance
# to build or reach an expected status
build_timeout = 600
# Run additional tests that use SSH for instance validation?
# This requires the instances be routable from the host
# executing the tests
run_ssh = false
# Name of a user used to authenticated to an instance
ssh_user = cirros
# Network id used for SSH (public, private, etc)
network_for_ssh = private
# IP version of the address used for SSH
ip_version_for_ssh = 4
# Number of seconds to wait to authenticate to an instance
ssh_timeout = 300
# Number of seconds to wait for output from ssh channel
ssh_channel_timeout = 60
# The type of endpoint for a Compute API service. Unless you have a
# custom Keystone service catalog implementation, you probably want to leave
# this value as "compute"
catalog_type = compute
# Does the Compute API support creation of images?
create_image_enabled = true
# For resize to work with libvirt/kvm, one of the following must be true:
# Single node: allow_resize_to_same_host=True must be set in nova.conf
# Cluster: the 'nova' user must have scp access between cluster nodes
resize_available = true
# Does the compute API support changing the admin password?
change_password_available=true
# Run live migration tests (requires 2 hosts)
live_migration_available = false
# Use block live migration (Otherwise, non-block migration will be
# performed, which requires XenServer pools in case of using XS)
use_block_migration_for_live_migration = false
# By default, rely on the status of the diskConfig extension to
# decide if to execute disk config tests. When set to false, tests
# are forced to skip, regardless of the extension status
disk_config_enabled_override = true
[whitebox]
# Whitebox options for compute. Whitebox options enable the
# whitebox test cases, which look at internal Nova database state,
# SSH into VMs to check instance state, etc.
# Should we run whitebox tests for Compute?
whitebox_enabled = true
# Path of nova source directory
source_dir = /opt/stack/nova
# Path of nova configuration file
config_path = /etc/nova/nova.conf
# Directory containing nova binaries such as nova-manage
bin_dir = /usr/local/bin
# Connection string to the database of Compute service
db_uri = mysql://nova:secret@localhost/nova
# Path to a private key file for SSH access to remote hosts
path_to_private_key = /home/user/.ssh/id_rsa
[compute-admin]
# This should be the username of a user WITH administrative privileges
# If not defined the admin user from the identity section will be used
username =
# The above administrative user's password
password =
# The above administrative user's tenant name
tenant_name =
[image]
# This section contains configuration options used when executing tests
# against the OpenStack Images API
# The type of endpoint for an Image API service. Unless you have a
# custom Keystone service catalog implementation, you probably want to leave
# this value as "image"
catalog_type = image
# The version of the OpenStack Images API to use
api_version = 1
[network]
# This section contains configuration options used when executing tests
# against the OpenStack Network API.
# Version of the Quantum API
api_version = v1.1
# Catalog type of the Quantum Service
catalog_type = network
# A large private cidr block from which to allocate smaller blocks for
# tenant networks.
tenant_network_cidr = 10.100.0.0/16
# The mask bits used to partition the tenant block.
tenant_network_mask_bits = 29
# If tenant networks are reachable, connectivity checks will be
# performed directly against addresses on those networks.
tenant_networks_reachable = false
# Id of the public network that provides external connectivity.
public_network_id = {$PUBLIC_NETWORK_ID}
# Id of a shared public router that provides external connectivity.
# A shared public router would commonly be used where IP namespaces
# were disabled. If namespaces are enabled, it would be preferable
# for each tenant to have their own router.
public_router_id = {$PUBLIC_ROUTER_ID}
# Whether or not quantum is expected to be available
quantum_available = false
[volume]
# This section contains the configuration options used when executing tests
# against the OpenStack Block Storage API service
# The type of endpoint for a Cinder or Block Storage API service.
# Unless you have a custom Keystone service catalog implementation, you
# probably want to leave this value as "volume"
catalog_type = volume
# Number of seconds to wait while looping to check the status of a
# volume that is being made available
build_interval = 10
# Number of seconds to time out on waiting for a volume
# to be available or reach an expected status
build_timeout = 300
[object-storage]
# This section contains configuration options used when executing tests
# against the OpenStack Object Storage API.
# You can configure the credentials in the compute section
# The type of endpoint for an Object Storage API service. Unless you have a
# custom Keystone service catalog implementation, you probably want to leave
# this value as "object-store"
catalog_type = object-store
# Number of seconds to time on waiting for a container to container
# synchronization complete
container_sync_timeout = 120
# Number of seconds to wait while looping to check the status of a
# container to container synchronization
container_sync_interval = 5
[boto]
# This section contains configuration options used when executing tests
# with boto.
# EC2 URL
ec2_url = http://localhost:8773/services/Cloud
# S3 URL
s3_url = http://localhost:3333
# Use keystone ec2-* command to get those values for your test user and tenant
aws_access =
aws_secret =
#Image materials for S3 upload
# ALL content of the specified directory will be uploaded to S3
s3_materials_path = /opt/stack/devstack/files/images/s3-materials/cirros-0.3.1
# The manifest.xml files, must be in the s3_materials_path directory
# Subdirectories not allowed!
# The filenames will be used as a Keys in the S3 Buckets
#ARI Ramdisk manifest. Must be in the above s3_materials_path
ari_manifest = cirros-0.3.1-x86_64-initrd.manifest.xml
#AMI Machine Image manifest. Must be in the above s3_materials_path
ami_manifest = cirros-0.3.1-x86_64-blank.img.manifest.xml
#AKI Kernel Image manifest, Must be in the above s3_materials_path
aki_manifest = cirros-0.3.1-x86_64-vmlinuz.manifest.xml
#Instance type
instance_type = m1.tiny
#TCP/IP connection timeout
http_socket_timeout = 5
#Number of retries actions on connection or 5xx error
num_retries = 1
# Status change wait timout
build_timeout = 120
# Status change wait interval
build_interval = 1

View File

@ -1,77 +0,0 @@
class AttributeDict(dict):
"""
Provide attribute access (dict.key) to dictionary values.
"""
def __getattr__(self, name):
"""Allow attribute access for all keys in the dict."""
if name in self:
return self[name]
return super(AttributeDict, self).__getattribute__(name)
class DeletableResource(AttributeDict):
"""
Support deletion of quantum resources (networks, subnets) via a
delete() method, as is supported by keystone and nova resources.
"""
def __init__(self, *args, **kwargs):
self.client = kwargs.pop('client', None)
super(DeletableResource, self).__init__(*args, **kwargs)
def __str__(self):
return '<%s id="%s" name="%s">' % (self.__class__.__name__,
self.id, self.name)
def delete(self):
raise NotImplemented()
class DeletableNetwork(DeletableResource):
def delete(self):
self.client.delete_network(self.id)
class DeletableSubnet(DeletableResource):
_router_ids = set()
def add_to_router(self, router_id):
self._router_ids.add(router_id)
body = dict(subnet_id=self.id)
self.client.add_interface_router(router_id, body=body)
def delete(self):
for router_id in self._router_ids.copy():
body = dict(subnet_id=self.id)
self.client.remove_interface_router(router_id, body=body)
self._router_ids.remove(router_id)
self.client.delete_subnet(self.id)
class DeletableRouter(DeletableResource):
def add_gateway(self, network_id):
body = dict(network_id=network_id)
self.client.add_gateway_router(self.id, body=body)
def delete(self):
self.client.remove_gateway_router(self.id)
self.client.delete_router(self.id)
class DeletableFloatingIp(DeletableResource):
def delete(self):
self.client.delete_floatingip(self.id)
class DeletablePort(DeletableResource):
def delete(self):
self.client.delete_port(self.id)

View File

@ -1,510 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import hashlib
import httplib2
import json
import re
import time
from fuel_health.common import log as logging
from fuel_health import exceptions
# redrive rate limited calls at most twice
MAX_RECURSION_DEPTH = 2
TOKEN_CHARS_RE = re.compile('^[-A-Za-z0-9+/=]*$')
class RestClient(object):
TYPE = "json"
LOG = logging.getLogger(__name__)
def __init__(self, config, user, password, auth_url, tenant_name=None,
auth_version='v2'):
self.config = config
self.user = user
self.password = password
self.auth_url = auth_url
self.tenant_name = tenant_name
self.auth_version = auth_version
self.service = None
self.token = None
self.base_url = None
self.region = {'compute': self.config.identity.region}
self.endpoint_url = 'publicURL'
self.strategy = self.config.identity.strategy
self.headers = {'Content-Type': 'application/%s' % self.TYPE,
'Accept': 'application/%s' % self.TYPE}
self.build_interval = config.compute.build_interval
self.build_timeout = config.compute.build_timeout
self.general_header_lc = set(('cache-control', 'connection',
'date', 'pragma', 'trailer',
'transfer-encoding', 'via',
'warning'))
self.response_header_lc = set(('accept-ranges', 'age', 'etag',
'location', 'proxy-authenticate',
'retry-after', 'server',
'vary', 'www-authenticate'))
dscv = self.config.identity.disable_ssl_certificate_validation
self.http_obj = httplib2.Http(disable_ssl_certificate_validation=dscv)
def _set_auth(self):
"""
Sets the token and base_url used in requests based on the strategy type
"""
if self.strategy == 'keystone':
if self.auth_version == 'v3':
auth_func = self.identity_auth_v3
else:
auth_func = self.keystone_auth
self.token, self.base_url = (
auth_func(self.user, self.password, self.auth_url,
self.service, self.tenant_name))
else:
self.token, self.base_url = self.basic_auth(self.user,
self.password,
self.auth_url)
def clear_auth(self):
"""
Can be called to clear the token and base_url so that the next request
will fetch a new token and base_url.
"""
self.token = None
self.base_url = None
def get_auth(self):
"""Returns the token of the current request or sets the token if
none.
"""
if not self.token:
self._set_auth()
return self.token
def basic_auth(self, user, password, auth_url):
"""
Provides authentication for the target API.
"""
params = {}
params['headers'] = {'User-Agent': 'Test-Client', 'X-Auth-User': user,
'X-Auth-Key': password}
resp, body = self.http_obj.request(auth_url, 'GET', **params)
try:
return resp['x-auth-token'], resp['x-server-management-url']
except Exception:
raise
def keystone_auth(self, user, password, auth_url, service, tenant_name):
"""
Provides authentication via Keystone using v2 identity API.
"""
# Normalize URI to ensure /tokens is in it.
if 'tokens' not in auth_url:
auth_url = auth_url.rstrip('/') + '/tokens'
creds = {
'auth': {
'passwordCredentials': {
'username': user,
'password': password,
},
'tenantName': tenant_name,
}
}
headers = {'Content-Type': 'application/json'}
body = json.dumps(creds)
self._log_request('POST', auth_url, headers, body)
resp, resp_body = self.http_obj.request(auth_url, 'POST',
headers=headers, body=body)
self._log_response(resp, resp_body)
if resp.status == 200:
try:
auth_data = json.loads(resp_body)['access']
token = auth_data['token']['id']
except Exception, e:
print "Failed to obtain token for user: %s" % e
raise
mgmt_url = None
for ep in auth_data['serviceCatalog']:
if ep["type"] == service:
for _ep in ep['endpoints']:
if service in self.region and \
_ep['region'] == self.region[service]:
mgmt_url = _ep[self.endpoint_url]
if not mgmt_url:
mgmt_url = ep['endpoints'][0][self.endpoint_url]
break
if mgmt_url is None:
raise exceptions.EndpointNotFound(service)
return token, mgmt_url
elif resp.status == 401:
raise exceptions.AuthenticationFailure(user=user,
password=password)
raise exceptions.IdentityError('Unexpected status code {0}'.format(
resp.status))
def identity_auth_v3(self, user, password, auth_url, service,
project_name, domain_id='default'):
"""Provides authentication using Identity API v3."""
req_url = auth_url.rstrip('/') + '/auth/tokens'
creds = {
"auth": {
"identity": {
"methods": ["password"],
"password": {
"user": {
"name": user, "password": password,
"domain": {"id": domain_id}
}
}
},
"scope": {
"project": {
"domain": {"id": domain_id},
"name": project_name
}
}
}
}
headers = {'Content-Type': 'application/json'}
body = json.dumps(creds)
resp, body = self.http_obj.request(req_url, 'POST',
headers=headers, body=body)
if resp.status == 201:
try:
token = resp['x-subject-token']
except Exception:
self.LOG.exception("Failed to obtain token using V3"
" authentication (auth URL is '%s')" %
req_url)
raise
catalog = json.loads(body)['token']['catalog']
mgmt_url = None
for service_info in catalog:
if service_info['type'] != service:
continue # this isn't the entry for us.
endpoints = service_info['endpoints']
# Look for an endpoint in the region if configured.
if service in self.region:
region = self.region[service]
for ep in endpoints:
if ep['region'] != region:
continue
mgmt_url = ep['url']
# FIXME(blk-u): this isn't handling endpoint type
# (public, internal, admin).
break
if not mgmt_url:
# Didn't find endpoint for region, use the first.
ep = endpoints[0]
mgmt_url = ep['url']
# FIXME(blk-u): this isn't handling endpoint type
# (public, internal, admin).
break
return token, mgmt_url
elif resp.status == 401:
raise exceptions.AuthenticationFailure(user=user,
password=password)
else:
self.LOG.error("Failed to obtain token using V3 authentication"
" (auth URL is '%s'), the response status is %s" %
(req_url, resp.status))
raise exceptions.AuthenticationFailure(user=user,
password=password)
def post(self, url, body, headers):
return self.request('POST', url, headers, body)
def get(self, url, headers=None):
return self.request('GET', url, headers)
def delete(self, url, headers=None):
return self.request('DELETE', url, headers)
def patch(self, url, body, headers):
return self.request('PATCH', url, headers, body)
def put(self, url, body, headers):
return self.request('PUT', url, headers, body)
def head(self, url, headers=None):
return self.request('HEAD', url, headers)
def copy(self, url, headers=None):
return self.request('COPY', url, headers)
def get_versions(self):
resp, body = self.get('')
body = self._parse_resp(body)
body = body['versions']
versions = map(lambda x: x['id'], body)
return resp, versions
def _log_request(self, method, req_url, headers, body):
self.LOG.info('Request: ' + method + ' ' + req_url)
if headers:
print_headers = headers
if 'X-Auth-Token' in headers and headers['X-Auth-Token']:
token = headers['X-Auth-Token']
if len(token) > 64 and TOKEN_CHARS_RE.match(token):
print_headers = headers.copy()
print_headers['X-Auth-Token'] = "<Token omitted>"
self.LOG.debug('Request Headers: ' + str(print_headers))
if body:
str_body = str(body)
length = len(str_body)
self.LOG.debug('Request Body: ' + str_body[:2048])
if length >= 2048:
self.LOG.debug("Large body (%d) md5 summary: %s", length,
hashlib.md5(str_body).hexdigest())
def _log_response(self, resp, resp_body):
status = resp['status']
self.LOG.info("Response Status: " + status)
headers = resp.copy()
del headers['status']
if len(headers):
self.LOG.debug('Response Headers: ' + str(headers))
if resp_body:
str_body = str(resp_body)
length = len(str_body)
self.LOG.debug('Response Body: ' + str_body[:2048])
if length >= 2048:
self.LOG.debug("Large body (%d) md5 summary: %s", length,
hashlib.md5(str_body).hexdigest())
def _parse_resp(self, body):
return json.loads(body)
def response_checker(self, method, url, headers, body, resp, resp_body):
if (resp.status in set((204, 205, 304)) or resp.status < 200 or
method.upper() == 'HEAD') and resp_body:
raise exceptions.ResponseWithNonEmptyBody(status=resp.status)
#NOTE(afazekas):
# If the HTTP Status Code is 205
# 'The response MUST NOT include an entity.'
# A HTTP entity has an entity-body and an 'entity-header'.
# In the HTTP response specification (Section 6) the 'entity-header'
# 'generic-header' and 'response-header' are in OR relation.
# All headers not in the above two group are considered as entity
# header in every interpretation.
if (resp.status == 205 and
0 != len(set(resp.keys()) - set(('status',)) -
self.response_header_lc - self.general_header_lc)):
raise exceptions.ResponseWithEntity()
#NOTE(afazekas)
# Now the swift sometimes (delete not empty container)
# returns with non json error response, we can create new rest class
# for swift.
# Usually RFC2616 says error responses SHOULD contain an explanation.
# The warning is normal for SHOULD/SHOULD NOT case
# Likely it will cause an error
if not resp_body and resp.status >= 400:
self.LOG.warning("status >= 400 response with empty body")
def _request(self, method, url,
headers=None, body=None):
"""A simple HTTP request interface."""
req_url = "%s/%s" % (self.base_url, url)
self._log_request(method, req_url, headers, body)
resp, resp_body = self.http_obj.request(req_url, method,
headers=headers, body=body)
self._log_response(resp, resp_body)
self.response_checker(method, url, headers, body, resp, resp_body)
return resp, resp_body
def request(self, method, url,
headers=None, body=None):
retry = 0
if (self.token is None) or (self.base_url is None):
self._set_auth()
if headers is None:
headers = {}
headers['X-Auth-Token'] = self.token
resp, resp_body = self._request(method, url,
headers=headers, body=body)
while (resp.status == 413 and
'retry-after' in resp and
not self.is_absolute_limit(
resp, self._parse_resp(resp_body)) and
retry < MAX_RECURSION_DEPTH):
retry += 1
delay = int(resp['retry-after'])
time.sleep(delay)
resp, resp_body = self._request(method, url,
headers=headers, body=body)
self._error_checker(method, url, headers, body,
resp, resp_body)
return resp, resp_body
def _error_checker(self, method, url,
headers, body, resp, resp_body):
# NOTE(mtreinish): Check for httplib response from glance_http. The
# object can't be used here because importing httplib breaks httplib2.
# If another object from a class not imported were passed here as
# resp this could possibly fail
if str(type(resp)) == "<type 'instance'>":
ctype = resp.getheader('content-type')
else:
try:
ctype = resp['content-type']
# NOTE(mtreinish): Keystone delete user responses doesn't have a
# content-type header. (They don't have a body) So just pretend it
# is set.
except KeyError:
ctype = 'application/json'
# It is not an error response
if resp.status < 400:
return
JSON_ENC = ['application/json; charset=UTF-8', 'application/json',
'application/json; charset=utf-8']
# NOTE(mtreinish): This is for compatibility with Glance and swift
# APIs. These are the return content types that Glance api v1
# (and occasionally swift) are using.
TXT_ENC = ['text/plain; charset=UTF-8', 'text/html; charset=UTF-8',
'text/plain; charset=utf-8']
XML_ENC = ['application/xml', 'application/xml; charset=UTF-8']
if ctype in JSON_ENC or ctype in XML_ENC:
parse_resp = True
elif ctype in TXT_ENC:
parse_resp = False
else:
raise exceptions.RestClientException(str(resp.status))
if resp.status == 401 or resp.status == 403:
raise exceptions.Unauthorized()
if resp.status == 404:
raise exceptions.NotFound(resp_body)
if resp.status == 400:
if parse_resp:
resp_body = self._parse_resp(resp_body)
raise exceptions.BadRequest(resp_body)
if resp.status == 409:
if parse_resp:
resp_body = self._parse_resp(resp_body)
raise exceptions.Duplicate(resp_body)
if resp.status == 413:
if parse_resp:
resp_body = self._parse_resp(resp_body)
if self.is_absolute_limit(resp, resp_body):
raise exceptions.OverLimit(resp_body)
else:
raise exceptions.RateLimitExceeded(resp_body)
if resp.status == 422:
if parse_resp:
resp_body = self._parse_resp(resp_body)
raise exceptions.UnprocessableEntity(resp_body)
if resp.status in (500, 501):
message = resp_body
if parse_resp:
resp_body = self._parse_resp(resp_body)
#I'm seeing both computeFault and cloudServersFault come back.
#Will file a bug to fix, but leave as is for now.
if 'cloudServersFault' in resp_body:
message = resp_body['cloudServersFault']['message']
elif 'computeFault' in resp_body:
message = resp_body['computeFault']['message']
elif 'error' in resp_body: # Keystone errors
message = resp_body['error']['message']
raise exceptions.IdentityError(message)
elif 'message' in resp_body:
message = resp_body['message']
raise exceptions.ComputeFault(message)
if resp.status >= 400:
if parse_resp:
resp_body = self._parse_resp(resp_body)
raise exceptions.RestClientException(str(resp.status))
def is_absolute_limit(self, resp, resp_body):
if (not isinstance(resp_body, collections.Mapping) or
'retry-after' not in resp):
return True
over_limit = resp_body.get('overLimit', None)
if not over_limit:
return True
return 'exceed' in over_limit.get('message', 'blabla')
def wait_for_resource_deletion(self, id):
"""Waits for a resource to be deleted."""
start_time = int(time.time())
while True:
if self.is_resource_deleted(id):
return
if int(time.time()) - start_time >= self.build_timeout:
raise exceptions.TimeoutException
time.sleep(self.build_interval)
def is_resource_deleted(self, id):
"""
Subclasses override with specific deletion detection.
"""
message = ('"%s" does not implement is_resource_deleted'
% self.__class__.__name__)
raise NotImplementedError(message)

View File

@ -1,5 +1,18 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import signal
import time
@ -8,6 +21,7 @@ from fuel_health.common import log as logging
LOG = logging.getLogger(__name__)
class FuelTestAssertMixin(object):
"""
Mixin class with a set of assert methods created to abstract
@ -55,16 +69,16 @@ class FuelTestAssertMixin(object):
status_msg = human_readable_statuses[status].format(
status=status, appl=appl)
else:
status_msg = human_readable_status_groups.get(status / 100,
unknown_msg).format(status=status, appl=appl)
status_msg = human_readable_status_groups.get(
status / 100, unknown_msg).format(status=status, appl=appl)
failed_step_msg = ''
if failed_step:
failed_step_msg = ('Step %s failed: ' % str(failed_step))
self.fail(''.join((failed_step_msg +
'Status - {status} '.format(status=status),
status_msg, '\n', msg)))
'Status - {status} '.format(
status=status), status_msg, '\n', msg)))
def verify_response_body(self, body, content='', msg='', failed_step=''):
"""
@ -142,7 +156,6 @@ class FuelTestAssertMixin(object):
return result
class TimeOutError(Exception):
def __init__(self):
Exception.__init__(self)
@ -173,10 +186,6 @@ class timeout(object):
if exc_type is not TimeOutError:
return False # never swallow other exceptions
else:
msg = "Time limit exceeded while waiting" \
" for {call} to finish."\
.format(call=self.action)
msg = ("Time limit exceeded while waiting for {call} to "
"finish.").format(call=self.action)
raise AssertionError(msg)

Binary file not shown.

View File

@ -717,7 +717,8 @@ class SmokeChecksTest(OfficialClientTest):
def _attach_volume_to_instance(self, volume, instance):
device = '/dev/vdb'
attached_volume = self.volume_client.volumes.attach(volume, instance, mountpoint=device)
attached_volume = self.compute_client.volumes.create_server_volume(
volume_id=volume.id, server_id=instance, device=device)
return attached_volume
def _detach_volume(self, client, volume):

View File

@ -28,6 +28,7 @@ from fuel_health.common.test_mixins import FuelTestAssertMixin
LOG = logging.getLogger(__name__)
class BaseTestCase(unittest2.TestCase,
testresources.ResourcedTestCase,
FuelTestAssertMixin):
@ -126,4 +127,4 @@ class TestCase(BaseTestCase):
conf.compute.build_timeout,
conf.compute.build_interval):
self.fail("Timed out waiting to become %s"
% (expected_status))
% expected_status)