Add test cases to testing firewall drivers
Part of this patch is also preparation for having common test plan for firewall driver testing. Following test cases were implemented: - dhcp works by default - dhcp server is prevented on vm by default - ip spoofing from vm - allowed address pairs allows traffic to given ip - arp can go through - ingress/egress traffic with src/dest port ranges Related-bug: #1461000 Change-Id: Ib00c99f236855e6556f43f4ffc55014c73b077bb
This commit is contained in:
parent
c429ee18b6
commit
a94777a005
|
@ -7,6 +7,7 @@
|
|||
# enable ping from namespace
|
||||
ping_filter: CommandFilter, ping, root
|
||||
ping6_filter: CommandFilter, ping6, root
|
||||
ping_kill: KillFilter, root, ping, -2
|
||||
|
||||
# enable curl from namespace
|
||||
curl_filter: RegExpFilter, /usr/bin/curl, root, curl, --max-time, \d+, -D-, http://[0-9a-z:./-]+
|
||||
|
|
|
@ -1,197 +0,0 @@
|
|||
# Copyright 2015 Intel Corporation.
|
||||
# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
|
||||
# <isaku.yamahata at gmail com>
|
||||
# All Rights Reserved.
|
||||
#
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
|
||||
from neutron.agent.linux import iptables_firewall
|
||||
from neutron.agent import securitygroups_rpc as sg_cfg
|
||||
from neutron.common import constants
|
||||
from neutron.tests.common import machine_fixtures
|
||||
from neutron.tests.common import net_helpers
|
||||
from neutron.tests.functional import base
|
||||
from oslo_config import cfg
|
||||
|
||||
DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
|
||||
|
||||
|
||||
class IptablesFirewallTestCase(base.BaseSudoTestCase):
|
||||
MAC_REAL = "fa:16:3e:9a:2f:49"
|
||||
MAC_SPOOFED = "fa:16:3e:9a:2f:48"
|
||||
FAKE_SECURITY_GROUP_ID = "fake_sg_id"
|
||||
|
||||
def _set_src_mac(self, mac):
|
||||
self.client.port.link.set_down()
|
||||
self.client.port.link.set_address(mac)
|
||||
self.client.port.link.set_up()
|
||||
|
||||
def setUp(self):
|
||||
cfg.CONF.register_opts(sg_cfg.security_group_opts, 'SECURITYGROUP')
|
||||
super(IptablesFirewallTestCase, self).setUp()
|
||||
|
||||
bridge = self.useFixture(net_helpers.LinuxBridgeFixture()).bridge
|
||||
self.client, self.server = self.useFixture(
|
||||
machine_fixtures.PeerMachines(bridge)).machines
|
||||
|
||||
self.firewall = iptables_firewall.IptablesFirewallDriver(
|
||||
namespace=bridge.namespace)
|
||||
|
||||
self._set_src_mac(self.MAC_REAL)
|
||||
|
||||
client_br_port_name = net_helpers.VethFixture.get_peer_name(
|
||||
self.client.port.name)
|
||||
self.src_port_desc = {'admin_state_up': True,
|
||||
'device': client_br_port_name,
|
||||
'device_owner': DEVICE_OWNER_COMPUTE,
|
||||
'fixed_ips': [self.client.ip],
|
||||
'mac_address': self.MAC_REAL,
|
||||
'port_security_enabled': True,
|
||||
'security_groups': [self.FAKE_SECURITY_GROUP_ID],
|
||||
'status': 'ACTIVE'}
|
||||
|
||||
# setup firewall on bridge and send packet from src_veth and observe
|
||||
# if sent packet can be observed on dst_veth
|
||||
def test_port_sec_within_firewall(self):
|
||||
|
||||
# update the sg_group to make ping pass
|
||||
sg_rules = [{'ethertype': 'IPv4', 'direction': 'ingress',
|
||||
'source_ip_prefix': '0.0.0.0/0', 'protocol': 'icmp'},
|
||||
{'ethertype': 'IPv4', 'direction': 'egress'}]
|
||||
|
||||
with self.firewall.defer_apply():
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID,
|
||||
sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
self.client.assert_ping(self.server.ip)
|
||||
|
||||
# modify the src_veth's MAC and test again
|
||||
self._set_src_mac(self.MAC_SPOOFED)
|
||||
self.client.assert_no_ping(self.server.ip)
|
||||
|
||||
# update the port's port_security_enabled value and test again
|
||||
self.src_port_desc['port_security_enabled'] = False
|
||||
self.firewall.update_port_filter(self.src_port_desc)
|
||||
self.client.assert_ping(self.server.ip)
|
||||
|
||||
def test_rule_application_converges(self):
|
||||
sg_rules = [{'ethertype': 'IPv4', 'direction': 'egress'},
|
||||
{'ethertype': 'IPv6', 'direction': 'egress'},
|
||||
{'ethertype': 'IPv4', 'direction': 'ingress',
|
||||
'source_ip_prefix': '0.0.0.0/0', 'protocol': 'icmp'},
|
||||
{'ethertype': 'IPv6', 'direction': 'ingress',
|
||||
'source_ip_prefix': '0::0/0', 'protocol': 'ipv6-icmp'}]
|
||||
# make sure port ranges converge on all protocols with and without
|
||||
# port ranges (prevents regression of bug 1502924)
|
||||
for proto in ('tcp', 'udp', 'icmp'):
|
||||
for version in ('IPv4', 'IPv6'):
|
||||
if proto == 'icmp' and version == 'IPv6':
|
||||
proto = 'ipv6-icmp'
|
||||
base = {'ethertype': version, 'direction': 'ingress',
|
||||
'protocol': proto}
|
||||
sg_rules.append(copy.copy(base))
|
||||
base['port_range_min'] = 50
|
||||
base['port_range_max'] = 50
|
||||
sg_rules.append(copy.copy(base))
|
||||
base['port_range_max'] = 55
|
||||
sg_rules.append(copy.copy(base))
|
||||
base['source_port_range_min'] = 60
|
||||
base['source_port_range_max'] = 60
|
||||
sg_rules.append(copy.copy(base))
|
||||
base['source_port_range_max'] = 65
|
||||
sg_rules.append(copy.copy(base))
|
||||
|
||||
# add some single-host rules to prevent regression of bug 1502917
|
||||
sg_rules.append({'ethertype': 'IPv4', 'direction': 'ingress',
|
||||
'source_ip_prefix': '77.77.77.77/32'})
|
||||
sg_rules.append({'ethertype': 'IPv6', 'direction': 'ingress',
|
||||
'source_ip_prefix': 'fe80::1/128'})
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
# after one prepare call, another apply should be a NOOP
|
||||
self.assertEqual([], self.firewall.iptables._apply())
|
||||
|
||||
orig_sg_rules = copy.copy(sg_rules)
|
||||
for proto in ('tcp', 'udp', 'icmp'):
|
||||
for version in ('IPv4', 'IPv6'):
|
||||
if proto == 'icmp' and version == 'IPv6':
|
||||
proto = 'ipv6-icmp'
|
||||
# make sure firewall is in converged state
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, orig_sg_rules)
|
||||
self.firewall.update_port_filter(self.src_port_desc)
|
||||
sg_rules = copy.copy(orig_sg_rules)
|
||||
|
||||
# remove one rule and add another to make sure it results in
|
||||
# exactly one delete and insert
|
||||
sg_rules.pop(0 if version == 'IPv4' else 1)
|
||||
sg_rules.append({'ethertype': version, 'direction': 'egress',
|
||||
'protocol': proto})
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
result = self.firewall.update_port_filter(self.src_port_desc)
|
||||
deletes = [r for r in result if r.startswith('-D ')]
|
||||
creates = [r for r in result if r.startswith('-I ')]
|
||||
self.assertEqual(1, len(deletes))
|
||||
self.assertEqual(1, len(creates))
|
||||
# quick sanity check to make sure the insert was for the
|
||||
# correct proto
|
||||
self.assertIn('-p %s' % proto, creates[0])
|
||||
# another apply should be a NOOP if the right rule was removed
|
||||
# and the new one was inserted in the correct position
|
||||
self.assertEqual([], self.firewall.iptables._apply())
|
||||
|
||||
def test_rule_ordering_correct(self):
|
||||
sg_rules = [
|
||||
{'ethertype': 'IPv4', 'direction': 'egress', 'protocol': 'tcp',
|
||||
'port_range_min': i, 'port_range_max': i}
|
||||
for i in range(50, 61)
|
||||
]
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
|
||||
# remove a rule and add a new one
|
||||
sg_rules.pop(5)
|
||||
sg_rules.insert(8, {'ethertype': 'IPv4', 'direction': 'egress',
|
||||
'protocol': 'tcp', 'port_range_min': 400,
|
||||
'port_range_max': 400})
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
|
||||
|
||||
# reverse all of the rules (requires lots of deletes and inserts)
|
||||
sg_rules = list(reversed(sg_rules))
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
|
||||
|
||||
def _assert_sg_out_tcp_rules_appear_in_order(self, sg_rules):
|
||||
outgoing_rule_pref = '-A %s-o%s' % (self.firewall.iptables.wrap_name,
|
||||
self.src_port_desc['device'][3:13])
|
||||
rules = [
|
||||
r for r in self.firewall.iptables.get_rules_for_table('filter')
|
||||
if r.startswith(outgoing_rule_pref)
|
||||
]
|
||||
# we want to ensure the rules went in in the same order we sent
|
||||
indexes = [rules.index('%s -p tcp -m tcp --dport %s -j RETURN' %
|
||||
(outgoing_rule_pref, i['port_range_min']))
|
||||
for i in sg_rules]
|
||||
# all indexes should be in order with no unexpected rules in between
|
||||
self.assertEqual(range(indexes[0], indexes[-1] + 1), indexes)
|
|
@ -0,0 +1,424 @@
|
|||
# Copyright 2015 Intel Corporation.
|
||||
# Copyright 2015 Isaku Yamahata <isaku.yamahata at intel com>
|
||||
# <isaku.yamahata at gmail com>
|
||||
# Copyright 2015 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
import testscenarios
|
||||
|
||||
import netaddr
|
||||
from oslo_config import cfg
|
||||
|
||||
from neutron.agent import firewall
|
||||
from neutron.agent.linux import iptables_firewall
|
||||
from neutron.agent import securitygroups_rpc as sg_cfg
|
||||
from neutron.common import constants
|
||||
from neutron.tests.common import conn_testers
|
||||
from neutron.tests.functional import base
|
||||
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
|
||||
reverse_direction = {
|
||||
conn_testers.ConnectionTester.INGRESS:
|
||||
conn_testers.ConnectionTester.EGRESS,
|
||||
conn_testers.ConnectionTester.EGRESS:
|
||||
conn_testers.ConnectionTester.INGRESS}
|
||||
reverse_transport_protocol = {
|
||||
conn_testers.ConnectionTester.TCP: conn_testers.ConnectionTester.UDP,
|
||||
conn_testers.ConnectionTester.UDP: conn_testers.ConnectionTester.TCP}
|
||||
|
||||
|
||||
DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
|
||||
|
||||
|
||||
def _add_rule(sg_rules, base, port_range_min=None, port_range_max=None):
|
||||
rule = copy.copy(base)
|
||||
if port_range_min:
|
||||
rule['port_range_min'] = port_range_min
|
||||
if port_range_max:
|
||||
rule['port_range_max'] = port_range_max
|
||||
sg_rules.append(rule)
|
||||
|
||||
|
||||
class FirewallTestCase(base.BaseSudoTestCase):
|
||||
FAKE_SECURITY_GROUP_ID = 'fake_sg_id'
|
||||
MAC_SPOOFED = "fa:16:3e:9a:2f:48"
|
||||
scenarios = [('IptablesFirewallDriver without ipset',
|
||||
{'enable_ipset': False}),
|
||||
('IptablesFirewallDriver with ipset',
|
||||
{'enable_ipset': True})]
|
||||
|
||||
def create_iptables_firewall(self):
|
||||
cfg.CONF.set_override('enable_ipset', self.enable_ipset,
|
||||
'SECURITYGROUP')
|
||||
return iptables_firewall.IptablesFirewallDriver(
|
||||
namespace=self.tester.bridge_namespace)
|
||||
|
||||
@staticmethod
|
||||
def _create_port_description(port_id, ip_addresses, mac_address, sg_ids):
|
||||
return {'admin_state_up': True,
|
||||
'device': port_id,
|
||||
'device_owner': DEVICE_OWNER_COMPUTE,
|
||||
'fixed_ips': ip_addresses,
|
||||
'mac_address': mac_address,
|
||||
'port_security_enabled': True,
|
||||
'security_groups': sg_ids,
|
||||
'status': 'ACTIVE'}
|
||||
|
||||
def setUp(self):
|
||||
cfg.CONF.register_opts(sg_cfg.security_group_opts, 'SECURITYGROUP')
|
||||
super(FirewallTestCase, self).setUp()
|
||||
self.tester = self.useFixture(
|
||||
conn_testers.LinuxBridgeConnectionTester())
|
||||
self.firewall = self.create_iptables_firewall()
|
||||
vm_mac = self.tester.vm_mac_address
|
||||
vm_port_id = self.tester.vm_port_id
|
||||
self.src_port_desc = self._create_port_description(
|
||||
vm_port_id, [self.tester.vm_ip_address], vm_mac,
|
||||
[self.FAKE_SECURITY_GROUP_ID])
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
|
||||
def _apply_security_group_rules(self, sg_id, sg_rules):
|
||||
with self.firewall.defer_apply():
|
||||
self.firewall.update_security_group_rules(sg_id, sg_rules)
|
||||
|
||||
def test_rule_application_converges(self):
|
||||
sg_rules = [{'ethertype': 'IPv4', 'direction': 'egress'},
|
||||
{'ethertype': 'IPv6', 'direction': 'egress'},
|
||||
{'ethertype': 'IPv4', 'direction': 'ingress',
|
||||
'source_ip_prefix': '0.0.0.0/0', 'protocol': 'icmp'},
|
||||
{'ethertype': 'IPv6', 'direction': 'ingress',
|
||||
'source_ip_prefix': '0::0/0', 'protocol': 'ipv6-icmp'}]
|
||||
# make sure port ranges converge on all protocols with and without
|
||||
# port ranges (prevents regression of bug 1502924)
|
||||
for proto in ('tcp', 'udp', 'icmp'):
|
||||
for version in ('IPv4', 'IPv6'):
|
||||
if proto == 'icmp' and version == 'IPv6':
|
||||
proto = 'ipv6-icmp'
|
||||
base = {'ethertype': version, 'direction': 'ingress',
|
||||
'protocol': proto}
|
||||
sg_rules.append(copy.copy(base))
|
||||
_add_rule(sg_rules, base, port_range_min=50,
|
||||
port_range_max=50)
|
||||
_add_rule(sg_rules, base, port_range_max=55)
|
||||
_add_rule(sg_rules, base, port_range_min=60,
|
||||
port_range_max=60)
|
||||
_add_rule(sg_rules, base, port_range_max=65)
|
||||
|
||||
# add some single-host rules to prevent regression of bug 1502917
|
||||
sg_rules.append({'ethertype': 'IPv4', 'direction': 'ingress',
|
||||
'source_ip_prefix': '77.77.77.77/32'})
|
||||
sg_rules.append({'ethertype': 'IPv6', 'direction': 'ingress',
|
||||
'source_ip_prefix': 'fe80::1/128'})
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
# after one prepare call, another apply should be a NOOP
|
||||
self.assertEqual([], self.firewall.iptables._apply())
|
||||
|
||||
orig_sg_rules = copy.copy(sg_rules)
|
||||
for proto in ('tcp', 'udp', 'icmp'):
|
||||
for version in ('IPv4', 'IPv6'):
|
||||
if proto == 'icmp' and version == 'IPv6':
|
||||
proto = 'ipv6-icmp'
|
||||
# make sure firewall is in converged state
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, orig_sg_rules)
|
||||
self.firewall.update_port_filter(self.src_port_desc)
|
||||
sg_rules = copy.copy(orig_sg_rules)
|
||||
|
||||
# remove one rule and add another to make sure it results in
|
||||
# exactly one delete and insert
|
||||
sg_rules.pop(0 if version == 'IPv4' else 1)
|
||||
sg_rules.append({'ethertype': version, 'direction': 'egress',
|
||||
'protocol': proto})
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
result = self.firewall.update_port_filter(self.src_port_desc)
|
||||
deletes = [r for r in result if r.startswith('-D ')]
|
||||
creates = [r for r in result if r.startswith('-I ')]
|
||||
self.assertEqual(1, len(deletes))
|
||||
self.assertEqual(1, len(creates))
|
||||
# quick sanity check to make sure the insert was for the
|
||||
# correct proto
|
||||
self.assertIn('-p %s' % proto, creates[0])
|
||||
# another apply should be a NOOP if the right rule was removed
|
||||
# and the new one was inserted in the correct position
|
||||
self.assertEqual([], self.firewall.iptables._apply())
|
||||
|
||||
def test_rule_ordering_correct(self):
|
||||
sg_rules = [
|
||||
{'ethertype': 'IPv4', 'direction': 'egress', 'protocol': 'tcp',
|
||||
'port_range_min': i, 'port_range_max': i}
|
||||
for i in range(50, 61)
|
||||
]
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
|
||||
# remove a rule and add a new one
|
||||
sg_rules.pop(5)
|
||||
sg_rules.insert(8, {'ethertype': 'IPv4', 'direction': 'egress',
|
||||
'protocol': 'tcp', 'port_range_min': 400,
|
||||
'port_range_max': 400})
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
|
||||
|
||||
# reverse all of the rules (requires lots of deletes and inserts)
|
||||
sg_rules = list(reversed(sg_rules))
|
||||
self.firewall.update_security_group_rules(
|
||||
self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.firewall.prepare_port_filter(self.src_port_desc)
|
||||
self._assert_sg_out_tcp_rules_appear_in_order(sg_rules)
|
||||
|
||||
def _assert_sg_out_tcp_rules_appear_in_order(self, sg_rules):
|
||||
outgoing_rule_pref = '-A %s-o%s' % (self.firewall.iptables.wrap_name,
|
||||
self.src_port_desc['device'][3:13])
|
||||
rules = [
|
||||
r for r in self.firewall.iptables.get_rules_for_table('filter')
|
||||
if r.startswith(outgoing_rule_pref)
|
||||
]
|
||||
# we want to ensure the rules went in in the same order we sent
|
||||
indexes = [rules.index('%s -p tcp -m tcp --dport %s -j RETURN' %
|
||||
(outgoing_rule_pref, i['port_range_min']))
|
||||
for i in sg_rules]
|
||||
# all indexes should be in order with no unexpected rules in between
|
||||
self.assertEqual(range(indexes[0], indexes[-1] + 1), indexes)
|
||||
|
||||
def test_ingress_icmp_secgroup(self):
|
||||
# update the sg_group to make ping pass
|
||||
sg_rules = [{'ethertype': constants.IPv4,
|
||||
'direction': firewall.INGRESS_DIRECTION,
|
||||
'protocol': constants.PROTO_NAME_ICMP},
|
||||
{'ethertype': constants.IPv4,
|
||||
'direction': firewall.EGRESS_DIRECTION}]
|
||||
|
||||
self.tester.assert_no_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
|
||||
def test_mac_spoofing(self):
|
||||
sg_rules = [{'ethertype': constants.IPv4,
|
||||
'direction': firewall.INGRESS_DIRECTION,
|
||||
'protocol': constants.PROTO_NAME_ICMP},
|
||||
{'ethertype': constants.IPv4,
|
||||
'direction': firewall.EGRESS_DIRECTION}]
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.vm_mac_address = self.MAC_SPOOFED
|
||||
self.tester.flush_arp_tables()
|
||||
self.tester.assert_no_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.assert_no_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.EGRESS)
|
||||
|
||||
def test_mac_spoofing_works_without_port_security_enabled(self):
|
||||
self.src_port_desc['port_security_enabled'] = False
|
||||
self.firewall.update_port_filter(self.src_port_desc)
|
||||
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.vm_mac_address = self.MAC_SPOOFED
|
||||
self.tester.flush_arp_tables()
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.EGRESS)
|
||||
|
||||
def test_port_security_enabled_set_to_false(self):
|
||||
self.tester.assert_no_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.src_port_desc['port_security_enabled'] = False
|
||||
self.firewall.update_port_filter(self.src_port_desc)
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
|
||||
def test_dhcp_requests_from_vm(self):
|
||||
# DHCPv4 uses source port 67, destination port 68
|
||||
self.tester.assert_connection(direction=self.tester.EGRESS,
|
||||
protocol=self.tester.UDP,
|
||||
src_port=68, dst_port=67)
|
||||
|
||||
def test_dhcp_server_forbidden_on_vm(self):
|
||||
self.tester.assert_no_connection(direction=self.tester.EGRESS,
|
||||
protocol=self.tester.UDP,
|
||||
src_port=67, dst_port=68)
|
||||
self.tester.assert_no_connection(direction=self.tester.INGRESS,
|
||||
protocol=self.tester.UDP,
|
||||
src_port=68, dst_port=67)
|
||||
|
||||
def test_ip_spoofing(self):
|
||||
sg_rules = [{'ethertype': constants.IPv4,
|
||||
'direction': firewall.INGRESS_DIRECTION,
|
||||
'protocol': constants.PROTO_NAME_ICMP}]
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
not_allowed_ip = "%s/24" % (
|
||||
netaddr.IPAddress(self.tester.vm_ip_address) + 1)
|
||||
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.vm_ip_cidr = not_allowed_ip
|
||||
self.tester.assert_no_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.assert_no_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.EGRESS)
|
||||
|
||||
def test_ip_spoofing_works_without_port_security_enabled(self):
|
||||
self.src_port_desc['port_security_enabled'] = False
|
||||
self.firewall.update_port_filter(self.src_port_desc)
|
||||
|
||||
sg_rules = [{'ethertype': constants.IPv4,
|
||||
'direction': firewall.INGRESS_DIRECTION,
|
||||
'protocol': constants.PROTO_NAME_ICMP}]
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
not_allowed_ip = "%s/24" % (
|
||||
netaddr.IPAddress(self.tester.vm_ip_address) + 1)
|
||||
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.vm_ip_cidr = not_allowed_ip
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.EGRESS)
|
||||
|
||||
def test_allowed_address_pairs(self):
|
||||
sg_rules = [{'ethertype': constants.IPv4,
|
||||
'direction': firewall.INGRESS_DIRECTION,
|
||||
'protocol': constants.PROTO_NAME_ICMP},
|
||||
{'ethertype': constants.IPv4,
|
||||
'direction': firewall.EGRESS_DIRECTION}]
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
|
||||
port_mac = self.tester.vm_mac_address
|
||||
allowed_ip = netaddr.IPAddress(self.tester.vm_ip_address) + 1
|
||||
not_allowed_ip = "%s/24" % (allowed_ip + 1)
|
||||
self.src_port_desc['allowed_address_pairs'] = [
|
||||
{'mac_address': port_mac,
|
||||
'ip_address': allowed_ip}]
|
||||
allowed_ip = "%s/24" % allowed_ip
|
||||
|
||||
self.firewall.update_port_filter(self.src_port_desc)
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.vm_ip_cidr = allowed_ip
|
||||
self.tester.assert_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
self.tester.vm_ip_cidr = not_allowed_ip
|
||||
self.tester.assert_no_connection(protocol=self.tester.ICMP,
|
||||
direction=self.tester.INGRESS)
|
||||
|
||||
def test_arp_is_allowed(self):
|
||||
self.tester.assert_connection(protocol=self.tester.ARP,
|
||||
direction=self.tester.EGRESS)
|
||||
self.tester.assert_connection(protocol=self.tester.ARP,
|
||||
direction=self.tester.INGRESS)
|
||||
|
||||
def _test_rule(self, direction, protocol):
|
||||
sg_rules = [{'ethertype': constants.IPv4, 'direction': direction,
|
||||
'protocol': protocol}]
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
not_allowed_direction = reverse_direction[direction]
|
||||
not_allowed_protocol = reverse_transport_protocol[protocol]
|
||||
|
||||
self.tester.assert_connection(protocol=protocol,
|
||||
direction=direction)
|
||||
self.tester.assert_no_connection(protocol=not_allowed_protocol,
|
||||
direction=direction)
|
||||
self.tester.assert_no_connection(protocol=protocol,
|
||||
direction=not_allowed_direction)
|
||||
|
||||
def test_ingress_tcp_rule(self):
|
||||
self._test_rule(self.tester.INGRESS, self.tester.TCP)
|
||||
|
||||
def test_ingress_udp_rule(self):
|
||||
self._test_rule(self.tester.INGRESS, self.tester.UDP)
|
||||
|
||||
def test_egress_tcp_rule(self):
|
||||
self._test_rule(self.tester.EGRESS, self.tester.TCP)
|
||||
|
||||
def test_egress_udp_rule(self):
|
||||
self._test_rule(self.tester.EGRESS, self.tester.UDP)
|
||||
|
||||
def test_connection_with_destination_port_range(self):
|
||||
port_min = 12345
|
||||
port_max = 12346
|
||||
sg_rules = [{'ethertype': constants.IPv4,
|
||||
'direction': firewall.INGRESS_DIRECTION,
|
||||
'protocol': constants.PROTO_NAME_TCP,
|
||||
'port_range_min': port_min,
|
||||
'port_range_max': port_max}]
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
|
||||
self.tester.assert_connection(protocol=self.tester.TCP,
|
||||
direction=self.tester.INGRESS,
|
||||
dst_port=port_min)
|
||||
self.tester.assert_connection(protocol=self.tester.TCP,
|
||||
direction=self.tester.INGRESS,
|
||||
dst_port=port_max)
|
||||
self.tester.assert_no_connection(protocol=self.tester.TCP,
|
||||
direction=self.tester.INGRESS,
|
||||
dst_port=port_min - 1)
|
||||
self.tester.assert_no_connection(protocol=self.tester.TCP,
|
||||
direction=self.tester.INGRESS,
|
||||
dst_port=port_max + 1)
|
||||
|
||||
def test_connection_with_source_port_range(self):
|
||||
source_port_min = 12345
|
||||
source_port_max = 12346
|
||||
sg_rules = [{'ethertype': constants.IPv4,
|
||||
'direction': firewall.EGRESS_DIRECTION,
|
||||
'protocol': constants.PROTO_NAME_TCP,
|
||||
'source_port_range_min': source_port_min,
|
||||
'source_port_range_max': source_port_max}]
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
|
||||
self.tester.assert_connection(protocol=self.tester.TCP,
|
||||
direction=self.tester.EGRESS,
|
||||
src_port=source_port_min)
|
||||
self.tester.assert_connection(protocol=self.tester.TCP,
|
||||
direction=self.tester.EGRESS,
|
||||
src_port=source_port_max)
|
||||
self.tester.assert_no_connection(protocol=self.tester.TCP,
|
||||
direction=self.tester.EGRESS,
|
||||
src_port=source_port_min - 1)
|
||||
self.tester.assert_no_connection(protocol=self.tester.TCP,
|
||||
direction=self.tester.EGRESS,
|
||||
src_port=source_port_max + 1)
|
||||
|
||||
def test_established_connection_is_not_cut(self):
|
||||
port = 12345
|
||||
sg_rules = [{'ethertype': constants.IPv4,
|
||||
'direction': firewall.INGRESS_DIRECTION,
|
||||
'protocol': constants.PROTO_NAME_TCP,
|
||||
'port_range_min': port,
|
||||
'port_range_max': port}]
|
||||
connection = {'protocol': self.tester.TCP,
|
||||
'direction': self.tester.INGRESS,
|
||||
'dst_port': port}
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, sg_rules)
|
||||
self.tester.establish_connection(**connection)
|
||||
|
||||
self._apply_security_group_rules(self.FAKE_SECURITY_GROUP_ID, list())
|
||||
self.tester.assert_established_connection(**connection)
|
Loading…
Reference in New Issue