[Tempest]: Adding new cases.

Case to check admin-state up/down when vms booted under network
connected to same router.
Case to check firewall goes in error state when attach to
shared or exclusive router.
Case to check firewall state change from ACTIVE -> INACTIVE &
and back to ACTIVE once router removed and add to firewall.
Check distributed router shouldn't get updated with exclusive
or shared router.
Case to static-route on router with 0.0.0.0/0
Change-Id: Iacfddf15c94a57fa83162b2b6ffcf4c2806b2076
This commit is contained in:
Puneet Arora 2018-07-15 20:38:25 +00:00
parent 802164f2a2
commit e6e44bf0a3
3 changed files with 682 additions and 6 deletions

View File

@ -103,8 +103,12 @@ class ApplianceManager(manager.NetworkScenarioTest):
tenant_id = routers_client.tenant_id
router_name_ = constants.APPLIANCE_NAME_STARTS_WITH + router_name
name = data_utils.rand_name(router_name_)
router = routers_client.create_router(
name=name, admin_state_up=True, tenant_id=tenant_id)['router']
if CONF.network.backend == "nsxv3":
router = routers_client.create_router(
name=name, admin_state_up=True, tenant_id=tenant_id)['router']
elif CONF.network.backend == "nsxv":
router = routers_client.create_router(
name=name, **kwargs)['router']
if set_gateway is not False:
if kwargs.get("enable_snat") is not None:
public_network_info = {"external_gateway_info": dict(
@ -221,8 +225,8 @@ class ApplianceManager(manager.NetworkScenarioTest):
"""
cidr_in_use = \
self.os_admin.subnets_client.list_subnets(
tenant_id=tenant_id, cidr=cidr)['subnets']
self.os_admin.subnets_client.\
list_subnets(tenant_id=tenant_id, cidr=cidr)['subnets']
return len(cidr_in_use) != 0
if ip_version == 6:
@ -435,13 +439,19 @@ class ApplianceManager(manager.NetworkScenarioTest):
return server
def create_topology_port(self, network,
ports_client=None, **args):
ports_client=None, **args):
if not ports_client:
ports_client = self.ports_client
port = ports_client.create_port(network_id=network['id'], **args)
self.addCleanup(ports_client.delete_port, port['port']['id'])
return port
def update_topology_port(self, port_id, ports_client=None,
**kwargs):
if not ports_client:
ports_client = self.ports_client
ports_client.update_port(port_id, **kwargs)
def _list_ports(self, *args, **kwargs):
"""List ports using admin creds """
ports_list = self.os_admin.ports_client.list_ports(

View File

@ -26,6 +26,7 @@ from vmware_nsx_tempest_plugin._i18n import _
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import traffic_manager
from vmware_nsx_tempest_plugin.services import designate_base
from vmware_nsx_tempest_plugin.services import fwaas_client as FWAASC
from vmware_nsx_tempest_plugin.services.lbaas import health_monitors_client
from vmware_nsx_tempest_plugin.services.lbaas import listeners_client
from vmware_nsx_tempest_plugin.services.lbaas import load_balancers_client
@ -34,7 +35,6 @@ from vmware_nsx_tempest_plugin.services.lbaas import pools_client
from vmware_nsx_tempest_plugin.services import nsx_client
from vmware_nsx_tempest_plugin.services import openstack_network_clients
LOG = constants.log.getLogger(__name__)
CONF = config.CONF
@ -91,6 +91,7 @@ class FeatureManager(traffic_manager.IperfManager,
net_client.region,
net_client.endpoint_type,
**_params)
cls.fwaasv1_client = FWAASC.get_client(cls.manager)
cls.vpnaas_client = openstack_network_clients.VPNClient(
net_client.auth_provider,
net_client.service,
@ -175,6 +176,164 @@ class FeatureManager(traffic_manager.IperfManager,
fw_policy = self.fwaas_v2_client.show_firewall_v2_policy(policy_id)
return fw_policy
#
# FwaasV1 base class
#
def _create_firewall_rule_name(self, body):
firewall_rule_name = body['firewall_rule']['name']
firewall_rule_name = "Fwaas-" + firewall_rule_name
return firewall_rule_name
def _delete_rule_if_exists(self, rule_id):
# delete rule, if it exists
try:
self.fwaasv1_client.delete_firewall_rule(rule_id)
# if rule is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
def _delete_firewall_if_exists(self, fw_id):
# delete firewall, if it exists
try:
self.fwaasv1_client.delete_firewall(fw_id)
# if firewall is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
self.fwaasv1_client.wait_for_resource_deletion(fw_id)
def create_fw_v1_rule(self, **kwargs):
body = self.fwaasv1_client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
**kwargs)
fw_rule = body['firewall_rule']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaasv1_client.delete_firewall_rule,
fw_rule['id'])
return fw_rule
def create_fw_v1_policy(self, **kwargs):
body = self.fwaasv1_client.create_firewall_policy(
name=data_utils.rand_name("fw-policy"),
**kwargs)
fw_policy = body['firewall_policy']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.fwaasv1_client.delete_firewall_policy,
fw_policy['id'])
return fw_policy
def insert_fw_v1_rule_in_policy(self, firewall_policy_id, firewall_rule_id,
insert_after, insert_before):
self.fwaasv1_client.insert_firewall_rule_in_policy(firewall_policy_id,
firewall_rule_id,
insert_after,
insert_before)
def delete_fw_v1_and_wait(self, firewall_id):
self.fwaasv1_client.delete_firewall(firewall_id)
self._wait_firewall_while(firewall_id, [nl_constants.PENDING_DELETE],
not_found_ok=True)
def _delete_policy_if_exists(self, policy_id):
# delete policy, if it exists
try:
self.fwaasv1_client.delete_firewall_policy(policy_id)
# if policy is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
def create_fw_v1(self, **kwargs):
body = self.fwaasv1_client.create_firewall(
name=data_utils.rand_name("fw"),
**kwargs)
fw = body['firewall']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.delete_fw_v1_and_wait,
fw['id'])
return fw
def update_fw_v1(self, firewall_id, **kwargs):
body = self.fwaasv1_client.update_firewall(firewall_id, **kwargs)
return body
def show_fw_v1(self, firewall_id):
body = self.fwaasv1_client.show_firewall(firewall_id)
return body
def _wait_fw_v1_while(self, firewall_id, statuses, not_found_ok=False):
start = int(time.time())
if not_found_ok:
expected_exceptions = (lib_exc.NotFound)
else:
expected_exceptions = ()
while True:
try:
fw = self.fwaasv1_client.show_firewall(firewall_id)
except expected_exceptions:
break
status = fw['firewall']['status']
if status not in statuses:
break
if int(time.time()) - start >= self.fwaasv1_client.build_timeout:
msg = ("Firewall %(firewall)s failed to reach "
"non PENDING status (current %(status)s)") % {
"firewall": firewall_id,
"status": status,
}
raise lib_exc.TimeoutException(msg)
time.sleep(constants.NSX_BACKEND_VERY_SMALL_TIME_INTERVAL)
def _wait_fw_v1_ready(self, firewall_id):
self._wait_firewall_while(firewall_id,
[nl_constants.PENDING_CREATE,
nl_constants.PENDING_UPDATE])
def _wait_fw_v1_until_ready(self, fw_id):
target_states = ('ACTIVE', 'CREATED')
def _wait():
firewall = self.fwaasv1_client.show_firewall(fw_id)
firewall = firewall['firewall']
return firewall['status'] in target_states
if not test_utils.call_until_true(_wait, CONF.network.build_timeout,
CONF.network.build_interval):
m = ("Timed out waiting for firewall %s to reach %s state(s)" %
(fw_id, target_states))
raise lib_exc.TimeoutException(m)
def create_fw_v1_basic_topo(self, router_type, protocol_name,
policy=None):
rtr_kwargs = {"router_type": "exclusive",
"admin_state_up": "True"}
router = self.create_topology_router("fire-1", **rtr_kwargs)
body = self.fwaasv1_client.create_fw_v1_rule(
name=data_utils.rand_name("fw-rule"),
action="allow",
protocol=protocol_name)
fw_rule_id1 = body['firewall_rule']['id']
self._create_firewall_rule_name(body)
self.addCleanup(self._delete_rule_if_exists, fw_rule_id1)
# Create firewall policy
if not policy:
body = self.fwaasv1_client.create_fw_v1_policy(
name=data_utils.rand_name("fw-policy"))
fw_policy_id = body['firewall_policy']['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.fwaasv1_client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
else:
fw_policy_id = policy
# Create firewall
firewall_1 = self.fwaasv1_client.create_fw_v1(
name=data_utils.rand_name("firewall"),
firewall_policy_id=fw_policy_id,
router_ids=[router['id']])
created_firewall = firewall_1['firewall']
self.addCleanup(self._delete_firewall_if_exists,
created_firewall['id'])
# Wait for the firewall resource to become ready
self._wait_fw_v1_until_ready(created_firewall['id'])
#
# L2Gateway base class. To get basics of L2GW.
#

View File

@ -0,0 +1,507 @@
# Copyright 2017 VMware Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import testtools
import time
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from vmware_nsx_tempest_plugin.common import constants
from vmware_nsx_tempest_plugin.lib import feature_manager
from vmware_nsx_tempest_plugin.services import nsxv3_client
from vmware_nsx_tempest_plugin.services import nsxv_client
CONF = config.CONF
class TestNewCase(feature_manager.FeatureManager):
"""Test New Cases Scenario
"""
@classmethod
def setup_clients(cls):
super(TestNewCase, cls).setup_clients()
cls.cmgr_adm = cls.get_client_manager('admin')
cls.cmgr_alt = cls.get_client_manager('alt')
cls.cmgr_adm = cls.get_client_manager('admin')
@classmethod
def resource_setup(cls):
super(TestNewCase, cls).resource_setup()
if CONF.network.backend == "nsxv3":
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
CONF.nsxv3.nsx_user,
CONF.nsxv3.nsx_password)
elif CONF.network.backend == "nsxv":
manager_ip = re.search(r"(\d{1,3}\.){3}\d{1,3}",
CONF.nsxv.manager_uri).group(0)
cls.vsm = nsxv_client.VSMClient(
manager_ip, CONF.nsxv.user, CONF.nsxv.password)
def create_topo_single_network(self, namestart, create_instance=True,
set_gateway=True, **kwargs):
"""
Create Topo where 1 logical switches which is
connected via tier-1 router.
"""
name = data_utils.rand_name(namestart)
rtr_name = "rtr" + name
network_name = "net" + name
subnet_name = "net" + name
router_state = self.create_topology_router(rtr_name,
set_gateway=set_gateway,
**kwargs)
network_state = self.create_topology_network(network_name)
subnet_state = self.create_topology_subnet(subnet_name, network_state,
router_id=router_state["id"]
)
if create_instance:
image_id = self.get_glance_image_id(['cirros'])
image_id = u'3ed1165d-a489-4c73-a887-5061f547b723'
self.create_topology_instance(
"state_vm_1", [network_state],
create_floating_ip=True, image_id=image_id)
self.create_topology_instance(
"state_vm_2", [network_state],
create_floating_ip=True, image_id=image_id)
topology_dict = dict(router_state=router_state,
network_state=network_state,
subnet_state=subnet_state)
return topology_dict
def create_topo_across_networks(self, namestart, create_instance=True):
"""
Create Topo where 2 logical switches which are
connected via tier-1 router.
"""
name = data_utils.rand_name(namestart)
rtr_name = "rtr" + name
network_name1 = "net" + name
network_name2 = "net1" + name
subnet_name1 = "sub1" + name
subnet_name2 = "sub2" + name
router_state = self.create_topology_router(rtr_name)
network_state1 = self.create_topology_network(network_name1)
network_state2 = self.create_topology_network(network_name2)
self.create_topology_subnet(subnet_name1, network_state1,
router_id=router_state["id"])
self.create_topology_subnet(subnet_name2, network_state2,
router_id=router_state["id"],
cidr="22.0.9.0/24")
if create_instance:
image_id = self.get_glance_image_id(['cirros'])
self.create_topology_instance(
"state_vm_1", [network_state1],
create_floating_ip=True, image_id=image_id)
self.create_topology_instance(
"state_vm_2", [network_state2],
create_floating_ip=True, image_id=image_id)
topology_dict = dict(router_state=router_state,
network_state1=network_state1,
network_state2=network_state2)
return topology_dict
def verify_ping_to_fip_from_ext_vm(self, server_details):
self.using_floating_ip_check_server_and_project_network_connectivity(
server_details)
def verify_ping_own_fip(self, server):
fip = server["floating_ips"][0]["floating_ip_address"]
client = self.verify_server_ssh(server, floating_ip=fip)
ping_cmd = "ping -c 1 %s " % fip
self.exec_cmd_on_server_using_fip(ping_cmd, ssh_client=client)
@decorators.idempotent_id('1206127a-91cc-8905-b217-98844caa35b2')
def test_router_interface_port_update(self):
"""
"""
self.create_topo_single_network(
"route-port", create_instance=False)
p_client = self.ports_client
port = self.get_router_port(p_client)
kwargs = {'port_security_enabled': True}
self.assertRaises(exceptions.BadRequest,
p_client.update_port,
port, **kwargs)
@decorators.idempotent_id('1206238b-91cc-8905-b217-98844caa46c3')
@testtools.skipUnless(
[
i for i in CONF.network_feature_enabled.api_extensions
if i != "mac-learning"][0],
'Mac learning feature is not available.')
def test_port_create_mac_learning_port_security(self):
topology_dict = self.create_topo_single_network(
"route-port", create_instance=False)
network_state = topology_dict['network_state']
args = {'port_security_enabled': True,
'mac_learning_enabled': False}
port = self.create_topology_port(
network_state, ports_client=self.cmgr_adm.ports_client, **args)
port = port['port']
self.assertIn("ACTIVE", port['status'])
@decorators.idempotent_id('1207349c-91cc-8905-b217-98844caa57d4')
def test_create_port_with_two_fixed_ip(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ip = [{'subnet_id': subnet_state.get(
'id'),
'ip_address': network_cidr[0] + '.20'},
{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.21'}]
args = {'fixed_ips': fix_ip, 'network_id': network_state['id']}
self.assertRaises(exceptions.BadRequest,
self.cmgr_adm.ports_client.create_port,
**args)
@decorators.idempotent_id('1207450d-91cc-8905-b217-98844caa68e5')
def test_update_port_with_two_fixed_ip(self):
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ip1 = [
{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.20'}]
port = self.create_topology_port(
network_state, ports_client=self.cmgr_adm.ports_client,
fixed_ips=fix_ip1)
port = port['port']
self.assertIn("ACTIVE", port['status'])
fix_ip = [{'subnet_id': subnet_state.get(
'id'),
'ip_address': network_cidr[0] + '.21'},
{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.22'}]
args = {'fixed_ips': fix_ip}
self.assertRaises(exceptions.BadRequest,
self.cmgr_adm.ports_client.update_port,
port['id'], **args)
@decorators.idempotent_id('1206016a-91cc-8905-b217-98844caa24a1')
def test_router_admin_state_when_vms_hosted(self):
"""
Check router admin state should be down if vms hosted from network
which is attached to router
"""
# Create single network attached to router topo
topology_dict = self.create_topo_single_network("admin_state")
router_state = topology_dict['router_state']
network_state = topology_dict['network_state']
# Update router admin state to False
kwargs = {"admin_state_up": "False"}
self.assertRaises(exceptions.BadRequest,
self.routers_client.update_router,
router_state['id'], **kwargs)
# Verify E-W traffic
self.check_cross_network_connectivity(
network_state,
self.servers_details.get("state_vm_1").floating_ips[0],
self.servers_details.get("state_vm_1").server, should_connect=True)
self.check_cross_network_connectivity(
network_state,
self.servers_details.get("state_vm_2").floating_ips[0],
self.servers_details.get("state_vm_2").server, should_connect=True)
# Verify fip ping N-S traffic
for server, details in self.servers_details.items():
self.verify_ping_to_fip_from_ext_vm(details)
self.verify_ping_own_fip(self.topology_servers["state_vm_1"])
self.verify_ping_own_fip(self.topology_servers["state_vm_2"])
# Update router admin state to False
self.assertRaises(exceptions.BadRequest,
self.routers_client.update_router,
router_state['id'], **kwargs)
@decorators.idempotent_id('9006016a-91cc-8905-b217-98844caa2212')
def test_dhcp_port_update_with_device_owner_field(self):
"""
Check dhcp port update with device owner field doesn't let
operational down for that port.
"""
# Create single network attached to router topo
self.create_topo_single_network(
"admin_state", create_instance=False)
ports = self.ports_client.list_ports()
for port in ports['ports']:
if 'device_owner' in port:
if port['device_owner'] == "network:dhcp":
kwargs = {"device_owner": "nova:compute"}
self.assertRaises(
exceptions.BadRequest, self.ports_client.update_port,
port['id'], **kwargs)
@decorators.idempotent_id('1206016a-91cc-8905-b217-98844caa2212')
@testtools.skipUnless(
[
i for i in CONF.network_feature_enabled.api_extensions
if i != "provider-security-group"][0],
'provider-security-group feature is not available.')
def test_mac_learning_with_provider_sec_group_enabled_on_port(self):
"""
Check mac larning to be enabled on port and provider-sec group
should get disabled.
"""
self.create_topology_security_provider_group(self.cmgr_adm,
provider=True)
network_state = self.create_topology_network("pro-network")
self.create_topology_subnet("pro-sub", network_state)
port = self.create_topology_port(
network_state, ports_client=self.cmgr_adm.ports_client)
port_id = port.get('port')['id']
kwargs = {"port_security_enabled": "false",
"mac_learning_enabled": "false", "security_groups": [],
"provider_security_groups": []}
self.assertRaises(exceptions.Forbidden, self.update_topology_port,
port_id, **kwargs)
network_state = self.create_topology_network(
"pro-network-admin", networks_client=self.cmgr_adm.networks_client)
self.create_topology_subnet(
"pro-sub-admin",
network_state,
subnets_client=self.cmgr_adm.subnets_client)
port = self.create_topology_port(
network_state, ports_client=self.cmgr_adm.ports_client)
port_id = port.get('port')['id']
kwargs = {"port_security_enabled": "false",
"mac_learning_enabled": "false", "security_groups": [],
"provider_security_groups": []}
self.update_topology_port(
port_id,
ports_client=self.cmgr_adm.ports_client,
**kwargs)
@decorators.idempotent_id('1207561e-91cc-8905-b217-98844caa79f6')
def test_create_port_with_dhcp_port_ip(self):
topology_dict = self.create_topo_single_network(
"instance_port", deploy_instance=False)
network_state = topology_dict['network_state']
subnet_state = topology_dict['subnet_state']
network_cidr = (
CONF.network.project_network_cidr.rsplit('/')[0]).rsplit('.',
1)
fix_ip = [
{'subnet_id': subnet_state.get('id'),
'ip_address': network_cidr[0] + '.2'}]
args = {'fixed_ips': fix_ip, 'network_id': network_state['id']}
self.assertRaises(exceptions.BadRequest,
self.cmgr_adm.ports_client.create_port,
**args)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2226016a-91cc-8905-b217-12344caa24a1')
def test_dist_router_update_probhited(self):
kwargs = {"distributed": "true",
"admin_state_up": "True"}
topology_dict = self.create_topo_single_network("rtr_update",
create_instance=False,
set_gateway=False,
**kwargs)
router_state = topology_dict['router_state']
router_id = router_state['id']
kwargs = {"router_type": "exclusive"}
# Update router from distributed to exclusive should be restricted
self.assertRaises(exceptions.BadRequest, self.update_topology_router,
router_id, **kwargs)
kwargs = {"router_type": "shared"}
# Update router from distributed to shared should be restricted
self.assertRaises(exceptions.BadRequest, self.update_topology_router,
router_id, **kwargs)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2306016a-91cc-8905-b217-98844caa24a1')
def test_assign_firewall_to_shared_router_failed(self):
"""
Firewall creation with shared router should get fail
"""
# Create shared router
kwargs = {"router_type": "shared",
"admin_state_up": "True"}
router = self.create_topology_router("fire-1", **kwargs)
firewall = self.create_fw_v1_rule(action="allow",
protocol="icmp")
fw_rule_id1 = firewall['id']
self.addCleanup(self._delete_rule_if_exists, fw_rule_id1)
# Create firewall policy
body = self.create_fw_v1_policy()
fw_policy_id = body['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.insert_fw_v1_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
# Create firewall should fail with shared router
firewall_1 = self.create_fw_v1(
firewall_policy_id=fw_policy_id,
router_ids=[router['id']])
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
firewall_info = self.show_fw_v1(firewall_1['id'])
self.assertIn("ERROR", firewall_info['firewall']['status'])
kwargs = {"router_ids": []}
self.update_fw_v1(firewall_1['id'], **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2306016a-91cc-8905-b217-98844caa24a1')
def test_assign_firewall_to_md_router_failed(self):
"""
Firewall creation with md router should get fail
"""
firewall = self.create_fw_v1_rule(action="allow",
protocol="icmp")
fw_rule_id1 = firewall['id']
self.addCleanup(self._delete_rule_if_exists, fw_rule_id1)
# Create firewall policy
body = self.create_fw_v1_policy()
fw_policy_id = body['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.insert_fw_v1_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
# Create firewall should fail with shared router
routers_list = self.cmgr_adm.routers_client.list_routers()
router_id = [
router for router in routers_list['routers']
if "metadata_proxy_router" in router.get('name')][0]['id']
firewall_1 = self.create_fw_v1(
firewall_policy_id=fw_policy_id,
router_ids=[router_id])
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
firewall_info = self.show_fw_v1(firewall_1['id'])
self.assertIn("ERROR", firewall_info['firewall']['status'])
kwargs = {"router_ids": []}
self.fwaasv1_client.update_fw_v1(firewall_1['id'], **kwargs)
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2676016a-91cc-8905-b217-98844caa24a1')
def test_update_firewall_from_router_to_no_router(self):
"""
Firewall update should work on exclusive router
"""
kwargs = {"router_type": "exclusive",
"admin_state_up": "True"}
router = self.create_topology_router("fire-1", **kwargs)
firewall = self.create_fw_v1_rule(action="allow",
protocol="icmp")
fw_rule_id1 = firewall['id']
self.addCleanup(self._delete_rule_if_exists, fw_rule_id1)
# Create firewall policy
body = self.create_fw_v1_policy()
fw_policy_id = body['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.insert_fw_v1_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
# Create firewall should fail with shared router
firewall_1 = self.create_fw_v1(
firewall_policy_id=fw_policy_id,
router_ids=[router['id']])
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
firewall_info = self.show_fw_v1(firewall_1['id'])
self.assertIn("ACTIVE", firewall_info['firewall']['status'])
kwargs = {"router_ids": []}
self.update_fw_v1(firewall_1['id'], **kwargs)
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
firewall_info = self.show_fw_v1(firewall_1['id'])
self.assertIn("INACTIVE", firewall_info['firewall']['status'])
kwargs = {"router_ids": [router['id']]}
self.update_fw_v1(firewall_1['id'], **kwargs)
self._wait_fw_v1_until_ready(firewall_1['id'])
firewall_info = self.show_fw_v1(firewall_1['id'])
self.assertIn("ACTIVE", firewall_info['firewall']['status'])
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2226016a-91cc-8905-b217-12344caa24a1')
def test_update_router_with_static_route_via_0_0_0_0(self):
kwargs = {"distributed": "true",
"admin_state_up": "True"}
topology_dict = self.create_topo_single_network("rtr_update",
create_instance=False,
set_gateway=False,
**kwargs)
next_hop = topology_dict['subnet_state']['allocation_pools'][0]['end']
router_state = topology_dict['router_state']
routes = [{
"destination": "0.0.0.0/0",
"nexthop": next_hop
}]
router_id = router_state['id']
self.assertRaises(exceptions.BadRequest,
self.routers_client.update_router,
router_id, routes=routes)
@decorators.attr(type='nsxt')
@decorators.idempotent_id('2227127b-91cc-8905-b217-12344cab35b2')
def test_update_router_nsxt_with_static_route_via_0_0_0_0(self):
kwargs = {"admin_state_up": "True"}
topology_dict = self.create_topo_single_network("rtr_update",
create_instance=False,
set_gateway=False,
**kwargs)
next_hop = topology_dict['subnet_state']['allocation_pools'][0]['end']
router_state = topology_dict['router_state']
routes = [{
"destination": "0.0.0.0/0",
"nexthop": next_hop
}]
router_id = router_state['id']
self.assertRaises(exceptions.BadRequest,
self.routers_client.update_router,
router_id, routes=routes)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2226016a-91cc-8905-b217-12344caa24a1')
def test_exc_to_shared_router_update_not_allowed_with_fw(self):
kwargs = {"router_type": "exclusive",
"admin_state_up": "True"}
name = "rtr-exc"
router_state = self.create_topology_router(name, set_gateway=True,
**kwargs)
router_id = router_state['id']
firewall = self.create_fw_v1_rule(action="allow",
protocol="icmp")
fw_rule_id1 = firewall['id']
self.addCleanup(self._delete_rule_if_exists, fw_rule_id1)
# Create firewall policy
body = self.create_fw_v1_policy()
fw_policy_id = body['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.insert_fw_v1_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
# Create firewall should fail with shared router
firewall_1 = self.create_fw_v1(
firewall_policy_id=fw_policy_id,
router_ids=[router_id])
time.sleep(constants.NSX_BACKEND_SMALL_TIME_INTERVAL)
firewall_info = self.show_fw_v1(firewall_1['id'])
self.assertIn("ACTIVE", firewall_info['firewall']['status'])
kwargs = {"router_type": "shared"}
# Update router from distributed to shared should be restricted
self.assertRaises(exceptions.BadRequest, self.update_topology_router,
router_id, **kwargs)