Initial Commit

This commit is contained in:
samu4924 2013-03-30 14:50:01 -05:00
commit 24bf38f024
51 changed files with 3140 additions and 0 deletions

0
AUTHORS.md Normal file
View File

0
HISTORY.md Normal file
View File

0
HISTORY.rst Normal file
View File

13
LICENSE Normal file
View File

@ -0,0 +1,13 @@
# Copyright 2013 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

1
MANIFEST.in Normal file
View File

@ -0,0 +1 @@
include README.md LICENSE NOTICE HISTORY.md pip-requires

0
NOTICE Normal file
View File

68
README.md Normal file
View File

@ -0,0 +1,68 @@
CloudRoast, CloudCAFE Test Repo
================================
<pre>
(----) (----)--)
(--(----) (----) --)
(----) (--(----) (----)
-----------------------
\ /
\ /
\_________________/
)\ ` `(` `
( ) ` )' ) \_
( ) _) \ )
) ) (_ ) , (
( , ) ( (
( ( ) )
=== CloudRoast ===
= A CloudCAFE Test Repository =
</pre>
CloudRoast is a rich, full bodied blend of premium roasted automated test cases. CloudRoast tests are based on the expanded unittest driver in the
[Open CAFE Core](https://github.com/stackforge) and built using the [CloudCAFE Framework](https://github.com/stackforge).
CloudRoast tests support smoke, functional, integration, scenario and reliability based test cases for OpenStack. It is meant to be highly flexible
and leave the logic of the testing in the hands of the test case developer while leaving the interactions with OpenStack, various resources and
support infrastructure to CloudCAFE.
Installation
------------
CloudRoast can be [installed with pip](https://pypi.python.org/pypi/pip) from the git repository after it is cloned to a local machine.
* First follow the README instructions to install the [CloudCAFE Framework](https://github.com/stackforge)
* Clone this repository to your local machine
* CD to the root directory in your cloned repository.
* Run "pip install . --upgrade" and pip will auto install all other dependencies.
Configuration
--------------
CloudRoast runs on the [CloudCAFE Framework](https://github.com/stackforge) using the cafe-runner. It relies on the configurations installed to:
<USER_HOME>/.cloudcafe/configs/<PRODUCT> by CloudCAFE.
At this stage you will have the Open CAFE Core engine, the CloudCAFE Framework implementation and the Open Source automated test cases. You are now
ready to:
1) Execute the test cases against a deployed Open Stack.
or
2) Write entirely new tests in this repository using the CloudCAFE Framework.
Logging
-------
If tests are executed with the built-in cafe-runner, runtime logs will be output to
<USER_HOME>/.cloudcafe/logs/<PRODUCT>/<CONFIGURATION>/<TIME_STAMP>.
In addition, tests built from the built-in CAFE unittest driver will generate
csv statistics files in <USER_HOME>/.cloudcafe/logs/<PRODUCT>/<CONFIGURATION>/statistics for each and ever execution of each and every test case that
provides metrics of execution over time for elapsed time, pass/fail rates, etc...
Basic CloudRoast Package Anatomy
-------------------------------
Below is a short description of the top level CloudRoast Packages.
##test_repo
This is the root package for all automated tests. This is namespace is currently **required** by the cafe-runner for any Test Repository plug-in.
##identity
OpenStack Identity Service cafe-runner plug-in test cases.
##compute
OpenStack Compute Service cafe-runner plug-in test cases.

0
pip-requires Normal file
View File

94
setup.py Normal file
View File

@ -0,0 +1,94 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import sys
import test_repo
try:
from setuptools import setup, find_packages
except ImportError:
from distutils.core import setup, find_packages
if sys.argv[-1] == 'publish':
os.system('python setup.py sdist upload')
sys.exit()
requires = open('pip-requires').readlines()
setup(
name='test_repo',
version=test_repo.__version__,
description='CloudCAFE based automated test repository for OpenStack',
long_description='{0}\n\n{1}'.format(
open('README.md').read(),
open('HISTORY.md').read()),
author='Rackspace Cloud QE',
author_email='cloud-cafe@lists.rackspace.com',
url='http://rackspace.com',
packages=find_packages(exclude=[]),
package_data={'': ['LICENSE', 'NOTICE']},
package_dir={'test_repo': 'test_repo'},
include_package_data=True,
install_requires=requires,
license=open('LICENSE').read(),
zip_safe=False,
#https://the-hitchhikers-guide-to-packaging.readthedocs.org/en/latest/specification.html
classifiers=(
'Development Status :: 1 - Planning',
'Intended Audience :: Developers',
'Natural Language :: English',
'License :: Other/Proprietary License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
)
)
''' @todo: need to clean this up or do it with puppet/chef '''
# Default Config Options
root_dir = "{0}/.cloudcafe".format(os.path.expanduser("~"))
config_dir = "{0}/configs".format(root_dir)
# Build Default directories
if(os.path.exists("{0}/engine.config".format(config_dir)) == False):
raise Exception("Core CAFE Engine configuration not found")
else:
# Copy over the default configurations
if(os.path.exists("~install")):
os.remove("~install")
# Report
print('\n'.join(["\t\t (----) (----)--)",
"\t\t (--(----) (----) --)",
"\t\t(----) (--(----) (----)",
"\t\t-----------------------",
"\t\t\ /",
"\t\t \ /",
"\t\t \_________________/",
"\t\t )\ ` `(` `",
"\t\t ( ) ` )' ) \_",
"\t\t ( ) _) \ )",
"\t\t ) ) (_ ) , (",
"\t\t ( , ) ( (",
"\t\t ( ( ) )",
"\t\t === CloudRoast ===",
"\t\t= A CloudCAFE Test Repository ="]))
else:
# State file
temp = open("~install", "w")
temp.close()

22
test_repo/__init__.py Normal file
View File

@ -0,0 +1,22 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
__title__ = 'test_repo'
__version__ = '0.0.1'
#__build__ = 0x010100
__author__ = 'Rackspace Cloud QE'
__license__ = 'Internal Only'
__copyright__ = 'Copyright 2013 Rackspace Inc.'

BIN
test_repo/__init__.pyc Normal file

Binary file not shown.

View File

@ -0,0 +1,16 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

Binary file not shown.

View File

@ -0,0 +1,214 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
'''
@summary: Base Classes for Compute Test Suites (Collections of Test Cases)
@note: Correspondes DIRECTLY TO A unittest.TestCase
@see: http://docs.python.org/library/unittest.html#unittest.TestCase
@copyright: Copyright (c) 2012 Rackspace US, Inc.
'''
import os
from cafe.drivers.unittest.fixtures import BaseTestFixture
from cloudcafe.common.resources import ResourcePool
from cloudcafe.compute.common.exceptions import TimeoutException, \
BuildErrorException
from cloudcafe.compute.common.types import NovaServerStatusTypes as ServerStates
from cloudcafe.compute.common.datagen import rand_name
from cloudcafe.compute.common.exception_handler import ExceptionHandler
from cloudcafe.compute.flavors_api.client import FlavorsClient
from cloudcafe.compute.servers_api.client import ServersClient
from cloudcafe.compute.images_api.client import ImagesClient
from cloudcafe.compute.servers_api.behaviors import ServerBehaviors
from cloudcafe.compute.images_api.behaviors import ImageBehaviors
from cloudcafe.compute.config import ComputeConfig
from cloudcafe.compute.flavors_api.config import FlavorsConfig
from cloudcafe.compute.images_api.config import ImagesConfig
from cloudcafe.compute.servers_api.config import ServersConfig
from cloudcafe.identity.v2_0.tokens_api.client import TokenAPI_Client
from cloudcafe.identity.v2_0.tokens_api.behaviors import TokenAPI_Behaviors
from cloudcafe.identity.v2_0.tokens_api.config import TokenAPI_Config
class ComputeFixture(BaseTestFixture):
'''
@summary: Fixture for an Compute test.
'''
@classmethod
def setUpClass(cls):
super(ComputeFixture, cls).setUpClass()
cls.flavors_config = FlavorsConfig()
cls.images_config = ImagesConfig()
cls.servers_config = ServersConfig()
cls.compute_config = ComputeConfig()
cls.flavor_ref = cls.flavors_config.primary_flavor
cls.flavor_ref_alt = cls.flavors_config.secondary_flavor
cls.image_ref = cls.images_config.primary_image
cls.image_ref_alt = cls.images_config.secondary_image
cls.disk_path = cls.servers_config.instance_disk_path
identity_config = TokenAPI_Config()
token_client = TokenAPI_Client(identity_config.authentication_endpoint,
'json', 'json')
token_behaviors = TokenAPI_Behaviors(token_client)
access_data = token_behaviors.get_access_data(identity_config.username,
identity_config.password,
identity_config.tenant_name)
compute_service = access_data.get_service(
cls.compute_config.compute_endpoint_name)
url = compute_service.get_endpoint(
cls.compute_config.region).public_url
cls.flavors_client = FlavorsClient(url, access_data.token.id_,
'json', 'json')
cls.servers_client = ServersClient(url, access_data.token.id_,
'json', 'json')
cls.images_client = ImagesClient(url, access_data.token.id_,
'json', 'json')
cls.server_behaviors = ServerBehaviors(cls.servers_client,
cls.servers_config,
cls.images_config,
cls.flavors_config)
cls.image_behaviors = ImageBehaviors(cls.images_client,
cls.images_config)
cls.flavors_client.add_exception_handler(ExceptionHandler())
cls.resources = ResourcePool()
@classmethod
def tearDownClass(cls):
super(ComputeFixture, cls).tearDownClass()
cls.flavors_client.delete_exception_handler(ExceptionHandler())
cls.resources.release()
@classmethod
def parse_image_id(self, image_response):
"""
@summary: Extract Image Id from Image response
@param image_response: Image response
@type image_ref: string
@return: Image id
@rtype: string
"""
image_ref = image_response.headers['location']
return image_ref.rsplit('/')[-1]
def verify_server_event_details(self, server, image, flavor, event):
'''
@summary: Verifies the common attributes for all compute events
@param event: Contains event details (actual data) to be verified.
@type event: Dictionary
'''
failure = 'Expected {0} field in event to be {1}, was {2}'
image_id = event.payload.image_ref_url.rsplit('/')[-1]
self.assertEqual(server.tenant_id, event.payload.tenant_id,
msg=failure.format('tenant id', server.tenant_id,
event.payload.tenant_id))
self.assertEqual(event.payload.user_id, server.user_id,
msg=failure.format('user id', server.user_id,
event.payload.user_id))
self.assertEqual(event.payload.instance_type_id, int(server.flavor.id),
msg=failure.format('flavor id', server.flavor.id,
event.payload.instance_type_id))
self.assertEqual(event.payload.instance_type, flavor.name,
msg=failure.format('flavor name', flavor.name,
event.payload.instance_type))
self.assertEqual(event.payload.memory_mb, flavor.ram,
msg=failure.format('RAM size', flavor.ram,
event.payload.memory_mb))
self.assertEqual(event.payload.disk_gb, flavor.disk,
msg=failure.format('disk size', flavor.disk,
event.payload.disk_gb))
self.assertEqual(event.payload.instance_id, server.id,
msg=failure.format('server id', server.id,
event.payload.instance_id))
self.assertEqual(event.payload.display_name, server.name,
msg=failure.format('server name', server.name,
event.payload.display_name))
self.assertEqual(image_id, server.image.id,
msg=failure.format('image id', server.image.id,
image_id))
class CreateServerFixture(ComputeFixture):
'''
@summary: Creates a server using defaults from the test data,
waits for active state.
'''
@classmethod
def setUpClass(cls, name=None,
imageRef=None, flavorRef=None,
personality=None, metadata=None,
diskConfig=None, networks=None):
'''
@summary:Creates a server and waits for server to reach active status
@param name: The name of the server.
@type name: String
@param image_ref: The reference to the image used to build the server.
@type image_ref: String
@param flavor_ref: The flavor used to build the server.
@type flavor_ref: String
@param meta: A dictionary of values to be used as metadata.
@type meta: Dictionary. The limit is 5 key/values.
@param personality: A list of dictionaries for files to be
injected into the server.
@type personality: List
@param disk_config: MANUAL/AUTO/None
@type disk_config: String
@param networks:The networks to which you want to attach the server
@type networks: String
'''
super(CreateServerFixture, cls).setUpClass()
if name is None:
name = rand_name('testserver')
if imageRef is None:
imageRef = cls.image_ref
if flavorRef is None:
flavorRef = cls.flavor_ref
cls.flavor_ref = flavorRef
cls.image_ref = imageRef
resp = cls.servers_client.create_server(name, imageRef,
flavorRef,
personality=personality,
metadata=metadata,
disk_config=diskConfig,
networks=networks)
cls.created_server = resp.entity
try:
wait_response = cls.server_behaviors.wait_for_server_status(
cls.created_server.id,
ServerStates.ACTIVE)
wait_response.entity.admin_pass = cls.created_server.admin_pass
except TimeoutException as exception:
cls.assertClassSetupFailure(exception.message)
except BuildErrorException as exception:
cls.assertClassSetupFailure(exception.message)
finally:
cls.resources.add(cls.created_server.id,
cls.servers_client.delete_server)
cls.server_response = wait_response
if cls.server_response.entity.status != ServerStates.ACTIVE:
cls.assertClassSetupFailure('Server %s did not reach active state',
cls.created_server.id)
@classmethod
def tearDownClass(cls):
super(CreateServerFixture, cls).tearDownClass()

Binary file not shown.

View File

@ -0,0 +1,16 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

Binary file not shown.

View File

@ -0,0 +1,17 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
__author__ = 'dwalleck'

Binary file not shown.

View File

