diff --git a/neutron/tests/functional/agent/linux/test_ipset.py b/neutron/tests/functional/agent/linux/test_ipset.py index 703db22449b..a575ad6318b 100644 --- a/neutron/tests/functional/agent/linux/test_ipset.py +++ b/neutron/tests/functional/agent/linux/test_ipset.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014 Red Hat, Inc. +# Copyright (c) 2015 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -21,9 +21,8 @@ from neutron.tests.common import net_helpers from neutron.tests.functional.agent.linux import base from neutron.tests.functional import base as functional_base -IPSET_SET = 'test-set' +MAX_IPSET_NAME_LENGTH = 28 IPSET_ETHERTYPE = 'IPv4' -ICMP_ACCEPT_RULE = '-p icmp -m set --match-set %s src -j ACCEPT' % IPSET_SET UNRELATED_IP = '1.1.1.1' @@ -36,13 +35,17 @@ class IpsetBase(functional_base.BaseSudoTestCase): self.source, self.destination = self.useFixture( machine_fixtures.PeerMachines(bridge)).machines + self.ipset_name = base.get_rand_name(MAX_IPSET_NAME_LENGTH, 'set-') + self.icmp_accept_rule = ('-p icmp -m set --match-set %s src -j ACCEPT' + % self.ipset_name) self.ipset = self._create_ipset_manager_and_set( - ip_lib.IPWrapper(self.destination.namespace), IPSET_SET) - + ip_lib.IPWrapper(self.destination.namespace), self.ipset_name) + self.addCleanup(self.ipset._destroy, self.ipset_name) self.dst_iptables = iptables_manager.IptablesManager( namespace=self.destination.namespace) - self._add_iptables_ipset_rules(self.dst_iptables) + self._add_iptables_ipset_rules() + self.addCleanup(self._remove_iptables_ipset_rules) def _create_ipset_manager_and_set(self, dst_ns, set_name): ipset = ipset_manager.IpsetManager( @@ -51,45 +54,49 @@ class IpsetBase(functional_base.BaseSudoTestCase): ipset._create_set(set_name, IPSET_ETHERTYPE) return ipset - @staticmethod - def _remove_iptables_ipset_rules(iptables_manager): - iptables_manager.ipv4['filter'].remove_rule('INPUT', ICMP_ACCEPT_RULE) - iptables_manager.apply() + def _remove_iptables_ipset_rules(self): + self.dst_iptables.ipv4['filter'].remove_rule( + 'INPUT', base.ICMP_BLOCK_RULE) + self.dst_iptables.ipv4['filter'].remove_rule( + 'INPUT', self.icmp_accept_rule) + self.dst_iptables.apply() - @staticmethod - def _add_iptables_ipset_rules(iptables_manager): - iptables_manager.ipv4['filter'].add_rule('INPUT', ICMP_ACCEPT_RULE) - iptables_manager.ipv4['filter'].add_rule('INPUT', base.ICMP_BLOCK_RULE) - iptables_manager.apply() + def _add_iptables_ipset_rules(self): + self.dst_iptables.ipv4['filter'].add_rule( + 'INPUT', self.icmp_accept_rule) + self.dst_iptables.ipv4['filter'].add_rule( + 'INPUT', base.ICMP_BLOCK_RULE) + self.dst_iptables.apply() class IpsetManagerTestCase(IpsetBase): def test_add_member_allows_ping(self): self.source.assert_no_ping(self.destination.ip) - self.ipset._add_member_to_set(IPSET_SET, self.source.ip) + self.ipset._add_member_to_set(self.ipset_name, self.source.ip) self.source.assert_ping(self.destination.ip) def test_del_member_denies_ping(self): - self.ipset._add_member_to_set(IPSET_SET, self.source.ip) + self.ipset._add_member_to_set(self.ipset_name, self.source.ip) self.source.assert_ping(self.destination.ip) - self.ipset._del_member_from_set(IPSET_SET, self.source.ip) + self.ipset._del_member_from_set(self.ipset_name, self.source.ip) self.source.assert_no_ping(self.destination.ip) def test_refresh_ipset_allows_ping(self): - self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP], IPSET_ETHERTYPE) + self.ipset._refresh_set( + self.ipset_name, [UNRELATED_IP], IPSET_ETHERTYPE) self.source.assert_no_ping(self.destination.ip) - self.ipset._refresh_set(IPSET_SET, [UNRELATED_IP, self.source.ip], - IPSET_ETHERTYPE) + self.ipset._refresh_set( + self.ipset_name, [UNRELATED_IP, self.source.ip], IPSET_ETHERTYPE) self.source.assert_ping(self.destination.ip) - self.ipset._refresh_set(IPSET_SET, [self.source.ip, UNRELATED_IP], - IPSET_ETHERTYPE) + self.ipset._refresh_set( + self.ipset_name, [self.source.ip, UNRELATED_IP], IPSET_ETHERTYPE) self.source.assert_ping(self.destination.ip) def test_destroy_ipset_set(self): - self.assertRaises(RuntimeError, self.ipset._destroy, IPSET_SET) - self._remove_iptables_ipset_rules(self.dst_iptables) - self.ipset._destroy(IPSET_SET) + self.assertRaises(RuntimeError, self.ipset._destroy, self.ipset_name) + self._remove_iptables_ipset_rules() + self.ipset._destroy(self.ipset_name)