Merge "tempest: Add test cases for router insertion"
This commit is contained in:
commit
cb0093d185
|
@ -13,6 +13,8 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import testscenarios
|
||||||
|
|
||||||
from tempest import config
|
from tempest import config
|
||||||
from tempest import test
|
from tempest import test
|
||||||
|
|
||||||
|
@ -20,16 +22,29 @@ from neutron_fwaas.tests.tempest_plugin.tests.scenario import base
|
||||||
|
|
||||||
CONF = config.CONF
|
CONF = config.CONF
|
||||||
|
|
||||||
|
load_tests = testscenarios.load_tests_apply_scenarios
|
||||||
|
|
||||||
|
|
||||||
class TestFWaaS(base.FWaaSScenarioTest):
|
class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
|
scenarios = [
|
||||||
|
('without router insersion', {
|
||||||
|
'router_insertion': False,
|
||||||
|
}),
|
||||||
|
('with router insersion', {
|
||||||
|
'router_insertion': True,
|
||||||
|
}),
|
||||||
|
]
|
||||||
|
|
||||||
@classmethod
|
def setUp(self):
|
||||||
def resource_setup(cls):
|
super(TestFWaaS, self).setUp()
|
||||||
super(TestFWaaS, cls).resource_setup()
|
required_exts = ['fwaas', 'security-group', 'router']
|
||||||
for ext in ['fwaas', 'security-group', 'router']:
|
if self.router_insertion:
|
||||||
|
required_exts.append('fwaasrouterinsertion')
|
||||||
|
for ext in required_exts:
|
||||||
if not test.is_extension_enabled(ext, 'network'):
|
if not test.is_extension_enabled(ext, 'network'):
|
||||||
msg = "%s Extension not enabled." % ext
|
msg = "%s Extension not enabled." % ext
|
||||||
raise cls.skipException(msg)
|
raise self.skipException(msg)
|
||||||
|
self._router_ids = None
|
||||||
|
|
||||||
def _create_server(self, network, security_group=None):
|
def _create_server(self, network, security_group=None):
|
||||||
keys = self.create_keypair()
|
keys = self.create_keypair()
|
||||||
|
@ -43,10 +58,15 @@ class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
**kwargs)
|
**kwargs)
|
||||||
return server, keys
|
return server, keys
|
||||||
|
|
||||||
|
def _create_firewall(self, **kwargs):
|
||||||
|
if self._router_ids is not None:
|
||||||
|
kwargs['router_ids'] = self._router_ids
|
||||||
|
return self.create_firewall(**kwargs)
|
||||||
|
|
||||||
def _empty_policy(self, server1_ip):
|
def _empty_policy(self, server1_ip):
|
||||||
# NOTE(yamamoto): an empty policy would deny all
|
# NOTE(yamamoto): an empty policy would deny all
|
||||||
fw_policy = self.create_firewall_policy(firewall_rules=[])
|
fw_policy = self.create_firewall_policy(firewall_rules=[])
|
||||||
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
|
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
|
||||||
self._wait_firewall_ready(fw['id'])
|
self._wait_firewall_ready(fw['id'])
|
||||||
return {
|
return {
|
||||||
'fw': fw,
|
'fw': fw,
|
||||||
|
@ -57,7 +77,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
# NOTE(yamamoto): a policy whose rules are all disabled would deny all
|
# NOTE(yamamoto): a policy whose rules are all disabled would deny all
|
||||||
fw_rule = self.create_firewall_rule(action="allow", enabled=False)
|
fw_rule = self.create_firewall_rule(action="allow", enabled=False)
|
||||||
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
|
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
|
||||||
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
|
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
|
||||||
self._wait_firewall_ready(fw['id'])
|
self._wait_firewall_ready(fw['id'])
|
||||||
return {
|
return {
|
||||||
'fw': fw,
|
'fw': fw,
|
||||||
|
@ -74,7 +94,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
action="allow")
|
action="allow")
|
||||||
fw_policy = self.create_firewall_policy(
|
fw_policy = self.create_firewall_policy(
|
||||||
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
|
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
|
||||||
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
|
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
|
||||||
self._wait_firewall_ready(fw['id'])
|
self._wait_firewall_ready(fw['id'])
|
||||||
return {
|
return {
|
||||||
'fw': fw,
|
'fw': fw,
|
||||||
|
@ -90,7 +110,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
action="allow")
|
action="allow")
|
||||||
fw_policy = self.create_firewall_policy(
|
fw_policy = self.create_firewall_policy(
|
||||||
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
|
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
|
||||||
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
|
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
|
||||||
self._wait_firewall_ready(fw['id'])
|
self._wait_firewall_ready(fw['id'])
|
||||||
return {
|
return {
|
||||||
'fw': fw,
|
'fw': fw,
|
||||||
|
@ -105,7 +125,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
action="allow")
|
action="allow")
|
||||||
fw_policy = self.create_firewall_policy(
|
fw_policy = self.create_firewall_policy(
|
||||||
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
|
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
|
||||||
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
|
fw = self._create_firewall(firewall_policy_id=fw_policy['id'])
|
||||||
self._wait_firewall_ready(fw['id'])
|
self._wait_firewall_ready(fw['id'])
|
||||||
return {
|
return {
|
||||||
'fw': fw,
|
'fw': fw,
|
||||||
|
@ -117,7 +137,7 @@ class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
# NOTE(yamamoto): A firewall with admin_state_up=False would block all
|
# NOTE(yamamoto): A firewall with admin_state_up=False would block all
|
||||||
fw_rule = self.create_firewall_rule(action="allow")
|
fw_rule = self.create_firewall_rule(action="allow")
|
||||||
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
|
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
|
||||||
fw = self.create_firewall(firewall_policy_id=fw_policy['id'],
|
fw = self._create_firewall(firewall_policy_id=fw_policy['id'],
|
||||||
admin_state_up=False)
|
admin_state_up=False)
|
||||||
self._wait_firewall_ready(fw['id'])
|
self._wait_firewall_ready(fw['id'])
|
||||||
return {
|
return {
|
||||||
|
@ -187,6 +207,17 @@ class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
**kwargs)
|
**kwargs)
|
||||||
self.check_connectivity(check_ssh=False, **kwargs)
|
self.check_connectivity(check_ssh=False, **kwargs)
|
||||||
|
|
||||||
|
def _create_topology(self):
|
||||||
|
public_network_id = CONF.network.public_network_id
|
||||||
|
network, subnet, router = self.create_networks()
|
||||||
|
security_group = self._create_security_group()
|
||||||
|
server, keys = self._create_server(network,
|
||||||
|
security_group=security_group)
|
||||||
|
private_key = keys['private_key']
|
||||||
|
server_floating_ip = self.create_floating_ip(server, public_network_id)
|
||||||
|
server_ip = server_floating_ip.floating_ip_address
|
||||||
|
return server_ip, private_key, router
|
||||||
|
|
||||||
def _test_firewall_basic(self, block, allow=None,
|
def _test_firewall_basic(self, block, allow=None,
|
||||||
confirm_allowed=None, confirm_blocked=None):
|
confirm_allowed=None, confirm_blocked=None):
|
||||||
if allow is None:
|
if allow is None:
|
||||||
|
@ -196,25 +227,34 @@ class TestFWaaS(base.FWaaSScenarioTest):
|
||||||
if confirm_blocked is None:
|
if confirm_blocked is None:
|
||||||
confirm_blocked = self._confirm_blocked
|
confirm_blocked = self._confirm_blocked
|
||||||
ssh_login = CONF.validation.image_ssh_user
|
ssh_login = CONF.validation.image_ssh_user
|
||||||
public_network_id = CONF.network.public_network_id
|
|
||||||
|
|
||||||
network1, subnet1, router1 = self.create_networks()
|
|
||||||
security_group = self._create_security_group()
|
|
||||||
server1, keys1 = self._create_server(network1,
|
|
||||||
security_group=security_group)
|
|
||||||
private_key = keys1['private_key']
|
|
||||||
server1_floating_ip = self.create_floating_ip(server1,
|
|
||||||
public_network_id)
|
|
||||||
server1_ip = server1_floating_ip.floating_ip_address
|
|
||||||
|
|
||||||
|
server1_ip, private_key1, router1 = self._create_topology()
|
||||||
|
server2_ip, private_key2, router2 = self._create_topology()
|
||||||
|
if self.router_insertion:
|
||||||
|
# Specify the router when creating a firewall and ensures that
|
||||||
|
# the other router (router2) is not affected by the firewall
|
||||||
|
self._router_ids = [router1['id']]
|
||||||
|
confirm_allowed2 = self.check_connectivity
|
||||||
|
confirm_blocked2 = self.check_connectivity
|
||||||
|
else:
|
||||||
|
# Without router insertion, all routers should be affected
|
||||||
|
# equally
|
||||||
|
confirm_allowed2 = confirm_allowed
|
||||||
|
confirm_blocked2 = confirm_blocked
|
||||||
confirm_allowed(ip_address=server1_ip, username=ssh_login,
|
confirm_allowed(ip_address=server1_ip, username=ssh_login,
|
||||||
private_key=private_key)
|
private_key=private_key1)
|
||||||
|
confirm_allowed2(ip_address=server2_ip, username=ssh_login,
|
||||||
|
private_key=private_key2)
|
||||||
ctx = block(server1_ip)
|
ctx = block(server1_ip)
|
||||||
confirm_blocked(ip_address=server1_ip, username=ssh_login,
|
confirm_blocked(ip_address=server1_ip, username=ssh_login,
|
||||||
private_key=private_key)
|
private_key=private_key1)
|
||||||
|
confirm_blocked2(ip_address=server2_ip, username=ssh_login,
|
||||||
|
private_key=private_key2)
|
||||||
allow(ctx)
|
allow(ctx)
|
||||||
confirm_allowed(ip_address=server1_ip, username=ssh_login,
|
confirm_allowed(ip_address=server1_ip, username=ssh_login,
|
||||||
private_key=private_key)
|
private_key=private_key1)
|
||||||
|
confirm_allowed2(ip_address=server2_ip, username=ssh_login,
|
||||||
|
private_key=private_key2)
|
||||||
|
|
||||||
@test.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21a8')
|
@test.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21a8')
|
||||||
def test_firewall_block_ip(self):
|
def test_firewall_block_ip(self):
|
||||||
|
|
Loading…
Reference in New Issue