@ -0,0 +1,236 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.exceptions import BadRequest, ItemNotFound
from test_repo.compute.fixtures import ComputeFixture
class FlavorsTest(ComputeFixture):
def test_list_flavors(self):
""" List of all flavors should contain the expected flavor """
response = self.flavors_client.list_flavors()
flavors = response.entity
self.assertTrue(len(flavors) > 0)
response = self.flavors_client.get_flavor_details(self.flavor_ref)
flavor = response.entity
for each in flavors:
if flavor.id == each.id:
return
self.fail("The expected flavor: %s not found in the flavor list." % flavor.id)
def test_list_flavors_with_detail(self):
""" Detailed list of all flavors should contain the expected flavor """
response = self.flavors_client.list_flavors_with_detail()
flavors = response.entity
self.assertTrue(len(flavors) > 0)
response = self.flavors_client.get_flavor_details(self.flavor_ref)
flavor = response.entity
self.assertIn(flavor, flavors, "The expected flavor: %s not found in the flavor list." % flavor.id)
def test_get_flavor(self):
""" The expected flavor details should be returned """
response = self.flavors_client.get_flavor_details(self.flavor_ref)
flavor = response.entity
self.assertEqual(self.flavor_ref, flavor.id, "Could not retrieve the expected flavor.")
def test_get_non_existent_flavor(self):
"""flavor details are not returned for non existent flavors"""
try:
self.flavors_client.get_flavor_details(999)
self.fail('No exception thrown for a non-existent flavor id')
except ItemNotFound:
pass
def test_list_flavors_limit_results(self):
"""Only the expected number of flavors should be returned"""
response = self.flavors_client.list_flavors(limit=1)
flavors = response.entity
self.assertEqual(1, len(flavors),
"The length of flavor list was %s instead of 1" % len(flavors))
def test_list_flavors_detailed_limit_results(self):
"""Only the expected number of flavors (detailed) should be returned"""
response = self.flavors_client.list_flavors_with_detail(limit=1)
flavors = response.entity
self.assertEqual(1, len(flavors),
"The length of flavor list was %s instead of 1" % len(flavors))
def test_list_flavors_using_marker(self):
"""The list of flavors should start from the provided marker"""
response = self.flavors_client.list_flavors()
flavors = response.entity
flavors.sort(key=lambda k: k.id)
# Filter out any flavors of the same size
filter_criteria = lambda x: x.id > flavors[1].id
expected_flavors = filter(filter_criteria, flavors)
response = self.flavors_client.list_flavors(marker=flavors[1].id)
actual_flavors = response.entity
actual_flavors.sort(key=lambda k: k.id)
expected_flavors.sort(key=lambda k: k.id)
self.assertEqual(actual_flavors, expected_flavors,
msg='Filtered flavor was incorrectly \
included in the list of returned flavors')
def test_list_flavors_detailed_using_marker(self):
"""The list of flavors should start from the provided marker"""
response = self.flavors_client.list_flavors_with_detail()
flavors = response.entity
flavors.sort(key=lambda k: k.id)
# Filter out any flavors of the same size
filter_criteria = lambda x: x.id > flavors[1].id
expected_flavors = filter(filter_criteria, flavors)
response = self.flavors_client.list_flavors_with_detail(marker=flavors[1].id)
actual_flavors = response.entity
actual_flavors.sort(key=lambda k: k.id)
expected_flavors.sort(key=lambda k: k.id)
self.assertEqual(actual_flavors, expected_flavors,
msg='Filtered flavors list does not begin at provided marker')
def test_list_flavors_detailed_filter_by_min_disk(self):
"""The detailed list of flavors should be filtered by disk space"""
response = self.flavors_client.list_flavors_with_detail()
flavors = response.entity
flavors.sort(key=lambda k: int(k.disk))
# Filter out any flavors of the same size
filter_criteria = lambda x: int(x.disk) >= int(flavors[1].disk)
expected_flavors = filter(filter_criteria, flavors)
response = self.flavors_client.list_flavors_with_detail(min_disk=flavors[1].disk)
actual_flavors = response.entity
actual_flavors.sort(key=lambda k: k.id)
expected_flavors.sort(key=lambda k: k.id)
self.assertEqual(actual_flavors, expected_flavors,
msg="A flavor with min_disk lower than %s was returned" % (flavors[1].disk))
def test_list_flavors_detailed_filter_by_min_ram(self):
"""The detailed list of flavors should be filtered by RAM"""
response = self.flavors_client.list_flavors_with_detail()
flavors = response.entity
flavors.sort(key=lambda k: int(k.ram))
# Filter out any flavors of the same size
filter_criteria = lambda x: int(x.ram) >= int(flavors[1].ram)
expected_flavors = filter(filter_criteria, flavors)
response = self.flavors_client.list_flavors_with_detail(min_ram=flavors[1].ram)
actual_flavors = response.entity
actual_flavors.sort(key=lambda k: k.id)
expected_flavors.sort(key=lambda k: k.id)
self.assertEqual(actual_flavors, expected_flavors,
msg="A flavor with min_ram lower than %s was returned" % (flavors[1].ram))
def test_list_flavors_filter_by_min_disk(self):
"""The list of flavors should be filtered by disk space"""
response = self.flavors_client.list_flavors_with_detail()
flavors = response.entity
flavors.sort(key=lambda k: int(k.disk))
# Filter out any flavors of the same size
filter_criteria = lambda x: int(x.disk) >= int(flavors[1].disk)
expected_flavors = filter(filter_criteria, flavors)
response = self.flavors_client.list_flavors(min_disk=flavors[1].disk)
actual_flavors = response.entity
actual_flavors.sort(key=lambda k: k.id)
expected_flavors.sort(key=lambda k: k.id)
self.assertEqual(actual_flavors, expected_flavors,
msg="A flavor with min_disk lower than %s was returned" % (flavors[1].disk))
def test_list_flavors_filter_by_min_ram(self):
"""The list of flavors should be filtered by RAM"""
response = self.flavors_client.list_flavors_with_detail()
flavors = response.entity
flavors.sort(key=lambda k: int(k.ram))
# Filter out any flavors of the same size
filter_criteria = lambda x: int(x.ram) >= int(flavors[1].ram)
expected_flavors = filter(filter_criteria, flavors)
response = self.flavors_client.list_flavors(min_ram=flavors[1].ram)
actual_flavors = response.entity
actual_flavors.sort(key=lambda k: k.id)
expected_flavors.sort(key=lambda k: k.id)
self.assertEqual(actual_flavors, expected_flavors,
msg="A flavor with min_disk lower than %s was returned" % flavors[1].ram)
def test_list_flavors_detailed_filter_by_invalid_min_disk(self):
"""The detailed list of flavors should be filtered by disk space"""
with self.assertRaises(BadRequest):
response = self.flavors_client.list_flavors_with_detail(min_disk='invalid_disk')
flavors = response.entity
self.assertTrue(len(flavors) == 0,
msg="The list of flavors is not empty for \
an invalid min disk value")
def test_list_flavors_detailed_filter_by_invalid_min_ram(self):
"""The detailed list of flavors should be filtered by RAM"""
with self.assertRaises(BadRequest):
response = self.flavors_client.list_flavors_with_detail(min_ram='invalid_ram')
flavors = response.entity
self.assertTrue(len(flavors) == 0,
msg="The list of flavors is not empty for \
an invalid min RAM value")
def test_list_flavors_filter_by_invalid_min_disk(self):
"""The detailed list of flavors should be filtered by disk space"""
with self.assertRaises(BadRequest):
response = self.flavors_client.list_flavors(min_disk='invalid_disk')
flavors = response.entity
self.assertTrue(len(flavors) == 0,
msg="The list of flavors is not empty for an \
invalid min disk value")
def test_list_flavors_filter_by_invalid_min_ram(self):
"""The detailed list of flavors should be filtered by RAM"""
with self.assertRaises(BadRequest):
response = self.flavors_client.list_flavors(min_ram='invalid_ram')
flavors = response.entity
self.assertTrue(len(flavors) == 0,
msg="The list of flavors is not empty for \
an invalid min RAM value")
def test_list_flavors_detailed_filter_min_disk_value_greater_than_max_flavor_disk(self):
"""The detailed list of flavors should be filtered by disk space"""
response = self.flavors_client.list_flavors_with_detail(min_disk='99999')
flavors = response.entity
self.assertTrue(len(flavors) == 0,
msg="The list of flavors is not empty for the value \
of min disk greater then max flavor disk size.")
def test_list_flavors_detailed_filter_min_ram_value_greater_than_max_flavor_ram(self):
"""The detailed list of flavors should be filtered by RAM"""
response = self.flavors_client.list_flavors_with_detail(min_ram='99999')
flavors = response.entity
self.assertTrue(len(flavors) == 0,
msg="The list of flavors is not empty for the value \
of min RAM greater then max flavor RAM size.")
def test_list_flavors_filter_min_disk_value_greater_than_max_flavor_disk(self):
"""The detailed list of flavors should be filtered by disk space"""
response = self.flavors_client.list_flavors(min_disk='99999')
flavors = response.entity
self.assertTrue(len(flavors) == 0,
msg="The list of flavors is not empty for the value \
of min disk greater then max flavor disk size.")
def test_list_flavors_filter_min_disk_value_greater_than_max_flavor_ram(self):
"""The detailed list of flavors should be filtered by RAM"""
response = self.flavors_client.list_flavors(min_ram='99999')
flavors = response.entity
self.assertTrue(len(flavors) == 0,
msg="The list of flavors is not empty for the value \
of min RAM greater then max flavor RAM size.")

Binary file not shown.

View File

@ -0,0 +1,16 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

Binary file not shown.

View File

@ -0,0 +1,56 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.datagen import rand_name
from cloudcafe.compute.common.types import NovaImageStatusTypes
from test_repo.compute.fixtures import CreateServerFixture
class ImagesTest(CreateServerFixture):
@classmethod
def setUpClass(cls):
super(ImagesTest, cls).setUpClass()
cls.name = rand_name('testserver')
cls.server = cls.server_response.entity
@classmethod
def tearDownClass(cls):
super(ImagesTest, cls).tearDownClass()
@tags(type='smoke', net='no')
def test_create_delete_image(self):
"""An image for the provided server should be created"""
name = rand_name('testimage')
server_id = self.server.id
image_response = self.servers_client.create_image(server_id, name)
image_id = self.parse_image_id(image_response)
self.image_behaviors.wait_for_image_status(image_id,
NovaImageStatusTypes.ACTIVE)
# Delete image and wait for image to be deleted
self.image_behaviors.wait_for_image_to_be_deleted(image_id)
@tags(type='smoke', net='no')
def test_get_image(self):
'''The expected image should be returned'''
image_response = self.images_client.get_image(self.image_ref)
image = image_response.entity
self.assertEqual(self.image_ref, image.id,
"Could not retrieve the expected image with id %s" %
(image.id))

Binary file not shown.

View File

@ -0,0 +1,100 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest2 as unittest
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.datagen import rand_name
from cloudcafe.compute.common.exceptions import ItemNotFound
from cloudcafe.compute.common.types import NovaImageStatusTypes
from test_repo.compute.fixtures import ComputeFixture
class ImagesMetadataTest(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ImagesMetadataTest, cls).setUpClass()
def setUp(self):
super(ImagesMetadataTest, self).setUp()
self.server_resp = self.server_behaviors.create_active_server()
self.server_id = self.server_resp.entity.id
self.resources.add(self.server_id, self.servers_client.delete_server)
meta = {'key1': 'value1', 'key2': 'value2'}
name = rand_name('testimage')
image_resp = self.servers_client.create_image(self.server_id, name, meta)
self.image_id = self.parse_image_id(image_resp)
self.resources.add(self.image_id, self.images_client.delete_image)
self.image_behaviors.wait_for_image_resp_code(self.image_id, 200)
self.image_behaviors.wait_for_image_status(self.image_id, NovaImageStatusTypes.ACTIVE)
self.image = self.images_client.get_image(self.image_id)
@classmethod
def tearDownClass(cls):
super(ImagesMetadataTest, cls).tearDownClass()
@tags(type='negative', net='no')
def test_delete_nonexistant_image_metadata_item(self):
"""User should not be able to delete a metadata which does not exist"""
with self.assertRaises(ItemNotFound):
self.images_client.delete_image_metadata_item(self.image_id,
'meta_key_5')
@tags(type='negative', net='no')
def test_get_nonexistent_image_metadata_item(self):
"""User should not be able to perform a get on an image metadata which does not exist"""
with self.assertRaises(ItemNotFound):
self.images_client.get_image_metadata_item(self.image_id,
'meta_key_5')
@tags(type='positive', net='no')
def test_list_image_metadata(self):
"""All metadata key/value pairs for an image should be returned"""
image_metadata = self.images_client.list_image_metadata(self.image_id)
self.assertEqual('value1', image_metadata.entity.key1,
"The metadata is not same as expected.")
self.assertEqual('value2', image_metadata.entity.key2,
"The metadata is not same as expected.")
@tags(type='positive', net='no')
def test_set_image_metadata(self):
"""Test user should be able to set the metadata of an image"""
meta = {'key3': 'meta3', 'key4': 'meta4'}
self.images_client.set_image_metadata(self.image_id, meta)
image_metadata = self.images_client.list_image_metadata(self.image_id)
self.assertEqual('meta3', image_metadata.entity.key3,
"The metadata is not same as expected.")
self.assertEqual('meta4', image_metadata.entity.key4,
"The metadata is not same as expected.")
self.assertFalse(hasattr(image_metadata.entity, 'key1'))
self.assertFalse(hasattr(image_metadata.entity, 'key2'))
@tags(type='positive', net='no')
def test_get_image_metadata_item(self):
"""The value for a specific metadata key should be returned"""
meta_resp = self.images_client.get_image_metadata_item(self.image_id, 'key2')
self.assertTrue('value2', meta_resp.text)
@tags(type='positive', net='no')
def test_delete_image_metadata_item(self):
"""The metadata value/key pair should be deleted from the image"""
self.images_client.delete_image_metadata_item(self.image_id, 'key1')
metadata_resp = self.images_client.list_image_metadata(self.image_id)
self.assertFalse(hasattr(metadata_resp.entity, 'key1'),
msg="The metadata did not get deleted.")

