Merge "Add some more tempest scenarios"

This commit is contained in:
Jenkins 2015-12-01 00:31:52 +00:00 committed by Gerrit Code Review
commit ada8022878
3 changed files with 268 additions and 20 deletions

View File

@ -13,9 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
from tempest import config
from tempest import exceptions
from neutron.plugins.common import constants as p_const
from neutron_fwaas.tests.tempest_plugin.services import client
@ -79,6 +85,39 @@ class FWaaSClientMixin(object):
**kwargs)
fw = body['firewall']
self.addCleanup(self._delete_wrapper,
self.firewalls_client.delete_firewall,
self.delete_firewall_and_wait,
fw['id'])
return fw
def delete_firewall_and_wait(self, firewall_id):
self.firewalls_client.delete_firewall(firewall_id)
self._wait_firewall_while(firewall_id, [p_const.PENDING_DELETE],
not_found_ok=True)
def _wait_firewall_ready(self, firewall_id):
self._wait_firewall_while(firewall_id,
[p_const.PENDING_CREATE,
p_const.PENDING_UPDATE])
def _wait_firewall_while(self, firewall_id, statuses, not_found_ok=False):
start = int(time.time())
if not_found_ok:
expected_exceptions = (lib_exc.NotFound)
else:
expected_exceptions = ()
while True:
try:
fw = self.firewalls_client.show_firewall(firewall_id)
except expected_exceptions:
break
status = fw['firewall']['status']
if status not in statuses:
break
if int(time.time()) - start >= self.firewalls_client.build_timeout:
msg = ("Firewall %(firewall)s failed to reach "
"non PENDING status (current %(status)s)") % {
"firewall": firewall_id,
"status": status,
}
raise exceptions.TimeoutException(msg)
time.sleep(1)

View File

@ -13,11 +13,43 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib.common import ssh
from tempest_lib import exceptions as lib_exc
from tempest import config
from tempest.scenario import manager
from neutron_fwaas.tests.tempest_plugin.tests import fwaas_client
CONF = config.CONF
class FWaaSScenarioTest(fwaas_client.FWaaSClientMixin,
manager.NetworkScenarioTest):
_delete_wrapper = manager.NetworkScenarioTest.delete_wrapper
def check_connectivity(self, ip_address, username=None, private_key=None,
should_connect=True,
check_icmp=True, check_ssh=True):
if should_connect:
msg = "Timed out waiting for %s to become reachable" % ip_address
else:
msg = "ip address %s is reachable" % ip_address
if check_icmp:
ok = self.ping_ip_address(ip_address,
should_succeed=should_connect),
self.assertTrue(ok, msg=msg)
if check_ssh:
connect_timeout = CONF.validation.connect_timeout
kwargs = {}
if not should_connect:
# Use a shorter timeout for negative case
kwargs['timeout'] = 1
try:
client = ssh.Client(ip_address, username, pkey=private_key,
channel_timeout=connect_timeout,
**kwargs)
client.test_connection_auth()
except lib_exc.SSHTimeout:
if should_connect:
raise

View File

