Implement IpRuleCommand.list_rules() using pyroute2

Change-Id: I55d5dd756940e5a92f472c9309d49f427e907928
Related-Bug: #1492714
This commit is contained in:
Rodolfo Alonso Hernandez 2018-11-13 16:36:06 +00:00
parent 08bf770055
commit c68ebd661b
8 changed files with 328 additions and 87 deletions

View File

@ -262,8 +262,7 @@ class FipNamespace(namespaces.Namespace):
return new_gw_ips != old_gw_ips
def get_fip_table_indexes(self, ip_version):
ns_ipr = ip_lib.IPRule(namespace=self.get_name())
ip_rules_list = ns_ipr.rule.list_rules(ip_version)
ip_rules_list = ip_lib.list_ip_rules(self.get_name(), ip_version)
tbl_index_list = []
for ip_rule in ip_rules_list:
tbl_index = ip_rule['table']

View File

@ -326,7 +326,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase):
pass
def _stale_ip_rule_cleanup(self, ns_ipr, ns_ipd, ip_version):
ip_rules_list = ns_ipr.rule.list_rules(ip_version)
ip_rules_list = ip_lib.list_ip_rules(ns_ipr.namespace, ip_version)
snat_table_list = []
for ip_rule in ip_rules_list:
snat_table = ip_rule['table']

View File

@ -44,6 +44,18 @@ IP_NONLOCAL_BIND = 'net.ipv4.ip_nonlocal_bind'
LOOPBACK_DEVNAME = 'lo'
FB_TUNNEL_DEVICE_NAMES = ['gre0', 'gretap0', 'tunl0', 'erspan0', 'sit0',
'ip6tnl0', 'ip6gre0']
RULE_TABLES = {'default': 253,
'main': 254,
'local': 255}
# Rule indexes: pyroute2.netlink.rtnl
# Rule names: https://www.systutorials.com/docs/linux/man/8-ip-rule/
# NOTE(ralonsoh): 'masquerade' type is printed as 'nat' in 'ip rule' command
RULE_TYPES = {1: 'unicast',
6: 'blackhole',
7: 'unreachable',
8: 'prohibit',
10: 'nat'}
SYS_NET_PATH = '/sys/class/net'
DEFAULT_GW_PATTERN = re.compile(r"via (\S+)")
@ -427,30 +439,8 @@ class IpRuleCommand(IpCommandBase):
return {k: str(v) for k, v in map(canonicalize, settings.items())}
def _parse_line(self, ip_version, line):
# Typical rules from 'ip rule show':
# 4030201: from 1.2.3.4/24 lookup 10203040
# 1024: from all iif qg-c43b1928-48 lookup noscope
parts = line.split()
if not parts:
return {}
# Format of line is: "priority: <key> <value> ... [<type>]"
settings = {k: v for k, v in zip(parts[1::2], parts[2::2])}
settings['priority'] = parts[0][:-1]
if len(parts) % 2 == 0:
# When line has an even number of columns, last one is the type.
settings['type'] = parts[-1]
return self._make_canonical(ip_version, settings)
def list_rules(self, ip_version):
lines = self._as_root([ip_version], ['show']).splitlines()
return [self._parse_line(ip_version, line) for line in lines]
def _exists(self, ip_version, **kwargs):
return kwargs in self.list_rules(ip_version)
return kwargs in list_ip_rules(self._parent.namespace, ip_version)
def _make__flat_args_tuple(self, *args, **kwargs):
for kwargs_item in sorted(kwargs.items(), key=lambda i: i[0]):
@ -1309,3 +1299,68 @@ def get_ipv6_forwarding(device, namespace=None):
cmd = ['sysctl', '-b', "net.ipv6.conf.%s.forwarding" % device]
ip_wrapper = IPWrapper(namespace)
return int(ip_wrapper.netns.execute(cmd, run_as_root=True))
def _parse_ip_rule(rule, ip_version):
"""Parse a pyroute2 rule and returns a dictionary
Parameters contained in the returned dictionary:
- priority: rule priority
- from: source IP address
- to: (optional) destination IP address
- type: rule type (see RULE_TYPES)
- table: table name or number (see RULE_TABLES)
- fwmark: (optional) FW mark
- iif: (optional) input interface name
- oif: (optional) output interface name
:param rule: pyroute2 rule dictionary
:param ip_version: IP version (4, 6)
:return: dictionary with IP rule information
"""
parsed_rule = {'priority': str(rule['attrs'].get('FRA_PRIORITY', 0))}
from_ip = rule['attrs'].get('FRA_SRC')
if from_ip:
parsed_rule['from'] = common_utils.ip_to_cidr(
from_ip, prefix=rule['src_len'])
if common_utils.is_cidr_host(parsed_rule['from']):
parsed_rule['from'] = common_utils.cidr_to_ip(parsed_rule['from'])
else:
parsed_rule['from'] = constants.IP_ANY[ip_version]
to_ip = rule['attrs'].get('FRA_DST')
if to_ip:
parsed_rule['to'] = common_utils.ip_to_cidr(
to_ip, prefix=rule['dst_len'])
if common_utils.is_cidr_host(parsed_rule['to']):
parsed_rule['to'] = common_utils.cidr_to_ip(parsed_rule['to'])
parsed_rule['type'] = RULE_TYPES[rule['action']]
table_num = rule['attrs']['FRA_TABLE']
for table_name in (name for (name, index) in
RULE_TABLES.items() if index == table_num):
parsed_rule['table'] = table_name
break
else:
parsed_rule['table'] = str(table_num)
fwmark = rule['attrs'].get('FRA_FWMARK')
if fwmark:
fwmask = rule['attrs'].get('FRA_FWMASK')
parsed_rule['fwmark'] = '{0:#x}/{1:#x}'.format(fwmark, fwmask)
iifname = rule['attrs'].get('FRA_IIFNAME')
if iifname:
parsed_rule['iif'] = iifname
oifname = rule['attrs'].get('FRA_OIFNAME')
if oifname:
parsed_rule['oif'] = oifname
return parsed_rule
def list_ip_rules(namespace, ip_version):
"""List all IP rules in a namespace
:param namespace: namespace name
:param ip_version: IP version (4, 6)
:return: list of dictionaries with the rules information
"""
rules = privileged.list_ip_rules(namespace, ip_version)
return [_parse_ip_rule(rule, ip_version) for rule in rules]