View File

@ -0,0 +1,51 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.exceptions import ItemNotFound
from test_repo.compute.fixtures import ComputeFixture
class ImagesMetadataNegativeTest(ComputeFixture):
@tags(type='negative', net='no')
def test_list_image_metadata_for_nonexistent_image(self):
"""List on nonexistent image metadata should fail"""
with self.assertRaises(ItemNotFound):
self.images_client.list_image_metadata(999)
@tags(type='negative', net='no')
def test_get_image_metadata_item_for_nonexistent_image(self):
"""Get metadata of a nonexistent image should fail"""
with self.assertRaises(ItemNotFound):
self.images_client.get_image_metadata_item(999, 'key2')
@tags(type='negative', net='no')
def test_set_image_metadata_item_for_nonexistent_image(self):
""""Metadata item should not be set for a nonexistent image"""
meta = {'meta_key_1': 'meta_value_1'}
with self.assertRaises(ItemNotFound):
self.images_client.set_image_metadata_item(999, 'meta_key_1',
'meta_value_1')
@tags(type='negative', net='no')
def test_delete_image_metadata_item_for_nonexistent_image(self):
"""Should not be able to delete metadata item of nonexistent image"""
try:
self.images_client.delete_image_metadata_item(999, 'key1')
self.fail("No exception thrown for delete image metadata for non existent image")
except:
pass

View File

@ -0,0 +1,59 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.exceptions import ItemNotFound
from cloudcafe.compute.common.types import NovaImageStatusTypes
from test_repo.compute.fixtures import CreateServerFixture
class ImagesTest(CreateServerFixture):
@classmethod
def setUpClass(cls):
super(ImagesTest, cls).setUpClass()
cls.server = cls.server_response.entity
@classmethod
def tearDownClass(cls):
super(ImagesTest, cls).tearDownClass()
@tags(type='negative', net='no')
def test_create_image_invalid_server_id(self):
"""Image creation should fail if the server id does not exist"""
with self.assertRaises(ItemNotFound):
self.servers_client.create_image(999, 'test_neg')
@tags(type='negative', net='no')
def test_delete_image_invalid_id(self):
"""Image deletion should fail if the image id does not exist"""
with self.assertRaises(ItemNotFound):
self.images_client.delete_image(999)
@tags(type='negative', net='no')
def test_create_image_invalid_server_name(self):
"""Image creation should fail if the image name is blank"""
try:
image_resp = self.servers_client.create_image(self.server.id, '')
except:
pass
else:
image_id = self.parse_image_id(image_resp)
self.image_behaviors.wait_for_image_resp_code(image_id, 200)
self.image_behaviors.wait_for_image_status(image_id,
NovaImageStatusTypes.ACTIVE)
self.images_client.delete_image(image_id)
self.fail('The create request should have failed since the name was blank.')

View File

@ -0,0 +1,257 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.datagen import rand_name
from cloudcafe.compute.common.types import NovaImageStatusTypes
from test_repo.compute.fixtures import ComputeFixture
class ImageListTest(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ImageListTest, cls).setUpClass()
cls.server1 = cls.server_behaviors.create_active_server()
cls.server2 = cls.server_behaviors.create_active_server()
cls.server1_id = cls.server1.entity.id
cls.server2_id = cls.server2.entity.id
cls.resources.add(cls.server1_id, cls.servers_client.delete_server)
cls.resources.add(cls.server2_id, cls.servers_client.delete_server)
image1_name = rand_name('testimage')
image1_resp = cls.servers_client.create_image(cls.server1_id,
image1_name)
assert image1_resp.status_code == 202
cls.image1_id = cls.parse_image_id(image1_resp)
cls.image_behaviors.wait_for_image_status(cls.image1_id, NovaImageStatusTypes.ACTIVE)
image2_name = rand_name('testimage')
image2_resp = cls.servers_client.create_image(cls.server2_id,
image2_name)
assert image2_resp.status_code == 202
cls.image2_id = cls.parse_image_id(image2_resp)
cls.image_behaviors.wait_for_image_status(cls.image2_id, NovaImageStatusTypes.ACTIVE)
cls.image_1 = cls.images_client.get_image(cls.image1_id).entity
cls.image_2 = cls.images_client.get_image(cls.image2_id).entity
cls.resources.add(cls.image1_id, cls.images_client.delete_image)
cls.resources.add(cls.image2_id, cls.images_client.delete_image)
@classmethod
def tearDownClass(cls):
super(ImageListTest, cls).tearDownClass()
@tags(type='smoke', net='no')
def test_list_images_with_detail(self):
"""Detailed list of all images should contain the expected images"""
images = self.images_client.list_images_with_detail()
images = [image.id for image in images.entity]
self.assertTrue(self.image_1.id in images,
msg="Image %s should have been in the list of images." % self.image_1.id)
self.assertTrue(self.image_2.id in images,
msg="Image %s should have been in the list of images." % self.image_2.id)
@tags(type='positive', net='no')
def test_list_images_with_detail_filter_by_status(self):
"""Detailed list of all images should contain the expected images filtered by status"""
image_status = 'ACTIVE'
images = self.images_client.list_images_with_detail(status=image_status)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image_1.id in filtered_images,
msg="Image %s should have been in the list of images." % self.image_1.id)
self.assertTrue(self.image_2.id in filtered_images,
msg="Image %s should have been in the list of images." % self.image_2.id)
@tags(type='positive', net='no')
def test_list_images_with_detail_filter_by_name(self):
"""Detailed list of all images should contain the expected images filtered by name"""
image_name = self.image_1.name
images = self.images_client.list_images_with_detail(image_name=image_name)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image_1.id in filtered_images,
msg="Image %s should have been in the list of images." % self.image_1.id)
self.assertTrue(self.image_2.id not in filtered_images,
msg="Image %s should have been in the list of images." % self.image_2.id)
@tags(type='positive', net='no')
def test_list_images_with_detail_filter_by_server_ref(self):
"""Detailed list of servers should be filtered by server_id of the image"""
server_ref = self.image_2.server.links.self
images = self.images_client.list_images_with_detail(server_ref=server_ref)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image_1.id not in filtered_images,
msg="The image %s is found in the image list." % self.image_1.id)
self.assertTrue(self.image_2.id in filtered_images,
msg="The image %s is not found in the image list." % self.image_2.id)
@tags(type='positive', net='no')
def test_list_images_with_detail_filter_by_server_id(self):
"""Detailed list of servers should be filtered by server id from which the image was created"""
server_id = self.server2_id
images = self.images_client.list_images_with_detail(server_ref=server_id)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image_1.id not in filtered_images,
msg="The image %s is found in the image list." % self.image_1.id)
self.assertTrue(self.image_2.id in filtered_images,
msg="The image %s is not found in the image list." % self.image_2.id)
@tags(type='positive', net='no')
def test_list_images_with_detail_filter_by_type(self):
"""The detailed list of servers should be filtered by image type"""
type = self.image_2.metadata.image_type
images = self.images_client.list_images_with_detail(image_type=type)
image_3 = self.images_client.get_image(self.image_ref)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image_1.id in filtered_images,
msg="The image %s is not found in the image list." % self.image_1.id)
self.assertTrue(self.image_2.id in filtered_images,
msg="The image %s is not found in the image list." % self.image_2.id)
self.assertTrue(image_3.id not in filtered_images,
msg="The image %s is found in the image list." % image_3.id)
@tags(type='positive', net='no')
def test_list_images_with_detail_limit_results(self):
"""Verify only the expected number of results (with full details) are returned"""
limit = 1
images = self.images_client.list_images_with_detail(limit=1)
self.assertEquals(len(images.entity), limit,
msg="The image list length does not match the expected limit.")
@tags(type='positive', net='no')
def test_list_images_with_detail_filter_by_changes_since(self):
"""Verify an update image is returned"""
#Becoming ACTIVE will modify the updated time
#Filter by the image's created time
changes_since = self.image_1.created
images = self.images_client.list_images_with_detail(changes_since=changes_since)
found = any([i for i in images.entity if i.id == self.image1_id])
self.assertTrue(found, msg="The images are not listed according to the changes since date.")
@tags(type='positive', net='no')
def test_list_images_with_detail_using_marker(self):
"""The detailed list of images should start from the provided marker"""
# Verify that the original image is not in the new list
marker = self.image1_id
images = self.images_client.list_images_with_detail(marker=marker)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image_1.id not in filtered_images,
msg="The marker image %s is found in the image list." % self.image_1.id)
@tags(type='positive', net='no')
def test_list_images(self):
"""The list of all images should contain the expected images"""
images = self.images_client.list_images()
images = [image.id for image in images.entity]
self.assertTrue(self.image_1.id in images,
msg="The image %s is not found in the image list." % self.image_1.id)
self.assertTrue(self.image_2.id in images,
msg="The image %s is not found in the image list." % self.image_2.id)
@tags(type='positive', net='no')
def test_list_images_limit_results(self):
"""Verify only the expected number of results are returned"""
limit = 1
images = self.images_client.list_images(limit=1)
self.assertEquals(len(images.entity), limit,
msg="The image list length does not match the expected limit.")
@tags(type='positive', net='no')
def test_list_images_filter_by_changes_since(self):
"""Verify only updated images are returned in the detailed list"""
#Becoming ACTIVE will modify the updated time
#Filter by the image's created time
changes_since = self.image_2.created
images = self.images_client.list_images(changes_since=changes_since)
filtered_images = [image.id for image in images.entity]
found = any([i for i in filtered_images if i == self.image2_id])
self.assertTrue(found, msg="The images are not listed according to the changes since.")
@tags(type='positive', net='no')
def test_list_images_using_marker(self):
"""The list of images should start from the provided marker"""
marker = self.image2_id
images = self.images_client.list_images(marker=marker)
filtered_images = [image.id for image in images.entity]
#Verify the image does not exist in the filtered list
self.assertTrue(self.image2_id not in filtered_images,
msg="The marker image %s is found in the image list." % self.image1_id)
@tags(type='positive', net='no')
def test_list_images_filter_by_status(self):
"""List of all images should contain the expected images filtered by image status"""
imageStatus = NovaImageStatusTypes.ACTIVE
images = self.images_client.list_images(status=imageStatus)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image1_id in filtered_images,
msg="The image %s is not found in the image list." % self.image1_id)
self.assertTrue(self.image2_id in filtered_images,
msg="The image %s is not found in the image list." % self.image2_id)
@tags(type='positive', net='no')
def test_list_images_filter_by_name(self):
"""List of all images should contain the expected images filtered by name"""
imageName = self.image_1.name
images = self.images_client.list_images(image_name=imageName)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image1_id in filtered_images,
msg="The image %s is not found in the image list." % self.image1_id)
self.assertTrue(self.image2_id not in filtered_images,
msg="The image %s is found in the image list." % self.image2_id)
@tags(type='positive', net='no')
def test_list_images_filter_by_server_ref(self):
"""List of all images should contain the expected images filtered by server_id of the image"""
server_ref = self.image_1.server.links.self
images = self.images_client.list_images(server_ref=server_ref)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image1_id in filtered_images,
msg="The image %s is not found in the image list." % self.image1_id)
self.assertTrue(self.image2_id not in filtered_images,
msg="The image %s is found in the image list." % self.image2_id)
@tags(type='positive', net='no')
def test_list_images_filter_by_server_id(self):
"""List of all images should contain the expected images filtered by server id from which the image was created"""
server_id = self.server1_id
images = self.images_client.list_images(server_ref=server_id)
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image1_id in filtered_images,
msg="The image %s is not found in the image list." % self.image1_id)
self.assertTrue(self.image2_id not in filtered_images,
msg="The image %s is found in the image list." % self.image2_id)
@tags(type='positive', net='no')
def test_list_images_filter_by_type(self):
"""The list of images should be filtered by image type"""
type = 'snapshot'
images = self.images_client.list_images(image_type=type)
image3_id = self.image_ref
filtered_images = [image.id for image in images.entity]
self.assertTrue(self.image1_id in filtered_images,
msg="The image %s is not found in the image list." % self.image1_id)
self.assertTrue(self.image2_id in filtered_images,
msg="The image %s is not found in the image list." % self.image2_id)
self.assertTrue(image3_id not in filtered_images,
msg="The image %s is found in the image list." % image3_id)

View File

