Merge "remove obsolete tests"

This commit is contained in:
Jenkins 2017-03-02 12:36:53 +00:00 committed by Gerrit Code Review
commit 72292a91ea
2 changed files with 0 additions and 879 deletions

View File

@ -1,289 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import subprocess
import unittest
import urlparse
from oslo_log import log as logging
from tempest.lib.common import rest_client
import testtools
from tempest.common.utils.linux import remote_client
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import test_utils
from tempest import manager
CONF = config.CONF
LOG = logging.getLogger("tempest.thirdparty.gce")
REGION_NAME = 'region-one'
class GCEConnection(rest_client.RestClient):
def __init__(self, auth_provider):
super(GCEConnection, self).__init__(auth_provider,
"gceapi", REGION_NAME)
self.service = CONF.gceapi.catalog_type
def set_zone(self, zone):
self.zone = zone
def set_region(self, region):
self.region = region
def auth_request(self, uri, **kwargs):
if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers']['AUTHORIZATION'] = "a " + self.token
return self.http_obj.request(uri, **kwargs)
def _combine_uri(self, *path):
if not len(path):
return None
parts = []
gce_cfg = CONF.gceapi
if not urlparse.urlsplit(path[0]).netloc:
parts = [self.base_url, gce_cfg.api_path, self.tenant_name]
parts.extend(path)
uri = "/".join(x.strip("/") for x in parts if x)
return uri
def _add_params(self, uri, params):
if not params:
return uri
param_list = ["%s=%s" % (param, value)
for (param, value) in params.iteritems()]
return "%s?%s" % (uri, ";".join(param_list))
def _convert_response(self, response):
header = response[0]
body = (json.loads(response[1])
if header.get("content-type") == "application/json"
else None)
return (header.status, body)
def request(self, *path, **kwargs):
if (self.token is None) or (self.base_url is None):
self._set_auth()
uri = self._combine_uri(*path)
params = kwargs.pop("params", None)
uri = self._add_params(uri, params)
response = self.auth_request(uri, **kwargs)
return self._convert_response(response)
def _add_zone_path(self, *path):
return ('zones', self.zone) + path
def _add_region_path(self, *path):
return ('regions', self.region) + path
def get(self, *path):
return self.request(*path)
def zone_get(self, *path):
return self.get(*self._add_zone_path(*path))
def region_get(self, *path):
return self.get(*self._add_region_path(*path))
def post(self, *path, **kwargs):
req_args = {"method": "POST"}
if "body" in kwargs:
req_args["body"] = json.dumps(kwargs["body"])
req_args["headers"] = {"Content-Type": "application/json"}
if "params" in kwargs:
req_args["params"] = kwargs["params"]
return self.request(*path, **req_args)
def zone_post(self, *path, **kwargs):
return self.post(*self._add_zone_path(*path), **kwargs)
def region_post(self, *path, **kwargs):
return self.post(*self._add_region_path(*path), **kwargs)
def delete(self, *path):
return self.request(*path, method="DELETE")
def zone_delete(self, *path):
return self.delete(*self._add_zone_path(*path))
class GCESmokeTestCase(testtools.TestCase):
failed = False
@classmethod
def setUpClass(cls):
super(GCESmokeTestCase, cls).setUpClass()
cls.gce = GCEConnection(manager.Manager().auth_provider)
cls._trash_bin = []
@classmethod
def tearDownClass(cls):
targetLink = None
opLink = None
def check():
(status, body) = cls.gce.get(opLink)
if status == 200 and body.get("progress") == 100:
return True
return False
timeout = CONF.gceapi.operation_timeout
idle = CONF.gceapi.operation_interval
while cls._trash_bin:
targetLink = cls._trash_bin.pop()
(status, body) = cls.gce.delete(targetLink)
if status != 200:
msg = ("Delete operation fails for resource %s"
% targetLink)
LOG.error(msg)
continue
try:
opLink = body["selfLink"]
result = test_utils.call_until_true(check, timeout, idle)
if not result:
msg = ("Timed out waiting for deletion resource %s"
% targetLink)
LOG.error(msg)
except Exception as exc:
LOG.exception(exc)
@classmethod
def add_resource_cleanup(cls, resource_link):
if resource_link not in cls._trash_bin:
cls._trash_bin.append(resource_link)
@classmethod
def cancel_resource_cleanup(cls, resource_link):
if resource_link in cls._trash_bin:
cls._trash_bin.remove(resource_link)
@staticmethod
def incremental(meth):
def decorator(*args, **kwargs):
try:
meth(*args, **kwargs)
except unittest.SkipTest:
raise
except Exception:
GCESmokeTestCase.failed = True
raise
decorator.__test__ = True
return decorator
def setUp(self):
if GCESmokeTestCase.failed:
raise unittest.SkipTest("Skipped by previous exception")
super(GCESmokeTestCase, self).setUp()
self.skipTest('Not to run in gating. It is just an old example and '
'will be remove removed in future')
def wait_for_operation(self, body, operation, status):
self.assertEqual("compute#operation", body["kind"])
self.assertEqual(operation, body["operationType"])
targetLink = body["targetLink"]
opLink = body["selfLink"]
def check():
(http_status, op_body) = self.gce.get(opLink)
self.assertEqual(http_status, 200)
self.assertEqual(targetLink, op_body["targetLink"])
self.assertEqual(opLink, op_body["selfLink"])
error = op_body.get("error")
if error:
msg = "Operation(resource %s) error %s (%s)" % (targetLink,
error["errors"], op_body.get("httpErrorMessage"))
self.fail(msg)
if op_body["progress"] != 100:
return False
self.assertEqual("DONE", op_body["status"])
return True
timeout = CONF.gceapi.operation_timeout
idle = CONF.gceapi.operation_interval
result = test_utils.call_until_true(check, timeout, idle)
if not result:
message = "Timed out waiting for uri %s" % targetLink
raise exceptions.TimeoutException(message)
(http_status, body) = self.gce.get(targetLink)
# NOTE(apavlov): only for delete* we wait 404. for other 200
target_status = 404 if "delete" in operation else 200
self.assertEqual(http_status, target_status)
if status:
self.assertEqual(status, body["status"])
def verify_resource_uri(self, uri, resource_path=None, resource_name=None):
(scheme, netloc, path, query, fragment) = urlparse.urlsplit(str(uri))
self.assertEqual(self.gce.base_url.strip("/"),
scheme + "://" + netloc)
resource_parts = [CONF.gceapi.api_path.strip("/"),
self.gce.tenant_name]
if resource_path is not None:
resource_parts.append(resource_path.strip("/"))
if resource_name is not None:
resource_parts.append(resource_name)
self.assertEqual("/" + "/".join(resource_parts), path)
self.assertFalse(query)
self.assertFalse(fragment)
def verify_region_resource_uri(self, uri, resource_path, resource_name):
resource_path = "/".join(["regions", self.ctx.region["name"],
resource_path.strip("/")])
self.verify_resource_uri(uri, resource_path, resource_name)
def verify_zone_resource_uri(self, uri, resource_path, resource_name):
resource_path = "/".join(["zones", self.ctx.zone["name"],
resource_path.strip("/")])
self.verify_resource_uri(uri, resource_path, resource_name)
def _get_ip_address(self, network_interface):
if CONF.gceapi.use_floatingip:
ipaddr = network_interface["accessConfigs"][0]["natIP"]
else:
ipaddr = network_interface["networkIP"]
return ipaddr
def _ping_ip_address(self, ip_address):
cmd = ['ping', '-c1', '-w1', ip_address]
def ping():
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.wait()
return True if proc.returncode == 0 else False
result = test_utils.call_until_true(
ping, CONF.validation.ping_timeout, 1)
if result:
return
message = "Timed out waiting for ping %s" % (ip_address)
raise exceptions.TimeoutException(message)
def _check_ssh_connectivity(self, ip_address, username, pkey):
ssh = remote_client.RemoteClient(ip_address, username, pkey=pkey)
ssh.validate_authentication()