@ -44,8 +44,158 @@ class TestFWaaS(base.FWaaSScenarioTest):
server = self.create_server(create_kwargs=kwargs)
return server, keys
@test.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21a8')
def test_firewall_basic(self):
def _empty_policy(self, server1_ip):
# NOTE(yamamoto): an empty policy would deny all
fw_policy = self.create_firewall_policy(firewall_rules=[])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
'fw_policy': fw_policy,
}
def _all_disabled_rules(self, server1_ip):
# NOTE(yamamoto): a policy whose rules are all disabled would deny all
fw_rule = self.create_firewall_rule(action="allow", enabled=False)
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
'fw_policy': fw_policy,
'fw_rule': fw_rule,
}
def _block_ip(self, server1_ip):
# NOTE(yamamoto): this rule does NOT match with icmp packets
fw_rule = self.create_firewall_rule(
source_ip_address=server1_ip,
action="deny")
fw_rule_allow = self.create_firewall_rule(
action="allow")
fw_policy = self.create_firewall_policy(
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
'fw_policy': fw_policy,
'fw_rule': fw_rule,
}
def _block_icmp(self, server1_ip):
fw_rule = self.create_firewall_rule(
protocol="icmp",
action="deny")
fw_rule_allow = self.create_firewall_rule(
action="allow")
fw_policy = self.create_firewall_policy(
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
'fw_policy': fw_policy,
'fw_rule': fw_rule,
}
def _block_all_with_default_allow(self, server1_ip):
fw_rule = self.create_firewall_rule(
action="deny")
fw_rule_allow = self.create_firewall_rule(
action="allow")
fw_policy = self.create_firewall_policy(
firewall_rules=[fw_rule['id'], fw_rule_allow['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
'fw_policy': fw_policy,
'fw_rule': fw_rule,
}
def _admin_disable(self, server1_ip):
# NOTE(yamamoto): A firewall with admin_state_up=False would block all
fw_rule = self.create_firewall_rule(action="allow")
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'],
admin_state_up=False)
self._wait_firewall_ready(fw['id'])
return {
'fw': fw,
'fw_policy': fw_policy,
'fw_rule': fw_rule,
}
def _allow_ssh_and_icmp(self, ctx):
fw_ssh_rule = self.create_firewall_rule(
protocol="tcp",
destination_port=22,
action="allow")
fw_icmp_rule = self.create_firewall_rule(
protocol="icmp",
action="allow")
for rule in [fw_ssh_rule, fw_icmp_rule]:
self.firewall_policies_client.insert_firewall_rule_in_policy(
firewall_policy_id=ctx['fw_policy']['id'],
firewall_rule_id=rule['id'],
insert_before=ctx['fw_rule']['id'])
self.addCleanup(
self._remove_rule_and_wait,
firewall_id=ctx['fw']['id'],
firewall_policy_id=ctx['fw_policy']['id'],
firewall_rule_id=rule['id'])
self._wait_firewall_ready(ctx['fw']['id'])
def _remove_rule_and_wait(self, firewall_id, firewall_policy_id,
firewall_rule_id):
self.firewall_policies_client.remove_firewall_rule_from_policy(
firewall_policy_id=firewall_policy_id,
firewall_rule_id=firewall_rule_id)
self._wait_firewall_ready(firewall_id)
def _delete_fw(self, ctx):
self.delete_firewall_and_wait(ctx['fw']['id'])
def _set_admin_up(self, firewall_id, up):
self.firewalls_client.update_firewall(firewall_id=firewall_id,
admin_state_up=up)
self._wait_firewall_ready(firewall_id=firewall_id)
def _admin_enable(self, ctx):
self._set_admin_up(ctx['fw']['id'], up=True)
def _remove_rule(self, ctx):
self._remove_rule_and_wait(
firewall_id=ctx['fw']['id'],
firewall_policy_id=ctx['fw_policy']['id'],
firewall_rule_id=ctx['fw_rule']['id'])
def _disable_rule(self, ctx):
self.firewall_rules_client.update_firewall_rule(
firewall_rule_id=ctx['fw_rule']['id'],
enabled=False)
self._wait_firewall_ready(ctx['fw']['id'])
def _confirm_allowed(self, **kwargs):
self.check_connectivity(**kwargs)
def _confirm_blocked(self, **kwargs):
self.check_connectivity(should_connect=False, **kwargs)
def _confirm_tcp_blocked_but_icmp(self, **kwargs):
self.check_connectivity(should_connect=False, check_icmp=False,
**kwargs)
self.check_connectivity(check_ssh=False, **kwargs)
def _test_firewall_basic(self, block, allow=None,
confirm_allowed=None, confirm_blocked=None):
if allow is None:
allow = self._delete_fw
if confirm_allowed is None:
confirm_allowed = self._confirm_allowed
if confirm_blocked is None:
confirm_blocked = self._confirm_blocked
ssh_login = CONF.compute.image_ssh_user
public_network_id = CONF.network.public_network_id
@ -58,22 +208,49 @@ class TestFWaaS(base.FWaaSScenarioTest):
public_network_id)
server1_ip = server1_floating_ip.floating_ip_address
self.check_vm_connectivity(server1_ip, username=ssh_login,
private_key=private_key,
should_connect=True)
confirm_allowed(ip_address=server1_ip, username=ssh_login,
private_key=private_key)
ctx = block(server1_ip)
confirm_blocked(ip_address=server1_ip, username=ssh_login,
private_key=private_key)
allow(ctx)
confirm_allowed(ip_address=server1_ip, username=ssh_login,
private_key=private_key)
# Create a firewall to block traffic.
fw_rule = self.create_firewall_rule(
source_ip_address=server1_ip,
action="deny")
fw_policy = self.create_firewall_policy(firewall_rules=[fw_rule['id']])
fw = self.create_firewall(firewall_policy_id=fw_policy['id'])
self.check_vm_connectivity(server1_ip, username=ssh_login,
private_key=private_key,
should_connect=False)
@test.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21a8')
def test_firewall_block_ip(self):
self._test_firewall_basic(
block=self._block_ip,
confirm_blocked=self._confirm_tcp_blocked_but_icmp)
# Remove the firewall so that the VM is reachable again.
self.firewalls_client.delete_firewall(fw['id'])
self.check_vm_connectivity(server1_ip, username=ssh_login,
private_key=private_key,
should_connect=True)
@test.idempotent_id('b985d010-994a-4055-bd5c-9e961464ccde')
def test_firewall_block_icmp(self):
self._test_firewall_basic(block=self._block_icmp)
@test.idempotent_id('ca473af0-26f9-4fad-9550-1c34371c900e')
def test_firewall_insert_rule(self):
self._test_firewall_basic(block=self._block_icmp,
allow=self._allow_ssh_and_icmp)
@test.idempotent_id('54a937a6-cecf-444c-b3f9-b67a1c1b7411')
def test_firewall_remove_rule(self):
self._test_firewall_basic(block=self._block_all_with_default_allow,
allow=self._remove_rule)
@test.idempotent_id('12a18776-9b60-4479-9988-f45971c96a92')
def test_firewall_disable_rule(self):
self._test_firewall_basic(block=self._block_all_with_default_allow,
allow=self._disable_rule)
@test.idempotent_id('a2a58c1f-49ad-4b5f-9463-e746b9efe08a')
def test_firewall_empty_policy(self):
self._test_firewall_basic(block=self._empty_policy)
@test.idempotent_id('477a47e0-5156-4784-9417-f77970d85c36')
def test_firewall_all_disabled_rules(self):
self._test_firewall_basic(block=self._all_disabled_rules)
@test.idempotent_id('a83f51c5-1a18-4d2a-a778-c368e4d95c29')
def test_firewall_admin_disable(self):
self._test_firewall_basic(block=self._admin_disable,
allow=self._admin_enable)