@ -0,0 +1,83 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.exceptions import BadRequest
from test_repo.compute.fixtures import ComputeFixture
class ImagesListTestNegative(ComputeFixture):
@tags(type='negative', net='no')
def test_list_images_filter_by_nonexistent_server_id(self):
"""Negative Test: Images should not get listed with invalid server ID"""
server_id = 'sjlfdlkjfldjlkdjfldjf'
images = self.images_client.list_images(server_ref=server_id)
self.assertEqual(200, images.status_code,
"The response code is not 200")
self.assertEqual(0, len(images.entity),
"The list of images is not empty.")
@tags(type='negative', net='no')
def test_list_images_filter_by_nonexistent_image_name(self):
"""Images should not get listed when filtered with invalid image name"""
image_name = 'aljsdjfsjkljlkjdfkjs999'
images = self.images_client.list_images(image_name=image_name)
self.assertEqual(200, images.status_code,
"The response code is not 200.")
self.assertEqual(0, len(images.entity),
"The list of images is not empty.")
@tags(type='negative', net='no')
def test_list_images_filter_by_invalid_image_status(self):
"""Images should not get listed when filtered with invalid status"""
image_status = 'INVALID'
images = self.images_client.list_images(status=image_status)
self.assertEqual(200, images.status_code,
"The response code is not 200.")
self.assertEqual(0, len(images.entity),
"The list of images is not empty.")
@tags(type='negative', net='no')
def test_list_images_filter_by_invalid_marker(self):
"""Images should not get listed when filtered with invalid marker"""
marker = 999
with self.assertRaises(BadRequest):
self.images_client.list_images(marker=marker)
@tags(type='negative', net='no')
def test_list_images_filter_by_invalid_type(self):
"""Images should not get listed when filtered with invalid type"""
type = 'INVALID'
images = self.images_client.list_images(image_type=type)
self.assertEqual(200, images.status_code,
"The response code is not 200.")
self.assertEqual(0, len(images.entity),
"The list of images is not empty")
@tags(type='negative', net='no')
def test_list_images_filter_by_invalid_changes_since(self):
"""Images should not get listed with invalid changes since"""
changes_since = '2012-02-22T'
with self.assertRaises(BadRequest):
self.images_client.list_images(changes_since=changes_since)
@tags(type='negative', net='no')
def test_list_images_filter_by_invalid_limit(self):
"""Images should not get listed with invalid limit"""
limit = -3
with self.assertRaises(BadRequest):
self.images_client.list_images(limit=limit)

View File

@ -0,0 +1,16 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

Binary file not shown.

View File

@ -0,0 +1,16 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,53 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from test_repo.compute.fixtures import CreateServerFixture
class ChangeServerPasswordTests(CreateServerFixture):
@classmethod
def setUpClass(cls):
super(ChangeServerPasswordTests, cls).setUpClass()
cls.server = cls.server_response.entity
cls.new_password = "newslice129690TuG72Bgj2"
# Change password and wait for server to return to active state
cls.compute_provider.change_password_and_await(cls.server.id,
cls.new_password)
@classmethod
def tearDownClass(cls):
super(ChangeServerPasswordTests, cls).tearDownClass()
@tags(type='smoke', net='yes')
def test_can_log_in_with_new_password(self):
'''Verify the admin user can log in with the new password'''
'''Get server details '''
response = self.servers_client.get_server(self.server.id)
self.server = response.entity
'''Set the server's adminPass attribute to the new password,vas this field is not set in getServer'''
self.server.adminPass = self.new_password
public_address = self.compute_provider.get_public_ip_address(self.server)
'''Get an instance of the remote client '''
remote_client = self.compute_provider.get_remote_instance_client(self.server, public_address)
self.assertTrue(remote_client.can_connect_to_public_ip(),
"Could not connect to server (%s) using new admin password %s" %
(public_address, self.new_password))

View File

@ -0,0 +1,50 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.types import NovaServerRebootTypes
from test_repo.compute.fixtures import ComputeFixture
class RebootServerHardTests(ComputeFixture):
@classmethod
def setUpClass(cls):
super(RebootServerHardTests, cls).setUpClass()
response = cls.compute_provider.create_active_server()
cls.server = response.entity
cls.resources.add(cls.server.id, cls.servers_client.delete_server)
@classmethod
def tearDownClass(cls):
super(RebootServerHardTests, cls).tearDownClass()
@tags(type='smoke', net='yes')
def test_reboot_server_hard(self):
""" The server should be power cycled """
public_address = self.compute_provider.get_public_ip_address(self.server)
remote_instance = self.compute_provider.get_remote_instance_client(self.server, public_address)
uptime_start = remote_instance.get_uptime()
start = time.time()
self.compute_provider.reboot_and_await(self.server.id, NovaServerRebootTypes.HARD)
remote_client = self.compute_provider.get_remote_instance_client(self.server, public_address)
finish = time.time()
uptime_post_reboot = remote_client.get_uptime()
self.assertLess(uptime_post_reboot, (uptime_start + (finish - start)))

View File

@ -0,0 +1,49 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.types import NovaServerRebootTypes
from test_repo.compute.fixtures import ComputeFixture
class RebootServerSoftTests(ComputeFixture):
@classmethod
def setUpClass(cls):
super(RebootServerSoftTests, cls).setUpClass()
response = cls.compute_provider.create_active_server()
cls.server = response.entity
cls.resources.add(cls.server.id, cls.servers_client.delete_server)
@classmethod
def tearDownClass(cls):
super(RebootServerSoftTests, cls).tearDownClass()
@tags(type='smoke', net='yes')
def test_reboot_server_soft(self):
""" The server should be signaled to reboot gracefully """
public_address = self.compute_provider.get_public_ip_address(self.server)
remote_instance = self.compute_provider.get_remote_instance_client(self.server, public_address)
uptime_start = remote_instance.get_uptime()
start = time.time()
self.compute_provider.reboot_and_await(self.server.id, NovaServerRebootTypes.SOFT)
remote_client = self.compute_provider.get_remote_instance_client(self.server, public_address)
finish = time.time()
uptime_post_reboot = remote_client.get_uptime()
self.assertLess(uptime_post_reboot, (uptime_start + (finish - start)))

View File

@ -0,0 +1,115 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import base64
import unittest2 as unittest
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.types import NovaServerStatusTypes
from cloudcafe.compute.common.datagen import rand_name
from test_repo.compute.fixtures import ComputeFixture
class RebuildServerTests(ComputeFixture):
@classmethod
def setUpClass(cls):
super(RebuildServerTests, cls).setUpClass()
response = cls.compute_provider.create_active_server()
cls.server = response.entity
response = cls.flavors_client.get_flavor_details(cls.flavor_ref)
cls.flavor = response.entity
cls.resources.add(cls.server.id, cls.servers_client.delete_server)
cls.metadata = {'key': 'value'}
cls.name = rand_name('testserver')
file_contents = 'Test server rebuild.'
personality = [{'path': '/etc/rebuild.txt',
'contents': base64.b64encode(file_contents)}]
cls.password = 'rebuild'
rebuilt_server_response = cls.servers_client.rebuild(cls.server.id,
cls.image_ref_alt,
name=cls.name,
metadata=cls.metadata,
personality=personality,
admin_pass=cls.password)
cls.rebuilt_server_response = cls.compute_provider.wait_for_server_status(cls.server.id,
NovaServerStatusTypes.ACTIVE)
@classmethod
def tearDownClass(cls):
super(RebuildServerTests, cls).tearDownClass()
@tags(type='smoke', net='no')
def test_verify_rebuild_server_response(self):
#Verify the properties in the initial response are correct
rebuilt_server = self.rebuilt_server_response.entity
if rebuilt_server.addresses.public is not None:
v4_address = rebuilt_server.addresses.public.ipv4
v6_address = rebuilt_server.addresses.public.ipv6
self.assertEqual(v4_address, self.server.accessIPv4,
msg="AccessIPv4 did not match")
self.assertEqual(v6_address, self.server.accessIPv6,
msg="AccessIPv6 did not match")
self.assertEquals(rebuilt_server.tenant_id, self.config.compute_api.tenant_id,
msg="Tenant id did not match")
self.assertEqual(rebuilt_server.name, self.name,
msg="Server name did not match")
self.assertTrue(rebuilt_server.hostId is not None,
msg="Host id was not set")
self.assertEqual(rebuilt_server.image.id, self.image_ref_alt,
msg="Image id did not match")
self.assertEqual(rebuilt_server.flavor.id, self.flavor_ref,
msg="Flavor id did not match")
self.assertEqual(rebuilt_server.id, self.server.id, msg="Server id did not match")
self.assertEqual(rebuilt_server.links.bookmark, self.server.links.bookmark, msg="Bookmark links do not match")
self.assertEqual(self.server.links.self, self.rebuilt_server_response['location'],
msg="Location url did not match a valid link for the server")
self.assertEqual(rebuilt_server.metadata.key, 'value')
self.assertEqual(rebuilt_server.created, self.server.created,
msg="Server Created date changed after rebuild")
self.assertTrue(rebuilt_server.updated != self.server.updated,
msg="Server Updated date not changed after rebuild")
self.assertEquals(rebuilt_server.addresses, self.server.addresses,
msg="Server IP addresses changed after rebuild")
@tags(type='positive', net='yes')
@unittest.skip('V1 Bug:I-04125')
def test_server_hostname_after_rebuild(self):
server = self.rebuilt_server_response.entity
rebuilt_server = self.rebuilt_server_response.entity
public_address = self.compute_provider.get_public_ip_address(rebuilt_server)
server.adminPass = self.password
remote_instance = self.compute_provider.get_remote_instance_client(server, public_address)
# Verify that the server hostname is set to the new server name
hostname = remote_instance.get_hostname()
self.assertEqual(hostname, server.name,
msg="The hostname was not same as the server name after rebuild")
@tags(type='smoke', net='yes')
def test_can_log_into_server_after_rebuild(self):
server = self.rebuilt_server_response.entity
rebuilt_server = self.rebuilt_server_response.entity
public_address = self.compute_provider.get_public_ip_address(rebuilt_server)
server.adminPass = self.password
remote_instance = self.compute_provider.get_remote_instance_client(server, public_address)
self.assertTrue(remote_instance.can_connect_to_public_ip(),
msg="Could not connect to server (%s) using new admin password %s" % (public_address, server.adminPass))

View File

@ -0,0 +1,68 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from test_repo.compute.fixtures import ComputeFixture
class ServerRescueTests(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ServerRescueTests, cls).setUpClass()
server_response = cls.compute_provider.create_active_server()
cls.server = server_response.entity
cls.resources.add(cls.server.id, cls.servers_client.delete_server)
flavor_response = cls.flavors_client.get_flavor_details(cls.flavor_ref)
cls.flavor = flavor_response.entity
@classmethod
def tearDownClass(cls):
super(ServerRescueTests, cls).tearDownClass()
@tags(type='smoke', net='yes')
def test_rescue_and_unrescue_server_test(self):
"""Verify that a server can enter and exit rescue mode"""
rescue_response = self.servers_client.rescue(self.server.id)
changed_password = rescue_response.entity.adminPass
self.assertTrue(rescue_response.status_code is 200,
msg="The response code while rescuing a server is %s instead of 200" % rescue_response.status_code)
self.assertTrue(self.server.adminPass is not changed_password,
msg="The password did not change after Rescue.")
#Enter rescue mode
rescue_server_response = self.compute_provider.wait_for_server_status(self.server.id, 'RESCUE')
rescue_server = rescue_server_response.entity
rescue_server.adminPass = changed_password
remote_client = self.compute_provider.get_remote_instance_client(rescue_server)
#Verify if hard drives are attached
remote_client = self.compute_provider.get_remote_instance_client(rescue_server)
partitions = remote_client.get_partition_details()
self.assertEqual(3, len(partitions))
#Exit rescue mode
unrescue_response = self.servers_client.unrescue(self.server.id)
self.assertTrue(unrescue_response.status_code == 202,
msg="The response code while unrescuing a server is %s instead of 202" % rescue_response.status_code)
self.compute_provider.wait_for_server_status(self.server.id, 'ACTIVE')
remote_client = self.compute_provider.get_remote_instance_client(self.server)
partitions = remote_client.get_partition_details()
self.assertEqual(2, len(partitions), msg="The number of partitions after unrescue were not two.")
result, message = remote_client.verify_partitions(self.flavor.disk, self.flavor.swap, 'active', partitions)
self.assertTrue(result, msg=message)

View File