View File

@ -1,590 +0,0 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import netaddr
import testtools
from oslo_log import log as logging
from tempest.common.utils import data_utils
from tempest import config
from gceapi.tests.functional import base as base_gce
CONF = config.CONF
LOG = logging.getLogger("tempest.thirdparty.gce")
PUBLIC_KEY = ("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCiU5kpbgCLrKxP1LYH9"
"dumtf8d6Rb+CX/6irKYyJNbsNYSX1skM9jur17TiFlXQFCjorNYXZ/A1e"
"EKbiDcZUKrINhibQfQlAJZpYP1isLUwJlUhJtGFFBW38wTuyG0MFBO+TF"
"RtAG8GQRRfGDxIXvwUxuDR8sClNuTc0MURTbLCJGPFaK2S99NElNYP7R0"
"QpzQyTHkfl492NKD9Zr7kjvnssqihuQ8dZ0dh5xE2RuF9VChdmmPmsfQG"
"qtRXS6xf1Dy0rPHilEcJpGevcUs/JcqEnUd455uugfdueHLqhOvUt3WJU"
"6mThQ28kTAe7nN17Pj3yKRyurF42bigVKNBudD GCE API testing")
PRIVATE_KEY = (
"-----BEGIN RSA PRIVATE KEY-----\n"
"MIIEogIBAAKCAQEAolOZKW4Ai6ysT9S2B/XbprX/HekW/gl/+oqymMiTW7DWEl9b\n"
"JDPY7q9e04hZV0BQo6KzWF2fwNXhCm4g3GVCqyDYYm0H0JQCWaWD9YrC1MCZVISb\n"
"RhRQVt/ME7shtDBQTvkxUbQBvBkEUXxg8SF78FMbg0fLApTbk3NDFEU2ywiRjxWi\n"
"tkvfTRJTWD+0dEKc0Mkx5H5ePdjSg/Wa+5I757LKoobkPHWdHYecRNkbhfVQoXZp\n"
"j5rH0BqrUV0usX9Q8tKzx4pRHCaRnr3FLPyXKhJ1HeOebroH3bnhy6oTr1Ld1iVO\n"
"pk4UNvJEwHu5zdez498ikcrqxeNm4oFSjQbnQwIDAQABAoIBAG0MkjlF3/H1V3Dt\n"
"6jfgz+XoH/H9E+gng6VRpfeDz5LqcnW3P6hLeHGouKCM2dAGseWsOKWlh9vpExyJ\n"
"rWPCVw5Vq2g77OMPe6Cz07mRtZ9tn9QqnZFvtiUWhae/sD23s0vKlnpX3k550+/W\n"
"Cd4T64ogmrwP7+7VB8m/xhGJCe1My2j3bziloNo/3hmmZQPjgSAVn8sDLCmRGt84\n"
"TYO/f4yY9ftGsWZEa0GhtixBGs9YviyuHz1ANyTGJg6VJ/GIwaK/sefD22MGKWTN\n"
"AMuVdPThDwGftcL6Apd44yiiIbm5ufD7w2ZS2l9/dG+0RXV+iSTXQZlNn6MHo3zw\n"
"ebc+m6ECgYEAzqImO6hyKAEnWFgXc/NgLk4xxdumEbn3FqVqeTKlJTuUGqiAVzfD\n"
"+UDswIvRKwxGnJXlNTMwkT6w9LeFC5W7xvnr4a2YFj68oDNY4Gi6CFcv3kG7/6MR\n"
"u9bLtxDh3Q4JHwhLO5cWejIdZ4P+9aG7GzUTqofPMmXEEaiiam7NonMCgYEAyRuc\n"
"J1TOm3B/zy29rYgY8BLgdEsdQN07v1JA1xcNG6A24mxBbPrkOuDrZ0kLtnFewVFG\n"
"4fLEO5MQBhsRKHK0pw3VE5azO8jHyFzccEjuObUeuYXSLxZFmK8jdcIkRirh6O7D\n"
"qCm/cEePnkxIFEIjMrctWXxa/jYEZYheRCXH4/ECgYB0PfvMK+KsZpm/tS7cZ9l/\n"
"szWE3R/7cOZzsvLG45rL60xSAuDQL+rrWX7WgtFUqj8+74RV/UohK2dZA7Sw47cT\n"
"JJ1yA7o/KWPrq3cgJ0ogTwv6uHgOQ6pCRX+sqK6nMLIo5v2LtF9Mtsyb40GW5Tjh\n"
"AWbi1CvXajB2zqsvvM2pyQKBgG6dSBt+ExH+I96BqzWaiRTrXRe6BQIbbXSDOnTU\n"
"Efqi+e06XBYkPYqBEhnCXLXhz5uHJ/S5geO+tO6Wzq4vwVutSQi4OCdm/TQgl4MP\n"
"KjEFhTvH9l694lPj6R4pRahuh8mGIooJRGnugnkwPekeo5uOk1wIAUiXz31FL4xO\n"
"N48RAoGAK7+20dPiStPo8dnFrYjQ2j5xuMO2/0+BaLFhDWTiHHjALCWOHkXJ1JtN\n"
"9LM2cIlCC79p4+7KQwUXvMBcnAx6qwTMHisGg3WfxlD8f7MuDDR+or1fd/c0byti\n"
"z1r/I9Ya6/bAXQOjruxpHkECZl5DEdVvT0E2qL/pQx0rfqnkdfE=\n"
"-----END RSA PRIVATE KEY-----\n")
class GCEScenarioContext(object):
zone = None
region = None
machine_type = None
image_name = None
image = None
boot_disk_name = None
boot_disk = None
empty_disk_name = None
empty_disk = None
network_name = None
network_cidr = None
gateway_ip = None
network = None
firewall_name = None
instance_name = None
instance = None
address_name = None
address = None
access_config_name = None
class TestGCEScenario(base_gce.GCESmokeTestCase):
@classmethod
def setUpClass(cls):
super(TestGCEScenario, cls).setUpClass()
cls.ctx = GCEScenarioContext()
@classmethod
def tearDownClass(cls):
super(TestGCEScenario, cls).tearDownClass()
def setUp(self):
super(TestGCEScenario, self).setUp()
@base_gce.GCESmokeTestCase.incremental
def test_000_get_zone(self):
(status, body) = self.gce.get("/zones")
self.assertEqual(200, status, message=body.get("message"))
self.ctx.zone = next(itertools.ifilter(lambda x: x["status"] == "UP",
body["items"]),
None)
self.assertIsNotNone(self.ctx.zone)
self.gce.set_zone(self.ctx.zone["name"])
@base_gce.GCESmokeTestCase.incremental
def test_001_get_region(self):
(status, body) = self.gce.get("/regions")
self.assertEqual(200, status, message=body.get("message"))
self.ctx.region = next(itertools.ifilter(lambda x: x["status"] == "UP",
body["items"]),
None)
self.assertIsNotNone(self.ctx.region)
self.gce.set_region(self.ctx.region["name"])
@base_gce.GCESmokeTestCase.incremental
def test_002_get_machine_type(self):
(status, body) = self.gce.zone_get("/machineTypes")
self.assertEqual(200, status, message=body.get("message"))
machine_types = body.get("items")
self.assertTrue(machine_types)
if CONF.gceapi.machine_type:
self.ctx.machine_type = next(
t for t in machine_types
if t["name"] == CONF.gceapi.machine_type)
else:
self.ctx.machine_type = min(machine_types,
key=lambda x: x["memoryMb"])
@base_gce.GCESmokeTestCase.incremental
def test_003_get_project(self):
(status, body) = self.gce.get("")
self.assertEqual(200, status, message=body.get("message"))
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
not CONF.gceapi["existing_image"],
"Skipped by config settings")
def test_010_get_image(self):
(status, body) = self.gce.get("/global/images",
CONF.gceapi.existing_image)
self.assertEqual(200, status, message=body.get("message"))
self.assertIsNotNone(body)
self.ctx.image = body
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["existing_image"],
"Skipped by config settings")
def test_011_create_image(self):
self.ctx.image_name = data_utils.rand_name("image-")
(status, body) = self.gce.post(
"/global/images",
body={
"name": self.ctx.image_name,
"rawDisk": {
"source": CONF.gceapi.http_raw_image,
},
"sourceType": "RAW",
})
self.assertEqual(200, status, message=body.get("message"))
self.add_resource_cleanup(body["targetLink"])
self.verify_resource_uri(body["targetLink"], "/global/images",
self.ctx.image_name)
self.wait_for_operation(body, "insert", "READY")
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["existing_image"],
"Skipped by config settings")
def test_012_check_image_info(self):
(status, body) = self.gce.get("/global/images", self.ctx.image_name)
self.assertEqual(200, status, message=body.get("message"))
self.assertEqual(self.ctx.image_name, body["name"])
self.assertEqual("READY", body["status"])
self.assertEqual("RAW", body["sourceType"])
self.verify_resource_uri(body["selfLink"], "/global/images",
self.ctx.image_name)
self.ctx.image = body
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["skip_bootable_volume"],
"Skipped by config settings")
def test_020_create_bootable_disk(self):
self.ctx.boot_disk_name = data_utils.rand_name("boot-disk-")
(status, body) = self.gce.zone_post(
"/disks",
params={
"sourceImage": self.ctx.image["selfLink"],
},
body={
"name": self.ctx.boot_disk_name,
})
self.assertEqual(200, status, message=body.get("message"))
self.add_resource_cleanup(body["targetLink"])
self.verify_zone_resource_uri(body["targetLink"], "/disks",
self.ctx.boot_disk_name)
self.wait_for_operation(body, "insert", "READY")
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["skip_bootable_volume"],
"Skipped by config settings")
def test_021_check_bootable_disk_info(self):
(status, body) = self.gce.zone_get("/disks/", self.ctx.boot_disk_name)
self.assertEqual(200, status, message=body.get("message"))
self.assertEqual(self.ctx.boot_disk_name, body["name"])
self.assertIsNone(body["description"])
self.assertEqual("READY", body["status"])
self.verify_zone_resource_uri(body["selfLink"], "/disks",
self.ctx.boot_disk_name)
self.ctx.boot_disk = body
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["skip_empty_volume"],
"Skipped by config settings")
def test_022_create_empty_disk(self):
self.ctx.empty_disk_name = data_utils.rand_name("empty-disk-")
(status, body) = self.gce.zone_post(
"/disks",
body={
"name": self.ctx.empty_disk_name,
"sizeGb": 1,
"description": "test empty volume",
})
self.assertEqual(200, status, message=body.get("message"))
self.add_resource_cleanup(body["targetLink"])
self.verify_zone_resource_uri(body["targetLink"], "/disks",
self.ctx.empty_disk_name)
self.wait_for_operation(body, "insert", "READY")
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["skip_empty_volume"],
"Skipped by config settings")
def test_023_check_empty_disk_info(self):
(status, body) = self.gce.zone_get("/disks/", self.ctx.empty_disk_name)
self.assertEqual(200, status, message=body.get("message"))
self.assertEqual(self.ctx.empty_disk_name, body["name"])
self.assertEqual("test empty volume", body["description"])
self.assertEqual("READY", body["status"])
self.assertEqual(1, body["sizeGb"])
self.verify_zone_resource_uri(body["selfLink"], "/disks",
self.ctx.empty_disk_name)
self.ctx.empty_disk = body
@base_gce.GCESmokeTestCase.incremental
def test_030_create_network(self):
self.ctx.network_name = data_utils.rand_name("network-")
cfg = CONF.network
network_cidr = netaddr.IPNetwork(cfg.tenant_network_cidr)
subnet_cidr = next(s for s in network_cidr.
subnet(cfg.tenant_network_mask_bits))
gateway_ip = netaddr.IPAddress(subnet_cidr.last - 1)
self.ctx.network_cidr = str(subnet_cidr)
self.ctx.gateway_ip = str(gateway_ip)
(status, body) = self.gce.post(
"/global/networks",
body={
"name": self.ctx.network_name,
"IPv4Range": self.ctx.network_cidr,
"gatewayIPv4": self.ctx.gateway_ip,
})
self.assertEqual(200, status, message=body.get("message"))
self.add_resource_cleanup(body["targetLink"])
self.verify_resource_uri(body["targetLink"], "/global/networks",
self.ctx.network_name)
self.wait_for_operation(body, "insert", None)
@base_gce.GCESmokeTestCase.incremental
def test_031_check_network_info(self):
(status, body) = self.gce.get("/global/networks",
self.ctx.network_name)
self.assertEqual(200, status, message=body.get("message"))
self.assertEqual(self.ctx.network_name, body["name"])
self.assertEqual(self.ctx.network_cidr, body["IPv4Range"])
self.assertEqual(self.ctx.gateway_ip, body["gatewayIPv4"])
self.verify_resource_uri(body["selfLink"], "/global/networks",
self.ctx.network_name)
self.ctx.network = body
@base_gce.GCESmokeTestCase.incremental
def test_040_create_firewall(self):
self.ctx.firewall_name = data_utils.rand_name("firewall-")
(status, body) = self.gce.post(
"/global/firewalls",
body={
"name": self.ctx.firewall_name,
"description": "test firewall",
"network": self.ctx.network["selfLink"],
"sourceRanges": ["0.0.0.0/0"],
"allowed": [
{"IPProtocol": "icmp"},
{
"IPProtocol": "tcp",
"ports": ["22", "80", "8080-8089"],
},
],
})
self.assertEqual(200, status, message=body.get("message"))
self.add_resource_cleanup(body["targetLink"])
self.verify_resource_uri(body["targetLink"], "/global/firewalls",
self.ctx.firewall_name)
self.wait_for_operation(body, "insert", None)
@base_gce.GCESmokeTestCase.incremental
def test_041_check_firewall_info(self):
(status, body) = self.gce.get("/global/firewalls",
self.ctx.firewall_name)
self.assertEqual(200, status, message=body.get("message"))
self.assertEqual(self.ctx.firewall_name, body["name"])
self.assertTrue(body["description"].startswith("test firewall"))
self.assertEqual(body["network"], self.ctx.network["selfLink"])
self.assertEqual(body["sourceRanges"], ["0.0.0.0/0"])
self.assertEqual(2, len(body["allowed"]))
self.assertIn({"IPProtocol": "icmp"}, body["allowed"])
tcp_range = next((x for x in body["allowed"]
if x["IPProtocol"] == "tcp"), None)
self.assertIsNotNone(tcp_range)
self.assertItemsEqual(["22", "80", "8080-8089"], tcp_range["ports"])
self.verify_resource_uri(body["selfLink"], "/global/firewalls",
self.ctx.firewall_name)
@base_gce.GCESmokeTestCase.incremental
def test_050_run_instance(self):
self.ctx.instance_name = data_utils.rand_name("instance-")
disks = []
if not CONF.gceapi.skip_bootable_volume:
disks.append({
"kind": self.ctx.boot_disk["kind"],
"type": "PERSISTENT",
"mode": "READ_WRITE",
"source": self.ctx.boot_disk["selfLink"],
"deviceName": "vda",
"boot": True,
})
if not CONF.gceapi.skip_empty_volume:
disks.append({
"kind": self.ctx.empty_disk["kind"],
"type": "PERSISTENT",
"mode": "READ_WRITE",
"source": self.ctx.empty_disk["selfLink"],
"deviceName": "vdb",
"boot": False,
})
body = {
"name": self.ctx.instance_name,
"description": "test instance",
"machineType": self.ctx.machine_type["selfLink"],
"disks": disks,
"metadata": {
"items": [
{
"key": "sshKeys",
"value": ":".join([data_utils.rand_name("keypair-"),
PUBLIC_KEY])
},
],
},
"networkInterfaces": [{
"network": self.ctx.network["selfLink"],
}],
"tags": {
"items": [],
},
}
if CONF.gceapi.skip_bootable_volume:
body["image"] = self.ctx.image["selfLink"]
(status, body) = self.gce.zone_post("/instances", body=body)
self.assertEqual(200, status, message=body.get("message"))
self.add_resource_cleanup(body["targetLink"])
self.verify_zone_resource_uri(body["targetLink"], "/instances",
self.ctx.instance_name)
self.wait_for_operation(body, "insert", "RUNNING")
@base_gce.GCESmokeTestCase.incremental
def test_051_check_instance_info(self):
(status, body) = self.gce.zone_get("/instances/",
self.ctx.instance_name)
self.assertEqual(200, status, message=body.get("message"))
self.assertEqual(self.ctx.instance_name, body["name"])
self.assertEqual("test instance", body["description"])
self.assertEqual("RUNNING", body["status"])
self.assertEqual("ACTIVE", body["statusMessage"])
self.assertEqual(self.ctx.machine_type["selfLink"],
body["machineType"])
nwifs = body.get("networkInterfaces")
self.assertEqual(1, len(nwifs))
nwif = nwifs[0]
try:
self.assertEqual(self.ctx.network_name, nwif["name"])
except Exception:
LOG.exception("Is nova network here?")
# NOTE(apavlov): change network name for future usage in this case
self.ctx.network_name = nwif["name"]
cidrs = netaddr.all_matching_cidrs(nwif["networkIP"],
[self.ctx.network_cidr])
try:
self.assertEqual(1, len(cidrs))
except Exception:
LOG.exception("Is nova network here?")
try:
self.assertEqual(self.ctx.network_cidr, str(cidrs[0]))
except Exception:
LOG.exception("Is nova network here?")
self.verify_zone_resource_uri(body["selfLink"], "/instances",
self.ctx.instance_name)
self.ctx.instance = body
@testtools.skipIf(
not CONF.gceapi["use_floatingip"],
"Skipped by config settings")
@base_gce.GCESmokeTestCase.incremental
def test_060_reserve_address(self):
self.ctx.address_name = data_utils.rand_name("address-")
(status, body) = self.gce.region_post(
"/addresses",
body={
"name": self.ctx.address_name,
"description": "test address",
})
self.assertEqual(200, status, message=body.get("message"))
self.add_resource_cleanup(body["targetLink"])
self.verify_region_resource_uri(body["targetLink"], "/addresses",
self.ctx.address_name)
self.wait_for_operation(body, "insert", "RESERVED")
@testtools.skipIf(
not CONF.gceapi["use_floatingip"],
"Skipped by config settings")
@base_gce.GCESmokeTestCase.incremental
def test_061_check_address_info(self):
(status, body) = self.gce.region_get("/addresses/",
self.ctx.address_name)
self.assertEqual(200, status, message=body.get("message"))
self.assertEqual(self.ctx.address_name, body["name"])
self.assertEqual("test address", body["description"])
self.assertEqual("RESERVED", body["status"])
self.assertIsNotNone(body["address"])
self.verify_region_resource_uri(body["selfLink"], "/addresses",
self.ctx.address_name)
self.ctx.address = body
@testtools.skipIf(
not CONF.gceapi["use_floatingip"],
"Skipped by config settings")
@base_gce.GCESmokeTestCase.incremental
def test_062_associate_floating_ip(self):
self.ctx.access_config_name = data_utils.rand_name("accessConfig-")
(status, body) = self.gce.zone_post(
"/instances",
self.ctx.instance_name,
"addAccessConfig",
params={
"networkInterface": self.ctx.network_name,
},
body={
"type": "ONE_TO_ONE_NAT",
"name": self.ctx.access_config_name,
"natIP": self.ctx.address["address"]
})
self.assertEqual(200, status, message=body.get("message"))
self.wait_for_operation(body, "addAccessConfig", "RUNNING")
@testtools.skipIf(
not CONF.gceapi["use_floatingip"],
"Skipped by config settings")
@base_gce.GCESmokeTestCase.incremental
def test_063_get_floating_ip_info(self):
(status, body) = self.gce.zone_get("/instances/",
self.ctx.instance_name)
self.assertEqual(200, status, message=body.get("message"))
nwifs = body.get("networkInterfaces")
self.assertEqual(1, len(nwifs))
nwif = nwifs[0]
fp_infs = nwif.get("accessConfigs")
self.assertEqual(1, len(fp_infs))
fp_inf = fp_infs[0]
self.assertEqual("ONE_TO_ONE_NAT", fp_inf["type"])
self.assertEqual(self.ctx.access_config_name, fp_inf["name"])
self.assertEqual(self.ctx.address["address"], fp_inf["natIP"])
self.ctx.instance = body
@base_gce.GCESmokeTestCase.incremental
def test_064_ping_instance(self):
for network_interface in self.ctx.instance["networkInterfaces"]:
ip = self._get_ip_address(network_interface)
self._ping_ip_address(ip)
@base_gce.GCESmokeTestCase.incremental
def test_065_ssh_instance(self):
for network_interface in self.ctx.instance["networkInterfaces"]:
ip = self._get_ip_address(network_interface)
self._check_ssh_connectivity(ip,
CONF.gceapi.image_username,
PRIVATE_KEY)
@base_gce.GCESmokeTestCase.incremental
def test_100_reset_instance(self):
(status, body) = self.gce.zone_post("/instances",
self.ctx.instance_name,
"reset")
self.assertEqual(200, status, message=body.get("message"))
self.wait_for_operation(body, "reset", "RUNNING")
@base_gce.GCESmokeTestCase.incremental
def test_101_ping_reseted_instance(self):
for network_interface in self.ctx.instance["networkInterfaces"]:
ip = self._get_ip_address(network_interface)
self._ping_ip_address(ip)
@base_gce.GCESmokeTestCase.incremental
def test_900_stop_instance(self):
instance_link = self.ctx.instance["selfLink"]
self.cancel_resource_cleanup(instance_link)
(status, body) = self.gce.delete(instance_link)
self.assertEqual(200, status, message=body.get("message"))
self.wait_for_operation(body, "delete", None)
self.ctx.instance = None
@testtools.skipIf(
not CONF.gceapi["use_floatingip"],
"Skipped by config settings")
@base_gce.GCESmokeTestCase.incremental
def test_901_release_address(self):
address_link = self.ctx.address["selfLink"]
self.cancel_resource_cleanup(address_link)
(status, body) = self.gce.delete(address_link)
self.assertEqual(200, status, message=body.get("message"))
self.wait_for_operation(body, "delete", None)
self.ctx.address = None
@base_gce.GCESmokeTestCase.incremental
def test_910_delete_network(self):
network_link = self.ctx.network["selfLink"]
self.cancel_resource_cleanup(network_link)
(status, body) = self.gce.delete(network_link)
self.assertEqual(200, status, message=body.get("message"))
self.wait_for_operation(body, "delete", None)
self.ctx.network = None
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["skip_empty_volume"],
"Skipped by config settings")
def test_920_delete_empty_disk(self):
disk_link = self.ctx.empty_disk["selfLink"]
self.cancel_resource_cleanup(disk_link)
(status, body) = self.gce.delete(disk_link)
self.assertEqual(200, status, message=body.get("message"))
self.wait_for_operation(body, "delete", None)
self.ctx.empty_disk = None
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["skip_bootable_volume"],
"Skipped by config settings")
def test_921_delete_bootable_disk(self):
disk_link = self.ctx.boot_disk["selfLink"]
self.cancel_resource_cleanup(disk_link)
(status, body) = self.gce.delete(disk_link)
self.assertEqual(200, status, message=body.get("message"))
self.wait_for_operation(body, "delete", None)
self.ctx.boot_disk = None
@base_gce.GCESmokeTestCase.incremental
@testtools.skipIf(
CONF.gceapi["existing_image"],
"Skipped by config settings")
def test_922_delete_image(self):
image_link = self.ctx.image["selfLink"]
self.cancel_resource_cleanup(image_link)
(status, body) = self.gce.delete(image_link)
self.assertEqual(200, status, message=body.get("message"))
self.wait_for_operation(body, "delete", None)
self.ctx.image = None