View File

@ -469,3 +469,22 @@ def get_devices(namespace, **kwargs):
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
raise
@privileged.default.entrypoint
def list_ip_rules(namespace, ip_version, match=None, **kwargs):
"""List all IP rules"""
try:
with _get_iproute(namespace) as ip:
rules = ip.get_rules(family=_IP_VERSION_FAMILY_MAP[ip_version],
match=match, **kwargs)
for rule in rules:
rule['attrs'] = {
key: value for key, value
in ((item[0], item[1]) for item in rule['attrs'])}
return rules
except OSError as e:
if e.errno == errno.ENOENT:
raise NetworkNamespaceNotFound(netns_name=namespace)
raise

View File

@ -104,8 +104,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.agent._process_updated_router(router1.router)
router_updated = self.agent.router_info[router1.router['id']]
self.assertTrue(self._namespace_exists(router_updated.ns_name))
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
lib_constants.IP_VERSION_4)
self.assertEqual(6, len(ip4_rules_list))
# IPRule list should have 6 entries.
# Three entries from 'default', 'main' and 'local' table.
@ -364,9 +364,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
router1.router['external_gateway_info'] = ""
restarted_router = self.manage_router(restarted_agent, router1.router)
self.assertTrue(self._namespace_exists(restarted_router.ns_name))
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
lib_constants.IP_VERSION_4)
ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name,
lib_constants.IP_VERSION_6)
# Just make sure the basic set of rules are there in the router
# namespace
self.assertEqual(3, len(ip4_rules_list))
@ -808,9 +809,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.assertTrue(self._namespace_exists(router1.ns_name))
self.assertTrue(self._namespace_exists(fip_ns_name))
self._assert_snat_namespace_exists(router1)
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
ip4_rules_list_with_fip = ns_ipr.rule.list_rules(
lib_constants.IP_VERSION_4)
ip4_rules_list_with_fip = ip_lib.list_ip_rules(
router1.ns_name, lib_constants.IP_VERSION_4)
# The rules_list should have 6 entries:
# 3 default rules (local, main and default)
# 1 Fip forward rule
@ -829,7 +829,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
router_updated = self.agent.router_info[router1.router['id']]
self.assertTrue(self._namespace_exists(router_updated.ns_name))
self._assert_snat_namespace_exists(router1)
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
lib_constants.IP_VERSION_4)
self.assertEqual(5, len(ip4_rules_list))
interface_rules_list_count = 0
fip_rule_count = 0
@ -946,22 +947,16 @@ class TestDvrRouter(framework.L3AgentTestFramework):
restarted_router.iptables_manager, 'nat', [prevent_snat_rule])
def _get_fixed_ip_rule_priority(self, namespace, fip):
iprule = ip_lib.IPRule(namespace)
lines = iprule.rule._as_root([4], ['show']).splitlines()
for line in lines:
if fip in line:
info = iprule.rule._parse_line(4, line)
return info['priority']
ipv4_rules = ip_lib.list_ip_rules(namespace, 4)
for rule in (rule for rule in ipv4_rules
if utils.cidr_to_ip(rule['from']) == fip):
return rule['priority']
def _fixed_ip_rule_exists(self, namespace, ip):
iprule = ip_lib.IPRule(namespace)
lines = iprule.rule._as_root([4], ['show']).splitlines()
for line in lines:
if ip in line:
info = iprule.rule._parse_line(4, line)
if info['from'] == ip:
return True
ipv4_rules = ip_lib.list_ip_rules(namespace, 4)
for _ in (rule for rule in ipv4_rules
if utils.cidr_to_ip(rule['from']) == ip):
return True
return False
def test_dvr_router_add_internal_network_set_arp_cache(self):
@ -1582,9 +1577,8 @@ class TestDvrRouter(framework.L3AgentTestFramework):
else:
self.assertIsNone(fg_device.route.get_gateway(filters=tbl_filter))
ip_rule = ip_lib.IPRule(namespace=fip_ns_name)
ext_net_fw_rules_list = ip_rule.rule.list_rules(
lib_constants.IP_VERSION_4)
ext_net_fw_rules_list = ip_lib.list_ip_rules(
fip_ns_name, lib_constants.IP_VERSION_4)
if not check_fpr_int_rule_delete:
# When floatingip are associated, make sure that the
# corresponding rules and routes in route table are created
@ -1699,9 +1693,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self._assert_fip_namespace_deleted(
agent_gw_port, assert_ovs_interface=False)
if not address_scopes or no_external:
ns_ipr = ip_lib.IPRule(namespace=router_updated.ns_name)
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
ip4_rules_list = ip_lib.list_ip_rules(router_updated.ns_name,
lib_constants.IP_VERSION_4)
ip6_rules_list = ip_lib.list_ip_rules(router_updated.ns_name,
lib_constants.IP_VERSION_6)
self.assertEqual(3, len(ip4_rules_list))
self.assertEqual(2, len(ip6_rules_list))
@ -1759,9 +1754,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
fip_2_rtr, rfp_device, rfp_device_name)
# Check if any snat redirect rules in the router namespace exist.
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
lib_constants.IP_VERSION_4)
ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name,
lib_constants.IP_VERSION_6)
# Just make sure the basic set of rules are there in the router
# namespace
self.assertEqual(5, len(ip4_rules_list))
@ -1822,9 +1818,10 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self.assertFalse(mock_add_interface_route_rule.called)
self.assertFalse(mock_add_fip_interface_route_rule.called)
# Check if any snat redirect rules in the router namespace exist.
ns_ipr = ip_lib.IPRule(namespace=router1.ns_name)
ip4_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_4)
ip6_rules_list = ns_ipr.rule.list_rules(lib_constants.IP_VERSION_6)
ip4_rules_list = ip_lib.list_ip_rules(router1.ns_name,
lib_constants.IP_VERSION_4)
ip6_rules_list = ip_lib.list_ip_rules(router1.ns_name,
lib_constants.IP_VERSION_6)
# Just make sure the basic set of rules are there in the router
# namespace
self.assertEqual(5, len(ip4_rules_list))

