[Tempest]: Added vpnaas tempest cases

Change-Id: Ifa537ff524281c41026ecc23bfea44ff6b81921c
This commit is contained in:
Puneet Arora 2018-05-16 17:48:21 +00:00
parent 8f1490b59f
commit a7a36a20d0
6 changed files with 872 additions and 33 deletions

View File

@ -77,3 +77,14 @@ REJECT = "REJECT"
# ZONE Designate
ZONE_WAIT_TIME = 120
#VPN
PEER_ID = "172.24.4.12"
PFS = "group14"
ENCRYPTION_ALGO = "aes-128"
ENCRYPTION_ALGO_256 = "aes-256"
AUTH_ALGO = "sha1"
AUTH_ALGO_256 = "sha256"
LIFETIME = {"units": "seconds", "value": 21600}
PEER_ADDRESS = "172.24.4.12"
SITE_CONNECTION_STATE = 'True'
PSK = "secret"

View File

@ -105,12 +105,17 @@ class ApplianceManager(manager.NetworkScenarioTest):
router = routers_client.create_router(
name=name, admin_state_up=True, tenant_id=tenant_id)['router']
if set_gateway is not False:
public_network_info = {"external_gateway_info": dict(
network_id=self.topology_public_network_id)}
if kwargs["enable_snat"] is not None:
public_network_info = {"external_gateway_info": dict(
network_id=self.topology_public_network_id,
enable_snat=kwargs["enable_snat"])}
else:
public_network_info = {"external_gateway_info": dict(
network_id=self.topology_public_network_id)}
routers_client.update_router(router['id'], **public_network_info)
self.topology_routers[router_name] = router
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.routers_client.delete_router, router['id'])
routers_client.delete_router, router['id'])
return router
def update_topology_router(
@ -118,7 +123,7 @@ class ApplianceManager(manager.NetworkScenarioTest):
if not routers_client:
routers_client = self.routers_client
result = routers_client.update_router(router_id,
**update_kwargs)
**update_kwargs)
return result
def delete_topology_router(
@ -157,7 +162,7 @@ class ApplianceManager(manager.NetworkScenarioTest):
if not networks_client:
networks_client = self.networks_client
result = networks_client.update_network(network_id,
**update_kwargs)
**update_kwargs)
return result
def delete_topology_network(
@ -270,8 +275,8 @@ class ApplianceManager(manager.NetworkScenarioTest):
def update_topology_security_group(self, sg_id, client=None,
**updated_kwargs):
sg = self.security_groups_client.update_security_group(sg_id,
**updated_kwargs)
sg = self.security_groups_client.\
update_security_group(sg_id, **updated_kwargs)
return sg
def delete_topology_security_group(self, sg_id, client=None):
@ -333,12 +338,6 @@ class ApplianceManager(manager.NetworkScenarioTest):
floating_ip['id'])
return floating_ip
def delete_floatingip(self, floating_ip, client=None):
"""Delete floating IP associated to a resource/port on Neutron"""
if not client:
client = self.os_admin.floating_ips_client
client.delete_floatingip(floating_ip['id'])
def create_topology_instance(
self, server_name, networks, security_groups=None,
config_drive=None, keypair=None, image_id=None,
@ -403,11 +402,6 @@ class ApplianceManager(manager.NetworkScenarioTest):
self.topology_servers[server_name] = server
return server
def delete_topology_instance(self, server, servers_client=None):
if not servers_client:
servers_client = self.os_admin.servers_client
servers_client.delete_server(server['id'])
def _list_ports(self, *args, **kwargs):
"""List ports using admin creds """
ports_list = self.os_admin.ports_client.list_ports(
@ -462,11 +456,3 @@ class ApplianceManager(manager.NetworkScenarioTest):
user_id = self.security_groups_client.user_id
tenant_id = self.security_groups_client.tenant_id
return user_id, tenant_id
def create_topology_port(self, network,
ports_client=None, **args):
if not ports_client:
ports_client = self.ports_client
port = ports_client.create_port(network_id=network['id'], **args)
self.addCleanup(ports_client.delete_port, port['port']['id'])
return port

View File

@ -25,6 +25,7 @@ from tempest.lib import exceptions as lib_exc
from vmware_nsx_tempest._i18n import _
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.lib import traffic_manager
from vmware_nsx_tempest.services import designate_base
from vmware_nsx_tempest.services.lbaas import health_monitors_client
from vmware_nsx_tempest.services.lbaas import listeners_client
from vmware_nsx_tempest.services.lbaas import load_balancers_client
@ -43,7 +44,8 @@ RULE_TYPE_DSCP_MARK = "dscp_marking"
# It includes feature related function such CRUD Mdproxy, L2GW or QoS
class FeatureManager(traffic_manager.IperfManager):
class FeatureManager(traffic_manager.IperfManager,
designate_base.DnsClientBase):
@classmethod
def setup_clients(cls):
"""Create various client connections. Such as NSXv3 and L2 Gateway.
@ -89,6 +91,12 @@ class FeatureManager(traffic_manager.IperfManager):
net_client.region,
net_client.endpoint_type,
**_params)
cls.vpnaas_client = openstack_network_clients.VPNClient(
net_client.auth_provider,
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)
cls.qos_policy_client = openstack_network_clients.QosPoliciesClient(
net_client.auth_provider,
net_client.service,
@ -107,9 +115,10 @@ class FeatureManager(traffic_manager.IperfManager):
net_client.region,
net_client.endpoint_type,
**_params)
net_client.service = 'dns'
cls.zones_v2_client = openstack_network_clients.ZonesV2Client(
net_client.auth_provider,
'dns',
net_client.service,
net_client.region,
net_client.endpoint_type,
**_params)

View File

@ -508,6 +508,34 @@ class NSXV3Client(object):
dhcp_servers = self.get_logical_dhcp_servers()
return self.get_nsx_resource_by_name(dhcp_servers, nsx_name)
def get_dpd_profiles(self):
endpoint = "/vpn/ipsec/dpd-profiles"
return self.get_logical_resources(endpoint)
def get_ike_profiles(self):
endpoint = "/vpn/ipsec/ike-profiles"
return self.get_logical_resources(endpoint)
def get_ipsec_profiles(self):
endpoint = "/vpn/ipsec/sessions"
return self.get_logical_resources(endpoint)
def get_vpn_services(self):
endpoint = "/vpn/ipsec/services"
return self.get_logical_resources(endpoint)
def get_tunnel_profiles(self):
endpoint = "/vpn/ipsec/tunnel-profiles"
return self.get_logical_resources(endpoint)
def get_peer_endpoints(self):
endpoint = "/vpn/ipsec/peer-endpoints"
return self.get_logical_resources(endpoint)
def get_local_endpoints(self):
endpoint = "/vpn/ipsec/local-endpoints"
return self.get_logical_resources(endpoint)
def get_dhcp_server_static_bindings(self, dhcp_server):
"""
Get all DHCP static bindings of a logical DHCP server

View File

@ -104,6 +104,133 @@ class L2GatewayConnectionClient(base.BaseNetworkClient):
return self.list_resources(uri, **filters)
class VPNClient(base.BaseNetworkClient):
"""
Request resources via API for VPNaaS
vpn service reate request
vpn service update request
vpn ike policy create request
vpn ike policy update request
vpn ipsec policy create request
vpn ipsec policy update request
vpn site conection create request
vpn site connection update request
l2 gateway connection list all request
"""
endpoint_groups_path = "/vpn/endpoint-groups"
endpoint_group_path = "/vpn/endpoint-groups/%s"
vpnservices_path = "/vpn/vpnservices"
vpnservice_path = "/vpn/vpnservices/%s"
ipsecpolicies_path = "/vpn/ipsecpolicies"
ipsecpolicy_path = "/vpn/ipsecpolicies/%s"
ikepolicies_path = "/vpn/ikepolicies"
ikepolicy_path = "/vpn/ikepolicies/%s"
ipsec_site_connections_path = "/vpn/ipsec-site-connections"
ipsec_site_connection_path = "/vpn/ipsec-site-connections/%s"
def list_vpnservices(self, **filters):
"""Fetches a list of all configured VPNServices for a tenant."""
return self.list_resources(self.vpnservices_path, **filters)
def create_vpnservice(self, **kwargs):
"""Creates a new VPNService."""
return self.create_resource(self.vpnservices_path, kwargs)
def update_vpnservice(self, vpnservice_id, **kwargs):
"""Updates a VPNService."""
uri = self.vpnservice_path % vpnservice_id
return self.update_resource(uri, kwargs)
def update_ipsec_site_connections(self, endpoint_id, **kwargs):
"""Updates a VPN endpoint group."""
uri = self.ipsec_site_connection_path % endpoint_id
return self.update_resource(uri, kwargs)
def update_ikepolicy(self, ikepolicy_id, **kwargs):
"""Updates an IKEPolicy."""
uri = self.ikepolicy_path % ikepolicy_id
return self.update_resource(uri, kwargs)
def update_ipsecpolicy(self, ipsecpolicy_id, **kwargs):
uri = self.ipsecpolicy_path % ipsecpolicy_id
return self.update_resource(uri, kwargs)
def show_ikepolicy(self, ikepolicy_id):
"""Fetches information of a specific IKEPolicy."""
uri = self.ikepolicy_path % ikepolicy_id
return self.show_resource(uri)
def show_vpnservice(self, vpnservice_id):
"""Fetches information of a specific VPNService."""
uri = self.vpnservice_path % (vpnservice_id)
return self.show_resource(uri)
def show_ipsecpolicy(self, ipsecpolicy_id):
uri = self.ipsecpolicy_path % ipsecpolicy_id
return self.show_resource(uri)
def show_ipsec_site_connections(self, endpoint_id):
"""Updates a VPN endpoint group."""
uri = self.ipsec_site_connection_path % endpoint_id
return self.show_resource(uri)
def delete_vpnservice(self, vpnservice_id):
"""Deletes the specified VPNService."""
uri = self.vpnservice_path % (vpnservice_id)
self.delete_resource(uri)
def delete_ikepolicy(self, ikepolicy_id):
"""Deletes the specified IKEPolicy."""
uri = self.ikepolicy_path % ikepolicy_id
self.delete_resource(uri)
def delete_ipsecpolicy(self, ipsecpolicy_id):
"""Deletes the specified IPsecPolicy."""
uri = self.ipsecpolicy_path % (ipsecpolicy_id)
self.delete_resource(uri)
def list_ipsec_site_connections(self, retrieve_all=True, **_params):
"""Fetches all configured IPsecSiteConnections for a tenant."""
return self.list('ipsec_site_connections',
self.ipsec_site_connections_path,
retrieve_all,
**_params)
def show_ipsec_site_connection(self, ipsecsite_conn, **_params):
"""Fetches information of a specific IPsecSiteConnection."""
return self.get(
self.ipsec_site_connection_path % (ipsecsite_conn), params=_params
)
def delete_ipsec_site_connection(self, ipsecsite_conn):
"""Deletes the specified IPsecSiteConnection."""
uri = self.ipsec_site_connection_path % (ipsecsite_conn)
return self.delete_resource(uri)
def list_ikepolicies(self, retrieve_all=True, **_params):
"""Fetches a list of all configured IKEPolicies for a tenant."""
return self.list('ikepolicies', self.ikepolicies_path, retrieve_all,
**_params)
def create_ikepolicy(self, **kwargs):
"""Creates a new VPNService."""
return self.create_resource(self.ikepolicies_path, kwargs)
def create_ipsecpolicy(self, **kwargs):
return self.create_resource(self.ipsecpolicies_path, kwargs)
def create_ipsec_site_connection(self, **kwargs):
"""Creates a new VPN endpoint group."""
return self.create_resource(self.ipsec_site_connections_path, kwargs)
def list_ipsecpolicies(self, retrieve_all=True, **_params):
"""Fetches a list of all configured IPsecPolicies for a tenant."""
return self.list('ipsecpolicies',
self.ipsecpolicies_path,
retrieve_all,
**_params)
class FwaasV2Client(base.BaseNetworkClient):
"""
Request resources via API for FwaasV2Client
@ -311,6 +438,8 @@ class ZonesV2Client(designate_base.DnsClientBase):
zonesv2 show zone
zonesv2 list zones
"""
resource = 'zone'
resource_plural = 'policies'
path = 'zones'
resource_base_path = '/v2/%s' % path
@ -341,8 +470,3 @@ class ZonesV2Client(designate_base.DnsClientBase):
def list_zones(self):
return self._list_request(self.resource_base_path)
def list_recordset_zone(self, zone_id):
request = self.resource_base_path + '/' + zone_id + '/recordsets'
resp, body = self._list_request(request)
return resp, body

View File

@ -0,0 +1,681 @@
# Copyright 2018 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest.lib import exceptions as lib_exc
from tempest import test
from vmware_nsx_tempest.common import constants
from vmware_nsx_tempest.lib import feature_manager
from vmware_nsx_tempest.services import nsxv3_client
from oslo_log import log as logging
CONF = config.CONF
CONF.validation.auth_method = 'None'
LOG = logging.getLogger(__name__)
class TestVpnOps(feature_manager.FeatureManager):
@classmethod
def skip_checks(cls):
super(TestVpnOps, cls).skip_checks()
if not test.is_extension_enabled('vpnaas', 'network'):
msg = "Extension provider-security-group is not enabled."
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
cls.set_network_resources()
cls.admin_mgr = cls.get_client_manager('admin')
super(TestVpnOps, cls).setup_credentials()
@classmethod
def setup_clients(cls):
"""
Create various client connections. Such as NSX.
"""
super(TestVpnOps, cls).setup_clients()
cls.nsx_client = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def create_network_topo(self, enable_snat="False", cidr=None):
kwargs = {}
network = \
self.create_topology_network(
network_name="vpn-network",
networks_client=self.admin_mgr.networks_client)
router_name = 'vpn-router'
# Create router topo
kwargs["enable_snat"] = enable_snat
routers_client = self.admin_mgr.routers_client
router = self.create_topology_router(
router_name, routers_client=routers_client, **kwargs)
subnet_name = 'vpn-subnet'
# Create subnet topo
subnets_client = self.admin_mgr.subnets_client
if cidr is None:
subnet = self.create_topology_subnet(subnet_name, network,
router_id=router['id'],
subnets_client=subnets_client,
routers_client=routers_client
)
else:
subnet = self.create_topology_subnet(subnet_name, network,
router_id=router['id'],
subnets_client=subnets_client,
routers_client=routers_client,
cidr=cidr
)
return dict(network=network, subnet=subnet, router=router)
def create_vpn_basic_topo(
self, network_topology, name=None, ike=None, pfs=constants.PFS,
encryption_algorithm=constants.ENCRYPTION_ALGO,
lifetime=constants.LIFETIME,
peer_address=constants.PEER_ADDRESS,
peer_id=constants.PEER_ID,
site_connection_state=constants.SITE_CONNECTION_STATE):
# Create network topo
kwargs = {}
subnet = network_topology['subnet']
router = network_topology['router']
kwargs['vpnservice'] = dict(subnet_id=subnet['id'],
router_id=router['id'],
admin_state_up=site_connection_state,
name="vpn")
vpn_service = self.vpnaas_client.create_vpnservice(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_vpnservice,
vpn_service.get('vpnservice')['id'])
self.vpnaas_client.list_vpnservices()
if ike is None:
kwargs = {}
if lifetime is not None:
kwargs[
'ikepolicy'] = \
dict(name=data_utils.rand_name("ike-policy-"), pfs=pfs,
encryption_algorithm=encryption_algorithm,
lifetime=lifetime)
ike = self.vpnaas_client.create_ikepolicy(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ikepolicy,
ike.get('ikepolicy')['id'])
kwargs = {}
kwargs[
'ipsecpolicy'] = dict(name=data_utils.rand_name("ipsec-policy-"),
pfs=pfs)
ipsec = self.vpnaas_client.create_ipsecpolicy(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ipsecpolicy,
ipsec.get('ipsecpolicy')['id'])
kwargs = {}
if name is not None:
name = "site-conn-" + name
else:
name = "site-conn"
kwargs[
"ipsec_site_connection"] = \
dict(vpnservice_id=vpn_service.get('vpnservice')['id'],
psk="secret",
admin_state_up=site_connection_state, peer_cidrs=[
"10.0.1.0/24"],
ikepolicy_id=ike.get(
'ikepolicy')['id'],
ipsecpolicy_id=ipsec.get(
'ipsecpolicy')['id'],
peer_address=peer_address,
peer_id=peer_id, name=name)
endpoint = self.vpnaas_client.create_ipsec_site_connection(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_ipsec_site_connection,
endpoint.get("ipsec_site_connection")['id'])
return dict(endpoint=endpoint, vpn_service=vpn_service,
ike=ike, ipsec=ipsec)
@decorators.idempotent_id('7022b98f-f006-43c0-a1f7-5926035eb2b9')
def test_create_vpnservice_long_description(self):
description = 'x' * 256
network_topology = self.create_network_topo()
subnet = network_topology['subnet']
router = network_topology['router']
kwargs = {}
kwargs['vpnservice'] = dict(subnet_id=subnet['id'],
router_id=router['id'],
admin_state_up="True",
description=description,
name="vpn")
self.assertRaises(
lib_exc.BadRequest, self.vpnaas_client.create_vpnservice, **kwargs)
@decorators.idempotent_id('a4b0112d-2ab5-4b02-b0ab-562ae2cd4078')
def test_create_vpnservice_with_router_enable_snat(self):
network_topology = self.create_network_topo(enable_snat="True")
subnet = network_topology['subnet']
router = network_topology['router']
kwargs = {}
kwargs['vpnservice'] = dict(subnet_id=subnet['id'],
router_id=router['id'],
admin_state_up="True",
name="vpn")
self.assertRaises(
lib_exc.ServerFault, self.vpnaas_client.create_vpnservice, **kwargs
)
@decorators.idempotent_id('a68cd562-1df1-44e6-bb8b-f1ed7a1f0e2e')
def test_vpn_basic_ops(self):
"""
Test vpnaasv2 api to create icmp rule/policy/group and update it and
verifying its values
"""
network_topology = self.create_network_topo()
self.create_vpn_basic_topo(network_topology)
@decorators.idempotent_id('5802b98f-f006-43c0-a1f7-5926035eb2b9')
def test_try_to_delete_vpn_service_when_site_connection_active(self):
network_topology = self.create_network_topo(cidr="37.5.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
vpn_service = vpn_topo['vpn_service']
self.assertRaises(
lib_exc.Conflict, self.vpnaas_client.delete_vpnservice,
vpn_service.get('vpnservice')['id'])
@decorators.idempotent_id('4602b98f-f006-43c0-a1f7-5926035eb2b9')
def test_try_to_delete_ike_when_site_connection_active(self):
network_topology = self.create_network_topo(cidr="37.6.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
ike = vpn_topo['ike']
self.assertRaises(
lib_exc.Conflict, self.vpnaas_client.delete_ikepolicy,
ike.get('ikepolicy')['id'])
@decorators.idempotent_id('4502b98f-f006-43c0-a1f7-5926035eb2b9')
def test_try_to_delete_ipsec_when_site_connection_active(self):
network_topology = self.create_network_topo(cidr="37.9.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
ipsec = vpn_topo['ipsec']
self.assertRaises(
lib_exc.Conflict, self.vpnaas_client.delete_ipsecpolicy,
ipsec.get('ipsecpolicy')['id'])
@decorators.idempotent_id('1902b98f-f006-43c0-a1f7-5926035eb2b9')
def test_delete_vpn_ops(self):
network_topology = self.create_network_topo(cidr="37.10.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
ipsecpolicy = vpn_topo['ipsec']
vpnservice = vpn_topo['vpn_service']
ikepolicy = vpn_topo['ike']
endpoint = vpn_topo['endpoint']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
self.vpnaas_client.delete_ikepolicy(ikepolicy.get('ikepolicy')['id'])
self.vpnaas_client.delete_ipsecpolicy(
ipsecpolicy.get('ipsecpolicy')['id'])
self.vpnaas_client.delete_vpnservice(
vpnservice.get('vpnservice')['id'])
@decorators.idempotent_id('2022b98f-f006-43c0-a1f7-5926035eb2b9')
def test_peer_endpoint_delete_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.2.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology)
site = vpn_topo["endpoint"]
dpd_info = self.nsx_client.get_dpd_profiles()
peer_endpoints = self.nsx_client.get_peer_endpoints()
for dpd in dpd_info:
if dpd is not None and dpd.get("tags"):
if dpd.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
dpd_profile = dpd["id"]
break
continue
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(
"IPSecVPNPeerEndpoint",
end.get("resource_type"))
break
if flag == 0:
raise Exception('dpd_profile_id doesnt match with endpoint_id')
if flag == 1:
break
flag = 0
endpoint = vpn_topo['endpoint']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
peer_endpoints = self.nsx_client.get_peer_endpoints()
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(
"IPSecVPNPeerEndpoint",
end.get("resource_type"))
if flag == 1:
raise Exception('rtr_id doesnt match with endpoint_id')
@decorators.idempotent_id('1092b98f-f006-43c0-a1f7-5926035eb2b9')
def test_local_endpoint_delete_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.14.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology)
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
network_topology["router"]["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
pass
else:
raise Exception('rtr_id doesnt match with endpoint_id')
endpoint = vpn_topo['endpoint']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
network_topology["router"]["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
raise Exception('local endpoint not deleted from backend')
@decorators.idempotent_id('7022b98f-f006-43c0-a1f7-5926035eb212')
def test_vpn_service_delete_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.12.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test-delete")
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'], 'IPSecVPNService')
break
break
vpnservice = vpn_topo['vpn_service']
endpoint = vpn_topo['endpoint']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
self.vpnaas_client.delete_vpnservice(
vpnservice.get('vpnservice')['id'])
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'], 'IPSecVPNService')
flag = 1
break
if flag == 1:
break
if flag == 1:
raise Exception('vpn service not deleted from backend')
@decorators.idempotent_id('747c5864-409f-4ac4-bdbb-b74d7c618504')
def test_vpn_dpd_ike_ipsec_check_at_the_backend(self):
network_topology = self.create_network_topo(cidr="37.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
site = vpn_topo["endpoint"]
dpd_info = self.nsx_client.get_dpd_profiles()
for dpd in dpd_info:
if dpd is not None and dpd.get("tags"):
if dpd.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
self.assertIn(
"site-conn-test-dpd-profile",
dpd["display_name"])
self.assertEqual(
"os-vpn-connection-id",
dpd.get("tags")[0]["scope"])
break
ike_info = self.nsx_client.get_ike_profiles()
for ike in ike_info:
if ike is not None and ike.get("tags"):
if ike.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
self.assertEqual(
ike.get('resource_type'),
"IPSecVPNIKEProfile")
self.assertEqual(
ike.get('encryption_algorithms'),
[u'AES_128'])
self.assertEqual(ike.get('ike_version'), 'IKE_V1')
self.assertEqual(ike.get('dh_groups'), [u'GROUP14'])
break
ipsec_info = self.nsx_client.get_ipsec_profiles()
for ipsec in ipsec_info:
if ipsec is not None and ipsec.get("tags"):
if ipsec.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
self.assertEqual(
ipsec.get('resource_type'),
"PolicyBasedIPSecVPNSession")
cidr = vpn_topo['endpoint'].get(
'ipsec_site_connection')['peer_cidrs']
peer_cidr = [{u'subnet': u'%s' % cidr[0]}]
self.assertEqual(
ipsec.get('policy_rules')[0]['destinations'],
peer_cidr)
self.assertEqual(
"os-vpn-connection-id",
ipsec.get("tags")[0]["scope"])
break
@decorators.idempotent_id('cdb7333a-94c0-487f-9602-3bd990128a0f')
def test_vpn_dpd_ike_ipsec_update_at_the_backend(self):
kwargs = {}
kwargs[
'ikepolicy'] = dict(name=data_utils.rand_name("ike-policy-"),
pfs=constants.PFS,
encryption_algorithm=constants.ENCRYPTION_ALGO,
lifetime=constants.LIFETIME)
ike = self.vpnaas_client.create_ikepolicy(**kwargs)
kwargs['ikepolicy'] = \
dict(name="ike-new", ike_version="v2",
encryption_algorithm=constants.ENCRYPTION_ALGO_256,
auth_algorithm=constants.AUTH_ALGO_256)
self.vpnaas_client.update_ikepolicy(ike['ikepolicy']['id'],
**kwargs)
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
vpn_topo = self.create_vpn_basic_topo(
network_topology, "test-2", ike=ike)
ike_info = self.nsx_client.get_ike_profiles()
site = vpn_topo["endpoint"]
for ike_p in ike_info:
if ike_p is not None and ike_p.get("tags"):
if ike_p.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
self.assertEqual(ike_p.get('display_name'), "ike-new")
self.assertEqual(ike_p.get('ike_version'), "IKE_V2")
self.assertEqual(
ike_p.get('encryption_algorithms'),
[u'AES_256'])
self.assertEqual(
ike_p.get('digest_algorithms'),
[u'SHA2_256'])
break
@decorators.idempotent_id('a0a87543-fb0a-4c7a-897f-b5cd835de843')
def test_vpn_service_update_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test-4")
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
kwargs = {}
kwargs['vpnservice'] = dict(name="vpn-new", admin_state_up='false')
self.vpnaas_client.update_vpnservice(
vpn_topo['vpn_service'].get('vpnservice')['id'],
**kwargs)
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'], 'IPSecVPNService')
self.assertEqual(vpn['enabled'], True)
flag = 1
break
if flag == 1:
break
# ToDO testcase need to add
# def test_vpn_site_update_at_the_backend
@decorators.idempotent_id('00c8679d-68cd-49a9-b8b7-0dba1b675298')
def test_vpn_service_check_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
self.create_vpn_basic_topo(network_topology, "test-2")
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'], 'IPSecVPNService')
flag = 1
break
if flag == 1:
break
tunnel_profiles = self.nsx_client.get_tunnel_profiles()
for tunnel in tunnel_profiles:
if tunnel is not None and tunnel.get("tags"):
if tunnel.get("tags")[0]["tag"] == tunnel['id']:
self.assertEqual(
"IPSecVPNTunnelProfile",
tunnel.get("resource_type"))
self.assertEqual("ESP", tunnel.get("transform_protocol"))
self.assertEqual(
[u'AES_128'],
tunnel.get("encryption_algorithms"))
self.assertEqual(
"TUNNEL_MODE",
tunnel.get("encapsulation_mode"))
self.assertEqual(tunnel.get('dh_groups'), [u'GROUP14'])
break
@decorators.idempotent_id('f446a67a-4d09-4d5f-adff-cc497882d866')
def test_vpn_site_connection_at_the_backend(self):
flag = 1
network_topology = self.create_network_topo(cidr="37.2.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology)
site = vpn_topo["endpoint"]
dpd_info = self.nsx_client.get_dpd_profiles()
peer_endpoints = self.nsx_client.get_peer_endpoints()
for dpd in dpd_info:
if dpd is not None and dpd.get("tags"):
if dpd.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
dpd_profile = dpd["id"]
break
continue
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(
"IPSecVPNPeerEndpoint",
end.get("resource_type"))
break
if flag == 0:
raise Exception('dpd_profile_id doesnt match with endpoint_id')
if flag == 1:
break
flag = 0
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
network_topology["router"]["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
pass
else:
raise Exception('rtr_id doesnt match with endpoint_id')
@decorators.idempotent_id('eb953c67-8d8a-4ac5-b7c3-3c18270b50ce')
def test_vpn_basic_invalid_pfs(self):
network_topology = self.create_network_topo()
try:
self.create_vpn_basic_topo(network_topology, pfs="group5")
except exceptions.ServerFault:
LOG.info(
"Invalid VPN configuration: Unsupported pfs: "
" group5 in IKE policy.")
pass
@decorators.idempotent_id('1c034ea4-d8e6-41d0-b963-33dd5053476b')
def test_vpn_basic_algo_aes256(self):
network_topology = self.create_network_topo()
try:
self.create_vpn_basic_topo(
network_topology,
encryption_algorithm="aes-256")
except exceptions.ServerFault:
LOG.info(
"Invalid VPN configuration: Unsupported algo: aes-256 "
" is not supported.")
pass
@decorators.idempotent_id('885e1cda-b21d-4dec-8751-2a1f4e91773e')
def test_vpn_basic_invalid_algo(self):
kwargs = {}
try:
kwargs[
'ipsecpolicy'] = \
dict(name=data_utils.rand_name("ipsec-policy-"),
pfs="group14", encryption_algorithm="aes-512")
self.vpnaas_client.create_ipsecpolicy(**kwargs)
except exceptions.BadRequest:
pass
@decorators.idempotent_id('1e6f2f25-de83-4ee2-a30f-0f833da0c741')
def test_vpn_basic_invalid_pfs_value(self):
kwargs = {}
try:
kwargs[
'ipsecpolicy'] = \
dict(name=data_utils.rand_name("ipsec-policy-"),
pfs="group-14")
self.vpnaas_client.create_ipsecpolicy(**kwargs)
except exceptions.BadRequest:
pass
@decorators.idempotent_id('f4bea30b-76df-4dc2-a624-20621b8e0ef7')
def test_vpn_site_conenction_update_ops(self):
network_topo = self.create_network_topo(cidr="34.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topo)
site = vpn_topo['endpoint']
kwargs = {}
kwargs["ipsec_site_connection"] = dict(psk="new-secret")
self.vpnaas_client.update_ipsec_site_connections(
site.get('ipsec_site_connection')['id'],
**kwargs)
site_data = self.vpnaas_client.show_ipsec_site_connections(
endpoint_id=site.get('ipsec_site_connection')['id'])
self.assertEqual(
"new-secret",
site_data['ipsec_site_connection']['psk'])
kwargs = {}
kwargs["ipsec_site_connection"] = dict(
admin_state_up='False',
description="New Vpn site")
self.vpnaas_client.update_ipsec_site_connections(
site.get('ipsec_site_connection')['id'],
**kwargs)
site_data = self.vpnaas_client.show_ipsec_site_connections(
endpoint_id=site.get('ipsec_site_connection')['id'])
self.assertEqual(
False,
site_data[
'ipsec_site_connection'][
'admin_state_up'])
self.assertEqual(
"New Vpn site",
site_data[
'ipsec_site_connection'][
'description'])
@decorators.idempotent_id('8fbf9280-d154-425f-ad26-0a1250e0dd91')
def test_vpn_site_conenction_wrong_dpd_info(self):
network_topo = self.create_network_topo(cidr="35.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topo)
site = vpn_topo['endpoint']
kwargs = {}
kwargs["ipsec_site_connection"] = dict(action="disabled", timeout=1)
self.assertRaises(
lib_exc.BadRequest,
self.vpnaas_client.update_ipsec_site_connections,
site.get('ipsec_site_connection')['id'], **kwargs
)
kwargs["ipsec_site_connection"] = dict(action="restart-by-peer")
self.assertRaises(
lib_exc.BadRequest,
self.vpnaas_client.update_ipsec_site_connections,
site.get('ipsec_site_connection')['id'], **kwargs
)
kwargs["ipsec_site_connection"] = \
{"dpd": {"action": "hold", "timeout": 300}}
site_data = self.vpnaas_client.update_ipsec_site_connections(
site.get('ipsec_site_connection')['id'], **kwargs)
self.assertEqual(
"hold",
site_data['ipsec_site_connection'].get("dpd")["action"])
@decorators.idempotent_id('ea4fedae-4727-4524-a74a-0078d7fbfdd9')
def test_vpn_basic_update_ops(self):
network_topo = self.create_network_topo()
vpn_topo = self.create_vpn_basic_topo(network_topo)
vpn_service = vpn_topo['vpn_service']
kwargs = {}
kwargs['vpnservice'] = dict(admin_state_up='false')
self.vpnaas_client.update_vpnservice(
vpnservice_id=vpn_service.get('vpnservice')['id'],
**kwargs)
kwargs['vpnservice'] = dict(admin_state_up='true', description="vpn")
self.vpnaas_client.update_vpnservice(
vpnservice_id=vpn_service.get('vpnservice')['id'],
**kwargs)
@decorators.idempotent_id('d576c487-e7d5-4698-8a17-ea5521607675')
def test_vpn_ike_policy_update(self):
network_topo = self.create_network_topo(cidr="36.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topo)
try:
kwargs = {}
kwargs['ikepolicy'] = dict(pfs="group5")
self.vpnaas_client.update_ikepolicy(
vpn_topo['ike'].get('ikepolicy')['id'],
**kwargs)
except exceptions.Conflict:
LOG.info(
"IKEPolicy is in use by existing IPsecSiteConnection and "
" can't be updated or deleted")