test_instances and test_networks refactoring aka ec2 style
Change-Id: I096192c447e75ecbcfe400f4008a3b391fa71bd5
This commit is contained in:
parent
aa787e0fd6
commit
668848c03a
|
@ -56,6 +56,9 @@ gce_opts = [
|
|||
default=500,
|
||||
help='Default new volume size if sizeGb, sourceSnapshot and '
|
||||
'sourceImage are not provided'),
|
||||
cfg.StrOpt('default_network_ip_range',
|
||||
default='10.240.0.0/16',
|
||||
help='Default new network ip range if it is not provided'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
|
|
@ -86,7 +86,7 @@ class API(base_api.API):
|
|||
client.delete_network(network["id"])
|
||||
|
||||
def add_item(self, context, name, body, scope=None):
|
||||
ip_range = body['IPv4Range']
|
||||
ip_range = body.get('IPv4Range', CONF.default_network_ip_range)
|
||||
gateway = body.get('gatewayIPv4')
|
||||
if gateway is None:
|
||||
network_cidr = netaddr.IPNetwork(ip_range)
|
||||
|
@ -117,7 +117,8 @@ class API(base_api.API):
|
|||
result_data = client.create_subnet(subnet_body)
|
||||
subnet_id = result_data["subnet"]["id"]
|
||||
network = self._prepare_network(client, network)
|
||||
network["description"] = body.get("description")
|
||||
if 'description' in body:
|
||||
network["description"] = body["description"]
|
||||
network = self._add_db_item(context, network)
|
||||
self._process_callbacks(
|
||||
context, base_api._callback_reasons.post_add,
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import netaddr
|
||||
from oslo_config import cfg
|
||||
|
||||
from gceapi.api import base_api
|
||||
from gceapi.api import clients
|
||||
|
@ -22,6 +23,9 @@ from gceapi import exception
|
|||
from gceapi.i18n import _
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class API(base_api.API):
|
||||
"""GCE Network API - nova-network implementation."""
|
||||
|
||||
|
@ -68,7 +72,7 @@ class API(base_api.API):
|
|||
client.networks.delete(network["id"])
|
||||
|
||||
def add_item(self, context, name, body, scope=None):
|
||||
ip_range = body['IPv4Range']
|
||||
ip_range = body.get('IPv4Range', CONF.default_network_ip_range)
|
||||
gateway = body.get('gatewayIPv4')
|
||||
if gateway is None:
|
||||
network_cidr = netaddr.IPNetwork(ip_range)
|
||||
|
|
|
@ -30,8 +30,9 @@ class Controller(gce_common.Controller):
|
|||
"gatewayIPv4": network.get("gatewayIPv4", ""),
|
||||
"creationTimestamp": network.get("creationTimestamp", ""),
|
||||
}
|
||||
if "description" in network:
|
||||
result_dict["description"] = network["description"]
|
||||
description = network.get("description")
|
||||
if description is not None:
|
||||
result_dict["description"] = description
|
||||
|
||||
return self._format_item(request, result_dict, scope)
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
|
|||
networking="neutron"
|
||||
net_id=$(neutron net-create --tenant-id $project_id "default" | grep ' id ' | awk '{print $4}')
|
||||
[[ -n "$net_id" ]] || { echo "net-create failed"; exit 1; }
|
||||
subnet_id=$(neutron subnet-create --tenant-id $project_id --ip_version 4 --gateway 10.240.0.1 --name "private_subnet" $net_id 10.240.0.0/24 | grep ' id ' | awk '{print $4}')
|
||||
subnet_id=$(neutron subnet-create --tenant-id $project_id --ip_version 4 --gateway 10.240.0.1 --name "private_subnet" $net_id 10.240.0.0/16 | grep ' id ' | awk '{print $4}')
|
||||
[[ -n "$subnet_id" ]] || { echo "subnet-create failed"; exit 1; }
|
||||
router_id=$(neutron router-create --tenant-id $project_id "private_router" | grep ' id ' | awk '{print $4}')
|
||||
[[ -n "$router_id" ]] || { echo "router-create failed"; exit 1; }
|
||||
|
@ -77,7 +77,7 @@ if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
|
|||
else
|
||||
# nova networking
|
||||
networking="nova-network"
|
||||
nova network-create "default" --fixed-range-v4 10.240.0.0/24 --gateway 10.240.0.1
|
||||
nova network-create "default" --fixed-range-v4 10.240.0.0/16 --gateway 10.240.0.1
|
||||
fi
|
||||
|
||||
#create image in raw format
|
||||
|
|
|
@ -145,9 +145,7 @@ class TestAddressess(TestAddressesBase):
|
|||
self._delete_address(name)
|
||||
|
||||
def test_list_addresses_by_filter_name(self):
|
||||
# Goole's free evaluation account quote is 1 external IP
|
||||
count = 3 if not self.is_real_gce else 1
|
||||
names = [self._rand_name('testaddr') for _ in range(0, count)]
|
||||
names = [self._rand_name('testaddr') for _ in range(0, 3)]
|
||||
# prepare resources
|
||||
addresses = dict()
|
||||
for name in names:
|
||||
|
|
|
@ -59,7 +59,7 @@ class TestSnapshotsBase(test_base.GCETestCase):
|
|||
self.api.validate_schema(value=result, schema_name='Snapshot')
|
||||
return result
|
||||
|
||||
def _get_expected_snapshot_fields(self, disk_name, options):
|
||||
def _get_expected_snapshot(self, disk_name, options):
|
||||
snapshot = copy.deepcopy(options)
|
||||
# fill defaults if needed
|
||||
if 'kind' not in snapshot:
|
||||
|
@ -76,7 +76,7 @@ class TestSnapshotsBase(test_base.GCETestCase):
|
|||
|
||||
def _ensure_snapshot_created(self, disk_name, options):
|
||||
name = options['name']
|
||||
snapshot = self._get_expected_snapshot_fields(disk_name, options)
|
||||
snapshot = self._get_expected_snapshot(disk_name, options)
|
||||
# get object from server and check properties
|
||||
result = self._get_snapshot(name)
|
||||
self.assertObject(snapshot, result)
|
||||
|
@ -149,7 +149,7 @@ class TestDiskBase(TestSnapshotsBase):
|
|||
self.api.validate_schema(value=result, schema_name='Disk')
|
||||
return result
|
||||
|
||||
def _get_expected_disk_fields(self, options, source_image=None):
|
||||
def _get_expected_disk(self, options, source_image=None):
|
||||
disk = copy.deepcopy(options)
|
||||
# fill defaults if needed
|
||||
if 'kind' not in disk:
|
||||
|
@ -175,7 +175,7 @@ class TestDiskBase(TestSnapshotsBase):
|
|||
|
||||
def _ensure_disk_created(self, options, source_image=None):
|
||||
name = options['name']
|
||||
disk = self._get_expected_disk_fields(options, source_image)
|
||||
disk = self._get_expected_disk(options, source_image)
|
||||
# get object from server and check properties
|
||||
result = self._get_disk(name)
|
||||
self.assertObject(disk, result)
|
||||
|
|
|
@ -14,59 +14,13 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from gceapi.tests.functional import test_base
|
||||
from gceapi.tests.functional.api import test_addresses
|
||||
from gceapi.tests.functional.api import test_disks
|
||||
|
||||
|
||||
CREATE_INSTANCE_TEMPLATE = {
|
||||
"name": "${name}",
|
||||
"description": "Testing instance",
|
||||
"machineType": "zones/${zone}/machineTypes/${machine_type}",
|
||||
"disks": [
|
||||
{
|
||||
"boot": True,
|
||||
"autoDelete": True,
|
||||
"initializeParams": {
|
||||
"sourceImage": "projects/${image}",
|
||||
}
|
||||
}
|
||||
],
|
||||
"networkInterfaces": [
|
||||
{
|
||||
"network": "global/networks/${network}",
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"items": [
|
||||
{
|
||||
"key": "test_metadata_key",
|
||||
"value": "test_metadata_value"
|
||||
},
|
||||
{
|
||||
"key": "startup-script",
|
||||
"value": "echo Test startup script"
|
||||
}
|
||||
]
|
||||
},
|
||||
"serviceAccounts": [
|
||||
{
|
||||
"email": "default",
|
||||
"scopes": [
|
||||
"https://www.googleapis.com/auth/cloud.useraccounts.readonly",
|
||||
"https://www.googleapis.com/auth/devstorage.read_only",
|
||||
"https://www.googleapis.com/auth/logging.write",
|
||||
"https://www.googleapis.com/auth/monitoring.write"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def _prepare_instance_insert_parameters(**kwargs):
|
||||
return test_base.insert_json_parameters(CREATE_INSTANCE_TEMPLATE, **kwargs)
|
||||
|
||||
|
||||
class TestInstancesBase(test_base.GCETestCase):
|
||||
class TestInstancesBase(test_disks.TestDiskBase):
|
||||
@property
|
||||
def instances(self):
|
||||
res = self.api.compute.instances()
|
||||
|
@ -79,12 +33,11 @@ class TestInstancesBase(test_base.GCETestCase):
|
|||
cfg = self.cfg
|
||||
project_id = cfg.project_id
|
||||
zone = cfg.zone
|
||||
config = _prepare_instance_insert_parameters(**options)
|
||||
self.trace('Crete instance with options {}'.format(config))
|
||||
self.trace('Crete instance with options {}'.format(options))
|
||||
request = self.instances.insert(
|
||||
project=project_id,
|
||||
zone=zone,
|
||||
body=config)
|
||||
body=options)
|
||||
self._add_cleanup(self._delete_instance, options['name'])
|
||||
self._execute_async_request(request, project_id, zone=zone)
|
||||
|
||||
|
@ -101,12 +54,15 @@ class TestInstancesBase(test_base.GCETestCase):
|
|||
self._execute_async_request(request, project_id, zone=zone)
|
||||
self._remove_cleanup(self._delete_instance, name)
|
||||
|
||||
def _list_instances(self):
|
||||
def _list_instances(self, filter=None):
|
||||
project_id = self.cfg.project_id
|
||||
zone = self.cfg.zone
|
||||
self.trace('List instances: project_id={} zone={}'.
|
||||
format(project_id, zone))
|
||||
request = self.instances.list(project=project_id, zone=zone)
|
||||
request = self.instances.list(
|
||||
project=project_id,
|
||||
zone=zone,
|
||||
filter=filter)
|
||||
self.trace_request(request)
|
||||
result = request.execute()
|
||||
self.trace('Instances: {}'.format(result))
|
||||
|
@ -127,38 +83,140 @@ class TestInstancesBase(test_base.GCETestCase):
|
|||
self.api.validate_schema(value=result, schema_name='Instance')
|
||||
return result
|
||||
|
||||
def _get_expected_attached_disk(self, options, instance_name):
|
||||
disk = copy.deepcopy(options)
|
||||
source = disk.get('source', 'disks/{}'.format(instance_name))
|
||||
disk['source'] = self.api.get_zone_url(source)
|
||||
disk.setdefault('kind', u'compute#attachedDisk')
|
||||
disk.setdefault('mode', u'READ_WRITE')
|
||||
# TODO(alexey-mr): OS GCE return vda
|
||||
# if disk.setdefault('type', u'PERSISTENT') == u'PERSISTENT':
|
||||
# disk.setdefault('deviceName', 'persistent-disk-[0-9]+')
|
||||
is_boot = disk['boot'] = disk.get('boot', False)
|
||||
disk.setdefault('index', 0 if is_boot else '[0-9]+')
|
||||
disk.setdefault('autoDelete', False)
|
||||
# TODO(alexey-mr): OS gce api doesn't return interface
|
||||
# disk.setdefault('interface', u'SCSI')
|
||||
# remove input only parameters
|
||||
disk.pop('initializeParams', None)
|
||||
return disk
|
||||
|
||||
class TestInstancesCRUD(TestInstancesBase):
|
||||
def setUp(self):
|
||||
super(TestInstancesCRUD, self).setUp()
|
||||
self._instance_name = self._rand_name('testinst')
|
||||
@staticmethod
|
||||
def _get_expected_access_config(options):
|
||||
ac = copy.deepcopy(options)
|
||||
ac.setdefault('kind', u'compute#accessConfig')
|
||||
ac.setdefault('type', u'ONE_TO_ONE_NAT')
|
||||
return ac
|
||||
|
||||
def _create(self):
|
||||
def _get_expected_nic(self, options):
|
||||
nic = copy.deepcopy(options)
|
||||
nic['network'] = self.api.get_project_url(nic['network'])
|
||||
nic.setdefault('networkIP', test_addresses.IPV4_PATTERN)
|
||||
# TODO(alexey-mr): OS GCE returns network name aka 'default'
|
||||
# nic.setdefault('name', 'nic[0-9]+')
|
||||
access_configs = nic.get('accessConfigs')
|
||||
if access_configs:
|
||||
acs = []
|
||||
for ac in access_configs:
|
||||
acs.append(self._get_expected_access_config(ac))
|
||||
nic['accessConfigs'] = acs
|
||||
return nic
|
||||
|
||||
def _get_expected_instance(self, options):
|
||||
instance = copy.deepcopy(options)
|
||||
name = instance['name']
|
||||
# expected that machine_type here is in form of ralative zone url
|
||||
# aka 'zones/zone/machineTypes/machine_type or full absolute url
|
||||
machine_type_url = self.api.get_project_url(instance['machineType'])
|
||||
instance['machineType'] = machine_type_url
|
||||
attached_disks = list()
|
||||
for disk in instance['disks']:
|
||||
attached_disks.append(self._get_expected_attached_disk(disk, name))
|
||||
instance['disks'] = attached_disks
|
||||
network_interfaces = list()
|
||||
for nic in instance['networkInterfaces']:
|
||||
network_interfaces.append(self._get_expected_nic(nic))
|
||||
instance['networkInterfaces'] = network_interfaces
|
||||
instance.setdefault('kind', u'compute#instance')
|
||||
instance.setdefault('status', u'RUNNING')
|
||||
self_link = 'instances/{}'.format(name)
|
||||
instance.setdefault('selfLink', self.api.get_zone_url(self_link))
|
||||
instance.setdefault('zone', self.api.get_zone_url())
|
||||
# TODO(alexey-mr): OS gce api doesn't return canIpForward
|
||||
# instance.setdefault('canIpForward', False)
|
||||
# TODO(alexey-mr): OS gce api doesn't return scheduling
|
||||
# instance.setdefault(
|
||||
# 'scheduling',
|
||||
# {
|
||||
# 'automaticRestart': True,
|
||||
# 'preemptible': False,
|
||||
# 'onHostMaintenance': u'MIGRATE'
|
||||
# })
|
||||
return instance
|
||||
|
||||
def _ensure_instance_created(self, options):
|
||||
expected_instance = self._get_expected_instance(options)
|
||||
instance = self._get_instance(options['name'])
|
||||
self.assertObject(expected_instance, instance)
|
||||
return instance
|
||||
|
||||
def _get_create_instance_default_options(self, name):
|
||||
cfg = self.cfg
|
||||
machine_type = 'zones/{}/machineTypes/{}'.format(cfg.zone,
|
||||
cfg.machine_type)
|
||||
image = 'projects/{}'.format(cfg.image)
|
||||
options = {
|
||||
'zone': cfg.zone,
|
||||
'name': self._instance_name,
|
||||
'machine_type': cfg.machine_type,
|
||||
'image': cfg.image,
|
||||
'network': 'default',
|
||||
'name': name,
|
||||
'machineType': machine_type,
|
||||
'disks': [
|
||||
{
|
||||
'boot': True,
|
||||
'autoDelete': True,
|
||||
'initializeParams': {
|
||||
'sourceImage': image
|
||||
}
|
||||
}
|
||||
],
|
||||
'networkInterfaces': [
|
||||
{
|
||||
'network': 'global/networks/default',
|
||||
}
|
||||
],
|
||||
}
|
||||
return options
|
||||
|
||||
|
||||
class TestInstances(TestInstancesBase):
|
||||
def test_create_delete_instance_default(self):
|
||||
name = self._rand_name('testinstance')
|
||||
options = self._get_create_instance_default_options(name)
|
||||
self._create_instance(options)
|
||||
self._ensure_instance_created(options)
|
||||
self._delete_instance(name)
|
||||
|
||||
def _read(self):
|
||||
result = self._get_instance(self._instance_name)
|
||||
self.assertEqual(self._instance_name, result['name'])
|
||||
def test_list_instances(self):
|
||||
name = self._rand_name('testinstance')
|
||||
options = self._get_create_instance_default_options(name)
|
||||
self._create_instance(options)
|
||||
instance = self._ensure_instance_created(options)
|
||||
result = self._list_instances()
|
||||
self.assertFind(self._instance_name, result)
|
||||
result = self.assertFind(name, result)
|
||||
self.assertObject(instance, result)
|
||||
self._delete_instance(name)
|
||||
|
||||
def _update(self):
|
||||
# TODO(alexey-mr): to impl simple update cases
|
||||
pass
|
||||
|
||||
def _delete(self):
|
||||
self._delete_instance(self._instance_name)
|
||||
|
||||
def test_crud(self):
|
||||
self._create()
|
||||
self._read()
|
||||
self._update()
|
||||
self._delete()
|
||||
def test_list_instances_by_filter_name(self):
|
||||
names = [self._rand_name('testinstance') for _ in range(0, 3)]
|
||||
# prepare resources
|
||||
instances = dict()
|
||||
for name in names:
|
||||
options = self._get_create_instance_default_options(name)
|
||||
self._create_instance(options)
|
||||
instances[name] = self._ensure_instance_created(options)
|
||||
# do list by filter test
|
||||
for name in names:
|
||||
result = self._list_instances(filter='name eq {}'.format(name))
|
||||
self.assertEqual(1, len(result['items']))
|
||||
self.assertObject(instances[name], result['items'][0])
|
||||
# delete resources
|
||||
for name in names:
|
||||
self._delete_instance(name)
|
||||
|
|
|
@ -14,19 +14,13 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from gceapi.tests.functional import test_base
|
||||
|
||||
|
||||
CREATE_NETWORK_TEMPLATE = {
|
||||
"name": "${name}",
|
||||
"IPv4Range": "${ip_range}",
|
||||
"description": "testing network ${name}",
|
||||
"gatewayIPv4": "${gateway}"
|
||||
}
|
||||
|
||||
|
||||
def _prepare_network_create_parameters(**kwargs):
|
||||
return test_base.insert_json_parameters(CREATE_NETWORK_TEMPLATE, **kwargs)
|
||||
def ip_to_re_pattern(ip):
|
||||
return ip.replace('.', '\.')
|
||||
|
||||
|
||||
class TestNetworksBase(test_base.GCETestCase):
|
||||
|
@ -40,11 +34,10 @@ class TestNetworksBase(test_base.GCETestCase):
|
|||
|
||||
def _create_network(self, options):
|
||||
project_id = self.cfg.project_id
|
||||
config = _prepare_network_create_parameters(**options)
|
||||
self.trace('Crete network with options {}'.format(config))
|
||||
self.trace('Crete network with options {}'.format(options))
|
||||
request = self.networks.insert(
|
||||
project=project_id,
|
||||
body=config)
|
||||
body=options)
|
||||
self._add_cleanup(self._delete_network, options['name'])
|
||||
self._execute_async_request(request, project_id)
|
||||
|
||||
|
@ -59,10 +52,10 @@ class TestNetworksBase(test_base.GCETestCase):
|
|||
self._execute_async_request(request, project_id)
|
||||
self._remove_cleanup(self._delete_network, name)
|
||||
|
||||
def _list_networks(self):
|
||||
def _list_networks(self, filter=None):
|
||||
project_id = self.cfg.project_id
|
||||
self.trace('List networks: project_id={}'.format(project_id))
|
||||
request = self.networks.list(project=project_id)
|
||||
request = self.networks.list(project=project_id, filter=filter)
|
||||
self.trace_request(request)
|
||||
result = request.execute()
|
||||
self.trace('Networks: {}'.format(result))
|
||||
|
@ -81,52 +74,113 @@ class TestNetworksBase(test_base.GCETestCase):
|
|||
self.api.validate_schema(value=result, schema_name='Network')
|
||||
return result
|
||||
|
||||
def _get_expected_network(self, options):
|
||||
network = copy.deepcopy(options)
|
||||
network.setdefault('kind', u'compute#network')
|
||||
self_link = 'global/networks/{}'.format(network['name'])
|
||||
network.setdefault('selfLink', self.api.get_project_url(self_link))
|
||||
ip_range = network.get('IPv4Range', u'10.240.0.0/16')
|
||||
network['IPv4Range'] = ip_to_re_pattern(ip_range)
|
||||
gateway = network.get('gatewayIPv4', u'10.240.0.1')
|
||||
network['gatewayIPv4'] = ip_to_re_pattern(gateway)
|
||||
return network
|
||||
|
||||
class TestReadDefaultNetwork(TestNetworksBase):
|
||||
def setUp(self):
|
||||
super(TestReadDefaultNetwork, self).setUp()
|
||||
self._network_name = 'default'
|
||||
def _ensure_network_created(self, options):
|
||||
network = self._get_network(options['name'])
|
||||
expected_network = self._get_expected_network(options)
|
||||
self.assertObject(expected_network, network)
|
||||
return network
|
||||
|
||||
def test_get(self):
|
||||
self._get_network(self._network_name)
|
||||
|
||||
def test_list(self):
|
||||
class TestNetworks(TestNetworksBase):
|
||||
@property
|
||||
def _is_nova_network(self):
|
||||
return self.cfg.networking == 'nova-network'
|
||||
|
||||
def test_get_default_network(self):
|
||||
name = 'default'
|
||||
network = self._get_network(name)
|
||||
options = {
|
||||
'name': name
|
||||
}
|
||||
expected = self._get_expected_network(options)
|
||||
self.assertObject(expected, network)
|
||||
|
||||
def test_list_default_network(self):
|
||||
name = 'default'
|
||||
result = self._list_networks()
|
||||
self.assertFind(self._network_name, result)
|
||||
result = self.assertFind(name, result)
|
||||
options = {
|
||||
'name': name
|
||||
}
|
||||
expected = self._get_expected_network(options)
|
||||
self.assertObject(expected, result)
|
||||
|
||||
def test_list_default_network_by_filter(self):
|
||||
name = 'default'
|
||||
result = self._list_networks(filter='name eq {}'.format(name))
|
||||
result = self.assertFind(name, result)
|
||||
options = {
|
||||
'name': name
|
||||
}
|
||||
expected = self._get_expected_network(options)
|
||||
self.assertObject(expected, result)
|
||||
|
||||
class TestNetworksCRUD(TestNetworksBase):
|
||||
def setUp(self):
|
||||
if self.cfg.networking == 'nova-network':
|
||||
def test_create_network_default(self):
|
||||
if self._is_nova_network:
|
||||
self.skipTest('Skip network because of nova-network')
|
||||
return
|
||||
super(TestNetworksCRUD, self).setUp()
|
||||
self._network_name = self._rand_name('network')
|
||||
|
||||
def _create(self):
|
||||
name = self._rand_name('testnetwork')
|
||||
options = {
|
||||
'name': self._network_name,
|
||||
'ip_range': '10.240.0.0/16',
|
||||
'gateway': '10.240.0.1'
|
||||
'name': name,
|
||||
}
|
||||
# TODO(alexey-mr): gateway is optional, so add case with absent one
|
||||
self._create_network(options)
|
||||
self._ensure_network_created(options)
|
||||
self._delete_network(name)
|
||||
|
||||
def _read(self):
|
||||
result = self._get_network(self._network_name)
|
||||
self.assertEqual(self._network_name, result['name'])
|
||||
result = self._list_networks()
|
||||
self.assertFind(self._network_name, result)
|
||||
def test_create_network_with_ip_range(self):
|
||||
if self._is_nova_network:
|
||||
self.skipTest('Skip network because of nova-network')
|
||||
return
|
||||
name = self._rand_name('testnetwork')
|
||||
options = {
|
||||
'name': name,
|
||||
'IPv4Range': '10.241.0.0/16',
|
||||
}
|
||||
self._create_network(options)
|
||||
options['gatewayIPv4'] = '10.241.0.1'
|
||||
self._ensure_network_created(options)
|
||||
self._delete_network(name)
|
||||
|
||||
def _update(self):
|
||||
# TODO(alexey-mr): to be implemented
|
||||
pass
|
||||
def test_create_network_with_gateway(self):
|
||||
if self._is_nova_network:
|
||||
self.skipTest('Skip network because of nova-network')
|
||||
return
|
||||
name = self._rand_name('testnetwork')
|
||||
options = {
|
||||
'name': name,
|
||||
'IPv4Range': '10.242.0.0/16',
|
||||
'gatewayIPv4': '10.242.0.1'
|
||||
}
|
||||
self._create_network(options)
|
||||
self._ensure_network_created(options)
|
||||
self._delete_network(name)
|
||||
|
||||
def _delete(self):
|
||||
self._delete_network(self._network_name)
|
||||
|
||||
def test_crud(self):
|
||||
self._create()
|
||||
self._read()
|
||||
self._update()
|
||||
self._delete()
|
||||
def test_list_networks_by_filter_name(self):
|
||||
if self._is_nova_network:
|
||||
self.skipTest('Skip network because of nova-network')
|
||||
return
|
||||
names = [self._rand_name('testnetwork') for _ in range(0, 3)]
|
||||
networks = dict()
|
||||
for name in names:
|
||||
options = {
|
||||
'name': name,
|
||||
}
|
||||
self._create_network(options)
|
||||
networks[name] = self._ensure_network_created(options)
|
||||
for name in names:
|
||||
result = self._list_networks(filter='name eq {}'.format(name))
|
||||
network = self.assertFind(name, result)
|
||||
self.assertObject(networks[name], network)
|
||||
for name in names:
|
||||
self._delete_network(name)
|
||||
|
|
|
@ -212,26 +212,62 @@ class GCETestCase(base.BaseTestCase):
|
|||
self.fail(
|
||||
'There is no required item {} in the list {}'.format(item, items))
|
||||
|
||||
def assertObject(self, expected, actual):
|
||||
self.trace('Validate object: \n\texpected: {}\n\tactual: {}'.
|
||||
format(expected, actual))
|
||||
# aka self.assertDictContainsSubset(expected, observed) but
|
||||
# with regexp matching instead of '=='
|
||||
def _match_values(self, key, expected, actual):
|
||||
missing = []
|
||||
mismatched = []
|
||||
if isinstance(expected, dict) and isinstance(actual, dict):
|
||||
missing, mismatched = self._match_objects(expected,
|
||||
actual,
|
||||
root_key=key)
|
||||
elif isinstance(expected, list) and isinstance(actual, list):
|
||||
expected.sort()
|
||||
actual.sort()
|
||||
if len(expected) > len(actual):
|
||||
_missing = [str(i) for i in expected[len(actual):]]
|
||||
msg = 'key={}: subitems: {}'.format(key, ', '.join(_missing))
|
||||
missing.append(msg)
|
||||
for e, a in zip(expected, actual):
|
||||
_missing, _mismatched = self._match_values(key, e, a)
|
||||
missing.extend(_missing)
|
||||
mismatched.extend(_mismatched)
|
||||
elif isinstance(expected, (str, unicode, buffer)):
|
||||
if not re.compile(expected).match(str(actual)):
|
||||
msg = 'key={}: actual={}: expected_regexp={}'
|
||||
mismatched.append(msg.format(key, actual, expected))
|
||||
elif type(expected) == type(actual):
|
||||
if expected != actual:
|
||||
msg = 'key={}: actual={}: expected={}'
|
||||
mismatched.append(msg.format(key, actual, expected))
|
||||
else:
|
||||
msg = 'key={}: mismatched object types: actual={}: expected={}'
|
||||
mismatched.append(msg.format(key, type(actual), type(expected)))
|
||||
return missing, mismatched
|
||||
|
||||
def _match_objects(self, expected, actual, root_key=None):
|
||||
missing = []
|
||||
mismatched = []
|
||||
for key, value in expected.items():
|
||||
if key not in actual:
|
||||
missing.append(key)
|
||||
elif not re.compile(value).match(actual[key]):
|
||||
msg = 'key {}: actual={} is not match to expected={}'
|
||||
mismatched.append(msg.format(key, actual[key], value))
|
||||
else:
|
||||
_key = '{}/{}'.format(root_key, key) if root_key else key
|
||||
_missing, _mismatched = self._match_values(_key,
|
||||
value, actual[key])
|
||||
missing.extend(_missing)
|
||||
mismatched.extend(_mismatched)
|
||||
return missing, mismatched
|
||||
|
||||
def assertObject(self, expected, actual):
|
||||
self.trace('Validate object: \n\texpected: {}\n\tactual: {}'.
|
||||
format(expected, actual))
|
||||
missing, mismatched = self._match_objects(expected, actual)
|
||||
err = ''
|
||||
if missing:
|
||||
err = 'Missing: {}'.format(','.join(m for m in missing))
|
||||
err = 'Missing: {}'.format(', '.join(m for m in missing))
|
||||
if mismatched:
|
||||
if err:
|
||||
err += '; '
|
||||
err += 'Mismatched values: {}'.format(','.join(mismatched))
|
||||
err += 'Mismatched values: {}'.format(', '.join(mismatched))
|
||||
if err:
|
||||
self.fail(err)
|
||||
|
||||
|
|
Loading…
Reference in New Issue