Merge "Add tempest integration tests"

This commit is contained in:
Jenkins 2014-02-17 11:03:19 +00:00 committed by Gerrit Code Review
commit 60e22d0211
8 changed files with 3220 additions and 0 deletions

View File

@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,705 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import socket
import requests
import os
import novaclient.v1_1.client as nvclient
import tempest.test
import tempest.api.murano.config as cfg
from tempest import clients
from tempest.common import rest_client
class MuranoTest(tempest.test.BaseTestCase):
@classmethod
def setUpClass(cls):
"""
This method allows to initialize authentication before
each test case and define parameters of Murano API Service
This method also create environment for all tests
"""
super(MuranoTest, cls).setUpClass()
if not cfg.murano.service_available:
raise cls.skipException("Murano tests are disabled")
user = cls.config.identity.admin_username
password = cls.config.identity.admin_password
tenant = cls.config.identity.admin_tenant_name
auth_url = cls.config.identity.uri
client_args = (cls.config, user, password, auth_url, tenant)
cls.client = rest_client.RestClient(*client_args)
cls.client.service = 'identity'
cls.token = cls.client.get_auth()
cls.client.base_url = cfg.murano.murano_url
cls.environments = []
cls.inst_wth_fl_ip = []
cls.linux = 'linux'
cls.windows = 'windows'
cls.demo = 'demo'
def tearDown(self):
"""
This method allows to clean up after each test.
The main task for this method - delete environment after
PASSED and FAILED tests.
"""
super(MuranoTest, self).tearDown()
for environment in self.environments:
try:
self.delete_environment(environment['id'])
except Exception:
pass
for inst in self.inst_wth_fl_ip:
try:
self.remove_floating_ip(inst)
except Exception:
pass
def create_environment(self, name):
"""
This method allows to create environment.
Input parameters:
name - Name of new environment
Returns response and new environment.
"""
post_body = '{"name": "%s"}' % name
resp, body = self.client.post('environments', post_body,
self.client.headers)
return resp, json.loads(body)
def delete_environment(self, environment_id):
"""
This method allows to delete environment
Input parameters:
environment_id - ID of deleting environment
"""
self.client.delete('environments/' + str(environment_id),
self.client.headers)
def update_environment(self, environment_id, environment_name):
"""
This method allows to update environment instance
Input parameters:
environment_id - ID of updating environment
environment_name - name of updating environment
"""
post_body = '{"name": "%s"}' % (environment_name + "-changed")
resp, body = self.client.put('environments/' + str(environment_id),
post_body, self.client.headers)
return resp, json.loads(body)
def get_list_environments(self):
"""
This method allows to get a list of existing environments
Returns response and list of environments
"""
resp, body = self.client.get('environments',
self.client.headers)
return resp, json.loads(body)
def get_environment_by_id(self, environment_id):
"""
This method allows to get environment's info by id
Input parameters:
environment_id - ID of needed environment
Returns response and environment's info
"""
resp, body = self.client.get('environments/' + str(environment_id),
self.client.headers)
return resp, json.loads(body)
def nova_auth(self):
user = self.config.identity.admin_username
password = self.config.identity.admin_password
tenant = self.config.identity.admin_tenant_name
auth_url = self.config.identity.uri
nova = nvclient.Client(user, password, tenant, auth_url,
service_type="compute")
return nova
def search_instances(self, environment_id, hostname):
nova = self.nova_auth()
somelist = []
for i in nova.servers.list():
if ((str(environment_id) in str(
i.name)) and (str(hostname) in str(i.name))):
somelist.append(i.id)
return somelist
def add_floating_ip(self, instance_id):
nova = self.nova_auth()
pool = nova.floating_ip_pools.list()[0].name
ip = nova.floating_ips.create(pool)
nova.servers.get(instance_id).add_floating_ip(ip)
return ip.ip
def remove_floating_ip(self, instance_id):
nova = self.nova_auth()
fl_ips = nova.floating_ips.findall(instance_id=instance_id)
for fl_ip in fl_ips:
nova.floating_ips.delete(fl_ip.id)
return None
def socket_check(self, ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((str(ip), port))
sock.close()
return result
def create_session(self, environment_id):
"""
This method allow to create session
Input parameters:
environment_id - ID of environment
where session should be created
"""
post_body = None
resp, body = self.client.post('environments/' + str(environment_id) +
'/configure',
post_body, self.client.headers)
return resp, json.loads(body)
def get_session_info(self, environment_id, session_id):
"""
This method allow to get session's info
Input parameters:
environment_id - ID of environment
where needed session was created
session_id - ID of needed session
Return response and session's info
"""
resp, body = self.client.get('environments/' + str(environment_id) +
'/sessions/' + str(session_id),
self.client.headers)
return resp, json.loads(body)
def delete_session(self, environment_id, session_id):
"""
This method allow to delete session
Input parameters:
environment_id - ID of environment
where needed session was created
session_id - ID of needed session
"""
self.client.delete('environments/' + str(environment_id) +
'/sessions/' + str(session_id), self.client.headers)
def create_AD(self, environment_id, session_id):
"""
This method allow to add AD
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
post_body = {"type": "activeDirectory", "name": "ad.local",
"adminPassword": "P@ssw0rd", "domain": "ad.local",
"availabilityZone": "nova",
"unitNamingPattern": "adinstance",
"flavor": "m1.medium", "osImage":
{"type": "ws-2012-std", "name": self.windows, "title":
"Windows Server 2012 Standard"},
"configuration": "standalone",
"units": [{"isMaster": True,
"recoveryPassword": "P@ssw0rd",
"location": "west-dc"}]}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_IIS(self, environment_id, session_id, domain_name=""):
"""
This method allow to add IIS
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
iis_name = "IISSERVICE"
creds = {'username': 'Administrator',
'password': 'P@ssw0rd'}
post_body = {"type": "webServer", "domain": domain_name,
"availabilityZone": "nova", "name": iis_name,
"adminPassword": "P@ssw0rd",
"unitNamingPattern": "iisinstance",
"osImage": {"type": "ws-2012-std", "name": self.windows,
"title": "Windows Server 2012 Standard"},
"units": [{}], "credentials": creds,
"flavor": "m1.medium"}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_apsnet(self, environment_id, session_id, domain_name=""):
"""
This method allow to add apsnet
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
creds = {'username': 'Administrator',
'password': 'P@ssw0rd'}
post_body = {"type": "aspNetApp", "domain": domain_name,
"availabilityZone": "nova",
"name": "someasp", "repository":
"git://github.com/Mirantis/murano-mvc-demo.git",
"adminPassword": "P@ssw0rd",
"unitNamingPattern": "aspnetinstance",
"osImage": {"type": "ws-2012-std", "name": self.windows,
"title": "Windows Server 2012 Standard"},
"units": [{}], "credentials": creds,
"flavor": "m1.medium"}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_IIS_farm(self, environment_id, session_id, domain_name=""):
"""
This method allow to add IIS farm
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
creds = {'username': 'Administrator', 'password': 'P@ssw0rd'}
post_body = {"type": "webServerFarm", "domain": domain_name,
"availabilityZone": "nova", "name": "someIISFARM",
"adminPassword": "P@ssw0rd", "loadBalancerPort": 80,
"unitNamingPattern": "",
"osImage": {"type": "ws-2012-std", "name": self.windows,
"title": "Windows Server 2012 Standard"},
"units": [{}, {}],
"credentials": creds, "flavor": "m1.medium"}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_apsnet_farm(self, environment_id, session_id, domain_name=""):
"""
This method allow to add apsnet farm
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
creds = {'username': 'Administrator', 'password': 'P@ssw0rd'}
post_body = {"type": "aspNetAppFarm", "domain": domain_name,
"availabilityZone": "nova", "name": "SomeApsFarm",
"repository":
"git://github.com/Mirantis/murano-mvc-demo.git",
"adminPassword": "P@ssw0rd", "loadBalancerPort": 80,
"unitNamingPattern": "",
"osImage": {"type": "ws-2012-std", "name": self.windows,
"title": "Windows Server 2012 Standard"},
"units": [{}, {}],
"credentials": creds, "flavor": "m1.medium"}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_SQL(self, environment_id, session_id, domain_name=""):
"""
This method allow to add SQL
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
post_body = {"type": "msSqlServer", "domain": domain_name,
"availabilityZone": "nova", "name": "SQLSERVER",
"adminPassword": "P@ssw0rd",
"unitNamingPattern": "sqlinstance",
"saPassword": "P@ssw0rd", "mixedModeAuth": True,
"osImage": {"type": "ws-2012-std", "name": self.windows,
"title": "Windows Server 2012 Standard"},
"units": [{}],
"credentials": {"username": "Administrator",
"password": "P@ssw0rd"},
"flavor": "m1.medium"}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_SQL_cluster(self, environment_id, session_id, domain_name=""):
"""
This method allow to add SQL cluster
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
AG = cfg.murano.agListnerIP
clIP = cfg.murano.clusterIP
post_body = {"domain": domain_name, "domainAdminPassword": "P@ssw0rd",
"externalAD": False,
"sqlServiceUserName": "Administrator",
"sqlServicePassword": "P@ssw0rd",
"osImage": {"type": "ws-2012-std", "name": self.windows,
"title": "Windows Server 2012 Standard"},
"agListenerName": "SomeSQL_AGListner",
"flavor": "m1.medium",
"agGroupName": "SomeSQL_AG",
"domainAdminUserName": "Administrator",
"agListenerIP": AG,
"clusterIP": clIP,
"type": "msSqlClusterServer", "availabilityZone": "nova",
"adminPassword": "P@ssw0rd",
"clusterName": "SomeSQL", "mixedModeAuth": True,
"unitNamingPattern": "",
"units": [{"isMaster": True, "name": "node1",
"isSync": True},
{"isMaster": False, "name": "node2",
"isSync": True}],
"name": "Sqlname", "saPassword": "P@ssw0rd",
"databases": ['NewDB']}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_linux_telnet(self, environment_id, session_id):
post_body = {"availabilityZone": "nova", "name": "LinuxTelnet",
"deployTelnet": True, "unitNamingPattern": "telnet",
"keyPair": "murano-lb-key",
"osImage": {"type": "linux", "name": self.linux,
"title": "Linux Image"},
"units": [{}],
"flavor": "m1.small", "type": "linuxTelnetService"}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_linux_apache(self, environment_id, session_id):
post_body = {"availabilityZone": "nova", "name": "LinuxApache",
"deployApachePHP": True, "unitNamingPattern": "apache",
"keyPair": "murano-lb-key",
"instanceCount": [{}],
"osImage": {"type": "linux", "name": self.linux,
"title": "Linux Image"},
"units": [{}],
"flavor": "m1.small", "type": "linuxApacheService"}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def create_demo_service(self, environment_id, session_id):
post_body = {"availabilityZone": "nova", "name": "demo",
"unitNamingPattern": "host",
"osImage": {"type": "cirros.demo", "name": self.demo,
"title": "Demo"},
"units": [{}], "flavor": "m1.small",
"configuration": "standalone", "type": "demoService"}
post_body = json.dumps(post_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.post('environments/' + str(environment_id) +
'/services', post_body,
self.client.headers)
return resp, json.loads(body)
def delete_service(self, environment_id, session_id, service_id):
"""
This method allow to delete service
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
service_id - ID of needed service
"""
self.client.headers.update({'X-Configuration-Session': session_id})
self.client.delete('environments/' + str(environment_id)
+ '/services/' + str(service_id),
self.client.headers)
def get_list_services(self, environment_id, session_id):
"""
This method allow to get list of services
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.get('environments/' + str(environment_id) +
'/services',
self.client.headers)
return resp, json.loads(body)
def get_service_info(self, environment_id, session_id, service_id):
"""
This method allow to get detailed service info
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
service_id - ID of needed service
"""
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.get('environments/' + str(environment_id) +
'/services/' + str(service_id),
self.client.headers)
return resp, json.loads(body)
def update_service(self, environment_id, session_id, service_id, s_body):
"""
This method allows to update service
Input parameters:
environment_id - env's id
session_id - session_id where service is attach
service_id - service id of updating service
s_body - json obj
"""
s_body['flavor'] = "m1.small"
post_body = json.dumps(s_body)
self.client.headers.update({'X-Configuration-Session': session_id})
resp, body = self.client.put('environments/' + str(environment_id) +
'/services/' + str(service_id),
post_body, self.client.headers)
return resp, json.loads(body)
def deploy_session(self, environment_id, session_id):
"""
This method allow to send environment on deploy
Input parameters:
environment_id - ID of current environment
session_id - ID of current session
"""
post_body = None
resp = self.client.post('environments/' + str(environment_id) +
'/sessions/' + str(session_id) +
'/deploy', post_body, self.client.headers)
return resp
def get_deployments_list(self, environment_id):
"""
This method allow to get list of deployments
Input parameters:
environment_id - ID of current environment
"""
resp, body = self.client.get('environments/' + str(environment_id) +
'/deployments', self.client.headers)
return resp, json.loads(body)
def get_deployment_info(self, environment_id, deployment_id):
"""
This method allow to get detailed info about deployment
Input parameters:
environment_id - ID of current environment
deployment_id - ID of needed deployment
"""
resp, body = self.client.get('environments/' + str(environment_id) +
'/deployments/' + str(deployment_id),
self.client.headers)
return resp, json.loads(body)
class MuranoMeta(tempest.test.BaseTestCase):
@classmethod
def setUpClass(cls):
super(MuranoMeta, cls).setUpClass()
if not cfg.murano.service_available:
raise cls.skipException("Murano tests is disabled")
user = cls.config.identity.admin_username
password = cls.config.identity.admin_password
tenant = cls.config.identity.admin_tenant_name
auth_url = cls.config.identity.uri
client_args = (cls.config, user, password, auth_url, tenant)
cls.client = rest_client.RestClient(*client_args)
cls.client.service = 'identity'
cls.token = cls.client.get_auth()
cls.client.base_url = cfg.murano.metadata_url
cls.objs = []
cls.services = []
def tearDown(self):
super(MuranoMeta, self).tearDown()
for obj in self.objs:
try:
self.delete_metadata_obj_or_folder(obj)
except Exception:
pass
for service in self.services:
try:
self.delete_service(service)
except Exception:
pass
def get_ui_definitions(self):
resp, body = self.client.get('v1/client/ui', self.client.headers)
return resp, body
def get_conductor_metadata(self):
resp, body = self.client.get('v1/client/conductor',
self.client.headers)
return resp, body
def get_list_metadata_objects(self, path):
resp, body = self.client.get('v1/admin/' + path, self.client.headers)
return resp, body
def get_metadata_object(self, object):
resp, body = self.client.get('v1/admin/' + object, self.client.headers)
return resp, body
def upload_metadata_object(self, path, filename='testfile.txt'):
with open(filename, 'w') as f:
f.write("It's a test file")
files = {'file': open(filename, 'rb')}
headers = {'X-Auth-Token': self.token}
resp = requests.post('%s/v1/admin/%s' % (self.client.base_url, path),
files=files, headers=headers)
os.remove(filename)
return resp
def create_directory(self, path, name):
post_body = None
resp, body = self.client.put('v1/admin/' + path + name, post_body,
self.client.headers)
return resp, json.loads(body)
def delete_metadata_obj_or_folder(self, object):
resp, body = self.client.delete('v1/admin/' + object,
self.client.headers)
return resp, json.loads(body)
def create_new_service(self, name):
post_body = {"name": name, "version": "0.1",
"full_service_name": name,
"service_display_name": name}
post_body = json.dumps(post_body)
resp, body = self.client.put('v1/admin/services/' + name, post_body,
self.client.headers)
return resp, body
def update_new_service(self, name):
post_body = {"name": name + "1", "version": "0.1",
"full_service_name": name,
"service_display_name": name + "1"}
post_body = json.dumps(post_body)
resp, body = self.client.put('v1/admin/services/' + name,
post_body, self.client.headers)
return resp, body
def delete_service(self, name):
resp, body = self.client.delete('v1/admin/services/' + name,
self.client.headers)
return resp, body
def create_complex_service(self, name):
post_body = {"name": name, "version": "0.1",
"full_service_name": name,
"service_display_name": name,
"agent": ["CreatePrimaryDC.template",
"LeaveDomain.template",
"SetPassword.template",
"CreateSecondaryDC.template",
"AskDnsIp.template",
"JoinDomain.template"],
"heat": ["RouterInterface.template",
"Windows.template",
"Network.template",
"NNSecurity.template",
"Param.template",
"Subnet.template",
"InstancePortWSubnet.template",
"InstancePort.template"],
"scripts": ["Install-RoleSecondaryDomainController.ps1",
"Install-RolePrimaryDomainController.ps1",
"Join-Domain.ps1",
"ImportCoreFunctions.ps1",
"Get-DnsListeningIpAddress.ps1",
"Set-LocalUserPassword.ps1"],
"ui": ["ActiveDirectory.yaml"],
"workflows": ["AD.xml",
"Networking.xml",
"Common.xml"]}
post_body = json.dumps(post_body)
resp, body = self.client.put('v1/admin/services/' + name, post_body,
self.client.headers)
return resp, body, json.loads(post_body)
def switch_service_parameter(self, service):
post_body = None
resp, body = self.client.post('v1/admin/services/%s/toggle_enabled' %
service, post_body, self.client.headers)
return resp, json.loads(body)
def reset_cache(self):
post_body = None
resp, body = self.client.post('v1/admin/reset_caches', post_body,
self.client.headers)
return resp, json.loads(body)
def get_list_of_meta_information_about_service(self, service):
resp, body = self.client.get('v1/admin/services/%s/info' % service,
self.client.headers)
return resp, json.loads(body)

View File

@ -0,0 +1,6 @@
[murano]
murano_url = http://127.0.0.1:8082
metadata_url = http://127.0.0.1:8084
agListnerIP = 10.0.0.155
clusterIP = 10.0.0.150
service_available = True

View File

@ -0,0 +1,37 @@
import os
from oslo.config import cfg
murano_group = cfg.OptGroup(name='murano', title='murano')
MuranoGroup = [
cfg.StrOpt('murano_url',
default='http://127.0.0.1:8082',
help="murano url"),
cfg.StrOpt('metadata_url',
default='http://127.0.0.1:8084',
help="murano metadata repository url"),
cfg.StrOpt('agListnerIP',
default='10.0.0.155',
help="agListnerIP"),
cfg.StrOpt('clusterIP',
default='10.0.0.150',
help="clusterIP"),
cfg.BoolOpt('service_available',
default='True',
help='mistral available')
]
def register_config(config, config_group, config_opts):
config.register_group(config_group)
config.register_opts(config_opts, config_group)
path = os.path.join("%s/config.conf" % os.getcwd())
if os.path.exists(path):
cfg.CONF([], project='muranointegration', default_config_files=[path])
register_config(cfg.CONF, murano_group, MuranoGroup)
mistral = cfg.CONF.murano

View File

@ -0,0 +1,184 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.murano import base
from tempest.test import attr
from tempest import exceptions
class SanityMuranoTest(base.MuranoTest):
@attr(type='smoke')
def test_create_and_delete_environment(self):
resp, env = self.create_environment('test')
self.environments.append(env)
self.assertEqual(resp['status'], '200')
resp, infa = self.get_environment_by_id(env['id'])
self.assertEqual(resp['status'], '200')
self.assertEqual(infa['name'], 'test')
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='smoke')
def test_get_environment(self):
"""
Get environment by id
Test create environment, afterthat test try to get
environment's info, using environment's id,
and finally delete this environment
Target component: Murano
Scenario:
1. Send request to create environment.
2. Send request to get environment
3. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, infa = self.get_environment_by_id(env['id'])
self.assertEqual(resp['status'], '200')
self.assertEqual(infa['name'], 'test')
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='smoke')
def test_get_list_environments(self):
"""
Get list of existing environments
Test try to get list of existing environments
Target component: Murano
Scenario:
1. Send request to get list of enviroments
"""
_, env1 = self.create_environment('test1')
self.environments.append(env1)
resp, infa = self.get_list_environments()
self.assertEqual(resp['status'], '200')
self.assertEqual(len(infa['environments']) == 1)
self.delete_environment(env1['id'])
self.environments.pop(self.environments.index(env1))
@attr(type='smoke')
def test_update_environment(self):
"""
Update environment instance
Test try to update environment instance
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to update enviroment instance
3. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, infa = self.update_environment(env['id'], env['name'])
self.assertEqual(resp['status'], '200')
self.assertEqual(infa['name'], 'test-changed')
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='negative')
def test_update_env_with_wrong_env_id(self):
"""
Try to update environment using uncorrect env_id
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to update enviroment instance(with uncorrrect env_id)
3. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
self.assertRaises(exceptions.NotFound, self.update_environment, None,
env['name'])
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='smoke')
def test_update_env_after_begin_of_deploy(self):
"""
Try to update environment after begin of deploy
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to create session
3. Send request to deploy session
4. Send request to update enviroment
5. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, sess = self.create_session(env['id'])
self.deploy_session(env['id'], sess['id'])
resp, infa = self.update_environment(env['id'], env['name'])
self.assertEqual(resp['status'], '200')
self.assertEqual(infa['name'], 'test-changed')
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='negative')
def test_delete_env_by_uncorrect_env_id(self):
"""
Try to delete environment using uncorrect env_id
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to delete environment(with uncorrrect env_id)
3. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
self.assertRaises(exceptions.NotFound, self.delete_environment,
None)
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='negative')
def test_double_delete_environment(self):
"""
Try to delete environment twice
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to delete environment
3. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
self.assertRaises(exceptions.NotFound, self.delete_environment,
env['id'])
@attr(type='negative')
def test_delete_env_and_get_env(self):
_, env = self.create_environment('test')
self.environments.append(env)
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
self.assertRaises(exceptions.NotFound, self.get_environment_by_id,
env['id'])
@attr(type='negative')
def test_get_environment_wo_env_id(self):
self.assertRaises(exceptions.NotFound, self.get_environment_by_id,
None)