View File

@ -26,6 +26,7 @@ import testtools
from neutron.agent.linux import ip_lib
from neutron.common import utils
from neutron.conf.agent import common as config
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
from neutron.tests.common import net_helpers
from neutron.tests.functional import base as functional_base
@ -163,14 +164,14 @@ class IpLibTestCase(IpLibTestFramework):
for rule in test_case:
ip_rule.rule.add(table=TABLE, priority=PRIORITY, **rule)
rules = ip_rule.rule.list_rules(ip_version)
rules = ip_lib.list_ip_rules(ip_rule.namespace, ip_version)
for expected_rule in expected_rules[ip_version]:
self.assertIn(expected_rule, rules)
for rule in test_case:
ip_rule.rule.delete(table=TABLE, priority=PRIORITY, **rule)
rules = ip_rule.rule.list_rules(ip_version)
rules = priv_ip_lib.list_ip_rules(ip_rule.namespace, ip_version)
for expected_rule in expected_rules[ip_version]:
self.assertNotIn(expected_rule, rules)

View File

@ -16,10 +16,10 @@ from oslo_utils import uuidutils
from neutron.agent.linux import ip_lib
from neutron.privileged.agent.linux import ip_lib as priv_ip_lib
from neutron.tests.functional import base
from neutron.tests.functional import base as functional_base
class GetDevicesTestCase(base.BaseLoggingTestCase):
class GetDevicesTestCase(functional_base.BaseLoggingTestCase):
def _remove_ns(self, namespace):
priv_ip_lib.remove_netns(namespace)
@ -44,3 +44,56 @@ class GetDevicesTestCase(base.BaseLoggingTestCase):
device_names = priv_ip_lib.get_devices(namespace)
for name in device_names:
self.assertNotIn(name, interfaces)
class ListIpRulesTestCase(functional_base.BaseSudoTestCase):
RULE_TABLES = {'default': 253, 'main': 254, 'local': 255}
def setUp(self):
super(ListIpRulesTestCase, self).setUp()
self.namespace = 'ns_test-' + uuidutils.generate_uuid()
self.ns = priv_ip_lib.create_netns(self.namespace)
self.ip_rule = ip_lib.IPRule(namespace=self.namespace)
self.addCleanup(self._remove_ns)
def _remove_ns(self):
priv_ip_lib.remove_netns(self.namespace)
def test_list_default_rules_ipv4(self):
rules_ipv4 = priv_ip_lib.list_ip_rules(self.namespace, 4)
self.assertEqual(3, len(rules_ipv4))
rule_tables = list(self.RULE_TABLES.values())
for rule in rules_ipv4:
rule_tables.remove(rule['table'])
self.assertEqual(0, len(rule_tables))
def test_list_rules_ipv4(self):
self.ip_rule.rule.add('192.168.0.1/24', table=10)
rules_ipv4 = priv_ip_lib.list_ip_rules(self.namespace, 4)
for rule in rules_ipv4:
if rule['table'] == 10:
self.assertEqual('192.168.0.1', rule['attrs']['FRA_SRC'])
self.assertEqual(24, rule['src_len'])
break
else:
self.fail('Rule added (192.168.0.1/24, table 10) not found')
def test_list_default_rules_ipv6(self):
rules_ipv6 = priv_ip_lib.list_ip_rules(self.namespace, 6)
self.assertEqual(2, len(rules_ipv6))
rule_tables = [255, 254]
for rule in rules_ipv6:
rule_tables.remove(rule['table'])
self.assertEqual(0, len(rule_tables))
def test_list_rules_ipv6(self):
self.ip_rule.rule.add('2001:db8::1/64', table=20)
rules_ipv6 = priv_ip_lib.list_ip_rules(self.namespace, 6)
for rule in rules_ipv6:
if rule['table'] == 20:
self.assertEqual('2001:db8::1', rule['attrs']['FRA_SRC'])
self.assertEqual(64, rule['src_len'])
break
else:
self.fail('Rule added (2001:db8::1/64, table 20) not found')

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import errno
import socket
@ -20,6 +21,7 @@ import mock
import netaddr
from neutron_lib import constants
from neutron_lib import exceptions
from oslo_utils import uuidutils
import pyroute2
from pyroute2.netlink.rtnl import ifinfmsg
from pyroute2.netlink.rtnl import ndmsg
@ -29,6 +31,7 @@ import testtools
from neutron.agent.common import utils # noqa
from neutron.agent.linux import ip_lib
from neutron.common import exceptions as n_exc
from neutron.common import utils as n_utils
from neutron import privileged
from neutron.privileged.agent.linux import ip_lib as priv_lib
from neutron.tests import base
@ -617,13 +620,23 @@ class TestIpRuleCommand(TestIPCmdBase):
def setUp(self):
super(TestIpRuleCommand, self).setUp()
self.parent._as_root.return_value = ''
self.parent.namespace = uuidutils.generate_uuid()
self.command = 'rule'
self.rule_cmd = ip_lib.IpRuleCommand(self.parent)
self._mock_priv_list_ip_rules = mock.patch.object(priv_lib,
'list_ip_rules')
self.mock_priv_list_ip_rules = self._mock_priv_list_ip_rules.start()
self.addCleanup(self._stop_mock)
def _stop_mock(self):
self._mock_priv_list_ip_rules.stop()
def _test_add_rule(self, ip, table, priority):
ip_version = netaddr.IPNetwork(ip).version
self.rule_cmd.add(ip, table=table, priority=priority)
self._assert_sudo([ip_version], (['show']))
with mock.patch.object(ip_lib, '_parse_ip_rule'):
self.rule_cmd.add(ip, table=table, priority=priority)
self.mock_priv_list_ip_rules.assert_called_once_with(
self.parent.namespace, n_utils.get_ip_version(ip))
self._assert_sudo([ip_version], ('add', 'from', ip,
'priority', str(priority),
'table', str(table),
@ -631,9 +644,13 @@ class TestIpRuleCommand(TestIPCmdBase):
def _test_add_rule_exists(self, ip, table, priority, output):
self.parent._as_root.return_value = output
ip_version = netaddr.IPNetwork(ip).version
self.rule_cmd.add(ip, table=table, priority=priority)
self._assert_sudo([ip_version], (['show']))
with mock.patch.object(self.rule_cmd, '_exists', return_value=True) \
as mock_exists:
self.rule_cmd.add(ip, table=table, priority=priority)
kwargs = {'from': ip, 'priority': str(priority),
'table': str(table), 'type': 'unicast'}
mock_exists.assert_called_once_with(n_utils.get_ip_version(ip),
**kwargs)
def _test_delete_rule(self, ip, table, priority):
ip_version = netaddr.IPNetwork(ip).version
@ -642,23 +659,6 @@ class TestIpRuleCommand(TestIPCmdBase):
('del', 'from', ip, 'priority', str(priority),
'table', str(table), 'type', 'unicast'))
def test__parse_line(self):
def test(ip_version, line, expected):
actual = self.rule_cmd._parse_line(ip_version, line)
self.assertEqual(expected, actual)
test(4, "4030201:\tfrom 1.2.3.4/24 lookup 10203040",
{'from': '1.2.3.4/24',
'table': '10203040',
'type': 'unicast',
'priority': '4030201'})
test(6, "1024: from all iif qg-c43b1928-48 lookup noscope",
{'priority': '1024',
'from': '::/0',
'type': 'unicast',
'iif': 'qg-c43b1928-48',
'table': 'noscope'})
def test__make_canonical_all_v4(self):
actual = self.rule_cmd._make_canonical(4, {'from': 'all'})
self.assertEqual({'from': '0.0.0.0/0', 'type': 'unicast'}, actual)
@ -1863,3 +1863,120 @@ class TestConntrack(base.BaseTestCase):
device.delete_socket_conntrack_state(ip_str, dport, protocol)
self.execute.assert_called_once_with(expect_cmd, check_exit_code=True,
extra_ok_codes=[1])
class ParseIpRuleTestCase(base.BaseTestCase):
BASE_RULE = {
'family': 2, 'dst_len': 0, 'res2': 0, 'tos': 0, 'res1': 0, 'flags': 0,
'header': {
'pid': 18152, 'length': 44, 'flags': 2, 'error': None, 'type': 32,
'sequence_number': 281},
'attrs': {'FRA_TABLE': 255, 'FRA_SUPPRESS_PREFIXLEN': 4294967295},
'table': 255, 'action': 1, 'src_len': 0, 'event': 'RTM_NEWRULE'}
def setUp(self):
super(ParseIpRuleTestCase, self).setUp()
self.rule = copy.deepcopy(self.BASE_RULE)
def test_parse_priority(self):
self.rule['attrs']['FRA_PRIORITY'] = 1000
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('1000', parsed_rule['priority'])
def test_parse_from_ipv4(self):
self.rule['attrs']['FRA_SRC'] = '192.168.0.1'
self.rule['src_len'] = 24
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('192.168.0.1/24', parsed_rule['from'])
def test_parse_from_ipv6(self):
self.rule['attrs']['FRA_SRC'] = '2001:db8::1'
self.rule['src_len'] = 64
parsed_rule = ip_lib._parse_ip_rule(self.rule, 6)
self.assertEqual('2001:db8::1/64', parsed_rule['from'])
def test_parse_from_any_ipv4(self):
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('0.0.0.0/0', parsed_rule['from'])
def test_parse_from_any_ipv6(self):
parsed_rule = ip_lib._parse_ip_rule(self.rule, 6)
self.assertEqual('::/0', parsed_rule['from'])
def test_parse_to_ipv4(self):
self.rule['attrs']['FRA_DST'] = '192.168.10.1'
self.rule['dst_len'] = 24
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('192.168.10.1/24', parsed_rule['to'])
def test_parse_to_ipv6(self):
self.rule['attrs']['FRA_DST'] = '2001:db8::1'
self.rule['dst_len'] = 64
parsed_rule = ip_lib._parse_ip_rule(self.rule, 6)
self.assertEqual('2001:db8::1/64', parsed_rule['to'])
def test_parse_to_none(self):
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertIsNone(parsed_rule.get('to'))
def test_parse_table(self):
self.rule['attrs']['FRA_TABLE'] = 255
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('local', parsed_rule['table'])
self.rule['attrs']['FRA_TABLE'] = 254
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('main', parsed_rule['table'])
self.rule['attrs']['FRA_TABLE'] = 253
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('default', parsed_rule['table'])
self.rule['attrs']['FRA_TABLE'] = 1000
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('1000', parsed_rule['table'])
def test_parse_fwmark(self):
self.rule['attrs']['FRA_FWMARK'] = 1000
self.rule['attrs']['FRA_FWMASK'] = 10
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('0x3e8/0xa', parsed_rule['fwmark'])
def test_parse_fwmark_none(self):
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertIsNone(parsed_rule.get('fwmark'))
def test_parse_iif(self):
self.rule['attrs']['FRA_IIFNAME'] = 'input_interface_name'
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('input_interface_name', parsed_rule['iif'])
def test_parse_iif_none(self):
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertIsNone(parsed_rule.get('iif'))
def test_parse_oif(self):
self.rule['attrs']['FRA_OIFNAME'] = 'output_interface_name'
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertEqual('output_interface_name', parsed_rule['oif'])
def test_parse_oif_none(self):
parsed_rule = ip_lib._parse_ip_rule(self.rule, 4)
self.assertIsNone(parsed_rule.get('oif'))
class ListIpRulesTestCase(base.BaseTestCase):
def test_list_ip_rules(self):
rule1 = {'family': 2, 'src_len': 24, 'action': 1,
'attrs': {'FRA_SRC': '10.0.0.1', 'FRA_TABLE': 100}}
rule2 = {'family': 2, 'src_len': 0, 'action': 6,
'attrs': {'FRA_TABLE': 255}}
rules = [rule1, rule2]
with mock.patch.object(priv_lib, 'list_ip_rules') as mock_list_rules:
mock_list_rules.return_value = rules
retval = ip_lib.list_ip_rules(mock.ANY, 4)
reference = [
{'type': 'unicast', 'from': '10.0.0.1/24', 'priority': '0',
'table': '100'},
{'type': 'blackhole', 'from': '0.0.0.0/0', 'priority': '0',
'table': 'local'}]
self.assertEqual(reference, retval)