From c68ebd661b68c9da548a71521610839c38acd0c5 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Tue, 13 Nov 2018 16:36:06 +0000 Subject: [PATCH] Implement IpRuleCommand.list_rules() using pyroute2 Change-Id: I55d5dd756940e5a92f472c9309d49f427e907928 Related-Bug: #1492714 --- neutron/agent/l3/dvr_fip_ns.py | 3 +- neutron/agent/l3/dvr_local_router.py | 2 +- neutron/agent/linux/ip_lib.py | 101 ++++++++--- neutron/privileged/agent/linux/ip_lib.py | 19 +++ .../functional/agent/l3/test_dvr_router.py | 67 ++++---- .../functional/agent/linux/test_ip_lib.py | 5 +- .../privileged/agent/linux/test_ip_lib.py | 57 ++++++- neutron/tests/unit/agent/linux/test_ip_lib.py | 161 +++++++++++++++--- 8 files changed, 328 insertions(+), 87 deletions(-) diff --git a/neutron/agent/l3/dvr_fip_ns.py b/neutron/agent/l3/dvr_fip_ns.py index ce6f76762a6..8c1502cfed1 100644 --- a/neutron/agent/l3/dvr_fip_ns.py +++ b/neutron/agent/l3/dvr_fip_ns.py @@ -262,8 +262,7 @@ class FipNamespace(namespaces.Namespace): return new_gw_ips != old_gw_ips def get_fip_table_indexes(self, ip_version): - ns_ipr = ip_lib.IPRule(namespace=self.get_name()) - ip_rules_list = ns_ipr.rule.list_rules(ip_version) + ip_rules_list = ip_lib.list_ip_rules(self.get_name(), ip_version) tbl_index_list = [] for ip_rule in ip_rules_list: tbl_index = ip_rule['table'] diff --git a/neutron/agent/l3/dvr_local_router.py b/neutron/agent/l3/dvr_local_router.py index 961187f3313..c8a8670e2a7 100644 --- a/neutron/agent/l3/dvr_local_router.py +++ b/neutron/agent/l3/dvr_local_router.py @@ -326,7 +326,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): pass def _stale_ip_rule_cleanup(self, ns_ipr, ns_ipd, ip_version): - ip_rules_list = ns_ipr.rule.list_rules(ip_version) + ip_rules_list = ip_lib.list_ip_rules(ns_ipr.namespace, ip_version) snat_table_list = [] for ip_rule in ip_rules_list: snat_table = ip_rule['table'] diff --git a/neutron/agent/linux/ip_lib.py b/neutron/agent/linux/ip_lib.py index 84490b265b4..3533afc2b54 100644 --- a/neutron/agent/linux/ip_lib.py +++ b/neutron/agent/linux/ip_lib.py @@ -44,6 +44,18 @@ IP_NONLOCAL_BIND = 'net.ipv4.ip_nonlocal_bind' LOOPBACK_DEVNAME = 'lo' FB_TUNNEL_DEVICE_NAMES = ['gre0', 'gretap0', 'tunl0', 'erspan0', 'sit0', 'ip6tnl0', 'ip6gre0'] +RULE_TABLES = {'default': 253, + 'main': 254, + 'local': 255} + +# Rule indexes: pyroute2.netlink.rtnl +# Rule names: https://www.systutorials.com/docs/linux/man/8-ip-rule/ +# NOTE(ralonsoh): 'masquerade' type is printed as 'nat' in 'ip rule' command +RULE_TYPES = {1: 'unicast', + 6: 'blackhole', + 7: 'unreachable', + 8: 'prohibit', + 10: 'nat'} SYS_NET_PATH = '/sys/class/net' DEFAULT_GW_PATTERN = re.compile(r"via (\S+)") @@ -427,30 +439,8 @@ class IpRuleCommand(IpCommandBase): return {k: str(v) for k, v in map(canonicalize, settings.items())} - def _parse_line(self, ip_version, line): - # Typical rules from 'ip rule show': - # 4030201: from 1.2.3.4/24 lookup 10203040 - # 1024: from all iif qg-c43b1928-48 lookup noscope - - parts = line.split() - if not parts: - return {} - - # Format of line is: "priority: ... []" - settings = {k: v for k, v in zip(parts[1::2], parts[2::2])} - settings['priority'] = parts[0][:-1] - if len(parts) % 2 == 0: - # When line has an even number of columns, last one is the type. - settings['type'] = parts[-1] - - return self._make_canonical(ip_version, settings) - - def list_rules(self, ip_version): - lines = self._as_root([ip_version], ['show']).splitlines() - return [self._parse_line(ip_version, line) for line in lines] - def _exists(self, ip_version, **kwargs): - return kwargs in self.list_rules(ip_version) + return kwargs in list_ip_rules(self._parent.namespace, ip_version) def _make__flat_args_tuple(self, *args, **kwargs): for kwargs_item in sorted(kwargs.items(), key=lambda i: i[0]): @@ -1309,3 +1299,68 @@ def get_ipv6_forwarding(device, namespace=None): cmd = ['sysctl', '-b', "net.ipv6.conf.%s.forwarding" % device] ip_wrapper = IPWrapper(namespace) return int(ip_wrapper.netns.execute(cmd, run_as_root=True)) + + +def _parse_ip_rule(rule, ip_version): + """Parse a pyroute2 rule and returns a dictionary + + Parameters contained in the returned dictionary: + - priority: rule priority + - from: source IP address + - to: (optional) destination IP address + - type: rule type (see RULE_TYPES) + - table: table name or number (see RULE_TABLES) + - fwmark: (optional) FW mark + - iif: (optional) input interface name + - oif: (optional) output interface name + + :param rule: pyroute2 rule dictionary + :param ip_version: IP version (4, 6) + :return: dictionary with IP rule information + """ + parsed_rule = {'priority': str(rule['attrs'].get('FRA_PRIORITY', 0))} + from_ip = rule['attrs'].get('FRA_SRC') + if from_ip: + parsed_rule['from'] = common_utils.ip_to_cidr( + from_ip, prefix=rule['src_len']) + if common_utils.is_cidr_host(parsed_rule['from']): + parsed_rule['from'] = common_utils.cidr_to_ip(parsed_rule['from']) + else: + parsed_rule['from'] = constants.IP_ANY[ip_version] + to_ip = rule['attrs'].get('FRA_DST') + if to_ip: + parsed_rule['to'] = common_utils.ip_to_cidr( + to_ip, prefix=rule['dst_len']) + if common_utils.is_cidr_host(parsed_rule['to']): + parsed_rule['to'] = common_utils.cidr_to_ip(parsed_rule['to']) + parsed_rule['type'] = RULE_TYPES[rule['action']] + table_num = rule['attrs']['FRA_TABLE'] + for table_name in (name for (name, index) in + RULE_TABLES.items() if index == table_num): + parsed_rule['table'] = table_name + break + else: + parsed_rule['table'] = str(table_num) + fwmark = rule['attrs'].get('FRA_FWMARK') + if fwmark: + fwmask = rule['attrs'].get('FRA_FWMASK') + parsed_rule['fwmark'] = '{0:#x}/{1:#x}'.format(fwmark, fwmask) + iifname = rule['attrs'].get('FRA_IIFNAME') + if iifname: + parsed_rule['iif'] = iifname + oifname = rule['attrs'].get('FRA_OIFNAME') + if oifname: + parsed_rule['oif'] = oifname + + return parsed_rule + + +def list_ip_rules(namespace, ip_version): + """List all IP rules in a namespace + + :param namespace: namespace name + :param ip_version: IP version (4, 6) + :return: list of dictionaries with the rules information + """ + rules = privileged.list_ip_rules(namespace, ip_version) + return [_parse_ip_rule(rule, ip_version) for rule in rules] diff --git a/neutron/privileged/agent/linux/ip_lib.py b/neutron/privileged/agent/linux/ip_lib.py index 4d9c0ffbd2c..450d4a8a463 100644 --- a/neutron/privileged/agent/linux/ip_lib.py +++ b/neutron/privileged/agent/linux/ip_lib.py @@ -469,3 +469,22 @@ def get_devices(namespace, **kwargs): if e.errno == errno.ENOENT: raise NetworkNamespaceNotFound(netns_name=namespace) raise + + +@privileged.default.entrypoint +def list_ip_rules(namespace, ip_version, match=None, **kwargs): + """List all IP rules""" + try: + with _get_iproute(namespace) as ip: + rules = ip.get_rules(family=_IP_VERSION_FAMILY_MAP[ip_version], + match=match, **kwargs) + for rule in rules: + rule['attrs'] = { + key: value for key, value + in ((item[0], item[1]) for item in rule['attrs'])} + return rules + + except OSError as e: + if e.errno == errno.ENOENT: + raise NetworkNamespaceNotFound(netns_name=namespace) + raise diff --git a/neutron/tests/functional/agent/l3/test_dvr_router.py b/neutron/tests/functional/agent/l3/test_dvr_router.py index 571896e5a7b..52874cc2797 100644 --- a/neutron/tests/functional/agent/l3/test_dvr_router.py +++ b/neutron/tests/functional/agent/l3/test_dvr_router.py @@ -104,8 +104,8 @@ class TestDvrRouter(framework.L3AgentTestFramework): self.agent._process_updated_router(router1.router) router_updated = self.agent.router_info[router1.router['id']] self.assertTrue(self._namespace_exists(router_updated.ns_name)) - ns_ipr = ip_lib.IPRule(namespace=router1.ns_name) - ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4) + ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name, + lib_constants.IP_VERSION_4) self.assertEqual(6, len(ip4_rules_list)) # IPRule list should have 6 entries. # Three entries from 'default', 'main' and 'local' table. @@ -364,9 +364,10 @@ class TestDvrRouter(framework.L3AgentTestFramework): router1.router['external_gateway_info'] = "" restarted_router = self.manage_router(restarted_agent, router1.router) self.assertTrue(self._namespace_exists(restarted_router.ns_name)) - ns_ipr = ip_lib.IPRule(namespace=router1.ns_name) - ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4) - ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6) + ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name, + lib_constants.IP_VERSION_4) + ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name, + lib_constants.IP_VERSION_6) # Just make sure the basic set of rules are there in the router # namespace self.assertEqual(3, len(ip4_rules_list)) @@ -808,9 +809,8 @@ class TestDvrRouter(framework.L3AgentTestFramework): self.assertTrue(self._namespace_exists(router1.ns_name)) self.assertTrue(self._namespace_exists(fip_ns_name)) self._assert_snat_namespace_exists(router1) - ns_ipr = ip_lib.IPRule(namespace=router1.ns_name) - ip4_rules_list_with_fip = ns_ipr.rule.list_rules( - lib_constants.IP_VERSION_4) + ip4_rules_list_with_fip = ip_lib.list_ip_rules( + router1.ns_name, lib_constants.IP_VERSION_4) # The rules_list should have 6 entries: # 3 default rules (local, main and default) # 1 Fip forward rule @@ -829,7 +829,8 @@ class TestDvrRouter(framework.L3AgentTestFramework): router_updated = self.agent.router_info[router1.router['id']] self.assertTrue(self._namespace_exists(router_updated.ns_name)) self._assert_snat_namespace_exists(router1) - ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4) + ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name, + lib_constants.IP_VERSION_4) self.assertEqual(5, len(ip4_rules_list)) interface_rules_list_count = 0 fip_rule_count = 0 @@ -946,22 +947,16 @@ class TestDvrRouter(framework.L3AgentTestFramework): restarted_router.iptables_manager, 'nat', [prevent_snat_rule]) def _get_fixed_ip_rule_priority(self, namespace, fip): - iprule = ip_lib.IPRule(namespace) - lines = iprule.rule._as_root([4], ['show']).splitlines() - for line in lines: - if fip in line: - info = iprule.rule._parse_line(4, line) - return info['priority'] + ipv4_rules = ip_lib.list_ip_rules(namespace, 4) + for rule in (rule for rule in ipv4_rules + if utils.cidr_to_ip(rule['from']) == fip): + return rule['priority'] def _fixed_ip_rule_exists(self, namespace, ip): - iprule = ip_lib.IPRule(namespace) - lines = iprule.rule._as_root([4], ['show']).splitlines() - for line in lines: - if ip in line: - info = iprule.rule._parse_line(4, line) - if info['from'] == ip: - return True - + ipv4_rules = ip_lib.list_ip_rules(namespace, 4) + for _ in (rule for rule in ipv4_rules + if utils.cidr_to_ip(rule['from']) == ip): + return True return False def test_dvr_router_add_internal_network_set_arp_cache(self): @@ -1582,9 +1577,8 @@ class TestDvrRouter(framework.L3AgentTestFramework): else: self.assertIsNone(fg_device.route.get_gateway(filters=tbl_filter)) - ip_rule = ip_lib.IPRule(namespace=fip_ns_name) - ext_net_fw_rules_list = ip_rule.rule.list_rules( - lib_constants.IP_VERSION_4) + ext_net_fw_rules_list = ip_lib.list_ip_rules( + fip_ns_name, lib_constants.IP_VERSION_4) if not check_fpr_int_rule_delete: # When floatingip are associated, make sure that the # corresponding rules and routes in route table are created @@ -1699,9 +1693,10 @@ class TestDvrRouter(framework.L3AgentTestFramework): self._assert_fip_namespace_deleted( agent_gw_port, assert_ovs_interface=False) if not address_scopes or no_external: - ns_ipr = ip_lib.IPRule(namespace=router_updated.ns_name) - ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4) - ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6) + ip4_rules_list = ip_lib.list_ip_rules(router_updated.ns_name, + lib_constants.IP_VERSION_4) + ip6_rules_list = ip_lib.list_ip_rules(router_updated.ns_name, + lib_constants.IP_VERSION_6) self.assertEqual(3, len(ip4_rules_list)) self.assertEqual(2, len(ip6_rules_list)) @@ -1759,9 +1754,10 @@ class TestDvrRouter(framework.L3AgentTestFramework): fip_2_rtr, rfp_device, rfp_device_name) # Check if any snat redirect rules in the router namespace exist. - ns_ipr = ip_lib.IPRule(namespace=router1.ns_name) - ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4) - ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6) + ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name, + lib_constants.IP_VERSION_4) + ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name, + lib_constants.IP_VERSION_6) # Just make sure the basic set of rules are there in the router # namespace self.assertEqual(5, len(ip4_rules_list)) @@ -1822,9 +1818,10 @@ class TestDvrRouter(framework.L3AgentTestFramework): self.assertFalse(mock_add_interface_route_rule.called) self.assertFalse(mock_add_fip_interface_route_rule.called) # Check if any snat redirect rules in the router namespace exist. - ns_ipr = ip_lib.IPRule(namespace=router1.ns_name) - ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4) - ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6) + ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name, + lib_constants.IP_VERSION_4) + ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name, + lib_constants.IP_VERSION_6) # Just make sure the basic set of rules are there in the router # namespace self.assertEqual(5, len(ip4_rules_list)) diff --git a/neutron/tests/functional/agent/linux/test_ip_lib.py b/neutron/tests/functional/agent/linux/test_ip_lib.py index 1bc66a42b7e..738971ff580 100644 --- a/neutron/tests/functional/agent/linux/test_ip_lib.py +++ b/neutron/tests/functional/agent/linux/test_ip_lib.py @@ -26,6 +26,7 @@ import testtools from neutron.agent.linux import ip_lib from neutron.common import utils from neutron.conf.agent import common as config +from neutron.privileged.agent.linux import ip_lib as priv_ip_lib from neutron.tests.common import net_helpers from neutron.tests.functional import base as functional_base @@ -163,14 +164,14 @@ class IpLibTestCase(IpLibTestFramework): for rule in test_case: ip_rule.rule.add(table=TABLE, priority=PRIORITY, **rule) - rules = ip_rule.rule.list_rules(ip_version) + rules = ip_lib.list_ip_rules(ip_rule.namespace, ip_version) for expected_rule in expected_rules[ip_version]: self.assertIn(expected_rule, rules) for rule in test_case: ip_rule.rule.delete(table=TABLE, priority=PRIORITY, **rule) - rules = ip_rule.rule.list_rules(ip_version) + rules = priv_ip_lib.list_ip_rules(ip_rule.namespace, ip_version) for expected_rule in expected_rules[ip_version]: self.assertNotIn(expected_rule, rules) diff --git a/neutron/tests/functional/privileged/agent/linux/test_ip_lib.py b/neutron/tests/functional/privileged/agent/linux/test_ip_lib.py index c3c74bf522c..bb80f8c5c9b 100644 --- a/neutron/tests/functional/privileged/agent/linux/test_ip_lib.py +++ b/neutron/tests/functional/privileged/agent/linux/test_ip_lib.py @@ -16,10 +16,10 @@ from oslo_utils import uuidutils from neutron.agent.linux import ip_lib from neutron.privileged.agent.linux import ip_lib as priv_ip_lib -from neutron.tests.functional import base +from neutron.tests.functional import base as functional_base -class GetDevicesTestCase(base.BaseLoggingTestCase): +class GetDevicesTestCase(functional_base.BaseLoggingTestCase): def _remove_ns(self, namespace): priv_ip_lib.remove_netns(namespace) @@ -44,3 +44,56 @@ class GetDevicesTestCase(base.BaseLoggingTestCase): device_names = priv_ip_lib.get_devices(namespace) for name in device_names: self.assertNotIn(name, interfaces) + + +class ListIpRulesTestCase(functional_base.BaseSudoTestCase): + + RULE_TABLES = {'default': 253, 'main': 254, 'local': 255} + + def setUp(self): + super(ListIpRulesTestCase, self).setUp() + self.namespace = 'ns_test-' + uuidutils.generate_uuid() + self.ns = priv_ip_lib.create_netns(self.namespace) + self.ip_rule = ip_lib.IPRule(namespace=self.namespace) + self.addCleanup(self._remove_ns) + + def _remove_ns(self): + priv_ip_lib.remove_netns(self.namespace) + + def test_list_default_rules_ipv4(self): + rules_ipv4 = priv_ip_lib.list_ip_rules(self.namespace, 4) + self.assertEqual(3, len(rules_ipv4)) + rule_tables = list(self.RULE_TABLES.values()) + for rule in rules_ipv4: + rule_tables.remove(rule['table']) + self.assertEqual(0, len(rule_tables)) + + def test_list_rules_ipv4(self): + self.ip_rule.rule.add('192.168.0.1/24', table=10) + rules_ipv4 = priv_ip_lib.list_ip_rules(self.namespace, 4) + for rule in rules_ipv4: + if rule['table'] == 10: + self.assertEqual('192.168.0.1', rule['attrs']['FRA_SRC']) + self.assertEqual(24, rule['src_len']) + break + else: + self.fail('Rule added (192.168.0.1/24, table 10) not found') + + def test_list_default_rules_ipv6(self): + rules_ipv6 = priv_ip_lib.list_ip_rules(self.namespace, 6) + self.assertEqual(2, len(rules_ipv6)) + rule_tables = [255, 254] + for rule in rules_ipv6: + rule_tables.remove(rule['table']) + self.assertEqual(0, len(rule_tables)) + + def test_list_rules_ipv6(self): + self.ip_rule.rule.add('2001:db8::1/64', table=20) + rules_ipv6 = priv_ip_lib.list_ip_rules(self.namespace, 6) + for rule in rules_ipv6: + if rule['table'] == 20: + self.assertEqual('2001:db8::1', rule['attrs']['FRA_SRC']) + self.assertEqual(64, rule['src_len']) + break + else: + self.fail('Rule added (2001:db8::1/64, table 20) not found') diff --git a/neutron/tests/unit/agent/linux/test_ip_lib.py b/neutron/tests/unit/agent/linux/test_ip_lib.py index 4b62bebb23a..d1fb4dbf180 100644 --- a/neutron/tests/unit/agent/linux/test_ip_lib.py +++ b/neutron/tests/unit/agent/linux/test_ip_lib.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import errno import socket @@ -20,6 +21,7 @@ import mock import netaddr from neutron_lib import constants from neutron_lib import exceptions +from oslo_utils import uuidutils import pyroute2 from pyroute2.netlink.rtnl import ifinfmsg from pyroute2.netlink.rtnl import ndmsg @@ -29,6 +31,7 @@ import testtools from neutron.agent.common import utils # noqa from neutron.agent.linux import ip_lib from neutron.common import exceptions as n_exc +from neutron.common import utils as n_utils from neutron import privileged from neutron.privileged.agent.linux import ip_lib as priv_lib from neutron.tests import base @@ -617,13 +620,23 @@ class TestIpRuleCommand(TestIPCmdBase): def setUp(self): super(TestIpRuleCommand, self).setUp() self.parent._as_root.return_value = '' + self.parent.namespace = uuidutils.generate_uuid() self.command = 'rule' self.rule_cmd = ip_lib.IpRuleCommand(self.parent) + self._mock_priv_list_ip_rules = mock.patch.object(priv_lib, + 'list_ip_rules') + self.mock_priv_list_ip_rules = self._mock_priv_list_ip_rules.start() + self.addCleanup(self._stop_mock) + + def _stop_mock(self): + self._mock_priv_list_ip_rules.stop() def _test_add_rule(self, ip, table, priority): ip_version = netaddr.IPNetwork(ip).version - self.rule_cmd.add(ip, table=table, priority=priority) - self._assert_sudo([ip_version], (['show'])) + with mock.patch.object(ip_lib, '_parse_ip_rule'): + self.rule_cmd.add(ip, table=table, priority=priority) + self.mock_priv_list_ip_rules.assert_called_once_with( + self.parent.namespace, n_utils.get_ip_version(ip)) self._assert_sudo([ip_version], ('add', 'from', ip, 'priority', str(priority), 'table', str(table), @@ -631,9 +644,13 @@ class TestIpRuleCommand(TestIPCmdBase): def _test_add_rule_exists(self, ip, table, priority, output): self.parent._as_root.return_value = output - ip_version = netaddr.IPNetwork(ip).version - self.rule_cmd.add(ip, table=table, priority=priority) - self._assert_sudo([ip_version], (['show'])) + with mock.patch.object(self.rule_cmd, '_exists', return_value=True) \ + as mock_exists: + self.rule_cmd.add(ip, table=table, priority=priority) + kwargs = {'from': ip, 'priority': str(priority), + 'table': str(table), 'type': 'unicast'} + mock_exists.assert_called_once_with(n_utils.get_ip_version(ip), + **kwargs) def _test_delete_rule(self, ip, table, priority): ip_version = netaddr.IPNetwork(ip).version @@ -642,23 +659,6 @@ class TestIpRuleCommand(TestIPCmdBase): ('del', 'from', ip, 'priority', str(priority), 'table', str(table), 'type', 'unicast')) - def test__parse_line(self): - def test(ip_version, line, expected): - actual = self.rule_cmd._parse_line(ip_version, line) - self.assertEqual(expected, actual) - - test(4, "4030201:\tfrom 1.2.3.4/24 lookup 10203040", - {'from': '1.2.3.4/24', - 'table': '10203040', - 'type': 'unicast', - 'priority': '4030201'}) - test(6, "1024: from all iif qg-c43b1928-48 lookup noscope", - {'priority': '1024', - 'from': '::/0', - 'type': 'unicast', - 'iif': 'qg-c43b1928-48', - 'table': 'noscope'}) - def test__make_canonical_all_v4(self): actual = self.rule_cmd._make_canonical(4, {'from': 'all'}) self.assertEqual({'from': '0.0.0.0/0', 'type': 'unicast'}, actual) @@ -1863,3 +1863,120 @@ class TestConntrack(base.BaseTestCase): device.delete_socket_conntrack_state(ip_str, dport, protocol) self.execute.assert_called_once_with(expect_cmd, check_exit_code=True, extra_ok_codes=[1]) + + +class ParseIpRuleTestCase(base.BaseTestCase): + + BASE_RULE = { + 'family': 2, 'dst_len': 0, 'res2': 0, 'tos': 0, 'res1': 0, 'flags': 0, + 'header': { + 'pid': 18152, 'length': 44, 'flags': 2, 'error': None, 'type': 32, + 'sequence_number': 281}, + 'attrs': {'FRA_TABLE': 255, 'FRA_SUPPRESS_PREFIXLEN': 4294967295}, + 'table': 255, 'action': 1, 'src_len': 0, 'event': 'RTM_NEWRULE'} + + def setUp(self): + super(ParseIpRuleTestCase, self).setUp() + self.rule = copy.deepcopy(self.BASE_RULE) + + def test_parse_priority(self): + self.rule['attrs']['FRA_PRIORITY'] = 1000 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('1000', parsed_rule['priority']) + + def test_parse_from_ipv4(self): + self.rule['attrs']['FRA_SRC'] = '192.168.0.1' + self.rule['src_len'] = 24 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('192.168.0.1/24', parsed_rule['from']) + + def test_parse_from_ipv6(self): + self.rule['attrs']['FRA_SRC'] = '2001:db8::1' + self.rule['src_len'] = 64 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 6) + self.assertEqual('2001:db8::1/64', parsed_rule['from']) + + def test_parse_from_any_ipv4(self): + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('0.0.0.0/0', parsed_rule['from']) + + def test_parse_from_any_ipv6(self): + parsed_rule = ip_lib._parse_ip_rule(self.rule, 6) + self.assertEqual('::/0', parsed_rule['from']) + + def test_parse_to_ipv4(self): + self.rule['attrs']['FRA_DST'] = '192.168.10.1' + self.rule['dst_len'] = 24 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('192.168.10.1/24', parsed_rule['to']) + + def test_parse_to_ipv6(self): + self.rule['attrs']['FRA_DST'] = '2001:db8::1' + self.rule['dst_len'] = 64 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 6) + self.assertEqual('2001:db8::1/64', parsed_rule['to']) + + def test_parse_to_none(self): + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertIsNone(parsed_rule.get('to')) + + def test_parse_table(self): + self.rule['attrs']['FRA_TABLE'] = 255 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('local', parsed_rule['table']) + self.rule['attrs']['FRA_TABLE'] = 254 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('main', parsed_rule['table']) + self.rule['attrs']['FRA_TABLE'] = 253 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('default', parsed_rule['table']) + self.rule['attrs']['FRA_TABLE'] = 1000 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('1000', parsed_rule['table']) + + def test_parse_fwmark(self): + self.rule['attrs']['FRA_FWMARK'] = 1000 + self.rule['attrs']['FRA_FWMASK'] = 10 + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('0x3e8/0xa', parsed_rule['fwmark']) + + def test_parse_fwmark_none(self): + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertIsNone(parsed_rule.get('fwmark')) + + def test_parse_iif(self): + self.rule['attrs']['FRA_IIFNAME'] = 'input_interface_name' + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('input_interface_name', parsed_rule['iif']) + + def test_parse_iif_none(self): + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertIsNone(parsed_rule.get('iif')) + + def test_parse_oif(self): + self.rule['attrs']['FRA_OIFNAME'] = 'output_interface_name' + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertEqual('output_interface_name', parsed_rule['oif']) + + def test_parse_oif_none(self): + parsed_rule = ip_lib._parse_ip_rule(self.rule, 4) + self.assertIsNone(parsed_rule.get('oif')) + + +class ListIpRulesTestCase(base.BaseTestCase): + + def test_list_ip_rules(self): + rule1 = {'family': 2, 'src_len': 24, 'action': 1, + 'attrs': {'FRA_SRC': '10.0.0.1', 'FRA_TABLE': 100}} + rule2 = {'family': 2, 'src_len': 0, 'action': 6, + 'attrs': {'FRA_TABLE': 255}} + rules = [rule1, rule2] + with mock.patch.object(priv_lib, 'list_ip_rules') as mock_list_rules: + mock_list_rules.return_value = rules + retval = ip_lib.list_ip_rules(mock.ANY, 4) + reference = [ + {'type': 'unicast', 'from': '10.0.0.1/24', 'priority': '0', + 'table': '100'}, + {'type': 'blackhole', 'from': '0.0.0.0/0', 'priority': '0', + 'table': 'local'}] + self.assertEqual(reference, retval)