@ -0,0 +1,131 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.types import NovaServerStatusTypes
from cloudcafe.compute.common.equality_tools import EqualityTools
from test_repo.compute.fixtures import ComputeFixture
class ResizeServerUpConfirmTests(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ResizeServerUpConfirmTests, cls).setUpClass()
server_response = cls.compute_provider.create_active_server()
server_to_resize = server_response.entity
cls.resources.add(server_to_resize.id, cls.servers_client.delete_server)
# resize server and confirm
cls.servers_client.resize(server_to_resize.id, cls.flavor_ref_alt)
cls.compute_provider.wait_for_server_status(server_to_resize.id,
NovaServerStatusTypes.VERIFY_RESIZE)
cls.servers_client.confirm_resize(server_to_resize.id)
cls.compute_provider.wait_for_server_status(server_to_resize.id,
NovaServerStatusTypes.ACTIVE)
resized_server_response = cls.servers_client.get_server(server_to_resize.id)
cls.resized_server = resized_server_response.entity
cls.resized_server.adminPass = server_to_resize.adminPass
@classmethod
def tearDownClass(cls):
super(ResizeServerUpConfirmTests, cls).tearDownClass()
@tags(type='smoke', net='no')
def test_verify_confirm_resize_response(self):
pass
@tags(type='smoke', net='no')
def test_server_properties_after_resize(self):
self.assertEqual(self.flavor_ref_alt, self.resized_server.flavor.id)
@tags(type='smoke', net='yes')
def test_ram_and_disk_size_on_resize_up_server_confirm_test(self):
"""
The server's RAM and disk space should be modified to that of
the provided flavor
"""
new_flavor = self.flavors_client.get_flavor_details(self.flavor_ref_alt).entity
public_address = self.compute_provider.get_public_ip_address(self.resized_server)
remote_instance = self.compute_provider.get_remote_instance_client(self.resized_server, public_address)
lower_limit = int(new_flavor.ram) - (int(new_flavor.ram) * .1)
server_ram_size = int(remote_instance.get_ram_size_in_mb())
server_swap_size = int(remote_instance.get_swap_size_in_mb())
self.assertTrue(int(new_flavor.ram) == server_ram_size or lower_limit <= server_ram_size,
msg="Ram size after confirm-resize did not match. Expected ram size : %s, Actual ram size : %s" % (new_flavor.ram, server_ram_size))
self.assertEquals(int(new_flavor.swap), server_swap_size,
msg="Swap size after confirm-resize did not match. Expected swap size : %s, Actual swap size : %s" % (new_flavor.swap, server_swap_size))
self.assertTrue(EqualityTools.are_sizes_equal(new_flavor.disk, remote_instance.get_disk_size_in_gb(), 0.5),
msg="Disk size %s after confirm-resize did not match size %s" % (remote_instance.get_disk_size_in_gb(), new_flavor.disk))
class ResizeServerDownConfirmTests(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ResizeServerDownConfirmTests, cls).setUpClass()
server_response = cls.compute_provider.create_active_server(flavor_ref=cls.flavor_ref_alt)
server_to_resize = server_response.entity
cls.resources.add(server_to_resize.id, cls.servers_client.delete_server)
# resize server and confirm
cls.servers_client.resize(server_to_resize.id, cls.flavor_ref)
cls.compute_provider.wait_for_server_status(server_to_resize.id,
NovaServerStatusTypes.VERIFY_RESIZE)
cls.servers_client.confirm_resize(server_to_resize.id)
cls.compute_provider.wait_for_server_status(server_to_resize.id,
NovaServerStatusTypes.ACTIVE)
resized_server_response = cls.servers_client.get_server(server_to_resize.id)
cls.resized_server = resized_server_response.entity
cls.resized_server.adminPass = server_to_resize.adminPass
@classmethod
def tearDownClass(cls):
super(ResizeServerDownConfirmTests, cls).tearDownClass()
@tags(type='smoke', net='no')
def test_verify_confirm_resize_response(self):
pass
@tags(type='smoke', net='no')
def test_server_properties_after_resize(self):
self.assertEqual(self.flavor_ref, self.resized_server.flavor.id)
@tags(type='smoke', net='yes')
def test_ram_and_disk_size_on_resize_up_server_confirm_test(self):
"""
The server's RAM and disk space should be modified to that of
the provided flavor
"""
new_flavor = self.flavors_client.get_flavor_details(self.flavor_ref).entity
public_address = self.compute_provider.get_public_ip_address(self.resized_server)
remote_instance = self.compute_provider.get_remote_instance_client(self.resized_server, public_address)
lower_limit = int(new_flavor.ram) - (int(new_flavor.ram) * .1)
server_ram_size = int(remote_instance.get_ram_size_in_mb())
server_swap_size = int(remote_instance.get_swap_size_in_mb())
self.assertTrue(int(new_flavor.ram) == server_ram_size or lower_limit <= server_ram_size,
msg="Ram size after confirm-resize did not match. Expected ram size : %s, Actual ram size : %s" % (new_flavor.ram, server_ram_size))
self.assertEquals(int(new_flavor.swap), server_swap_size,
msg="Swap size after confirm-resize did not match. Expected swap size: %s, Actual swap size: %s" % (new_flavor.swap, server_swap_size))
self.assertTrue(EqualityTools.are_sizes_equal(new_flavor.disk, remote_instance.get_disk_size_in_gb(), 0.5),
msg="Disk size %s after confirm-resize did not match size %s" % (remote_instance.get_disk_size_in_gb(), new_flavor.disk))

View File

@ -0,0 +1,101 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.types import NovaServerStatusTypes
from cloudcafe.compute.common.datagen import rand_name
from cloudcafe.compute.common.equality_tools import EqualityTools
from test_repo.compute.fixtures import ComputeFixture
class ResizeServerUpRevertTests(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ResizeServerUpRevertTests, cls).setUpClass()
response = cls.compute_provider.create_active_server()
cls.server = response.entity
cls.remote_instance = cls.compute_provider.get_remote_instance_client(cls.server)
file_name = rand_name('file') + '.txt'
file_content = 'This is a test file'
cls.file_details = cls. remote_instance.create_file(file_name, file_content)
response = cls.flavors_client.get_flavor_details(cls.flavor_ref)
cls.flavor = response.entity
cls.resources.add(cls.server.id, cls.servers_client.delete_server)
@classmethod
def tearDownClass(cls):
super(ResizeServerUpRevertTests, cls).tearDownClass()
@tags(type='smoke', net='yes')
def test_ram_and_disk_size_on_resize_up_server_revert(self):
"""
The server's RAM and disk space should return to its original
values after a resize is reverted
"""
server_response = self.compute_provider.create_active_server()
server_to_resize = server_response.entity
self.resources.add(server_to_resize.id, self.servers_client.delete_server)
remote_instance = self.compute_provider.get_remote_instance_client(server_to_resize)
file_name = rand_name('file') + '.txt'
file_content = 'This is a test file'
file_details = remote_instance.create_file(file_name, file_content)
#resize server and revert
self.servers_client.resize(server_to_resize.id, self.flavor_ref_alt)
self.compute_provider.wait_for_server_status(server_to_resize.id, NovaServerStatusTypes.VERIFY_RESIZE)
self.servers_client.revert_resize(server_to_resize.id)
reverted_server_response = self.compute_provider.wait_for_server_status(server_to_resize.id, NovaServerStatusTypes.ACTIVE)
reverted_server = reverted_server_response.entity
flavor_response = self.flavors_client.get_flavor_details(self.flavor_ref)
flavor = flavor_response.entity
'''Verify that the server resize was reverted '''
public_address = self.compute_provider.get_public_ip_address(reverted_server)
reverted_server.adminPass = server_to_resize.adminPass
remote_instance = self.compute_provider.get_remote_instance_client(reverted_server, public_address)
actual_file_content = remote_instance.get_file_details(file_details.name)
'''Verify that the file content does not change after resize revert'''
self.assertEqual(actual_file_content, file_details, msg="file changed after resize revert")
self.assertEqual(self.flavor_ref, reverted_server.flavor.id,
msg="Flavor id not reverted")
lower_limit = int(flavor.ram) - (int(flavor.ram) * .1)
server_ram_size = int(remote_instance.get_ram_size_in_mb())
self.assertTrue(int(flavor.ram) == server_ram_size or lower_limit <= server_ram_size,
msg="Ram size after revert did not match.Expected ram size : %s, Actual ram size : %s" % (flavor.ram, server_ram_size))
self.assertTrue(EqualityTools.are_sizes_equal(flavor.disk, remote_instance.get_disk_size_in_gb(), 0.5),
msg="Disk size %s after revert did not match %s" % (remote_instance.get_disk_size_in_gb(), flavor.disk))
def _assert_server_details(self, server, expected_name, expected_accessIPv4, expected_accessIPv6, expected_id, expected_image_ref):
self.assertEqual(expected_accessIPv4, server.accessIPv4,
msg="AccessIPv4 did not match")
self.assertEqual(expected_accessIPv6, server.accessIPv6,
msg="AccessIPv6 did not match")
self.assertEquals(self.config.nova.tenant_id, server.tenant_id,
msg="Tenant id did not match")
self.assertEqual(expected_name, server.name,
msg="Server name did not match")
self.assertTrue(server.host_id is not None,
msg="Host id was not set")
self.assertEqual(expected_image_ref, server.image.id,
msg="Image id did not match")
self.assertEqual(self.flavor_ref, server.flavor.id,
msg="Flavor id did not match")
self.assertEqual(expected_id, server.id, msg="Server id did not match")

View File

@ -0,0 +1,96 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import base64
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.types import NovaServerStatusTypes
from cloudcafe.compute.common.datagen import rand_name
from test_repo.compute.fixtures import ComputeFixture
from cafe.engine.clients.remote_instance.instance_client import InstanceClientFactory
class CreateServerTest(ComputeFixture):
@classmethod
def setUpClass(cls):
super(CreateServerTest, cls).setUpClass()
cls.name = rand_name("cctestserver")
cls.metadata = {'meta_key_1': 'meta_value_1',
'meta_key_2': 'meta_value_2'}
cls.create_resp = cls.servers_client.create_server(cls.name, cls.image_ref, cls.flavor_ref,
metadata=cls.metadata)
created_server = cls.create_resp.entity
wait_response = cls.server_behaviors.wait_for_server_status(created_server.id,
NovaServerStatusTypes.ACTIVE)
wait_response.entity.admin_pass = created_server.admin_pass
cls.image = cls.images_client.get_image(cls.image_ref).entity
cls.flavor = cls.flavors_client.get_flavor_details(cls.flavor_ref).entity
cls.server = wait_response.entity
@classmethod
def tearDownClass(cls):
super(CreateServerTest, cls).tearDownClass()
@tags(type='smoke', net='no')
def test_create_server_response(self):
"""Verify the parameters are correct in the initial response"""
self.assertTrue(self.server.id is not None,
msg="Server id was not set in the response")
self.assertTrue(self.server.admin_pass is not None,
msg="Admin password was not set in the response")
self.assertTrue(self.server.links is not None,
msg="Server links were not set in the response")
@tags(type='smoke', net='no')
def test_created_server_fields(self):
"""Verify that a created server has all expected fields"""
message = "Expected {0} to be {1}, was {2}."
self.assertEqual(self.server.name, self.name,
msg=message.format('server name', self.server.name,
self.name))
self.assertEqual(self.image_ref, self.server.image.id,
msg=message.format('image id', self.image_ref,
self.server.image.id))
self.assertEqual(self.server.flavor.id, self.flavor_ref,
msg=message.format('flavor id', self.flavor_ref,
self.server.flavor.id))
self.assertTrue(self.server.created is not None,
msg="Expected server created date to be set, was null.")
self.assertTrue(self.server.updated is not None,
msg="Expected server updated date to be set, was null.")
self.assertGreaterEqual(self.server.updated, self.server.created,
msg="Expected server updated date to be before the created date.")
@tags(type='smoke', net='no')
def test_server_access_addresses(self):
"""If the server has public addresses, the access IP addresses should be same as the public addresses"""
addresses = self.server.addresses
if addresses.public is not None:
self.assertTrue(addresses.public.ipv4 is not None,
msg="Expected server to have a public IPv4 address set.")
self.assertTrue(addresses.public.ipv6 is not None,
msg="Expected server to have a public IPv6 address set.")
self.assertTrue(addresses.private.ipv4 is not None,
msg="Expected server to have a private IPv4 address set.")
self.assertEqual(addresses.public.ipv4, self.server.accessIPv4,
msg="Expected access IPv4 address to be {0}, was {1}.".format(
addresses.public.ipv4, self.server.accessIPv4))
self.assertEqual(addresses.public.ipv6, self.server.accessIPv6,
msg="Expected access IPv6 address to be {0}, was {1}.".format(
addresses.public.ipv6, self.server.accessIPv6))

View File

