Add unit tests for nova.virt.firewall.IpTablesFirewallDriver (Part 4)

There are no unit tests for the base IptablesFirewallDriver.
This patch adds those unit tests.
(This is the fourth part of the abandoned commit
Ie5372b060da1621927a638af915e15f29e885a4c )

Co-Authored-By:
Jay Pipes <jaypipes@gmail.com>
(cherry picked from commit Ia9ef5ead95508cfb27b53b9899a9cfe97d150662)

Chen Li <shchenli@cn.ibm.com>
(Cherry picked from commit Ie2827051e1795c717f8a7762b932a49d182891cd

Change-Id: Ib1d41fe55bf8fcf4ded8b48765f9f1d6fd3d19c1
Partial-bug: #1295889
This commit is contained in:
Julian Sy 2016-08-15 15:53:45 +00:00
parent 07b8a35bac
commit 37d1acd772
1 changed files with 142 additions and 0 deletions

View File

@ -13,6 +13,7 @@
import mock
from nova import exception
from nova import objects
from nova import test
from nova.virt import firewall
@ -443,3 +444,144 @@ class TestIptablesFirewallDriver(test.NoDBTestCase):
self.assertEqual(expected, v4_rules)
expected = ['-j $sg-fallback']
self.assertEqual(expected, v6_rules)
def setup_grantee_group(
self, ins_obj_cls_mock, sec_grp_list_mock, sec_grp_rule_list_mock,
ins_list_mock):
i_mock, ni_mock, i_obj_mock, driver = self.setup_instance_rules(
ins_obj_cls_mock)
# Tests that sec group rules that DO NOT contain a CIDR (i.e. the
# rule contains a grantee group of instances) populates
# the returned iptables rules with appropriate ingress and
# egress filters after calling out to the network API for information
# about the instances in the grantee group.
sec_grp_list_mock.get_by_instance.return_value = [
mock.sentinel.sec_grp
]
sec_grp_rule_list_mock.get_by_security_group.return_value = [
{
"cidr": None,
"grantee_group": mock.sentinel.gg,
"protocol": "tcp",
"to_port": "22",
"from_port": "22"
}
]
i_obj_list_mock = mock.MagicMock()
i_obj_list_mock.info_cache.return_value = {
"deleted": False
}
ins_list_mock.get_by_security_group.return_value = [i_obj_list_mock]
return i_mock, i_obj_mock, ni_mock, driver
@mock.patch('nova.compute.utils.get_nw_info_for_instance')
@mock.patch('nova.objects.InstanceList')
@mock.patch('nova.objects.SecurityGroupRuleList')
@mock.patch('nova.objects.SecurityGroupList')
@mock.patch.object(_IPT_DRIVER_CLS, _FN_DO_DHCP_RULES)
@mock.patch.object(_IPT_DRIVER_CLS, _FN_DO_BASIC_RULES)
@mock.patch('nova.objects.Instance')
@mock.patch('nova.context.get_admin_context',
return_value=mock.sentinel.ctx)
@mock.patch('nova.network.linux_net.iptables_manager')
def test_instance_rules_grantee_group(self, _iptm_mock, ctx_mock,
ins_obj_cls_mock, _do_basic_mock, _do_dhcp_mock,
sec_grp_list_mock, sec_grp_rule_list_mock, ins_list_mock,
get_nw_info_mock):
i_mock, i_obj_mock, ni_mock, driver = self.setup_grantee_group(
ins_obj_cls_mock, sec_grp_list_mock, sec_grp_rule_list_mock,
ins_list_mock)
nw_info_mock = mock.MagicMock()
nw_info_mock.fixed_ips.return_value = [
{
"address": "10.0.1.4",
"version": 4
}
]
get_nw_info_mock.return_value = nw_info_mock
v4_rules, v6_rules = driver.instance_rules(i_mock, ni_mock)
expected = ['-j $sg-fallback']
self.assertEqual(expected, v4_rules)
self.assertEqual(expected, v6_rules)
@mock.patch('nova.compute.utils.get_nw_info_for_instance')
@mock.patch('nova.objects.InstanceList')
@mock.patch('nova.objects.SecurityGroupRuleList')
@mock.patch('nova.objects.SecurityGroupList')
@mock.patch.object(_IPT_DRIVER_CLS, _FN_DO_DHCP_RULES)
@mock.patch.object(_IPT_DRIVER_CLS, _FN_DO_BASIC_RULES)
@mock.patch('nova.objects.Instance')
@mock.patch('nova.context.get_admin_context',
return_value=mock.sentinel.ctx)
@mock.patch('nova.network.linux_net.iptables_manager')
def test_instance_rules_grantee_group_instance_deleted(
self, _iptm_mock, ctx_mock, ins_obj_cls_mock, _do_basic_mock,
_do_dhcp_mock, sec_grp_list_mock, sec_grp_rule_list_mock,
ins_list_mock, get_nw_info_mock):
i_mock, i_obj_mock, ni_mock, driver = self.setup_grantee_group(
ins_obj_cls_mock, sec_grp_list_mock, sec_grp_rule_list_mock,
ins_list_mock)
# Emulate one of the instances in the grantee group being deleted
# in between when the spawn of this instance and when we set up
# network for that instance, and ensure that we do not crash and
# burn but just skip the deleted instance from the iptables filters
get_nw_info_mock.side_effect = exception.InstanceNotFound(
instance_id="_ignored")
v4_rules, v6_rules = driver.instance_rules(i_mock, ni_mock)
expected = ['-j $sg-fallback']
self.assertEqual(expected, v4_rules)
self.assertEqual(expected, v6_rules)
def test_refresh_security_group_rules(self):
self.driver.do_refresh_security_group_rules = mock.Mock()
self.driver.iptables.apply = mock.Mock()
self.driver.refresh_security_group_rules('mysecgroup')
self.driver.do_refresh_security_group_rules \
.assert_called_with('mysecgroup')
self.driver.iptables.apply.assert_called()
def test_refresh_instance_security_rules(self):
self.driver.do_refresh_instance_rules = mock.Mock()
self.driver.iptables.apply = mock.Mock()
self.driver.refresh_instance_security_rules('myinstance')
self.driver.do_refresh_instance_rules.assert_called_with('myinstance')
self.driver.iptables.apply.assert_called()
def test_do_refresh_security_group_rules(self):
self.driver.instance_info = \
{'1': ['myinstance1', 'netinfo1'],
'2': ['myinstance2', 'netinfo2']}
self.driver.instance_rules = \
mock.Mock(return_value=['myipv4rules', 'myipv6rules'])
self.driver._inner_do_refresh_rules = mock.Mock()
self.driver.do_refresh_security_group_rules('mysecgroup')
self.driver.instance_rules.assert_any_call('myinstance1', 'netinfo1')
self.driver.instance_rules.assert_any_call('myinstance2', 'netinfo2')
self.driver._inner_do_refresh_rules.assert_any_call(
'myinstance1', 'netinfo1',
'myipv4rules', 'myipv6rules')
self.driver._inner_do_refresh_rules.assert_any_call(
'myinstance2', 'netinfo2',
'myipv4rules', 'myipv6rules')
def test_do_refresh_instance_rules(self):
instance = mock.Mock()
instance.id = 'myid'
self.driver.instance_info = {instance.id: ['myinstance', 'mynetinfo']}
self.driver.instance_rules = \
mock.Mock(return_value=['myipv4rules', 'myipv6rules'])
self.driver._inner_do_refresh_rules = mock.Mock()
self.driver.do_refresh_instance_rules(instance)
self.driver.instance_rules.assert_called_with(instance, 'mynetinfo')
self.driver._inner_do_refresh_rules.assert_called_with(
instance, 'mynetinfo', 'myipv4rules', 'myipv6rules')