[Tempest]: Added new cases related to

Lbaas
Mac-learning
security-groups

Change-Id: I937a75ef1869c380f7505b95bffddc91e4cb0286
This commit is contained in:
shubhamk 2018-08-02 14:25:44 +00:00
parent e6e44bf0a3
commit 0dfcc7f315
5 changed files with 268 additions and 65 deletions

View File

@ -75,7 +75,7 @@ SLEEP_BETWEEN_VIRTUAL_SEREVRS_OPEARTIONS = 120
REDIRECT_TO_POOL = "REDIRECT_TO_POOL"
REJECT = "REJECT"
#AUDIT LOG WAIT TIME
# AUDIT LOG WAIT TIME
AUDIT_WAIT_TIME = 300
# ZONE Designate
ZONE_WAIT_TIME = 120
@ -90,3 +90,4 @@ LIFETIME = {"units": "seconds", "value": 21600}
PEER_ADDRESS = "172.24.4.12"
SITE_CONNECTION_STATE = 'True'
PSK = "secret"
CIDR = "22.0.9.0/24"

View File

@ -328,8 +328,8 @@ class ApplianceManager(manager.NetworkScenarioTest):
port_map = [(p["id"], fxip["ip_address"])
for p in ports
for fxip in p["fixed_ips"]
if netutils.is_valid_ipv4(fxip["ip_address"])
and p['status'] in p_status]
if netutils.is_valid_ipv4(fxip["ip_address"]) and
p['status'] in p_status]
inactive = [p for p in ports if p['status'] != 'ACTIVE']
if inactive:
LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
@ -355,19 +355,24 @@ class ApplianceManager(manager.NetworkScenarioTest):
subnet = result['subnet']
return subnet
def create_floatingip(self, thing, port_id, external_network_id=None,
def create_floatingip(self, thing=None, port_id=None,
external_network_id=None,
ip4=None, client=None):
"""Create a floating IP and associates to a resource/port on Neutron"""
if not external_network_id:
external_network_id = self.topology_public_network_id
if not client:
client = self.floating_ips_client
result = client.create_floatingip(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=thing['tenant_id'],
fixed_ip_address=ip4
)
if thing is None and port_id is None:
result = client.create_floatingip(
floating_network_id=external_network_id)
else:
result = client.create_floatingip(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=thing['tenant_id'],
fixed_ip_address=ip4
)
floating_ip = result['floatingip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_floatingip,
@ -375,9 +380,9 @@ class ApplianceManager(manager.NetworkScenarioTest):
return floating_ip
def create_topology_instance(
self, server_name, networks, security_groups=None,
self, server_name, networks=None, security_groups=None,
config_drive=None, keypair=None, image_id=None,
clients=None, create_floating_ip=True, **kwargs):
clients=None, create_floating_ip=True, port=None, **kwargs):
# Define security group for server.
if CONF.nsxv3.ens is not True:
if security_groups:
@ -407,9 +412,14 @@ class ApplianceManager(manager.NetworkScenarioTest):
server_name_ = constants.APPLIANCE_NAME_STARTS_WITH + server_name
# Collect all the networks for server.
networks_ = []
for net in networks:
net_ = {"uuid": net["id"]}
networks_.append(net_)
if networks is not None:
for net in networks:
net_ = {"uuid": net["id"]}
networks_.append(net_)
# Deploy instance with port
if port is not None:
port_ = {"port": port['id']}
networks_.append(port_)
# Deploy server with all the args.
server = self.create_server(
name=server_name_, networks=networks_, clients=clients, **kwargs)

View File

@ -46,6 +46,7 @@ RULE_TYPE_DSCP_MARK = "dscp_marking"
# It includes feature related function such CRUD Mdproxy, L2GW or QoS
class FeatureManager(traffic_manager.IperfManager,
designate_base.DnsClientBase):
@classmethod
def setup_clients(cls):
"""Create various client connections. Such as NSXv3 and L2 Gateway.
@ -477,6 +478,15 @@ class FeatureManager(traffic_manager.IperfManager,
lbs = lb_client.list_load_balancers()['loadbalancers']
self.assertEqual(0, len(lbs))
def delete_lb_pool_healthmonitor(self, pool):
"""Deletion of lb health pool and monitor.
"""
test_utils.call_and_ignore_notfound_exc(
self.health_monitors_client.delete_health_monitor,
pool.get('pool')['healthmonitor_id'])
test_utils.call_and_ignore_notfound_exc(
self.pools_client.delete_pool, pool.get('pool')['id'])
def delete_lb_pool_resources(self, lb_id, pool):
"""Deletion of lbaas pool resources.
@ -683,7 +693,11 @@ class FeatureManager(traffic_manager.IperfManager,
self.create_floatingip(self.loadbalancer,
port_id=self.loadbalancer['vip_port_id'])
self.vip_ip_address = vip_fip['floating_ip_address']
return self.vip_ip_address
pools = self.pools_client.show_pool(
self.pool['id'])
return dict(lb_id=lb_id, pool=pools,
vip_port=self.loadbalancer['vip_port_id'],
vip_ip=self.vip_ip_address)
def get_router_port(self, client):
"""List ports using admin creds """

View File

@ -21,6 +21,7 @@ from vmware_nsx_tempest_plugin.lib import feature_manager
from tempest.common import waiters
from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
@ -29,6 +30,7 @@ CONF = config.CONF
class LBaasRoundRobinBaseTest(feature_manager.FeatureManager):
"""Base class to support LBaaS ROUND-ROBIN test.
It provides the methods to create loadbalancer network, and
@ -113,13 +115,15 @@ class LBaasRoundRobinBaseTest(feature_manager.FeatureManager):
port_range_min=443, port_range_max=443, )]
for rule in lbaas_rules:
self.add_security_group_rule(self.sg, rule)
self.create_topology_subnet(
subnet_lbaas = self.create_topology_subnet(
"subnet_lbaas_1", network_lbaas_1, router_id=router_lbaas["id"])
for instance in range(0, no_of_servers):
self.create_topology_instance(
"server_lbaas_%s" % instance, [network_lbaas_1],
security_groups=[{'name': self.sg['name']}],
image_id=image_id)
return dict(router=router_lbaas, subnet=subnet_lbaas,
network=network_lbaas_1)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('c5ac853b-6867-4b7a-8704-3844b11b1a34')
@ -249,3 +253,35 @@ class LBaasRoundRobinBaseTest(feature_manager.FeatureManager):
self.create_addtional_lbaas_members(constants.HTTP_PORT)
time.sleep(constants.SLEEP_BETWEEN_VIRTUAL_SEREVRS_OPEARTIONS)
self.check_project_lbaas(constants.NO_OF_VMS_4)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('1839d22d-4da2-460e-9c5b-bd8ddc1d35b6')
def test_user_not_able_to_update_lb_port(self):
"""
Admin user shouldn't be able to update Lb internal ports
"""
self.deploy_lbaas_topology()
lb = self.create_project_lbaas(
protocol_type="HTTP", protocol_port="80",
lb_algorithm="ROUND_ROBIN", hm_type='PING')
kwargs = {"admin_state_up": True}
self.assertRaises(exceptions.BadRequest,
self.manager.ports_client.update_port(lb['vip_port'],
**kwargs))
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('98e1d22d-4da2-460e-9c5b-bd8ddc1d35b6')
def test_delete_router_attaching_to_lb(self):
"""
Delete tier-1 router when Lb is attached to it
"""
lb_topo = self.deploy_lbaas_topology()
if not CONF.nsxv3.ens:
self.start_web_servers(constants.HTTP_PORT)
lb = self.create_project_lbaas(
protocol_type="HTTP", protocol_port="80",
lb_algorithm="ROUND_ROBIN", hm_type='PING')
self.delete_lb_pool_healthmonitor(lb['pool'])
self.assertRaises(exceptions.BadRequest, self.remove_router_interface,
lb_topo.get('router')['id'],
lb_topo.get('subnet')['id'])

View File

@ -60,10 +60,9 @@ class TestNewCase(feature_manager.FeatureManager):
Create Topo where 1 logical switches which is
connected via tier-1 router.
"""
name = data_utils.rand_name(namestart)
rtr_name = "rtr" + name
network_name = "net" + name
subnet_name = "net" + name
rtr_name = data_utils.rand_name(name='tempest-router')
network_name = data_utils.rand_name(name='tempest-net')
subnet_name = data_utils.rand_name(name='tempest-subnet')
router_state = self.create_topology_router(rtr_name,
set_gateway=set_gateway,
**kwargs)
@ -72,8 +71,7 @@ class TestNewCase(feature_manager.FeatureManager):
router_id=router_state["id"]
)
if create_instance:
image_id = self.get_glance_image_id(['cirros'])
image_id = u'3ed1165d-a489-4c73-a887-5061f547b723'
image_id = self.get_glance_image_id(["cirros", "esx"])
self.create_topology_instance(
"state_vm_1", [network_state],
create_floating_ip=True, image_id=image_id)
@ -90,12 +88,11 @@ class TestNewCase(feature_manager.FeatureManager):
Create Topo where 2 logical switches which are
connected via tier-1 router.
"""
name = data_utils.rand_name(namestart)
rtr_name = "rtr" + name
network_name1 = "net" + name
network_name2 = "net1" + name
subnet_name1 = "sub1" + name
subnet_name2 = "sub2" + name
rtr_name = data_utils.rand_name(name='tempest-router')
network_name1 = data_utils.rand_name(name='tempest-net')
network_name2 = data_utils.rand_name(name='tempest-net')
subnet_name1 = data_utils.rand_name(name='tempest-subnet')
subnet_name2 = data_utils.rand_name(name='tempest-subnet')
router_state = self.create_topology_router(rtr_name)
network_state1 = self.create_topology_network(network_name1)
network_state2 = self.create_topology_network(network_name2)
@ -103,9 +100,9 @@ class TestNewCase(feature_manager.FeatureManager):
router_id=router_state["id"])
self.create_topology_subnet(subnet_name2, network_state2,
router_id=router_state["id"],
cidr="22.0.9.0/24")
cidr=constants.CIDR)
if create_instance:
image_id = self.get_glance_image_id(['cirros'])
image_id = self.get_glance_image_id(['cirros', "esx"])
self.create_topology_instance(
"state_vm_1", [network_state1],
create_floating_ip=True, image_id=image_id)
@ -130,6 +127,8 @@ class TestNewCase(feature_manager.FeatureManager):
@decorators.idempotent_id('1206127a-91cc-8905-b217-98844caa35b2')
def test_router_interface_port_update(self):
"""
Check it should not allow to update
port security of router port
"""
self.create_topo_single_network(
"route-port", create_instance=False)
@ -142,11 +141,14 @@ class TestNewCase(feature_manager.FeatureManager):
@decorators.idempotent_id('1206238b-91cc-8905-b217-98844caa46c3')
@testtools.skipUnless(
[
i for i in CONF.network_feature_enabled.api_extensions
[i for i in CONF.network_feature_enabled.api_extensions
if i != "mac-learning"][0],
'Mac learning feature is not available.')
def test_port_create_mac_learning_port_security(self):
"""
Check it should create port with port security enabled
and mac learning disabled.
"""
topology_dict = self.create_topo_single_network(
"route-port", create_instance=False)
network_state = topology_dict['network_state']
@ -155,10 +157,17 @@ class TestNewCase(feature_manager.FeatureManager):
port = self.create_topology_port(
network_state, ports_client=self.cmgr_adm.ports_client, **args)
port = port['port']
self.assertIn("ACTIVE", port['status'])
self.assertEqual(True, port['port_security_enabled'])
if 'mac_learning_enabled' in port:
raise Exception("Mac learning is enabled")
self.assertEqual("ACTIVE", port['status'])
@decorators.idempotent_id('1207349c-91cc-8905-b217-98844caa57d4')
def test_create_port_with_two_fixed_ip(self):
"""
Check it should not allow to create port with two
fixed ips.
"""
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False)
network_state = topology_dict['network_state']
@ -178,6 +187,10 @@ class TestNewCase(feature_manager.FeatureManager):
@decorators.idempotent_id('1207450d-91cc-8905-b217-98844caa68e5')
def test_update_port_with_two_fixed_ip(self):
"""
Check it should not update port with two
fixed ips.
"""
topology_dict = self.create_topo_single_network(
"instance_port", create_instance=False)
network_state = topology_dict['network_state']
@ -192,7 +205,7 @@ class TestNewCase(feature_manager.FeatureManager):
network_state, ports_client=self.cmgr_adm.ports_client,
fixed_ips=fix_ip1)
port = port['port']
self.assertIn("ACTIVE", port['status'])
self.assertEqual("ACTIVE", port['status'])
fix_ip = [{'subnet_id': subnet_state.get(
'id'),
'ip_address': network_cidr[0] + '.21'},
@ -255,10 +268,111 @@ class TestNewCase(feature_manager.FeatureManager):
exceptions.BadRequest, self.ports_client.update_port,
port['id'], **kwargs)
@decorators.idempotent_id('9006123b-91cc-8905-b217-98844caa3423')
def test_boot_instance_with_dhcp_port(self):
"""
Check it should not allow to boot instance
with dhcp port.
"""
# Create single network attached to router topo
topology_dict = self.create_topo_single_network(
"dhcp_port", create_instance=False)
network_state = topology_dict['network_state']
ports = self.ports_client.list_ports()
for port in ports['ports']:
if 'device_owner' in port:
if port['device_owner'] == "network:dhcp" \
and port['network_id'] == network_state['id']:
port = port
break
image_id = self.get_glance_image_id(['cirros', "esx"])
self.assertRaises(exceptions.Conflict, self.create_topology_instance,
"state_vm_1", create_floating_ip=False,
image_id=image_id, port=port)
@decorators.idempotent_id('1206016a-91cc-8905-b217-98844caa2212')
@testtools.skipUnless(
[
i for i in CONF.network_feature_enabled.api_extensions
[i for i in CONF.network_feature_enabled.api_extensions
if i != "provider-security-group"][0],
'provider-security-group feature is not available.')
def test_update_port_with_provider_securtiy_group(self):
"""
Check provider security group attachment should be failed
when port security disabled.
"""
self.create_topology_security_provider_group(self.cmgr_adm,
provider=True)
network_state = self.create_topology_network("pro-network")
self.create_topology_subnet("pro-sub", network_state)
kwargs = {"port_security_enabled": "false",
"security_groups": []}
port = self.create_topology_port(
network_state, ports_client=self.cmgr_adm.ports_client)
port_id = port.get('port')['id']
provider_sec = self.create_topology_security_provider_group(
self.cmgr_adm,
provider=True)
kwargs = {
"provider_security_groups": provider_sec['id']}
self.assertRaises(
exceptions.BadRequest, self.ports_client.update_port,
port_id, **kwargs)
@decorators.idempotent_id('1208238c-91cc-8905-b217-98844caa4434')
@testtools.skipUnless(
[i for i in CONF.network_feature_enabled.api_extensions
if i != "port-security-enabled"][0],
'provider-security-group feature is not available.')
def test_dhcp_port_of_network_with_port_security_disabled(self):
"""
Check port security of dhcp port should be disabled.
"""
network_state = self.create_topology_network("test-network")
kwargs = {"port_security_enabled": "false"}
self.networks_client.update_network(network_state['id'], **kwargs)
self.create_topology_subnet("test-sub", network_state)
ports = self.ports_client.list_ports()
for port in ports['ports']:
if 'device_owner' in port:
if port['device_owner'] == "network:dhcp" and \
port['network_id'] == network_state['id']:
port = port
break
if port['port_security_enabled'] is not False:
raise Exception("Port security of dhcp port is enabled")
@decorators.idempotent_id('1209349d-91cc-8905-b217-98844cab5545')
@testtools.skipUnless(
[i for i in CONF.network_feature_enabled.api_extensions
if i != "provider-security-group"][0],
'provider-security-group feature is not available.')
def test_port_security_disabled_port_in_exclude_list(self):
"""
Check port security disabled port should be in exclude
list at the backend.
"""
network_state = self.create_topology_network("test-network")
self.create_topology_subnet("test-sub", network_state)
kwargs = {"port_security_enabled": "false",
"security_groups": []}
port = self.create_topology_port(
network_state, ports_client=self.cmgr_adm.ports_client, **kwargs)
port_id = port.get('port')['id']
ports = self.nsx.get_logical_ports()
port_tags = None
for port in ports:
if 'tags' in port:
for tag in port['tags']:
if tag['tag'] == port_id:
port_tags = {'tags': port['tags']}
result = (item for item in port_tags['tags'] if
item["tag"] == "Exclude-Port").next()
if result is None:
raise Exception("Port is not in exclude list")
@decorators.idempotent_id('1206016a-91cc-8905-b217-98844caa2212')
@testtools.skipUnless(
[i for i in CONF.network_feature_enabled.api_extensions
if i != "provider-security-group"][0],
'provider-security-group feature is not available.')
def test_mac_learning_with_provider_sec_group_enabled_on_port(self):
@ -274,26 +388,15 @@ class TestNewCase(feature_manager.FeatureManager):
network_state, ports_client=self.cmgr_adm.ports_client)
port_id = port.get('port')['id']
kwargs = {"port_security_enabled": "false",
"mac_learning_enabled": "false", "security_groups": [],
"provider_security_groups": []}
self.assertRaises(exceptions.Forbidden, self.update_topology_port,
port_id, **kwargs)
network_state = self.create_topology_network(
"pro-network-admin", networks_client=self.cmgr_adm.networks_client)
self.create_topology_subnet(
"pro-sub-admin",
network_state,
subnets_client=self.cmgr_adm.subnets_client)
port = self.create_topology_port(
network_state, ports_client=self.cmgr_adm.ports_client)
port_id = port.get('port')['id']
kwargs = {"port_security_enabled": "false",
"mac_learning_enabled": "false", "security_groups": [],
"mac_learning_enabled": "true", "security_groups": [],
"provider_security_groups": []}
self.update_topology_port(
port_id,
ports_client=self.cmgr_adm.ports_client,
**kwargs)
port_id, ports_client=self.cmgr_adm.ports_client, **kwargs)
image_id = self.get_glance_image_id(['cirros', "esx"])
vm_state = self.create_topology_instance(
"state_vm_1", create_floating_ip=False,
image_id=image_id, port=port['port'])
self.assertEqual("ACTIVE", vm_state['status'])
@decorators.idempotent_id('1207561e-91cc-8905-b217-98844caa79f6')
def test_create_port_with_dhcp_port_ip(self):
@ -315,6 +418,10 @@ class TestNewCase(feature_manager.FeatureManager):
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2226016a-91cc-8905-b217-12344caa24a1')
def test_dist_router_update_probhited(self):
"""
Updation on distributed router to exclusive should not be
allowed
"""
kwargs = {"distributed": "true",
"admin_state_up": "True"}
topology_dict = self.create_topo_single_network("rtr_update",
@ -345,11 +452,9 @@ class TestNewCase(feature_manager.FeatureManager):
firewall = self.create_fw_v1_rule(action="allow",
protocol="icmp")
fw_rule_id1 = firewall['id']
self.addCleanup(self._delete_rule_if_exists, fw_rule_id1)
# Create firewall policy
body = self.create_fw_v1_policy()
fw_policy_id = body['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.insert_fw_v1_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
@ -377,7 +482,6 @@ class TestNewCase(feature_manager.FeatureManager):
# Create firewall policy
body = self.create_fw_v1_policy()
fw_policy_id = body['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.insert_fw_v1_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
@ -408,11 +512,9 @@ class TestNewCase(feature_manager.FeatureManager):
firewall = self.create_fw_v1_rule(action="allow",
protocol="icmp")
fw_rule_id1 = firewall['id']
self.addCleanup(self._delete_rule_if_exists, fw_rule_id1)
# Create firewall policy
body = self.create_fw_v1_policy()
fw_policy_id = body['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.insert_fw_v1_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
@ -433,11 +535,14 @@ class TestNewCase(feature_manager.FeatureManager):
self._wait_fw_v1_until_ready(firewall_1['id'])
firewall_info = self.show_fw_v1(firewall_1['id'])
self.assertIn("ACTIVE", firewall_info['firewall']['status'])
time.sleep(constants.NSX_BACKEND_TIME_INTERVAL)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2226016a-91cc-8905-b217-12344caa24a1')
def test_update_router_with_static_route_via_0_0_0_0(self):
"""
Check it should not allow to add static route on router with
0.0.0.0/0 next hop.
"""
kwargs = {"distributed": "true",
"admin_state_up": "True"}
topology_dict = self.create_topo_single_network("rtr_update",
@ -455,9 +560,13 @@ class TestNewCase(feature_manager.FeatureManager):
self.routers_client.update_router,
router_id, routes=routes)
@decorators.attr(type='nsxt')
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('2227127b-91cc-8905-b217-12344cab35b2')
def test_update_router_nsxt_with_static_route_via_0_0_0_0(self):
def test_update_router_nsxv3_with_static_route_via_0_0_0_0(self):
"""
Check it should not allow to add static route on router with
0.0.0.0/0 next hop.
"""
kwargs = {"admin_state_up": "True"}
topology_dict = self.create_topo_single_network("rtr_update",
create_instance=False,
@ -474,9 +583,44 @@ class TestNewCase(feature_manager.FeatureManager):
self.routers_client.update_router,
router_id, routes=routes)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('1116016a-91cc-8905-b217-12344caa24a1')
def test_mac_learning_should_not_applied_over_trusted_ports(self):
"""
Test mac learning shouldn't be applied over trusted ports
"""
fip = self.create_floatingip(client=self.cmgr_adm.floating_ips_client)
ports = self.cmgr_adm.ports_client.list_ports()
port_id = [port.get("id")
for port in ports['ports'] if
port.get('fixed_ips')[0]["ip_address"] ==
fip["floating_ip_address"]][0]
kwargs = {"mac_learning_enabled": True}
self.assertRaises(exceptions.BadRequest,
self.cmgr_adm.ports_client.update_port, port_id,
**kwargs)
@decorators.attr(type='nsxv3')
@decorators.idempotent_id('8816016a-91cc-8905-b217-12344caa9112')
def test_create_sec_group_with_invalid_protocol(self):
"""
Security group rule shouldn't be created wrong protocol
"""
sec_group = self.create_topology_empty_security_group()
rule = dict(direction='egress', protocol='ipip')
self.add_security_group_rule(sec_group, rule)
rule = dict(direction='egress', protocol='ipipip')
self.assertRaises(exceptions.BadRequest,
self.add_security_group_rule,
sec_group, rule)
@decorators.attr(type='nsxv')
@decorators.idempotent_id('2226016a-91cc-8905-b217-12344caa24a1')
def test_exc_to_shared_router_update_not_allowed_with_fw(self):
"""
Check if updation of router from exclusive to shared is restricted
if firewall is attatched
"""
kwargs = {"router_type": "exclusive",
"admin_state_up": "True"}
name = "rtr-exc"
@ -486,11 +630,9 @@ class TestNewCase(feature_manager.FeatureManager):
firewall = self.create_fw_v1_rule(action="allow",
protocol="icmp")
fw_rule_id1 = firewall['id']
self.addCleanup(self._delete_rule_if_exists, fw_rule_id1)
# Create firewall policy
body = self.create_fw_v1_policy()
fw_policy_id = body['id']
self.addCleanup(self._delete_policy_if_exists, fw_policy_id)
# Insert rule to firewall policy
self.insert_fw_v1_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')