@ -0,0 +1,282 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from test_repo.compute.fixtures import ComputeFixture
class ServerListTest(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ServerListTest, cls).setUpClass()
# Creation of 3 servers needed for the tests
active_server_response = cls.server_behaviors.create_active_server()
cls.server = active_server_response.entity
active_server_response = cls.server_behaviors.create_active_server()
cls.server_second = active_server_response.entity
active_server_response = cls.server_behaviors.create_active_server(
image_ref=cls.image_ref_alt,
flavor_ref=cls.flavor_ref_alt)
cls.server_third = active_server_response.entity
cls.resources.add(cls.server.id,
cls.servers_client.delete_server)
cls.resources.add(cls.server_second.id,
cls.servers_client.delete_server)
cls.resources.add(cls.server_third.id,
cls.servers_client.delete_server)
@classmethod
def tearDownClass(cls):
super(ServerListTest, cls).tearDownClass()
@tags(type='smoke', net='no')
def test_get_server(self):
"""Return the full details of a single server"""
server_info_response = self.servers_client.get_server(self.server.id)
server_info = server_info_response.entity
self.assertEqual(200, server_info_response.status_code)
self.assertEqual(self.server.name, server_info.name,
msg="Server name did not match")
self.assertEqual(self.image_ref, server_info.image.id,
msg="Image id did not match")
self.assertEqual(self.flavor_ref, server_info.flavor.id,
msg="Flavor id did not match")
@tags(type='smoke', net='no')
def test_list_servers(self):
"""All servers should be returned"""
list_servers_response = self.servers_client.list_servers()
list_servers = list_servers_response.entity
self.assertEqual(200, list_servers_response.status_code)
self.assertTrue(self.server.min_details() in
list_servers, msg="Server with id %s was not found in the list" % self.server.id)
self.assertTrue(self.server_second.min_details() in
list_servers, msg="Server with id %s was not found in the list" % self.server_second.id)
self.assertTrue(self.server_third.min_details() in
list_servers, msg="Server with id %s was not found in the list" % self.server_third.id)
@tags(type='smoke', net='no')
def test_list_servers_with_detail(self):
"""Return a detailed list of all servers"""
list_servers_detail_response = self.servers_client.list_servers_with_detail()
list_servers_detail = list_servers_detail_response.entity
self.assertEqual(200, list_servers_detail_response.status_code)
servers_lists = []
for i in list_servers_detail:
servers_lists.append(i.id)
self.assertTrue(self.server.id in servers_lists,
msg="Server with id %s was not found in the list" % self.server.id)
self.assertTrue(self.server_second.id in servers_lists,
msg="Server with id %s was not found in the list" % self.server_second.id)
self.assertTrue(self.server_third.id in servers_lists,
msg="Server with id %s was not found in the list" % self.server_third.id)
@tags(type='positive', net='no')
def test_list_server_details_using_marker(self):
"""The list of servers should start from the provided marker"""
list_server_detail_response = self.servers_client.list_servers_with_detail()
list_server_detail = list_server_detail_response.entity
first_server = list_server_detail[0]
# Verify the list of servers doesn't contain the server used as a marker
params = first_server.id
filtered_servers = self.servers_client.list_servers_with_detail(marker=params)
self.assertEqual(200, filtered_servers.status_code)
self.assertTrue(first_server not in filtered_servers.entity,
msg="The server id used as marker found in the server list")
@tags(type='positive', net='no')
def test_list_servers_using_marker(self):
"""The list of servers should start from the provided marker"""
list_server_info_response = self.servers_client.list_servers()
list_server_info = list_server_info_response.entity
first_server = list_server_info[0]
# Verify the list of servers doesn't contain the server used as a marker
params = first_server.id
filtered_servers = self.servers_client.list_servers(marker=params)
self.assertEqual(200, filtered_servers.status_code)
self.assertTrue(first_server not in filtered_servers.entity,
msg="The server id used as marker found in the server list")
@tags(type='positive', net='no')
def test_list_server_with_detail_limit_results(self):
"""Verify only the expected number of results (with full details) are returned"""
limit = 1
params = limit
server_with_limit = self.servers_client.list_servers_with_detail(limit=params)
self.assertEqual(limit, len(server_with_limit.entity),
msg="The number of servers returned (%s) was more than the limit (%s)" % (len(server_with_limit.entity), limit))
@tags(type='positive', net='no')
def test_list_servers_filter_by_image(self):
"""Filter the list of servers by image"""
params = self.image_ref
list_servers_response = self.servers_client.list_servers(image=params)
list_servers = list_servers_response.entity
self.assertEqual(200, list_servers_response.status_code)
self.assertTrue(self.server.min_details() in
list_servers, msg="Server with id %s was not found in the list" % self.server.image.id)
self.assertTrue(self.server_second.min_details() in
list_servers, msg="Server with id %s was not found in the list" % self.server_second.image.id)
self.assertTrue(self.server_third.min_details() not in
list_servers, msg="Server with id %s and image id %s was found in the list filtered by image id %s" % (self.server_third.id, self.server_third.image.id, self.image_ref))
@tags(type='positive', net='no')
def test_list_servers_filter_by_flavor(self):
"""Filter the list of servers by flavor"""
params = self.flavor_ref_alt
list_servers_response = self.servers_client.list_servers(flavor=params)
list_servers = list_servers_response.entity
self.assertEqual(200, list_servers_response.status_code)
self.assertTrue(self.server.min_details() not in list_servers,
msg="Server with id %s and flavor id %s was found in the list filtered by flavor id %s" % (self.server.id, self.server.flavor.id, self.flavor_ref_alt))
self.assertTrue(self.server_second.min_details() not in list_servers,
msg="Server with id %s and flavor id %s was found in the list filtered by flavor id %s" % (self.server_second.id, self.server_second.flavor.id, self.flavor_ref_alt))
self.assertTrue(self.server_third.min_details() in list_servers,
msg="Server with id %s was not found in the list" % self.server_third.id)
@tags(type='positive', net='no')
def test_list_servers_filter_by_server_name(self):
""" Filter the list of servers by name """
params = self.server.name
list_servers_response = self.servers_client.list_servers(name=params)
list_servers = list_servers_response.entity
self.assertEqual(200, list_servers_response.status_code)
self.assertTrue(self.server.min_details() in list_servers,
msg="Server with id %s was not found in the list" % self.server.id)
self.assertTrue(self.server_second.min_details() not in list_servers,
msg="Server with id %s and name %s was found in the list filtered by name %s" % (self.server.id, self.server_second.name, self.server.name))
self.assertTrue(self.server_third.min_details() not in list_servers,
msg="Server with id %s and name %s was found in the list filtered by name %s" % (self.server_third.id, self.server_third.name, self.server.name))
@tags(type='positive', net='no')
def test_list_servers_filter_by_server_status(self):
""" Filter the list of servers by server status """
params = 'active'
list_servers_response = self.servers_client.list_servers(status=params)
list_servers = list_servers_response.entity
self.assertEqual(200, list_servers_response.status_code)
self.assertTrue(self.server.min_details() in
list_servers, msg="Server with id %s was not found in the list" % self.server.id)
self.assertTrue(self.server_second.min_details() in
list_servers, msg="Server with id %s was not found in the list" % self.server_second.id)
self.assertTrue(self.server_third.min_details() in
list_servers, msg="Server with id %s was not found in the list" % self.server_third.id)
@tags(type='positive', net='no')
def test_list_servers_filter_by_changes_since(self):
"""Filter the list of servers by changes-since"""
change_time = self.server_second.created
params = change_time
servers = self.servers_client.list_servers(changes_since=params)
self.assertEqual(200, servers.status_code)
servers_ids_list = []
for i in servers.entity:
servers_ids_list.append(i.id)
self.assertTrue(self.server.id not in servers_ids_list,
msg="Server with id %s was found in the list" % self.server.id)
self.assertTrue(self.server_second.id in servers_ids_list,
msg="Server with id %s was not found in the list" % self.server_second.id)
self.assertTrue(self.server_third.id in servers_ids_list,
msg="Server with id %s was not found in the list" % self.server_third.id)
@tags(type='positive', net='no')
def test_list_servers_detailed_filter_by_image(self):
"""Filter the detailed list of servers by image"""
params = self.image_ref
servers_response = self.servers_client.list_servers_with_detail(image=params)
self.assertEqual(200, servers_response.status_code)
servers_list = []
for i in servers_response.entity:
servers_list.append(i.id)
self.assertTrue(self.server.id in servers_list,
msg="Server with id %s was not found in the list" % self.server.id)
self.assertTrue(self.server_second.id in servers_list,
msg="Server with id %s was not found in the list" % self.server_second.id)
self.assertTrue(self.server_third.id not in servers_list,
msg="Server with id %s and image id %s was found in the list filtered by image id %s" % (self.server_third.id, self.server_third.image.id, self.image_ref))
@tags(type='positive', net='no')
def test_list_servers_detailed_filter_by_flavor(self):
"""Filter the detailed list of servers by flavor"""
params = self.flavor_ref_alt
filtered_servers_response = self.servers_client.list_servers_with_detail(flavor=params)
filtered_servers = filtered_servers_response.entity
self.assertEqual(200, filtered_servers_response.status_code)
self.assertTrue(self.server not in filtered_servers,
msg="Server with id %s and flavor id %s was found in the list filtered by flavor id %s" % (self.server.id, self.server.flavor.id, self.flavor_ref_alt))
self.assertTrue(self.server_second not in filtered_servers,
msg="Server with id %s and flavor id %s was found in the list filtered by flavor id %s" % (self.server_second.id, self.server.flavor.id, self.flavor_ref_alt))
self.assertTrue(self.server_third in filtered_servers,
msg="Server with id %s was not found in the list" % self.server_third.id,)
@tags(type='positive', net='no')
def test_list_servers_detailed_filter_by_server_name(self):
"""Filter the detailed list of servers by server name"""
params = self.server.name
filtered_servers_response = self.servers_client.list_servers_with_detail(name=params)
filtered_servers = filtered_servers_response.entity
self.assertEqual(200, filtered_servers_response.status_code)
self.assertTrue(self.server in filtered_servers,
msg="Server with id %s was not found in the list" % self.server.id)
self.assertTrue(self.server_second not in filtered_servers,
msg="Server with id %s and name %s was found in the list filtered by name %s" % (self.server_second.id, self.server_second.name, self.server.name))
self.assertTrue(self.server_third not in filtered_servers,
msg="Server with id %s and name %s was found in the list filtered by name %s" % (self.server_third.id, self.server_third.name, self.server.name))
@tags(type='positive', net='no')
def test_list_servers_detailed_filter_by_server_status(self):
"""Filter the detailed list of servers by server status"""
params = 'active'
filtered_servers_response = self.servers_client.list_servers_with_detail(status=params)
filtered_servers = filtered_servers_response.entity
self.assertEqual(200, filtered_servers_response.status_code)
servers_list = []
for i in filtered_servers:
servers_list.append(i.id)
self.assertTrue(self.server.id in servers_list,
msg="Server with id %s was not found in the list" % self.server.id)
self.assertTrue(self.server_second.id in servers_list,
msg="Server with id %s was not found in the list" % self.server_second.id)
self.assertTrue(self.server_third.id in servers_list,
msg="Server with id %s was not found in the list" % self.server_third.id)
@tags(type='positive', net='no')
def test_list_servers_detailed_filter_by_changes_since(self):
"""Create a filter for the server with the second server created date"""
change_time = self.server_second.created
params = change_time
# Filter the detailed list of servers by changes-since
filtered_servers_response = self.servers_client.list_servers_with_detail(changes_since=params)
filtered_servers = filtered_servers_response.entity
self.assertEqual(200, filtered_servers_response.status_code)
servers_list = []
for i in filtered_servers:
servers_list.append(i.id)
self.assertTrue(self.server.id not in servers_list,
msg="Server with id %s was found in the list" % self.server.id)
self.assertTrue(self.server_second.id in servers_list,
msg="Server with id %s was not found in the list" % self.server_second.id)
self.assertTrue(self.server_third.id in servers_list,
msg="Server with id %s was not found in the list" % self.server_third.id)

View File

@ -0,0 +1,194 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest2 as unittest
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.datagen import rand_name
from cloudcafe.compute.common.exceptions import ItemNotFound
from test_repo.compute.fixtures import ComputeFixture
class ServerMetadataTest(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ServerMetadataTest, cls).setUpClass()
server_response = cls.server_behaviors.create_active_server()
cls.server = server_response.entity
cls.resources.add(cls.server.id, cls.servers_client.delete_server)
@classmethod
def tearDownClass(cls):
super(ServerMetadataTest, cls).tearDownClass()
def setUp(self):
super(ServerMetadataTest, self).setUp()
self.meta = {'meta_key_1': 'meta_value_1', 'meta_key_2': 'meta_value_2'}
self.servers_client.set_server_metadata(self.server.id, self.meta)
@tags(type='positive', net='no')
def test_list_server_metadata(self):
"""All metadata key/value pairs for a server should be returned"""
metadata_response = self.servers_client.list_server_metadata(self.server.id)
metadata = metadata_response.entity
self.assertEqual(200, metadata_response.status_code,
"List server metadata call response was: %s" % (metadata_response.status_code))
self.assertEqual('meta_value_1', metadata.meta_key_1,
"Metadata Item not found on server %s. Expected Item Key : meta_key_1, Value : meta_value_1" % self.server.id)
self.assertEqual('meta_value_2', metadata.meta_key_2,
"Metadata Item not found on server %s. Expected Item Key : meta_key_2, Value : meta_value_2" % self.server.id)
@tags(type='positive', net='no')
def test_set_server_metadata(self):
"""The server's metadata should be replaced with the provided values"""
meta = {'meta1': 'data1'}
server_response = self.server_behaviors.create_active_server(metadata=meta)
server = server_response.entity
self.resources.add(server.id, self.servers_client.delete_server)
new_meta = {'meta2': 'data2', 'meta3': 'data3'}
metadata_response = self.servers_client.set_server_metadata(server.id,
new_meta)
metadata = metadata_response.entity
self.assertEqual(200, metadata_response.status_code,
"Set server metadata call response was: %s" % (metadata_response.status_code))
self.assertEqual('data2', metadata.meta2,
"Metadata Item not found on server %s. Expected Item Key : meta2, Value : data2" % server.id)
self.assertEqual('data3', metadata.meta3,
"Metadata Item not found on server %s. Expected Item Key : meta3, Value : data3" % server.id)
self.assertFalse(hasattr(metadata, 'meta1'),
"The already existing metadata(Key : meta1) is not removed during Set metadata")
actual_metadata_response = self.servers_client.list_server_metadata(server.id)
actual_metadata = actual_metadata_response.entity
self.assertEqual('data2', actual_metadata.meta2,
"Metadata Item not found on server %s. Expected Item Key : meta2, Value : data2" % server.id)
self.assertEqual('data3', actual_metadata.meta3,
"Metadata Item not found on server %s. Expected Item Key : meta3, Value : data3" % server.id)
self.assertFalse(hasattr(actual_metadata, 'meta1'),
"The already existing metadata(Key : meta1) is not removed during Set metadata")
@tags(type='positive', net='no')
def test_update_server_metadata(self):
"""The server's metadata values should be updated to the provided values"""
meta = {'key1': 'alt1', 'key2': 'alt2', 'meta_key_1': 'alt3'}
metadata_response = self.servers_client.update_server_metadata(self.server.id, meta)
metadata = metadata_response.entity
self.assertEqual(200, metadata_response.status_code,
"Update server metadata call response was: %s" % (metadata_response.status_code))
self.assertEqual('alt1', metadata.key1,
"Metadata Item not found on server %s. Expected Item Key : key1, Value : alt1" % self.server.id)
self.assertEqual('alt2', metadata.key2,
"Metadata Item not found on server %s. Expected Item Key : key2, Value : alt2" % self.server.id)
self.assertEqual('alt3', metadata.meta_key_1,
"Metadata Item not found on server %s. Expected Item Key : meta_key_1, Value : alt3" % self.server.id)
self.assertEqual('meta_value_2', metadata.meta_key_2,
"Metadata Item not found on server %s. Expected Item Key : meta_key_2, Value : meta_value_2" % self.server.id)
#Verify the values have been updated to the proper values
actual_metadata_response = self.servers_client.list_server_metadata(self.server.id)
actual_metadata = actual_metadata_response.entity
self.assertEqual('alt1', actual_metadata.key1,
"Metadata Item not found on server %s. Expected Item Key : key1, Value : alt1" % self.server.id)
self.assertEqual('alt2', actual_metadata.key2,
"Metadata Item not found on server %s. Expected Item Key : key2, Value : alt2" % self.server.id)
self.assertEqual('alt3', actual_metadata.meta_key_1,
"Metadata Item not found on server %s. Expected Item Key : meta_key_1, Value : alt3" % self.server.id)
self.assertEqual('meta_value_2', actual_metadata.meta_key_2,
"Metadata Item not found on server %s. Expected Item Key : meta_key_2, Value : meta_value_2" % self.server.id)
@tags(type='positive', net='no')
def test_get_server_metadata_item(self):
"""The value for a specific metadata key should be returned"""
metadata_response = self.servers_client.get_server_metadata_item(self.server.id,
'meta_key_1')
metadata = metadata_response.entity
self.assertEqual('meta_value_1', metadata.meta_key_1,
msg="Metadata Item not found on server %s. Expected Item Key : meta_key_1, Value : meta_value_1" % self.server.id)
@tags(type='positive', net='no')
def test_set_server_metadata_item(self):
"""The value provided for the given meta item should be set for the server"""
meta = {'meta_key_2': 'nova'}
metadata_response = self.servers_client.set_server_metadata_item(self.server.id,
'meta_key_2', 'nova')
metadata = metadata_response.entity
self.assertEqual(200, metadata_response.status_code,
"Set server metadata item call response was: %s" % (metadata_response.status_code))
self.assertEqual('nova', metadata.meta_key_2,
"Metadata Item not found on server %s. Expected Item Key : meta_key_2, Value : nova" % self.server.id)
actual_metadata_response = self.servers_client.list_server_metadata(self.server.id)
actual_metadata = actual_metadata_response.entity
self.assertEqual('nova', actual_metadata.meta_key_2,
"Metadata Item not found on server %s. Expected Item Key : meta_key_2, Value : nova" % self.server.id)
self.assertEqual('meta_value_1', actual_metadata.meta_key_1,
"Metadata Item not found on server %s. Expected Item Key : meta_key_1, Value : meta_value_1" % self.server.id)
@tags(type='positive', net='no')
def test_add_new_server_metadata_item(self):
""" The metadata item should be added to the server"""
meta = {'meta_key_3': 'meta_value_3'}
metadata_response = self.servers_client.set_server_metadata_item(self.server.id,
'meta_key_3', 'meta_value_3')
metadata = metadata_response.entity
self.assertEqual(200, metadata_response.status_code, "Add server metadata item call response was: %s" % (metadata_response.status_code))
self.assertEqual('meta_value_3', metadata.meta_key_3,
"Metadata Item not found on server %s. Expected Item Key : meta_key_3, Value : meta_value_3" % self.server.id)
actual_metadata_response = self.servers_client.list_server_metadata(self.server.id)
actual_metadata = actual_metadata_response.entity
self.assertEqual('meta_value_3', actual_metadata.meta_key_3,
"Metadata Item not found on server %s. Expected Item Key : meta_key_3, Value : meta_value_3" % self.server.id)
self.assertEqual('meta_value_2', actual_metadata.meta_key_2,
"Metadata Item not found on server %s. Expected Item Key : meta_key_2, Value : meta_value_2" % self.server.id)
self.assertEqual('meta_value_1', actual_metadata.meta_key_1,
"Metadata Item not found on server %s. Expected Item Key : meta_key_1, Value : meta_value_1" % self.server.id)
@tags(type='positive', net='no')
def test_delete_server_metadata_item(self):
"""The metadata value/key pair should be deleted from the server"""
response = self.servers_client.delete_server_metadata_item(self.server.id,
'meta_key_1')
self.assertEqual(204, response.status_code, "Delete server metadata item call response was: %s" % (response.status_code))
metadata_response = self.servers_client.list_server_metadata(self.server.id)
metadata = metadata_response.entity
self.assertFalse(hasattr(metadata, 'meta_key_1'), "The server\
metadata item (Key: meta_key_1) is not deleted")
@tags(type='negative', net='no')
def test_delete_nonexistent_server_metadata_item(self):
with self.assertRaises(ItemNotFound):
self.servers_client.delete_server_metadata_item(self.server.id,
'meta_key_5')
@tags(type='negative', net='no')
def test_get_nonexistent_server_metadata_item(self):
with self.assertRaises(ItemNotFound):
self.servers_client.get_server_metadata_item(self.server.id,
'meta_key_5')
@tags(type='negative', net='no')
@unittest.skip('Failing, under review')
def test_set_blank_metadata_dict(self):
blank_meta = {'': ''}
create_server_response = self.servers_client.create_server(rand_name('testserver'), self.image_ref, self.flavor_ref, metadata=blank_meta)
server_response = self.servers_client.get_server(create_server_response.entity.id)
server = server_response.entity
self.assertEqual("", server.metadata[''])

