Merge "Add changes to existing vpnaas testcases for policy"

This commit is contained in:
Zuul 2019-10-31 07:35:34 +00:00 committed by Gerrit Code Review
commit 214b525245
2 changed files with 317 additions and 119 deletions

View File

@ -220,6 +220,71 @@ class NSXPClient(object):
"""
return self.get_logical_resources("/transport-zones")
def get_dpd_profiles(self):
"""
Retrieve all dpd profiles
"""
return self.get_logical_resources("ipsec-vpn-dpd-profiles")
def get_ipsec_profiles(self):
"""
Retrieve all ipsec profiles
"""
endpoint = "ipsec-vpn-tunnel-profiles"
return self.get_logical_resources(endpoint)
def get_local_endpoint(self, os_name, os_uuid):
"""
Retrieve local endpoint for router
"""
router = self.get_logical_router(os_name, os_uuid)
locale_service = self.get_logical_router_local_services(os_name,
os_uuid)
vpn_service = self.get_vpn_service(os_name, os_uuid)
endpoint = "tier-1s/%s/locale-services/%s/ipsec-vpn-services/%s/ \
local-endpoints" % (router["id"],
locale_service[0].get("id"),
vpn_service[0].get("id"))
return self.get_logical_resources(endpoint)
def get_ipsec_session(self, os_name, os_uuid):
"""
Retrieve ipsec session for router
"""
router = self.get_logical_router(os_name, os_uuid)
locale_service = self.get_logical_router_local_services(os_name,
os_uuid)
vpn_service = self.get_vpn_service(os_name, os_uuid)
endpoint = "tier-1s/%s/locale-services/%s/ipsec-vpn-services/%s/ \
sessions" % (router["id"], locale_service[0].get("id"),
vpn_service[0].get("id"))
return self.get_logical_resources(endpoint)
def get_vpn_service(self, os_name, os_uuid):
"""
Retrieve vpn service for router
"""
router = self.get_logical_router(os_name, os_uuid)
locale_service = self.get_logical_router_local_services(os_name,
os_uuid)
endpoint = "tier-1s/%s/locale-services/%s/ipsec-vpn-services" % \
(router["id"], locale_service[0].get("id"))
return self.get_logical_resources(endpoint)
def get_ike_profiles(self):
"""
Retrieve ipsec ike profiles
"""
endpoint = "ipsec-vpn-ike-profiles"
return self.get_logical_resources(endpoint)
def get_tunnel_profiles(self):
"""
Retrieve tunnel profiles
"""
endpoint = "ipsec-vpn-tunnel-profiles"
return self.get_logical_resources(endpoint)
def get_logical_routers(self, tier=None):
"""
Retrieve all the logical routers based on router type. If tier

View File

