Merge "Add unit tests for nova.virt.firewall.IpTablesFirewallDriver (Part 3)"

This commit is contained in:
Jenkins 2016-08-16 03:01:34 +00:00 committed by Gerrit Code Review
commit fb190f30a9
1 changed files with 175 additions and 2 deletions

View File

@ -20,6 +20,8 @@ from nova.virt import firewall
_IPT_DRIVER_CLS = firewall.IptablesFirewallDriver
_FN_INSTANCE_RULES = 'instance_rules'
_FN_ADD_FILTERS = 'add_filters_for_instance'
_FN_DO_BASIC_RULES = '_do_basic_rules'
_FN_DO_DHCP_RULES = '_do_dhcp_rules'
class TestIptablesFirewallDriver(test.NoDBTestCase):
@ -68,7 +70,8 @@ class TestIptablesFirewallDriver(test.NoDBTestCase):
fake_instance = objects.Instance(id=123)
ii_mock.pop.return_value = True
self.driver.unfilter_instance(fake_instance, 'fakenetinfo')
self.driver.unfilter_instance(fake_instance,
mock.sentinel.net_info)
ii_mock.pop.assert_called_once_with(fake_instance.id, None)
rfii_mock.assert_called_once_with(fake_instance)
@ -81,7 +84,8 @@ class TestIptablesFirewallDriver(test.NoDBTestCase):
fake_instance = objects.Instance(id=123)
ii_mock.pop.return_value = False
self.driver.unfilter_instance(fake_instance, 'fakenetinfo')
self.driver.unfilter_instance(fake_instance,
mock.sentinel.net_info)
ii_mock.pop.assert_called_once_with(fake_instance.id, None)
self.assertFalse(rfii_mock.called)
@ -270,3 +274,172 @@ class TestIptablesFirewallDriver(test.NoDBTestCase):
instance.id = "myinstanceid"
instance_chain_name = self.driver._instance_chain_name(instance)
self.assertEqual(instance_chain_name, 'inst-myinstanceid')
def test_do_basic_rules(self):
ipv4_rules = ['rule1']
ipv6_rules = ['rule2']
self.driver._do_basic_rules(ipv4_rules, ipv6_rules,
mock.sentinel.net_info)
self.assertEqual(ipv4_rules,
['rule1', '-m state --state INVALID -j DROP',
'-m state --state ESTABLISHED,RELATED -j ACCEPT'])
self.assertEqual(ipv6_rules,
['rule2', '-m state --state INVALID -j DROP',
'-m state --state ESTABLISHED,RELATED -j ACCEPT'])
def test_do_dhcp_rules(self):
subnet1 = mock.Mock()
subnet1.get_meta = mock.Mock(return_value='mydhcp')
subnet2 = mock.Mock()
subnet2.get_meta = mock.Mock(return_value=None)
self.driver._get_subnets = mock.Mock(return_value=[subnet1, subnet2])
ipv4_rules = ['rule1']
self.driver._do_dhcp_rules(ipv4_rules, mock.sentinel.net_info)
self.assertEqual(ipv4_rules,
['rule1',
'-s mydhcp -p udp --sport 67 --dport 68 -j ACCEPT'])
def test_do_project_network_rules(self):
self.flags(use_ipv6=True)
subnet1 = {'cidr': 'mycidr1'}
subnet2 = {'cidr': 'mycidr2'}
ipv4_rules = ['rule1']
ipv6_rules = ['rule2']
self.driver._get_subnets = mock.Mock(return_value=[subnet1, subnet2])
self.driver._do_project_network_rules(ipv4_rules, ipv6_rules,
mock.sentinel.net_info)
self.assertEqual(ipv4_rules,
['rule1',
'-s mycidr1 -j ACCEPT', '-s mycidr2 -j ACCEPT'])
self.assertEqual(ipv6_rules,
['rule2',
'-s mycidr1 -j ACCEPT', '-s mycidr2 -j ACCEPT'])
def test_do_ra_rules(self):
subnet1 = {'gateway': {'address': 'myaddress1'}}
subnet2 = {'gateway': {'address': 'myaddress2'}}
self.driver._get_subnets = \
mock.Mock(return_value=[subnet1, subnet2])
ipv6_rules = ['rule1']
self.driver._do_ra_rules(ipv6_rules, mock.sentinel.net_info)
self.assertEqual(ipv6_rules, ['rule1',
'-s myaddress1/128 -p icmpv6 -j ACCEPT',
'-s myaddress2/128 -p icmpv6 -j ACCEPT'])
def test_build_icmp_rule(self):
rule = mock.Mock()
# invalid icmp type
rule.from_port = -1
icmp_rule = self.driver._build_icmp_rule(rule, 4)
self.assertEqual(icmp_rule, [])
# version 4 invalid icmp code
rule.from_port = 123
rule.to_port = -1
icmp_rule = self.driver._build_icmp_rule(rule, 4)
self.assertEqual(icmp_rule,
['-m', 'icmp', '--icmp-type', '123'])
# version 6 valid icmp code
rule.from_port = 123
rule.to_port = 456
icmp_rule = self.driver._build_icmp_rule(rule, 6)
self.assertEqual(icmp_rule,
['-m', 'icmp6', '--icmpv6-type', '123/456'])
def test_build_tcp_udp_rule(self):
rule = mock.Mock()
# equal from and to port
rule.from_port = 123
rule.to_port = 123
tu_rule = self.driver._build_tcp_udp_rule(rule, 42)
self.assertEqual(tu_rule, ['--dport', '123'])
# different from and to port
rule.to_port = 456
tu_rule = self.driver._build_tcp_udp_rule(rule, 42)
self.assertEqual(tu_rule, ['-m', 'multiport', '--dports', '123:456'])
def setup_instance_rules(self, ins_obj_cls_mock):
"""Create necessary mock varibles for instance_rules.
The i_mock and ni_mock represent instance_rules parameters
instance and network_info.
The i_obj_mock represent the return vaue for nova.objects.Instance.
"""
i_mock = mock.MagicMock(spec=dict)
ni_mock = mock.MagicMock(spec=dict)
i_obj_mock = mock.MagicMock()
ins_obj_cls_mock._from_db_object.return_value = i_obj_mock
driver = firewall.IptablesFirewallDriver()
return i_mock, ni_mock, i_obj_mock, driver
@mock.patch('nova.objects.SecurityGroupRuleList')
@mock.patch.object(_IPT_DRIVER_CLS, _FN_DO_DHCP_RULES)
@mock.patch.object(_IPT_DRIVER_CLS, _FN_DO_BASIC_RULES)
@mock.patch('nova.objects.Instance')
@mock.patch('nova.context.get_admin_context',
return_value=mock.sentinel.ctx)
@mock.patch('nova.network.linux_net.iptables_manager')
def test_instance_rules_no_secgroups(self, _iptm_mock, ctx_mock,
ins_obj_cls_mock, _do_basic_mock, _do_dhcp_mock,
sec_grp_list_mock):
i_mock, ni_mock, i_obj_mock, driver = self.setup_instance_rules(
ins_obj_cls_mock)
# Simple unit test that verifies that the fallback jump
# is the only rule added to the returned list of rules if
# no secgroups are found (we ignore the basic and DHCP
# rule additions here)
sec_grp_list_mock.get_by_instance.return_value = []
v4_rules, v6_rules = driver.instance_rules(i_mock, ni_mock)
ins_obj_cls_mock._from_db_object.assert_called_once_with(
mock.sentinel.ctx, mock.ANY, i_mock, mock.ANY)
sec_grp_list_mock.get_by_instance.assert_called_once_with(
mock.sentinel.ctx, i_obj_mock)
expected = ['-j $sg-fallback']
self.assertEqual(expected, v4_rules)
self.assertEqual(expected, v6_rules)
@mock.patch('nova.objects.SecurityGroupRuleList')
@mock.patch('nova.objects.SecurityGroupList')
@mock.patch.object(_IPT_DRIVER_CLS, _FN_DO_DHCP_RULES)
@mock.patch.object(_IPT_DRIVER_CLS, _FN_DO_BASIC_RULES)
@mock.patch('nova.objects.Instance')
@mock.patch('nova.context.get_admin_context',
return_value=mock.sentinel.ctx)
@mock.patch('nova.network.linux_net.iptables_manager')
def test_instance_rules_cidr(self, _iptm_mock, ctx_mock,
ins_obj_cls_mock, _do_basic_mock, _do_dhcp_mock,
sec_grp_list_mock, sec_grp_rule_list_mock):
i_mock, ni_mock, i_obj_mock, driver = self.setup_instance_rules(
ins_obj_cls_mock)
# Tests that sec group rules that contain a CIDR (i.e. the
# rule does not contain a grantee group of instances) populates
# the returned iptables rules with appropriate ingress and
# egress filters.
sec_grp_list_mock.get_by_instance.return_value = [
mock.sentinel.sec_grp
]
sec_grp_rule_list_mock.get_by_security_group.return_value = [
{
"cidr": "192.168.1.0/24",
"protocol": "tcp",
"to_port": "22",
"from_port": "22"
}
]
v4_rules, v6_rules = driver.instance_rules(i_mock, ni_mock)
expected = [
# '-j ACCEPT -p tcp --dport 22 -s 192.168.1.0/24',
'-j $sg-fallback'
]
self.assertEqual(expected, v4_rules)
expected = ['-j $sg-fallback']
self.assertEqual(expected, v6_rules)