View File

@ -0,0 +1,330 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import base64
import time
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.models.metadata import Metadata
from cloudcafe.compute.common.datagen import rand_name
from cloudcafe.compute.common.types import NovaServerStatusTypes
from test_repo.compute.fixtures import ComputeFixture
class ServersTest(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ServersTest, cls).setUpClass()
cls.file_contents = 'This is a test file.'
cls.personality = [{'path': '/root/.csivh',
'contents': base64.b64encode(cls.file_contents)}]
cls.meta = {'meta_key_1': 'meta_value_1', 'meta_key_2': 'meta_value_2'}
cls.accessIPv4 = '192.168.32.16'
cls.accessIPv6 = '3ffe:1900:4545:3:200:f8ff:fe21:67cf'
@classmethod
def tearDownClass(cls):
super(ServersTest, cls).tearDownClass()
@tags(type='smoke', net='no')
def test_update_server_details(self):
"""The server name and access ip addresses should be changed to the provided values"""
active_server_response = self.server_behaviors.create_active_server()
active_server = active_server_response.entity
updated_server_response = self.servers_client.update_server(active_server.id,
name='newname',
accessIPv4=self.accessIPv4,
accessIPv6=self.accessIPv6)
updated_server = updated_server_response.entity
updated_server_details_response = self.servers_client.get_server(active_server.id)
updated_server_details = updated_server_details_response.entity
self.server_behaviors.wait_for_server_status(active_server.id,
NovaServerStatusTypes.ACTIVE)
# Verify the name and access IPs of the server have changed
self.assertEqual('newname', updated_server.name,
msg="The name was not updated")
self.assertEqual(self.accessIPv4, updated_server.accessIPv4,
msg="AccessIPv4 was not updated")
self.assertEqual(self.accessIPv6, updated_server.accessIPv6,
msg="AccessIPv6 was not updated")
self.assertEqual(active_server.created, updated_server.created,
msg="The creation date was updated")
# Verify details changed on get updated server call
self.assertEqual('newname', updated_server_details.name,
msg="The name was not updated")
self.assertEqual(self.accessIPv4, updated_server_details.accessIPv4,
msg="AccessIPv4 was not updated")
self.assertEqual(self.accessIPv6, updated_server_details.accessIPv6,
msg="AccessIPv6 was not updated")
self.assertEqual(active_server.created, updated_server_details.created,
msg="The creation date was updated")
self.assertNotEqual(active_server.updated, updated_server_details.updated,
msg="Server %s updated time did not change after a modification to the server." % updated_server_details.id)
#Teardown
self.servers_client.delete_server(updated_server_details.id)
def _assert_server_details(self, server, expected_name,
expected_accessIPv4, expected_accessIPv6,
expected_flavor):
self.assertEqual(expected_accessIPv4, server.accessIPv4,
msg="AccessIPv4 did not match")
self.assertEqual(expected_accessIPv6, server.accessIPv6,
msg="AccessIPv6 did not match")
self.assertEqual(expected_name, server.name,
msg="Server name did not match")
self.assertEqual(self.image_ref, server.image.id,
msg="Image id did not match")
self.assertEqual(expected_flavor, server.flavor.id,
msg="Flavor id did not match")
self.assertTrue(server.created is not None,
msg="Server created date was not set")
self.assertTrue(server.updated is not None,
msg="Server updated date was not set")
self.assertGreaterEqual(server.updated, server.created,
msg="Server updated date was before the created date")
self.assertEqual(self.meta, Metadata._obj_to_dict(server.metadata),
msg="Metadata did not match")
def _assert_public_address_is_valid(self, addresses):
self.assertTrue(len(addresses.public.addresses) == 2,
msg="Server does not have a Public IPs set")
ipv4_public_address = None
ipv6_public_address = None
if self.config.misc.serializer == "xml":
for address in addresses.public.addresses:
if address.version == '4':
ipv4_public_address = address
else:
ipv6_public_address = address
self.assertTrue(ipv4_public_address.version == '4' and
ipv4_public_address.addr is not None,
msg="Server does not have a Public IPv4 set")
self.assertTrue(ipv6_public_address.version == '6' and
ipv6_public_address.addr is not None,
msg="Server does not have a Public IPv6 set")
else:
for address in addresses.public.addresses:
if address.version == 4:
ipv4_public_address = address
else:
ipv6_public_address = address
self.assertTrue(ipv4_public_address.version == 4 and
ipv4_public_address.addr is not None,
msg="Server does not have a Public IPv4 set")
self.assertTrue(ipv6_public_address.version == 6 and
ipv6_public_address.addr is not None,
msg="Server does not have a Public IPv6 set")
@tags(type='positive', net='no')
def test_delete_server(self):
"""A server should be built using the specified image and flavor"""
active_server_response = self.server_behaviors.create_active_server()
active_server = active_server_response.entity
deleted_server_response = self.servers_client.delete_server(active_server.id)
self.assertEqual(204, deleted_server_response.status_code,
msg='The delete call response was: %s'
% (deleted_server_response.status_code))
self.server_behaviors.wait_for_server_status(active_server.id,
NovaServerStatusTypes.DELETED)
# Verify the server is now in deleted status
parameter = str(active_server.created)
list_servers = self.servers_client.list_servers_with_detail(changes_since=parameter)
found = False
for server in list_servers.entity:
if server.id == active_server.id:
deleted_server = server
found = True
self.assertTrue(found,
msg="The server which was deleted was not found in the server list")
self.assertEqual('DELETED', deleted_server.status,
msg="The server which was deleted was not in DELETED status")
@tags(type='positive', net='yes')
def test_create_server_with_admin_password(self):
"""
If an admin password is provided on server creation, the server's root
password should be set to that password.
"""
name = rand_name("testserver")
admin_password = 'oldslice129690TuG72Bgj2'
create_server_response = self.servers_client.create_server(name, self.image_ref, self.flavor_ref, admin_pass=admin_password)
created_server = create_server_response.entity
self.assertEqual(admin_password, created_server.admin_pass,
msg='Verify that given adminPass equals with actual one')
active_server = self.server_behaviors.wait_for_server_status(created_server.id, NovaServerStatusTypes.ACTIVE)
@tags(type='positive', net='no')
def test_create_server_with_image_and_flavor_self_link(self):
"""Create a server using image and flavor self links"""
name = rand_name("testserver")
image = self.images_client.get_image(self.image_ref)
image_self_link = image.entity.id
flavor = self.flavors_client.get_flavor_details(self.flavor_ref)
flavor_self_link = flavor.entity.id
create_server_response = self.servers_client.create_server(name,
image_self_link,
flavor_self_link)
created_server = create_server_response.entity
#Verify the parameters are correct in the initial response
self.assertTrue(created_server.id is not None,
msg="The server id was not set in response")
self.assertTrue(created_server.admin_pass is not None,
msg="Admin password was not set in response")
self.assertTrue(created_server.links is not None,
msg="Server links were not set in response")
'''Wait for the server to become active'''
active_server_response = self.server_behaviors.wait_for_server_status(created_server.id,
NovaServerStatusTypes.ACTIVE)
active_server = active_server_response.entity
get_server_info_response = self.servers_client.get_server(created_server.id)
get_server_info = get_server_info_response.entity
'''Verify that the image Id of the image ref link which is used to create server
is same as the image id of the created server'''
self.assertEqual(get_server_info.image.id, self._parse_link_to_retrieve_id(image_self_link),
msg="The image does not match to the image mentioned during create")
'''Verify that the flavor Id of the flavor ref link which is used to create server
is same as the flavor id of the created server'''
self.assertEqual(get_server_info.flavor.id, self._parse_link_to_retrieve_id(flavor_self_link),
msg="The flavor does not match to the flavor mentioned during create")
self.servers_client.delete_server(active_server.id)
def _parse_link_to_retrieve_id(self, ref):
temp = ref.rsplit('/')
#Return the last item, which is the image id
return temp[len(temp) - 1]
@tags(type='positive', net='no')
def test_create_server_with_image_and_flavor_bookmark_link(self):
"""Create a server using image and flavor bookmark links"""
name = rand_name("testserver")
image = self.images_client.get_image(self.image_ref)
image_bookmark_link = image.entity.links.links.get('bookmark')
flavor = self.flavors_client.get_flavor_details(self.flavor_ref)
flavor_bookmark_link = flavor.entity.links.links.get('bookmark')
create_server_response = self.servers_client.create_server(name,
image_bookmark_link,
flavor_bookmark_link)
created_server = create_server_response.entity
'''Verify the parameters are correct in the initial response'''
self.assertTrue(created_server.id is not None,
msg="The server id was not set in the response")
self.assertTrue(created_server.admin_pass is not None,
msg="Admin password was not set in the response")
self.assertTrue(created_server.links is not None,
msg="Server links were not set in the response")
'''Wait for the server to become active'''
active_server_response = self.server_behaviors.wait_for_server_status(created_server.id,
NovaServerStatusTypes.ACTIVE)
active_server = active_server_response.entity
get_server_info_response = self.servers_client.get_server(created_server.id)
get_server_info = get_server_info_response.entity
'''Verify that the correct image and flavor refs were used'''
self.assertEqual(get_server_info.image.id, self._parse_link_to_retrieve_id(image_bookmark_link),
msg="The image does not match to the image mentioned during create")
self.assertEqual(get_server_info.flavor.id, self._parse_link_to_retrieve_id(flavor_bookmark_link),
msg="The flavor does not match to the flavor mentioned during create")
self.servers_client.delete_server(active_server.id)
@tags(type='positive', net='no')
def test_update_server_using_server_self_link(self):
"""Update a server using the server self link"""
name = rand_name("testserver")
stored_name = name
'''Create an active server'''
active_server_response = self.server_behaviors.create_active_server()
active_server = active_server_response.entity
'''Need to ensure there is atleast one second gap between creating and
updating a server. The test failed once without the sleep.'''
time.sleep(1)
'''Some processing'''
link = str(active_server.links.self)
link_list = link.split('/')
server_id = link_list[6]
'''Use server self link to update the server'''
updated_server_response = self.servers_client.update_server(server_id,
name,
accessIPv4=self.accessIPv4,
accessIPv6=self.accessIPv6)
updated_server = updated_server_response.entity
self.server_behaviors.wait_for_server_status(updated_server.id, NovaServerStatusTypes.ACTIVE)
'''Verify the name and access ips of the server have changed'''
get_server_info = self.servers_client.get_server(updated_server.id)
self.assertEqual(stored_name, get_server_info.entity.name,
msg="The name was not updated")
self.assertEqual(self.accessIPv4, get_server_info.entity.accessIPv4,
msg="AccessIPv4 was not updated")
self.assertEqual(self.accessIPv6, get_server_info.entity.accessIPv6,
msg="AccessIPv6 was not updated")
self.assertEqual(active_server.created, get_server_info.entity.created,
msg="The creation date was updated")
self.assertTrue(active_server.updated != get_server_info.entity.updated,
msg="Server %s updated time did not change after a modification to the server." % updated_server.id)
self.servers_client.delete_server(get_server_info.entity.id)
@tags(type='positive', net='no')
def test_update_server_using_server_bookmark_link(self):
"""Update a server using the server bookmark link"""
name = rand_name("testserver")
stored_name = name
# Create an active server
active_server_response = self.server_behaviors.create_active_server()
active_server = active_server_response.entity
'''Need to ensure there is atleast one second gap between creating
and updating a server. The test failed once without the sleep.'''
time.sleep(1)
#Some processing
link = str(active_server.links.bookmark)
link_list = link.split('/')
server_id = link_list[5]
'''Use server bookmark's link to update the server'''
updated_server_response = self.servers_client.update_server(server_id,
name,
accessIPv4=self.accessIPv4,
accessIPv6=self.accessIPv6)
updated_server = updated_server_response.entity
self.server_behaviors.wait_for_server_status(updated_server.id, NovaServerStatusTypes.ACTIVE)
'''Verify the name and access ips of the server have changed'''
get_server_info = self.servers_client.get_server(updated_server.id)
self.assertEqual(stored_name, get_server_info.entity.name,
msg="The name was not updated")
self.assertEqual(self.accessIPv4, get_server_info.entity.accessIPv4,
msg="AccessIPv4 was not updated")
self.assertEqual(self.accessIPv6, get_server_info.entity.accessIPv6,
msg="AccessIPv6 was not updated")
self.assertEqual(active_server.created, get_server_info.entity.created,
msg="The creation date was updated")
self.assertTrue(active_server.updated != get_server_info.entity.updated,
msg="Server %s updated time did not change after a modification to the server." % updated_server.id)
self.servers_client.delete_server(get_server_info.entity.id)

