Avoid testing code duplication which introduced testing bugs

SecurityGroupAgentEnhancedRpcTestCase duplicated code in
SecurityGroupAgentRpcTestCase setUp, also
TestSecurityGroupAgentEnhancedRpcWithIptables duplicated
code from TestSecurityGroupAgentWithIptables setUp()
introducing bugs by improper initialization, like a missing
    self.iptables.use_ipv6 = True
which in combination with tests.unit.test_ipv6.TestIsEnabled
produced inconsistent testing results.

Change-Id: Ic0b596b6fe1d8273df197e9426abad818832c00a
Closes-bug: #1365829
This commit is contained in:
Miguel Angel Ajo 2014-09-05 14:10:35 +02:00
parent 3b46f2ca28
commit 74a292ceba
1 changed files with 31 additions and 61 deletions

View File

@ -943,21 +943,15 @@ class SecurityGroupAgentRpcTestCaseForNoneDriver(base.BaseTestCase):
'NoopFirewallDriver')
class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
class BaseSecurityGroupAgentRpcTestCase(base.BaseTestCase):
def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentRpcTestCase, self).setUp()
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
super(BaseSecurityGroupAgentRpcTestCase, self).setUp()
set_firewall_driver(FIREWALL_NOOP_DRIVER)
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
mock.patch('neutron.agent.linux.iptables_manager').start()
self.agent.root_helper = 'sudo'
rpc = mock.Mock()
rpc.security_group_info_for_devices.side_effect = (
messaging.UnsupportedVersion('1.2'))
self.agent.plugin_rpc = rpc
self.agent.plugin_rpc = mock.Mock()
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
@ -970,9 +964,18 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
'fake_sgid1',
'remote_group_id':
'fake_sgid2'}]}
fake_devices = {'fake_device': self.fake_device}
self.firewall.ports = fake_devices
rpc.security_group_rules_for_devices.return_value = fake_devices
self.firewall.ports = {'fake_device': self.fake_device}
class SecurityGroupAgentRpcTestCase(BaseSecurityGroupAgentRpcTestCase):
def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentRpcTestCase, self).setUp(
defer_refresh_firewall)
rpc = self.agent.plugin_rpc
rpc.security_group_info_for_devices.side_effect = (
messaging.UnsupportedVersion('1.2'))
rpc.security_group_rules_for_devices.return_value = (
self.firewall.ports)
def test_prepare_and_remove_devices_filter(self):
self.agent.prepare_devices_filter(['fake_device'])
@ -1041,39 +1044,20 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase):
self.firewall.assert_has_calls([])
class SecurityGroupAgentEnhancedRpcTestCase(base.BaseTestCase):
class SecurityGroupAgentEnhancedRpcTestCase(
BaseSecurityGroupAgentRpcTestCase):
def setUp(self, defer_refresh_firewall=False):
super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp()
cfg.CONF.set_default('firewall_driver',
'neutron.agent.firewall.NoopFirewallDriver',
group='SECURITYGROUP')
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
mock.patch('neutron.agent.linux.iptables_manager').start()
self.agent.root_helper = 'sudo'
rpc = mock.Mock()
self.agent.plugin_rpc = rpc
self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall)
self.firewall = mock.Mock()
firewall_object = firewall_base.FirewallDriver()
self.firewall.defer_apply.side_effect = firewall_object.defer_apply
self.agent.firewall = self.firewall
self.fake_device = {'device': 'fake_device',
'security_groups': ['fake_sgid1', 'fake_sgid2'],
'security_group_source_groups': ['fake_sgid2'],
'security_group_rules': [{'security_group_id':
'fake_sgid1',
'remote_group_id':
'fake_sgid2'}]}
fake_devices = {'fake_device': self.fake_device}
super(SecurityGroupAgentEnhancedRpcTestCase, self).setUp(
defer_refresh_firewall=defer_refresh_firewall)
fake_sg_info = {
'security_groups': {
'fake_sgid1': [
{'remote_group_id': 'fake_sgid2'}], 'fake_sgid2': []},
'sg_member_ips': {'fake_sgid2': {'IPv4': [], 'IPv6': []}},
'devices': fake_devices}
self.firewall.ports = fake_devices
rpc.security_group_info_for_devices.return_value = fake_sg_info
'devices': self.firewall.ports}
self.agent.plugin_rpc.security_group_info_for_devices.return_value = (
fake_sg_info)
def test_prepare_and_remove_devices_filter_enhanced_rpc(self):
self.agent.prepare_devices_filter(['fake_device'])
@ -2015,16 +1999,13 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
PHYSDEV_INGRESS = 'physdev-out'
PHYSDEV_EGRESS = 'physdev-in'
def setUp(self, defer_refresh_firewall=False):
def setUp(self, defer_refresh_firewall=False, test_rpc_v1_1=True):
super(TestSecurityGroupAgentWithIptables, self).setUp()
config.register_root_helper(cfg.CONF)
cfg.CONF.set_override(
'lock_path',
'$state_path/lock')
cfg.CONF.set_override(
'firewall_driver',
self.FIREWALL_DRIVER,
group='SECURITYGROUP')
set_firewall_driver(self.FIREWALL_DRIVER)
self.agent = sg_rpc.SecurityGroupAgentRpcMixin()
self.agent.context = None
@ -2033,8 +2014,10 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase):
self.agent.root_helper = 'sudo'
self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc
self.rpc.security_group_info_for_devices.side_effect = (
messaging.UnsupportedVersion('1.2'))
if test_rpc_v1_1:
self.rpc.security_group_info_for_devices.side_effect = (
messaging.UnsupportedVersion('1.2'))
self.agent.init_firewall(
defer_refresh_firewall=defer_refresh_firewall)
@ -2216,21 +2199,8 @@ class TestSecurityGroupAgentEnhancedRpcWithIptables(
TestSecurityGroupAgentWithIptables):
def setUp(self, defer_refresh_firewall=False):
super(TestSecurityGroupAgentEnhancedRpcWithIptables, self).setUp(
defer_refresh_firewall)
self.rpc = mock.Mock()
self.agent.plugin_rpc = self.rpc
defer_refresh_firewall=defer_refresh_firewall, test_rpc_v1_1=False)
self.sg_info = self.rpc.security_group_info_for_devices
self.agent.init_firewall(
defer_refresh_firewall=defer_refresh_firewall)
self.iptables = self.agent.firewall.iptables
self.iptables_execute = mock.patch.object(self.iptables,
"execute").start()
self.iptables_execute_return_values = []
self.expected_call_count = 0
self.expected_calls = []
self.expected_process_inputs = []
self.iptables_execute.side_effect = (
self.iptables_execute_return_values)
rule1 = [{'direction': 'ingress',
'protocol': const.PROTO_NAME_UDP,