View File

@ -0,0 +1,523 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
import json
from tempest.test import attr
from tempest.api.murano import base
from tempest import exceptions
class SanityMuranoTest(base.MuranoMeta):
@attr(type='smoke')
def test_get_list_metadata_objects_ui(self):
resp, body = self.get_list_metadata_objects("ui")
self.assertIsNotNone(body)
self.assertEqual(resp['status'], '200')
@attr(type='smoke')
def test_get_list_metadata_objects_workflows(self):
resp, body = self.get_list_metadata_objects("workflows")
self.assertIsNotNone(body)
self.assertEqual(resp['status'], '200')
@attr(type='smoke')
def test_get_list_metadata_objects_heat(self):
resp, body = self.get_list_metadata_objects("heat")
self.assertIsNotNone(body)
self.assertEqual(resp['status'], '200')
@attr(type='smoke')
def test_get_list_metadata_objects_agent(self):
resp, body = self.get_list_metadata_objects("agent")
self.assertIsNotNone(body)
self.assertEqual(resp['status'], '200')
@attr(type='smoke')
def test_get_list_metadata_objects_scripts(self):
resp, body = self.get_list_metadata_objects("scripts")
self.assertIsNotNone(body)
self.assertEqual(resp['status'], '200')
@attr(type='smoke')
def test_get_list_metadata_objects_manifests(self):
resp, body = self.get_list_metadata_objects("manifests")
self.assertIsNotNone(body)
self.assertEqual(resp['status'], '200')
@attr(type='negative')
def test_get_list_metadata_objects_incorrect_type(self):
self.assertRaises(exceptions.NotFound, self.get_list_metadata_objects,
'someth')
@attr(type='smoke')
def test_get_ui_definitions(self):
resp, body = self.get_ui_definitions()
self.assertIsNotNone(body)
self.assertEqual(resp['status'], '200')
@attr(type='smoke')
def test_get_conductor_metadata(self):
resp, body = self.get_conductor_metadata()
self.assertIsNotNone(body)
self.assertEqual(resp['status'], '200')
@attr(type='smoke')
def test_create_directory_and_delete_workflows(self):
resp, body = self.create_directory("workflows/", "testdir")
self.objs.append("workflows/testdir")
resp1, body1 = self.delete_metadata_obj_or_folder("workflows/testdir")
self.assertEqual(resp['status'], '200')
self.assertEqual(resp1['status'], '200')
self.assertEqual(body['result'], 'success')
self.assertEqual(body1['result'], 'success')
self.objs.pop(self.objs.index("workflows/testdir"))
@attr(type='smoke')
def test_create_directory_and_delete_ui(self):
resp, body = self.create_directory("ui/", "testdir")
self.objs.append("ui/testdir")
resp1, body1 = self.delete_metadata_obj_or_folder("ui/testdir")
self.assertEqual(resp['status'], '200')
self.assertEqual(resp1['status'], '200')
self.assertEqual(body['result'], 'success')
self.assertEqual(body1['result'], 'success')
self.objs.pop(self.objs.index("ui/testdir"))
@attr(type='smoke')
def test_create_directory_and_delete_heat(self):
resp, body = self.create_directory("heat/", "testdir")
self.objs.append("heat/testdir")
resp1, body1 = self.delete_metadata_obj_or_folder("heat/testdir")
self.assertEqual(resp['status'], '200')
self.assertEqual(resp1['status'], '200')
self.assertEqual(body['result'], 'success')
self.assertEqual(body1['result'], 'success')
self.objs.pop(self.objs.index("heat/testdir"))
@attr(type='smoke')
def test_create_directory_and_delete_agent(self):
resp, body = self.create_directory("agent/", "testdir")
self.objs.append("agent/testdir")
resp1, body1 = self.delete_metadata_obj_or_folder("agent/testdir")
self.assertEqual(resp['status'], '200')
self.assertEqual(resp1['status'], '200')
self.assertEqual(body['result'], 'success')
self.assertEqual(body1['result'], 'success')
self.objs.pop(self.objs.index("agent/testdir"))
@attr(type='smoke')
def test_create_directory_and_delete_scripts(self):
resp, body = self.create_directory("scripts/", "testdir")
self.objs.append("scripts/testdir")
resp1, body1 = self.delete_metadata_obj_or_folder("scripts/testdir")
self.assertEqual(resp['status'], '200')
self.assertEqual(resp1['status'], '200')
self.assertEqual(body['result'], 'success')
self.assertEqual(body1['result'], 'success')
self.objs.pop(self.objs.index("scripts/testdir"))
@testtools.skip('Bug https://bugs.launchpad.net/murano/+bug/1268934')
@attr(type='negative')
def test_create_directory_manifests(self):
self.assertRaises(exceptions.Unauthorized, self.create_directory,
"manifests/", "testdir")
@attr(type='negative')
def test_create_directory_incorrect_type(self):
self.assertRaises(exceptions.NotFound, self.create_directory,
"someth/", "testdir")
@attr(type='smoke')
def test_double_create_directory(self):
self.create_directory("workflows/", "testdir")
self.objs.append("workflows/testdir")
resp, body = self.create_directory("workflows/", "testdir")
self.assertEqual(resp['status'], '200')
self.assertEqual(body['result'], 'success')
self.delete_metadata_obj_or_folder("workflows/testdir")
self.objs.pop(self.objs.index("workflows/testdir"))
@attr(type='negative')
def test_delete_nonexistent_object(self):
self.assertRaises(exceptions.NotFound,
self.delete_metadata_obj_or_folder,
"somth/blabla")
@attr(type='negative')
def test_delete_basic_folder(self):
self.assertRaises(exceptions.MethodNotAllowed,
self.delete_metadata_obj_or_folder,
"workflows")
@attr(type='negative')
def test_create_basic_folder(self):
self.assertRaises(exceptions.MethodNotAllowed, self.create_directory,
"", "somth")
@attr(type='negative')
def test_double_upload_file(self):
self.upload_metadata_object(path="workflows")
self.objs.append("workflows/testfile.txt")
resp = self.upload_metadata_object(path="workflows")
self.assertEqual(resp.status_code, 403)
self.delete_metadata_obj_or_folder("workflows/testfile.txt")
self.objs.pop(self.objs.index("workflows/testfile.txt"))
@attr(type='negative')
def test_upload_file_incorrect(self):
resp = self.upload_metadata_object(path="workflows/testfil")
self.assertEqual(resp.status_code, 404)
@attr(type='smoke')
def test_upload_file_and_delete_workflows(self):
resp = self.upload_metadata_object(path="workflows")
self.objs.append("workflows/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("workflows")
self.delete_metadata_obj_or_folder("workflows/testfile.txt")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
self.objs.pop(self.objs.index("workflows/testfile.txt"))
@attr(type='smoke')
def test_upload_file_and_delete_ui(self):
resp = self.upload_metadata_object(path="ui")
self.objs.append("ui/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("ui")
self.delete_metadata_obj_or_folder("ui/testfile.txt")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
self.objs.pop(self.objs.index("ui/testfile.txt"))
@attr(type='smoke')
def test_upload_file_and_delete_heat(self):
resp = self.upload_metadata_object(path="heat")
self.objs.append("heat/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("heat")
self.delete_metadata_obj_or_folder("heat/testfile.txt")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
self.objs.pop(self.objs.index("heat/testfile.txt"))
@attr(type='smoke')
def test_upload_file_and_delete_agent(self):
resp = self.upload_metadata_object(path="agent")
self.objs.append("agent/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("agent")
self.delete_metadata_obj_or_folder("agent/testfile.txt")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
self.objs.pop(self.objs.index("agent/testfile.txt"))
@attr(type='smoke')
def test_upload_file_and_delete_scripts(self):
resp = self.upload_metadata_object(path="scripts")
self.objs.append("scripts/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("scripts")
self.delete_metadata_obj_or_folder("scripts/testfile.txt")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
self.objs.pop(self.objs.index("scripts/testfile.txt"))
@attr(type='smoke')
def test_upload_file_and_delete_manifests(self):
resp = self.upload_metadata_object(path="manifests",
filename='testfile-manifest.yaml')
self.objs.append("manifests/testfile-manifest.yaml")
resp1, body1 = self.get_list_metadata_objects("manifests")
self.delete_metadata_obj_or_folder("manifests/testfile-manifest.yaml")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile-manifest.yaml', body1)
self.objs.pop(self.objs.index("manifests/testfile-manifest.yaml"))
@attr(type='smoke')
def test_get_metadata_object(self):
self.upload_metadata_object(path="workflows")
self.objs.append("workflows/testfile.txt")
resp1, body1 = self.get_metadata_object("workflows/testfile.txt")
self.delete_metadata_obj_or_folder("workflows/testfile.txt")
self.assertEqual(resp1['status'], '200')
self.assertIsNotNone(body1)
self.objs.pop(self.objs.index("workflows/testfile.txt"))
@attr(type='negative')
def test_get_nonexistent_metadata_object(self):
self.assertRaises(exceptions.NotFound, self.get_metadata_object,
"somth/blabla")
@testtools.skip('Bug https://bugs.launchpad.net/murano/+bug/1249303')
@attr(type='negative')
def test_delete_nonempty_folder_in_workflows(self):
self.create_directory("workflows/", "testdir")
self.objs.append("workflows/testdir")
self.upload_metadata_object(path="workflows/testdir")
self.objs.append("workflows/testdir/testfile.txt")
self.assertRaises(Exception, self.delete_metadata_obj_or_folder,
"workflows/testdir")
self.delete_metadata_obj_or_folder("workflows/testdir/testfile.txt")
self.delete_metadata_obj_or_folder("workflows/testdir")
self.objs.pop(self.objs.index("workflows/testdir"))
self.objs.pop(self.objs.index("workflows/testdir/testfile.txt"))
@attr(type='positive')
def test_create_folder_and_upload_file_workflows(self):
self.create_directory("workflows/", "testdir")
self.objs.append("workflows/testdir")
resp = self.upload_metadata_object(path="workflows/testdir")
self.objs.append("workflows/testdir/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("workflows/testdir")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
resp, _ =\
self.delete_metadata_obj_or_folder(
"workflows/testdir/testfile.txt")
self.assertEqual(resp['status'], '200')
resp, _ = self.delete_metadata_obj_or_folder("workflows/testdir")
self.assertEqual(resp['status'], '200')
resp, body = self.get_list_metadata_objects("workflows")
self.assertEqual(resp['status'], '200')
self.assertNotIn('testfile.txt', body)
self.objs.pop(self.objs.index("workflows/testdir"))
self.objs.pop(self.objs.index("workflows/testdir/testfile.txt"))
@testtools.skip('Bug https://bugs.launchpad.net/murano/+bug/1249303')
@attr(type='negative')
def test_delete_nonempty_folder_in_ui(self):
self.create_directory("ui/", "testdir")
self.objs.append("ui/testdir")
self.upload_metadata_object(path="ui/testdir")
self.objs.append("ui/testdir/testfile.txt")
self.assertRaises(Exception, self.delete_metadata_obj_or_folder,
"ui/testdir")
self.delete_metadata_obj_or_folder("ui/testdir/testfile.txt")
self.delete_metadata_obj_or_folder("ui/testdir")
self.objs.pop(self.objs.index("ui/testdir"))
self.objs.pop(self.objs.index("ui/testdir/testfile.txt"))
@attr(type='positive')
def test_create_folder_and_upload_file_ui(self):
self.create_directory("ui/", "testdir")
self.objs.append("ui/testdir")
resp = self.upload_metadata_object(path="ui/testdir")
self.objs.append("ui/testdir/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("ui/testdir")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
resp, _ =\
self.delete_metadata_obj_or_folder("ui/testdir/testfile.txt")
self.assertEqual(resp['status'], '200')
resp, _ = self.delete_metadata_obj_or_folder("ui/testdir")
self.assertEqual(resp['status'], '200')
resp, body = self.get_list_metadata_objects("ui")
self.assertEqual(resp['status'], '200')
self.assertNotIn('testfile.txt', body)
self.objs.pop(self.objs.index("ui/testdir"))
self.objs.pop(self.objs.index("ui/testdir/testfile.txt"))
@testtools.skip('Bug https://bugs.launchpad.net/murano/+bug/1249303')
@attr(type='negative')
def test_delete_nonempty_folder_in_heat(self):
self.create_directory("heat/", "testdir")
self.objs.append("heat/testdir")
self.upload_metadata_object(path="heat/testdir")
self.objs.append("heat/testdir/testfile.txt")
self.assertRaises(Exception, self.delete_metadata_obj_or_folder,
"heat/testdir")
self.delete_metadata_obj_or_folder("heat/testdir/testfile.txt")
self.delete_metadata_obj_or_folder("heat/testdir")
self.objs.pop(self.objs.index("heat/testdir"))
self.objs.pop(self.objs.index("heat/testdir/testfile.txt"))
@attr(type='positive')
def test_create_folder_and_upload_file_heat(self):
self.create_directory("heat/", "testdir")
self.objs.append("heat/testdir")
resp = self.upload_metadata_object(path="heat/testdir")
self.objs.append("heat/testdir/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("heat/testdir")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
resp, _ =\
self.delete_metadata_obj_or_folder("heat/testdir/testfile.txt")
self.assertEqual(resp['status'], '200')
resp, _ = self.delete_metadata_obj_or_folder("heat/testdir")
self.assertEqual(resp['status'], '200')
resp, body = self.get_list_metadata_objects("heat")
self.assertEqual(resp['status'], '200')
self.assertNotIn('testfile.txt', body)
self.objs.pop(self.objs.index("heat/testdir"))
self.objs.pop(self.objs.index("heat/testdir/testfile.txt"))
@testtools.skip('Bug https://bugs.launchpad.net/murano/+bug/1249303')
@attr(type='negative')
def test_delete_nonempty_folder_in_agent(self):
self.create_directory("agent/", "testdir")
self.objs.append("agent/testdir")
self.upload_metadata_object(path="agent/testdir")
self.objs.append("agent/testdir/testfile.txt")
self.assertRaises(Exception, self.delete_metadata_obj_or_folder,
"agent/testdir")
self.delete_metadata_obj_or_folder("agent/testdir/testfile.txt")
self.delete_metadata_obj_or_folder("agent/testdir")
self.objs.pop(self.objs.index("agent/testdir"))
self.objs.pop(self.objs.index("agent/testdir/testfile.txt"))
@attr(type='positive')
def test_create_folder_and_upload_file_agent(self):
self.create_directory("agent/", "testdir")
self.objs.append("agent/testdir")
resp = self.upload_metadata_object(path="agent/testdir")
self.objs.append("agent/testdir/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("agent/testdir")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
resp, _ =\
self.delete_metadata_obj_or_folder("agent/testdir/testfile.txt")
self.assertEqual(resp['status'], '200')
resp, _ = self.delete_metadata_obj_or_folder("agent/testdir")
self.assertEqual(resp['status'], '200')
resp, body = self.get_list_metadata_objects("agent")
self.assertEqual(resp['status'], '200')
self.assertNotIn('testfile.txt', body)
self.objs.pop(self.objs.index("agent/testdir"))
self.objs.pop(self.objs.index("agent/testdir/testfile.txt"))
@testtools.skip('Bug https://bugs.launchpad.net/murano/+bug/1249303')
@attr(type='negative')
def test_delete_nonempty_folder_in_scripts(self):
self.create_directory("scripts/", "testdir")
self.objs.append("scripts/testdir")
self.upload_metadata_object(path="scripts/testdir")
self.objs.append("scripts/testdir/testfile.txt")
self.assertRaises(Exception, self.delete_metadata_obj_or_folder,
"scripts/testdir")
self.delete_metadata_obj_or_folder("scripts/testdir/testfile.txt")
self.delete_metadata_obj_or_folder("scripts/testdir")
self.objs.pop(self.objs.index("scripts/testdir"))
self.objs.pop(self.objs.index("scripts/testdir/testfile.txt"))
@attr(type='positive')
def test_create_folder_and_upload_file_scripts(self):
self.create_directory("scripts/", "testdir")
self.objs.append("scripts/testdir")
resp = self.upload_metadata_object(path="scripts/testdir")
self.objs.append("scripts/testdir/testfile.txt")
resp1, body1 = self.get_list_metadata_objects("scripts/testdir")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp1['status'], '200')
self.assertIn('testfile.txt', body1)
resp, _ =\
self.delete_metadata_obj_or_folder("scripts/testdir/testfile.txt")
self.assertEqual(resp['status'], '200')
resp, _ = self.delete_metadata_obj_or_folder("scripts/testdir")
self.assertEqual(resp['status'], '200')
resp, body = self.get_list_metadata_objects("scripts")
self.assertEqual(resp['status'], '200')
self.assertNotIn('testfile.txt', body)
self.objs.pop(self.objs.index("scripts/testdir"))
self.objs.pop(self.objs.index("scripts/testdir/testfile.txt"))
@attr(type='smoke')
def test_create_and_delete_new_service(self):
resp, body = self.create_new_service('test')
self.services.append('test')
self.assertEqual(resp['status'], '200')
self.assertIn('success', body)
resp, body = self.delete_service('test')
self.assertEqual(resp['status'], '200')
self.assertIn('success', body)
self.services.pop(self.services.index('test'))
@attr(type='smoke')
def test_update_created_service(self):
self.create_new_service('test')
self.services.append('test')
resp, body = self.update_new_service('test')
self.assertEqual(resp['status'], '200')
self.assertIn('success', body)
self.delete_service('test')
self.services.pop(self.services.index('test'))
@attr(type='positive')
def test_create_complex_service(self):
resp, body, post_body = self.create_complex_service('test')
self.services.append('test')
self.assertEqual(resp['status'], '200')
self.assertIn('success', body)
resp, body = self.get_metadata_object('services/test')
self.delete_service('test')
self.assertEqual(resp['status'], '200')
for k in post_body.values():
if isinstance(k, list):
for j in k:
self.assertIn(j, body)
self.services.pop(self.services.index('test'))
@attr(type='smoke')
def test_get_list_all_services(self):
resp, body = self.get_list_metadata_objects('services')
self.assertEqual(resp['status'], '200')
self.assertIsNotNone(body)
@attr(type='smoke')
def test_switch_service_parameter(self):
self.create_complex_service('test')
self.services.append('test')
resp, body = self.switch_service_parameter('test')
self.assertEqual(resp['status'], '200')
self.assertEqual(body['result'], 'success')
self.delete_service('test')
self.services.pop(self.services.index('test'))
@testtools.skip('Bug https://bugs.launchpad.net/murano/+bug/1268976')
@attr(type='negative')
def test_switch_parameter_none_existing_service(self):
self.assertRaises(exceptions.NotFound, self.switch_service_parameter,
'hupj')
@attr(type='positive')
def test_reset_cache(self):
resp, body = self.reset_cache()
self.assertEqual(resp['status'], '200')
self.assertEqual(body['result'], 'success')
@attr(type='smoke')
def test_get_meta_info_about_service(self):
self.create_new_service('test')
self.services.append('test')
resp, body = self.get_list_of_meta_information_about_service('test')
self.delete_service('test')
self.assertEqual(resp['status'], '200')
self.assertEqual(body['name'], 'test')
self.assertEqual(body['version'], '0.1')
self.services.pop(self.services.index('test'))
@attr(type='negative')
def test_get_meta_info_about_nonexistent_service(self):
self.assertRaises(exceptions.NotFound,
self.get_list_of_meta_information_about_service,
"hupj")

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,202 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.murano import base
from tempest.test import attr
from tempest import exceptions
class SanityMuranoTest(base.MuranoTest):
@attr(type='smoke')
def test_create_session(self):
"""
Create session
Test create environment, create session
and delete session
Target component: Murano
Scenario:
1. Send request to create environment.
2. Send request to create session
3. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, sess = self.create_session(env['id'])
self.assertEqual(resp['status'], '200')
resp, infa = self.get_session_info(env['id'], sess['id'])
self.assertEqual(resp['status'], '200')
self.assertEqual(infa['environment_id'], env['id'])
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='negative')
def test_create_session_before_env(self):
"""
Try to create session without environment
Target component: Murano
Scenario:
1. Send request to create session
"""
self.assertRaises(exceptions.NotFound, self.create_session,
None)
@attr(type='negative')
def test_delete_session_with_wrong_env_id(self):
"""
Try to delete session using uncorrect env_id
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to create session
3. Send request to delete session using uncorrect env_id
4. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, sess = self.create_session(env['id'])
self.assertRaises(exceptions.NotFound, self.delete_session, None,
sess['id'])
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='negative')
def test_create_session_with_wrong_env_id(self):
"""
Try to create session using uncorrect env_id
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to create session using uncorrect env_id
3. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
self.assertRaises(exceptions.NotFound, self.create_session,
None)
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='negative')
def test_get_session_info_wo_env_id(self):
"""
Try to get session info without env_id
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to create session
3. Send request to get info about session using wrong id
4. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, sess = self.create_session(env['id'])
self.assertRaises(exceptions.NotFound, self.get_session_info,
None, sess['id'])
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='negative')
def test_get_session_info_after_delete_env(self):
"""
Try to get session info after delete current environment
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to create session
3. Send request to delete environment
4. Send request to get info about session
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, sess = self.create_session(env['id'])
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
self.assertRaises(exceptions.NotFound, self.get_session_info,
env['id'], sess['id'])
@attr(type='smoke')
def test_get_session_info(self):
"""
Create session
Test create environment, create session, get info about this session
and delete session
Target component: Murano
Scenario:
1. Send request to create environment.
2. Send request to create session
3. Send request to get info about created session
4. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, sess = self.create_session(env['id'])
resp, infa = self.get_session_info(env['id'], sess['id'])
self.assertEqual(resp['status'], '200')
self.assertEqual(infa['environment_id'], env['id'])
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='smoke')
def test_delete_session(self):
"""
Create session
Test create environment, create session, delete created session
and delete session
Target component: Murano
Scenario:
1. Send request to create environment.
2. Send request to create session
3. Send request to delete created session
4. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, sess = self.create_session(env['id'])
self.delete_session(env['id'], sess['id'])
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))
@attr(type='negative')
def test_double_delete_session(self):
"""
Try to double delete session
Target component: Murano
Scenario:
1. Send request to create environment
2. Send request to create session
3. Send request to delete session
4. Send request to delete session
5. Send request to delete environment
"""
resp, env = self.create_environment('test')
self.environments.append(env)
resp, sess = self.create_session(env['id'])
self.delete_session(env['id'], sess['id'])
self.assertRaises(exceptions.NotFound, self.delete_session, env['id'],
sess['id'])
self.delete_environment(env['id'])
self.environments.pop(self.environments.index(env))