View File

@ -0,0 +1,163 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cafe.drivers.unittest.decorators import tags
from cloudcafe.compute.common.exceptions import BadRequest, ItemNotFound
from cloudcafe.compute.common.datagen import rand_name
from test_repo.compute.fixtures import ComputeFixture
class ServersNegativeTest(ComputeFixture):
@classmethod
def setUpClass(cls):
super(ServersNegativeTest, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(ServersNegativeTest, cls).tearDownClass()
@tags(type='negative', net='no')
def test_server_name_blank(self):
with self.assertRaises(BadRequest):
self.servers_client.create_server('', self.image_ref, self.flavor_ref)
@tags(type='negative', net='no')
def test_personality_file_contents_not_encoded(self):
"""Server should not get created with a personality file whose content is not encoded"""
file_contents = 'This is a test file.'
personality = [{'path': '/etc/testfile.txt',
'contents': file_contents}]
with self.assertRaises(BadRequest):
server = self.servers_client.create_server('blankfile',
self.image_ref, self.flavor_ref,
personality=personality)
@tags(type='negative', net='no')
def test_invalid_ip_v4_access_address(self):
"""Negative test: Server should not get created with invalid ipv4 address"""
accessIPv4 = '1.1.1.1.1.1'
name = rand_name("testserver")
with self.assertRaises(BadRequest):
server_response = self.servers_client.create_server(name,
self.image_ref,
self.flavor_ref,
accessIPv4=accessIPv4)
@tags(type='negative', net='no')
def test_invalid_ip_v6_access_address(self):
"""Negative test: Server should not get created with invalid ipv6 address"""
accessIPv6 = '2.2.2.2'
name = rand_name("testserver")
with self.assertRaises(BadRequest):
server_response = self.servers_client.create_server(name,
self.image_ref,
self.flavor_ref,
accessIPv6=accessIPv6)
@tags(type='negative', net='no')
def test_server_metadata_item_nonexistent_server(self):
"""Negative test: GET on nonexistent server should not succeed"""
with self.assertRaises(ItemNotFound):
self.servers_client.get_server_metadata_item(999, 'test2')
@tags(type='negative', net='no')
def test_list_server_metadata_nonexistent_server(self):
"""List metadata on a non existent server should not succeed"""
with self.assertRaises(ItemNotFound):
self.servers_client.list_server_metadata(999)
@tags(type='negative', net='no')
def test_set_server_metadata_nonexistent_server(self):
"""Set metadata on a non existent server should not succeed"""
meta = {'meta1': 'data1'}
with self.assertRaises(ItemNotFound):
metadata_response = self.servers_client.set_server_metadata(999, meta)
@tags(type='negative', net='no')
def test_update_server_metadata_nonexistent_server(self):
"""An update should not happen for a non existent image"""
meta = {'key1': 'value1', 'key2': 'value2'}
with self.assertRaises(ItemNotFound):
self.servers_client.update_server_metadata(999, meta)
@tags(type='negative', net='no')
def test_delete_server_metadata_item_nonexistent_server(self):
"""Should not be able to delete metadata item from a non existent server"""
with self.assertRaises(ItemNotFound):
self.servers_client.delete_server_metadata_item(999, 'delkey')
@tags(type='negative', net='no')
def test_create_server_with_unknown_flavor(self):
"""Server creation with a flavor ID which does not exist, should not be allowed"""
with self.assertRaises(BadRequest):
self.servers_client.create_server('testserver', self.image_ref, 999)
@tags(type='negative', net='no')
def test_create_server_with_unknown_image(self):
"""Server creation with an image ID which does not exist,should not be allowed"""
with self.assertRaises(BadRequest):
self.servers_client.create_server('testserver', 999, self.flavor_ref)
@tags(type='negative', net='no')
def test_get_nonexistent_server_fails(self):
"""Making a get request for a server that does not exist should cause a 404"""
with self.assertRaises(ItemNotFound):
self.servers_client.get_server(999)
@tags(type='negative', net='no')
def test_delete_nonexistent_server_fails(self):
"""Making a delete request for a server that does not exist should cause a 404"""
with self.assertRaises(ItemNotFound):
self.servers_client.delete_server(999)
@tags(type='negative', net='no')
def test_list_addresses_for_nonexistant_server_fails(self):
"""Making a list address request for a server that does not exist should cause a 404"""
with self.assertRaises(ItemNotFound):
self.servers_client.list_addresses(999)
@tags(type='negative', net='no')
def test_list_addresses_for_invalid_network_id_fails(self):
"""Making a list address request for a server that does not exist should cause a 404"""
server_response = self.server_behaviors.create_active_server()
server = server_response.entity
with self.assertRaises(ItemNotFound):
self.servers_client.list_addresses_by_network(server.id, 999)
@tags(type='negative', net='no')
def test_list_addresses_by_network_for_nonexistant_server_fails(self):
"""Making a list address by network request for a server that does not exist should cause a 404"""
with self.assertRaises(ItemNotFound):
self.servers_client.list_addresses_by_network(999, 'public')
@tags(type='negative', net='no')
def test_cannot_get_deleted_server(self):
"""A 400 response should be returned when you get a server which is deleted"""
server = self.server_behaviors.create_active_server()
delete_resp = self.servers_client.delete_server(server.entity.id)
self.assertEqual(204, delete_resp.status_code)
self.server_behaviors.wait_for_server_to_be_deleted(server.entity.id)
with self.assertRaises(ItemNotFound):
self.servers_client.get_server(server.entity.id)
@tags(type='negative', net='no')
def test_create_server_with_invalid_name(self):
"""Server creation with a blank/invalid name should not be allowed"""
with self.assertRaises(BadRequest):
self.servers_client.create_server('', self.image_ref, self.flavor_ref)

View File

@ -0,0 +1,118 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import re
import unittest2 as unittest
from urlparse import urlparse
from cafe.drivers.unittest.decorators import tags
from test_repo.compute.fixtures import ComputeFixture
class TestLinks(ComputeFixture):
@classmethod
def setUpClass(cls):
super(TestLinks, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(TestLinks, cls).tearDownClass()
@tags(type='positive', net='no')
def test_verify_server_self_link(self):
"""Verify that a server self link has the correct format"""
self.server_resp = self.server_behaviors.create_active_server()
self.server_id = self.server_resp.entity.id
self.resources.add(self.server_id, self.servers_client.delete_server)
server_self_link = self.server_resp.entity.links.self
self.assertTrue(self._has_version(server_self_link))
get_server_resp = self.servers_client.get_server(server_self_link)
self.assertEqual(self.server_id, get_server_resp.entity.id)
@tags(type='positive', net='no')
@unittest.skip('V1 Bug:D-03447')
def test_verify_server_bookmark_link(self):
"""Verify that server bookmark link is a link with no version number"""
self.server_resp = self.server_behaviors.create_active_server()
self.server_id = self.server_resp.entity.id
self.resources.add(self.server_id, self.servers_client.delete_server)
server_bookmark_link = self.server_resp.entity.links.bookmark
self.assertFalse(self._has_version(server_bookmark_link))
retrieved_server = self.servers_client.get_server(server_bookmark_link)
self.assertEqual(self.server_id, retrieved_server.entity.id)
@tags(type='positive', net='no')
def test_verify_flavor_self_link(self):
"""Verify that flavor self link is a full url with a version number"""
flavor_resp = self.flavors_client.get_flavor_details(self.flavor_ref)
flavor_self_link = flavor_resp.entity.links.self
self.assertTrue(self._has_version(flavor_self_link))
# Verify that the same flavor can be retrieved using the flavor self link
retrieved_flavor_resp = self.flavors_client.get_flavor_details(str(flavor_self_link))
self.assertEqual(retrieved_flavor_resp.entity.id, flavor_resp.entity.id)
self.assertEqual(retrieved_flavor_resp.entity.ram, flavor_resp.entity.ram)
self.assertEqual(retrieved_flavor_resp.entity.swap, flavor_resp.entity.swap)
@tags(type='positive', net='no')
@unittest.skip('V1 Bug:D-03447')
def test_verify_flavor_bookmark_link(self):
"""Verify that flavor bookmark link is a permanent link with no version number"""
flavor_resp = self.flavors_client.get_flavor_details(self.flavor_ref)
flavor_bookmark_link = flavor_resp.entity.links.bookmark
self.assertFalse(self._has_version(flavor_bookmark_link))
retrieved_flavor_resp = self.flavors_client.get_flavor_details(flavor_bookmark_link)
self.assertEqual(retrieved_flavor_resp.entity.id, flavor_resp.entity.id)
self.assertEqual(retrieved_flavor_resp.entity.ram, flavor_resp.entity.ram)
self.assertEqual(retrieved_flavor_resp.entity.swap, flavor_resp.entity.swap)
@tags(type='positive', net='no')
def test_image_self_link_during_get_image(self):
"""Verify that image self link is a full url with a version number"""
image_resp = self.images_client.get_image(self.image_ref)
image_self_link = image_resp.entity.links.self
self.assertTrue(self._has_version(image_self_link))
# Verify that the same image can be retrieved using the image self link
retrieved_image_with_self_link = self.images_client.get_image(image_self_link)
self.assertEqual(retrieved_image_with_self_link.entity.id, image_resp.entity.id)
@tags(type='positive', net='no')
@unittest.skip('V1 Bug:D-03447')
def test_image_bookmark_link_during_get_image(self):
"""Verify that image bookmark link is a permanent link with no version number"""
image_resp = self.images_client.get_image(self.image_ref)
image_bookmark_link = image_resp.entity.links.bookmark
self.assertFalse(self._has_version(image_bookmark_link))
# Verify that the same image can be retrieved using the image bookmark link
retrieved_image_with_bookmark_link = self.images_client.get_image(image_bookmark_link)
self.assertEqual(retrieved_image_with_bookmark_link.entity.id, image_resp.entity.id)
def _has_version(self, link):
return re.search('^/v+\d', urlparse(link).path) is not None
def _parse_ref_to_retrieve_id(self, ref):
temp = ref.rsplit('/')
#Return the last item, which is the image id
return temp[len(temp) - 1]

View File

@ -0,0 +1,16 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

View File

@ -0,0 +1,23 @@
"""
Copyright 2013 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from cloudcafe.identity.v2_0.tokens_api.provider import TokenAPI_Provider
tokens_api = TokenAPI_Provider()
auth_token = tokens_api.behaviors.get_token_by_password()
assert auth_token is not None, 'Auth token returned as None'
assert auth_token != '', 'Auth token returned empty'