@ -12,7 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest import config
from tempest.lib.common.utils import data_utils
@ -24,9 +24,9 @@ from tempest import test
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxp_client
from vmware_nsx_tempest_plugin.services import nsxv3_client
from oslo_log import log as logging
@ -60,6 +60,9 @@ class TestVpnOps(feature_manager.FeatureManager):
cls.nsx_client = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
cls.nsxp_client = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
def create_network_topo(self, enable_snat="False", cidr=None):
kwargs = {}
@ -185,9 +188,32 @@ class TestVpnOps(feature_manager.FeatureManager):
router_id=router['id'],
admin_state_up="True",
name="vpn")
self.assertRaises(
lib_exc.ServerFault, self.vpnaas_client.create_vpnservice, **kwargs
)
vpn_service = self.vpnaas_client.create_vpnservice(**kwargs)
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.vpnaas_client.delete_vpnservice,
vpn_service.get('vpnservice')['id'])
if CONF.network.backend == 'nsxp':
nsx_router = self.nsxp_client.get_logical_router(router["name"],
router["id"])
vpn_service = self.nsxp_client.get_vpn_service(router["name"],
router["id"])
self.assertEqual(vpn_service[0].get('tags')[0].get("tag").encode(),
nsx_router["id"])
self.assertEqual(vpn_service[0].get('resource_type').encode(),
'IPSecVpnService')
else:
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual((vpn.get('tags')[0]).get('tag'),
rtr["id"])
self.assertEqual(vpn['resource_type'].encode(),
'IPSecVPNService')
break
break
@decorators.idempotent_id('a68cd562-1df1-44e6-bb8b-f1ed7a1f0e2e')
def test_vpn_basic_ops(self):
@ -275,63 +301,106 @@ class TestVpnOps(feature_manager.FeatureManager):
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
peer_endpoints = self.nsx_client.get_peer_endpoints()
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(
"IPSecVPNPeerEndpoint",
end.get("resource_type"))
if flag == 1:
raise Exception('rtr_id doesnt match with endpoint_id')
if 'result_count' in peer_endpoints.keys() and \
peer_endpoints.get('result_count') == 0:
pass
else:
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(
"IPSecVPNPeerEndpoint",
end.get("resource_type"))
if flag == 1:
raise Exception('rtr_id doesnt match with endpoint_id')
@decorators.idempotent_id('1092b98f-f006-43c0-a1f7-5926035eb2b9')
def test_local_endpoint_delete_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.14.0.0/24")
router = network_topology["router"]
vpn_topo = self.create_vpn_basic_topo(network_topology)
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
network_topology["router"]["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if CONF.network.backend == 'nsxp':
local_endpoint = self.nsxp_client.get_local_endpoint(
router["name"], router["id"])
if local_endpoint[0].get('tags')[0].get("tag").encode() == \
router["id"]:
self.assertIsNotNone(local_endpoint[0].get("local_address"))
self.assertIsNotNone(local_endpoint[0].get("local_id"))
flag = 1
else:
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
router["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
pass
else:
raise Exception('rtr_id doesnt match with endpoint_id')
endpoint = vpn_topo['endpoint']
vpnservice = vpn_topo['vpn_service']
self.vpnaas_client.delete_ipsec_site_connection(
endpoint.get("ipsec_site_connection")['id'])
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
network_topology["router"]["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
raise Exception('local endpoint not deleted from backend')
self.vpnaas_client.delete_vpnservice(
vpnservice.get('vpnservice')['id'])
flag = 0
if CONF.network.backend == 'nsxp':
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
try:
local_endpoints = self.nsxp_client.get_local_endpoint(
router["name"], router["id"])
self.assertNone(local_endpoints)
except IndexError:
raise Exception('local endpoint not deleted from backend')
else:
local_endpoints = self.nsx_client.get_local_endpoints()
if 'result_count' in local_endpoints.keys() and \
local_endpoints.get('result_count') == 0:
pass
else:
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
router["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
raise Exception('local endpoint not deleted from backend')
@decorators.idempotent_id('7022b98f-f006-43c0-a1f7-5926035eb212')
def test_vpn_service_delete_at_the_backend(self):
flag = 0
network_topology = self.create_network_topo(cidr="37.12.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test-delete")
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'], 'IPSecVPNService')
router = network_topology["router"]
if CONF.network.backend == 'nsxp':
nsx_router = self.nsxp_client.get_logical_router(router["name"],
router["id"])
vpn_service = self.nsxp_client.get_vpn_service(router["name"],
router["id"])
self.assertEqual(vpn_service[0].get('tags')[0].get("tag").encode(),
nsx_router["id"])
self.assertEqual(vpn_service[0].get('resource_type').encode(),
'IPSecVpnService')
else:
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'].encode(),
'IPSecVPNService')
break
break
vpnservice = vpn_topo['vpn_service']
@ -340,25 +409,42 @@ class TestVpnOps(feature_manager.FeatureManager):
endpoint.get("ipsec_site_connection")['id'])
self.vpnaas_client.delete_vpnservice(
vpnservice.get('vpnservice')['id'])
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'], 'IPSecVPNService')
flag = 1
break
if flag == 1:
break
if CONF.network.backend == 'nsxp':
rtr_name = network_topology["router"]["name"]
rtr_id = network_topology["router"]["id"]
router = self.nsxp_client.get_logical_router(rtr_name, rtr_id)
vpn_service = self.nsxp_client.get_vpn_service(rtr_name, rtr_id)
if len(vpn_service) != 0:
flag = 1
else:
vpn_services = self.nsx_client.get_vpn_services()
if 'result_count' in vpn_services.keys() and \
vpn_services['result_count'] == 0:
pass
else:
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'],
rtr["id"])
self.assertEqual(vpn['resource_type'].encode(),
'IPSecVPNService')
flag = 1
break
if flag == 1:
break
if flag == 1:
raise Exception('vpn service not deleted from backend')
@decorators.idempotent_id('747c5864-409f-4ac4-bdbb-b74d7c618504')
def test_vpn_dpd_ike_ipsec_check_at_the_backend(self):
network_topology = self.create_network_topo(cidr="37.0.0.0/24")
network_topology = self.create_network_topo(cidr="39.0.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test")
site = vpn_topo["endpoint"]
dpd_info = self.nsx_client.get_dpd_profiles()
if CONF.network.backend == 'nsxp':
dpd_info = self.nsxp_client.get_dpd_profiles()
else:
dpd_info = self.nsx_client.get_dpd_profiles()
for dpd in dpd_info:
if dpd is not None and dpd.get("tags"):
if dpd.get("tags")[0]["tag"] == \
@ -370,7 +456,10 @@ class TestVpnOps(feature_manager.FeatureManager):
"os-vpn-connection-id",
dpd.get("tags")[0]["scope"])
break
ike_info = self.nsx_client.get_ike_profiles()
if CONF.network.backend == 'nsxp':
ike_info = self.nsxp_client.get_ike_profiles()
else:
ike_info = self.nsx_client.get_ike_profiles()
for ike in ike_info:
if ike is not None and ike.get("tags"):
if ike.get("tags")[0]["tag"] == \
@ -384,7 +473,10 @@ class TestVpnOps(feature_manager.FeatureManager):
self.assertEqual(ike.get('ike_version'), 'IKE_V1')
self.assertEqual(ike.get('dh_groups'), [u'GROUP14'])
break
ipsec_info = self.nsx_client.get_ipsec_profiles()
if CONF.network.backend == 'nsxp':
ipsec_info = self.nsxp_client.get_ipsec_profiles()
else:
ipsec_info = self.nsx_client.get_ipsec_profiles()
for ipsec in ipsec_info:
if ipsec is not None and ipsec.get("tags"):
if ipsec.get("tags")[0]["tag"] == \
@ -422,7 +514,10 @@ class TestVpnOps(feature_manager.FeatureManager):
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
vpn_topo = self.create_vpn_basic_topo(
network_topology, "test-2", ike=ike)
ike_info = self.nsx_client.get_ike_profiles()
if CONF.network.backend == 'nsxp':
ike_info = self.nsxp_client.get_ike_profiles()
else:
ike_info = self.nsx_client.get_ike_profiles()
site = vpn_topo["endpoint"]
for ike_p in ike_info:
if ike_p is not None and ike_p.get("tags"):
@ -443,23 +538,32 @@ class TestVpnOps(feature_manager.FeatureManager):
flag = 0
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
vpn_topo = self.create_vpn_basic_topo(network_topology, "test-4")
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
router = network_topology['router']
kwargs = {}
kwargs['vpnservice'] = dict(name="vpn-new", admin_state_up='false')
self.vpnaas_client.update_vpnservice(
vpn_topo['vpn_service'].get('vpnservice')['id'],
**kwargs)
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'], 'IPSecVPNService')
self.assertEqual(vpn['enabled'], True)
flag = 1
if CONF.network.backend == 'nsxp':
ipsec_session = self.nsxp_client.get_ipsec_session(router["name"],
router["id"])
self.assertEqual(ipsec_session[0].get('resource_type').encode(),
'PolicyBasedIPSecVpnSession')
self.assertEqual(ipsec_session[0].get('enabled'), False)
else:
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_service()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'],
'IPSecVPNService')
self.assertEqual(vpn['enabled'], True)
flag = 1
break
if flag == 1:
break
if flag == 1:
break
# ToDO testcase need to add
# def test_vpn_site_update_at_the_backend
@ -469,42 +573,59 @@ class TestVpnOps(feature_manager.FeatureManager):
flag = 0
network_topology = self.create_network_topo(cidr="37.1.0.0/24")
self.create_vpn_basic_topo(network_topology, "test-2")
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'], 'IPSecVPNService')
flag = 1
break
if flag == 1:
break
tunnel_profiles = self.nsx_client.get_tunnel_profiles()
for tunnel in tunnel_profiles:
if tunnel is not None and tunnel.get("tags"):
if tunnel.get("tags")[0]["tag"] == tunnel['id']:
self.assertEqual(
"IPSecVPNTunnelProfile",
tunnel.get("resource_type"))
self.assertEqual("ESP", tunnel.get("transform_protocol"))
self.assertEqual(
[u'AES_128'],
tunnel.get("encryption_algorithms"))
self.assertEqual(
"TUNNEL_MODE",
tunnel.get("encapsulation_mode"))
self.assertEqual(tunnel.get('dh_groups'), [u'GROUP14'])
router = network_topology["router"]
if CONF.network.backend == 'nsxp':
nsx_router = self.nsxp_client.get_logical_router(router["name"],
router["id"])
vpn_service = self.nsxp_client.get_vpn_service(router["name"],
router["id"])
self.assertEqual(vpn_service[0].get('tags')[0].get("tag").encode(),
nsx_router["id"])
self.assertEqual(vpn_service[0].get('resource_type').encode(),
'IPSecVpnService')
else:
routers = self.nsx_client.get_logical_routers()
vpn_services = self.nsx_client.get_vpn_services()
for rtr in routers:
for vpn in vpn_services:
if vpn['logical_router_id'] == rtr["id"]:
self.assertEqual(vpn['logical_router_id'], rtr["id"])
self.assertEqual(vpn['resource_type'],
'IPSecVPNService')
flag = 1
break
if flag == 1:
break
tunnel_profiles = self.nsx_client.get_tunnel_profiles()
for tunnel in tunnel_profiles:
if tunnel is not None and tunnel.get("tags"):
if tunnel.get("tags")[0]["tag"] == tunnel['id']:
self.assertEqual(
"IPSecVPNTunnelProfile",
tunnel.get("resource_type"))
self.assertEqual("ESP",
tunnel.get("transform_protocol"))
self.assertEqual(
[u'AES_128'],
tunnel.get("encryption_algorithms"))
self.assertEqual(
"TUNNEL_MODE",
tunnel.get("encapsulation_mode"))
self.assertEqual(tunnel.get('dh_groups'),
[u'GROUP14'])
break
@decorators.idempotent_id('f446a67a-4d09-4d5f-adff-cc497882d866')
def test_vpn_site_connection_at_the_backend(self):
flag = 1
network_topology = self.create_network_topo(cidr="37.2.0.0/24")
router = network_topology["router"]
vpn_topo = self.create_vpn_basic_topo(network_topology)
site = vpn_topo["endpoint"]
dpd_info = self.nsx_client.get_dpd_profiles()
peer_endpoints = self.nsx_client.get_peer_endpoints()
if CONF.network.backend == 'nsxp':
dpd_info = self.nsxp_client.get_dpd_profiles()
else:
dpd_info = self.nsx_client.get_dpd_profiles()
for dpd in dpd_info:
if dpd is not None and dpd.get("tags"):
if dpd.get("tags")[0]["tag"] == \
@ -512,30 +633,42 @@ class TestVpnOps(feature_manager.FeatureManager):
dpd_profile = dpd["id"]
break
continue
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(
"IPSecVPNPeerEndpoint",
end.get("resource_type"))
if CONF.network.backend == 'nsxv3':
peer_endpoints = self.nsx_client.get_peer_endpoints()
for end in peer_endpoints:
if end.get("tags")[0]["tag"] == \
site.get('ipsec_site_connection')['id']:
if end['dpd_profile_id'] == dpd_profile:
flag = 1
self.assertEqual(end['peer_id'], '172.24.4.12')
self.assertEqual(end.get("resource_type"),
"IPSecVPNPeerEndpoint")
break
if flag == 0:
raise Exception('dpd_profile_id doesnt match \
with endpoint_id')
if flag == 1:
break
if flag == 0:
raise Exception('dpd_profile_id doesnt match with endpoint_id')
if flag == 1:
break
flag = 0
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
network_topology["router"]["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if CONF.network.backend == 'nsxp':
local_endpoint = self.nsxp_client.get_local_endpoint(
router["name"], router["id"])
if local_endpoint[0].get('tags')[0].get("tag").encode() == \
router["id"]:
self.assertIsNotNone(local_endpoint[0].get("local_address"))
self.assertIsNotNone(local_endpoint[0].get("local_id"))
flag = 1
else:
local_endpoints = self.nsx_client.get_local_endpoints()
for local in local_endpoints:
if local is not None and local.get("tags"):
if local.get("tags")[0]["tag"] == \
network_topology["router"]["id"]:
self.assertIsNotNone(local["local_address"])
self.assertIsNotNone(local["local_id"])
flag = 1
break
if flag == 1:
pass
else: