diff --git a/neutron/agent/l3/dvr_fip_ns.py b/neutron/agent/l3/dvr_fip_ns.py index 8c1502cfed1..dad9fc0ae93 100644 --- a/neutron/agent/l3/dvr_fip_ns.py +++ b/neutron/agent/l3/dvr_fip_ns.py @@ -308,11 +308,9 @@ class FipNamespace(namespaces.Namespace): 'failed') # Now add the filter match rule for the table. - ip_rule = ip_lib.IPRule(namespace=self.get_name()) - ip_rule.rule.add(ip=str(fip_2_rtr.ip), - iif=fip_2_rtr_name, - table=rt_tbl_index, - priority=rt_tbl_index) + ip_lib.add_ip_rule(namespace=self.get_name(), ip=str(fip_2_rtr.ip), + iif=fip_2_rtr_name, table=rt_tbl_index, + priority=rt_tbl_index) def _update_gateway_port(self, agent_gateway_port, interface_name): if (self.agent_gateway_port and diff --git a/neutron/agent/l3/dvr_local_router.py b/neutron/agent/l3/dvr_local_router.py index c8a8670e2a7..6783a758345 100644 --- a/neutron/agent/l3/dvr_local_router.py +++ b/neutron/agent/l3/dvr_local_router.py @@ -149,10 +149,10 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): def _add_floating_ip_rule(self, floating_ip, fixed_ip): rule_pr = self.fip_ns.allocate_rule_priority(floating_ip) self.floating_ips_dict[floating_ip] = (fixed_ip, rule_pr) - ip_rule = ip_lib.IPRule(namespace=self.ns_name) - ip_rule.rule.add(ip=fixed_ip, - table=dvr_fip_ns.FIP_RT_TBL, - priority=rule_pr) + + ip_lib.add_ip_rule(namespace=self.ns_name, ip=fixed_ip, + table=dvr_fip_ns.FIP_RT_TBL, + priority=int(str(rule_pr))) def _remove_floating_ip_rule(self, floating_ip): if floating_ip in self.floating_ips_dict: @@ -160,7 +160,7 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): ip_rule = ip_lib.IPRule(namespace=self.ns_name) ip_rule.rule.delete(ip=fixed_ip, table=dvr_fip_ns.FIP_RT_TBL, - priority=rule_pr) + priority=int(str(rule_pr))) self.fip_ns.deallocate_rule_priority(floating_ip) # TODO(rajeev): Handle else case - exception/log? @@ -371,9 +371,10 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): if is_add: ns_ipd.route.add_gateway(gw_ip_addr, table=snat_idx) - ns_ipr.rule.add(ip=sn_port_cidr, - table=snat_idx, - priority=snat_idx) + ip_lib.add_ip_rule(namespace=self.ns_name, + ip=sn_port_cidr, + table=snat_idx, + priority=snat_idx) ip_lib.sysctl(cmd, namespace=self.ns_name) else: self._delete_gateway_device_if_exists(ns_ipd, @@ -688,12 +689,11 @@ class DvrLocalRouter(dvr_router_base.DvrRouterBase): device.route.add_route(rtr_port_cidr, str(rtr_2_fip_ip)) def _add_interface_routing_rule_to_router_ns(self, router_port): - ip_rule = ip_lib.IPRule(namespace=self.ns_name) for subnet in router_port['subnets']: rtr_port_cidr = subnet['cidr'] - ip_rule.rule.add(ip=rtr_port_cidr, - table=dvr_fip_ns.FIP_RT_TBL, - priority=dvr_fip_ns.FAST_PATH_EXIT_PR) + ip_lib.add_ip_rule(namespace=self.ns_name, ip=rtr_port_cidr, + table=dvr_fip_ns.FIP_RT_TBL, + priority=dvr_fip_ns.FAST_PATH_EXIT_PR) def _delete_interface_routing_rule_in_router_ns(self, router_port): ip_rule = ip_lib.IPRule(namespace=self.ns_name) diff --git a/neutron/agent/linux/ip_lib.py b/neutron/agent/linux/ip_lib.py index 3533afc2b54..353164b2ac4 100644 --- a/neutron/agent/linux/ip_lib.py +++ b/neutron/agent/linux/ip_lib.py @@ -51,7 +51,8 @@ RULE_TABLES = {'default': 253, # Rule indexes: pyroute2.netlink.rtnl # Rule names: https://www.systutorials.com/docs/linux/man/8-ip-rule/ # NOTE(ralonsoh): 'masquerade' type is printed as 'nat' in 'ip rule' command -RULE_TYPES = {1: 'unicast', +RULE_TYPES = {0: 'unspecified', + 1: 'unicast', 6: 'blackhole', 7: 'unreachable', 8: 'prohibit', @@ -439,30 +440,11 @@ class IpRuleCommand(IpCommandBase): return {k: str(v) for k, v in map(canonicalize, settings.items())} - def _exists(self, ip_version, **kwargs): - return kwargs in list_ip_rules(self._parent.namespace, ip_version) - def _make__flat_args_tuple(self, *args, **kwargs): for kwargs_item in sorted(kwargs.items(), key=lambda i: i[0]): args += kwargs_item return tuple(args) - def add(self, ip, **kwargs): - ip_version = common_utils.get_ip_version(ip) - - # In case we need to add a rule based on an incoming - # interface, pass the "any" IP address, for example, 0.0.0.0/0, - # else pass the given IP. - if kwargs.get('iif'): - kwargs.update({'from': constants.IP_ANY[ip_version]}) - else: - kwargs.update({'from': ip}) - canonical_kwargs = self._make_canonical(ip_version, kwargs) - - if not self._exists(ip_version, **canonical_kwargs): - args_tuple = self._make__flat_args_tuple('add', **canonical_kwargs) - self._as_root([ip_version], args_tuple) - def delete(self, ip, **kwargs): ip_version = common_utils.get_ip_version(ip) @@ -1364,3 +1346,70 @@ def list_ip_rules(namespace, ip_version): """ rules = privileged.list_ip_rules(namespace, ip_version) return [_parse_ip_rule(rule, ip_version) for rule in rules] + + +def _make_pyroute2_args(ip, iif, table, priority, to): + """Returns a dictionary of arguments to be used in pyroute rule commands + + :param ip: (string) source IP or CIDR address (IPv4, IPv6) + :param iif: (string) input interface name + :param table: (string, int) table number + :param priority: (string, int) rule priority + :param to: (string) destination IP or CIDR address (IPv4, IPv6) + :return: a dictionary with the kwargs needed in pyroute rule commands + """ + ip_version = common_utils.get_ip_version(ip) + # In case we need to add a rule based on an incoming interface, no + # IP address is given; the rule default source ("from") address is + # "all". + cmd_args = {'family': common_utils.get_socket_address_family(ip_version)} + if iif: + cmd_args['iifname'] = iif + else: + cmd_args['src'] = common_utils.cidr_to_ip(ip) + cmd_args['src_len'] = common_utils.cidr_mask(ip) + if to: + cmd_args['dst'] = common_utils.cidr_to_ip(to) + cmd_args['dst_len'] = common_utils.cidr_mask(to) + if table: + cmd_args['table'] = RULE_TABLES.get(int(table), int(table)) + if priority: + cmd_args['priority'] = int(priority) + return cmd_args + + +def _exist_ip_rule(rules, ip, iif, table, priority, to): + """Check if any rule matches the conditions""" + for rule in rules: + if iif and rule.get('iif') != iif: + continue + if not iif and rule['from'] != ip: + continue + if table and rule.get('table') != str(table): + continue + if priority and rule['priority'] != str(priority): + continue + if to and rule.get('to') != to: + continue + break + else: + return False + return True + + +def add_ip_rule(namespace, ip, iif=None, table=None, priority=None, to=None): + """Create an IP rule in a namespace + + :param namespace: (string) namespace name + :param ip: (string) source IP or CIDR address (IPv4, IPv6) + :param iif: (Optional) (string) input interface name + :param table: (Optional) (string, int) table number + :param priority: (Optional) (string, int) rule priority + :param to: (Optional) (string) destination IP or CIDR address (IPv4, IPv6) + """ + ip_version = common_utils.get_ip_version(ip) + rules = list_ip_rules(namespace, ip_version) + if _exist_ip_rule(rules, ip, iif, table, priority, to): + return + cmd_args = _make_pyroute2_args(ip, iif, table, priority, to) + privileged.add_ip_rule(namespace, **cmd_args) diff --git a/neutron/common/utils.py b/neutron/common/utils.py index c1160df0be7..e4c0137a980 100644 --- a/neutron/common/utils.py +++ b/neutron/common/utils.py @@ -25,6 +25,7 @@ import os.path import random import re import signal +import socket import sys import threading import time @@ -232,6 +233,14 @@ def cidr_to_ip(ip_cidr): return str(net.ip) +def cidr_mask(ip_cidr): + """Returns the subnet mask length from a cidr + + :param: An ipv4 or ipv6 cidr mask length + """ + return netaddr.IPNetwork(ip_cidr).netmask.netmask_bits() + + def fixed_ip_cidrs(fixed_ips): """Create a list of a port's fixed IPs in cidr notation. @@ -269,6 +278,18 @@ def ip_version_from_int(ip_version_int): raise ValueError(_('Illegal IP version number')) +def get_network_length(ip_version): + """Returns the network length depeding on the IP version""" + return (n_const.IPv4_BITS if ip_version == n_const.IP_VERSION_4 + else n_const.IPv6_BITS) + + +def get_socket_address_family(ip_version): + """Returns the address family depending on the IP version""" + return (int(socket.AF_INET if ip_version == n_const.IP_VERSION_4 + else socket.AF_INET6)) + + class DelayedStringRenderer(object): """Takes a callable and its args and calls when __str__ is called diff --git a/neutron/privileged/agent/linux/ip_lib.py b/neutron/privileged/agent/linux/ip_lib.py index 450d4a8a463..c71e3f7e2b8 100644 --- a/neutron/privileged/agent/linux/ip_lib.py +++ b/neutron/privileged/agent/linux/ip_lib.py @@ -15,6 +15,7 @@ import socket from neutron_lib import constants import pyroute2 +from pyroute2.netlink import exceptions as netlink_exceptions from pyroute2.netlink import rtnl from pyroute2.netlink.rtnl import ifinfmsg from pyroute2.netlink.rtnl import ndmsg @@ -488,3 +489,19 @@ def list_ip_rules(namespace, ip_version, match=None, **kwargs): if e.errno == errno.ENOENT: raise NetworkNamespaceNotFound(netns_name=namespace) raise + + +@privileged.default.entrypoint +def add_ip_rule(namespace, **kwargs): + """Add a new IP rule""" + try: + with _get_iproute(namespace) as ip: + ip.rule('add', **kwargs) + except netlink_exceptions.NetlinkError as e: + if e.code == errno.EEXIST: + return + raise + except OSError as e: + if e.errno == errno.ENOENT: + raise NetworkNamespaceNotFound(netns_name=namespace) + raise diff --git a/neutron/tests/functional/agent/linux/test_ip_lib.py b/neutron/tests/functional/agent/linux/test_ip_lib.py index 738971ff580..e8f5835b0a7 100644 --- a/neutron/tests/functional/agent/linux/test_ip_lib.py +++ b/neutron/tests/functional/agent/linux/test_ip_lib.py @@ -162,7 +162,8 @@ class IpLibTestCase(IpLibTestFramework): ip_rule = ip_lib.IPRule(namespace=device.namespace) for ip_version, test_case in test_cases.items(): for rule in test_case: - ip_rule.rule.add(table=TABLE, priority=PRIORITY, **rule) + ip_lib.add_ip_rule(namespace=device.namespace, table=TABLE, + priority=PRIORITY, **rule) rules = ip_lib.list_ip_rules(ip_rule.namespace, ip_version) for expected_rule in expected_rules[ip_version]: diff --git a/neutron/tests/functional/privileged/agent/linux/test_ip_lib.py b/neutron/tests/functional/privileged/agent/linux/test_ip_lib.py index bb80f8c5c9b..d9a1b0491c6 100644 --- a/neutron/tests/functional/privileged/agent/linux/test_ip_lib.py +++ b/neutron/tests/functional/privileged/agent/linux/test_ip_lib.py @@ -13,8 +13,10 @@ # under the License. from oslo_utils import uuidutils +import testtools from neutron.agent.linux import ip_lib +from neutron.common import utils as common_utils from neutron.privileged.agent.linux import ip_lib as priv_ip_lib from neutron.tests.functional import base as functional_base @@ -69,7 +71,7 @@ class ListIpRulesTestCase(functional_base.BaseSudoTestCase): self.assertEqual(0, len(rule_tables)) def test_list_rules_ipv4(self): - self.ip_rule.rule.add('192.168.0.1/24', table=10) + ip_lib.add_ip_rule(self.namespace, '192.168.0.1/24', table=10) rules_ipv4 = priv_ip_lib.list_ip_rules(self.namespace, 4) for rule in rules_ipv4: if rule['table'] == 10: @@ -88,7 +90,7 @@ class ListIpRulesTestCase(functional_base.BaseSudoTestCase): self.assertEqual(0, len(rule_tables)) def test_list_rules_ipv6(self): - self.ip_rule.rule.add('2001:db8::1/64', table=20) + ip_lib.add_ip_rule(self.namespace, '2001:db8::1/64', table=20) rules_ipv6 = priv_ip_lib.list_ip_rules(self.namespace, 6) for rule in rules_ipv6: if rule['table'] == 20: @@ -97,3 +99,103 @@ class ListIpRulesTestCase(functional_base.BaseSudoTestCase): break else: self.fail('Rule added (2001:db8::1/64, table 20) not found') + + +class RuleTestCase(functional_base.BaseSudoTestCase): + + def setUp(self): + super(RuleTestCase, self).setUp() + self.namespace = 'ns_test-' + uuidutils.generate_uuid() + self.ns = priv_ip_lib.create_netns(self.namespace) + self.addCleanup(self._remove_ns) + + def _remove_ns(self): + priv_ip_lib.remove_netns(self.namespace) + + def _check_rules(self, rules, parameters, values, exception_string): + for rule in rules: + if all(rule.get(parameter) == value + for parameter, value in zip(parameters, values)): + break + else: + self.fail('Rule with %s was expected' % exception_string) + + def test_add_rule_ip(self): + ip_addresses = ['192.168.200.250', '2001::250'] + for ip_address in ip_addresses: + ip_version = common_utils.get_ip_version(ip_address) + ip_lenght = common_utils.get_network_length(ip_version) + ip_family = common_utils.get_socket_address_family(ip_version) + priv_ip_lib.add_ip_rule(self.namespace, src=ip_address, + src_len=ip_lenght, family=ip_family) + rules = ip_lib.list_ip_rules(self.namespace, ip_version) + self._check_rules(rules, ['from'], [ip_address], + '"from" IP address %s' % ip_address) + + def test_add_rule_iif(self): + iif = 'iif_device_1' + priv_ip_lib.create_interface(iif, self.namespace, 'dummy') + priv_ip_lib.add_ip_rule(self.namespace, iifname=iif) + rules = ip_lib.list_ip_rules(self.namespace, 4) + self._check_rules(rules, ['iif'], [iif], 'iif name %s' % iif) + + def test_add_rule_table(self): + table = 212 + ip_addresses = ['192.168.200.251', '2001::251'] + for ip_address in ip_addresses: + ip_version = common_utils.get_ip_version(ip_address) + ip_lenght = common_utils.get_network_length(ip_version) + ip_family = common_utils.get_socket_address_family(ip_version) + priv_ip_lib.add_ip_rule(self.namespace, table=table, + src=ip_address, src_len=ip_lenght, + family=ip_family) + rules = ip_lib.list_ip_rules(self.namespace, ip_version) + self._check_rules( + rules, ['table', 'from'], [str(table), ip_address], + 'table %s and "from" IP address %s' % (table, ip_address)) + + def test_add_rule_priority(self): + priority = 12345 + ip_addresses = ['192.168.200.252', '2001::252'] + for ip_address in ip_addresses: + ip_version = common_utils.get_ip_version(ip_address) + ip_lenght = common_utils.get_network_length(ip_version) + ip_family = common_utils.get_socket_address_family(ip_version) + priv_ip_lib.add_ip_rule(self.namespace, priority=priority, + src=ip_address, src_len=ip_lenght, + family=ip_family) + rules = ip_lib.list_ip_rules(self.namespace, ip_version) + self._check_rules( + rules, ['priority', 'from'], [str(priority), ip_address], + 'priority %s and "from" IP address %s' % + (priority, ip_address)) + + def test_add_rule_priority_table_iif(self): + table = 213 + priority = 12346 + iif = 'iif_device_2' + priv_ip_lib.create_interface(iif, self.namespace, 'dummy') + priv_ip_lib.add_ip_rule(self.namespace, priority=priority, iifname=iif, + table=table) + + rules = ip_lib.list_ip_rules(self.namespace, 4) + self._check_rules( + rules, ['priority', 'iif', 'table'], + [str(priority), iif, str(table)], + 'priority %s, table %s and iif name %s' % (priority, table, iif)) + + @testtools.skip('https://github.com/svinota/pyroute2/issues/566') + def test_add_rule_exists(self): + iif = 'iif_device_1' + priv_ip_lib.create_interface(iif, self.namespace, 'dummy') + priv_ip_lib.add_ip_rule(self.namespace, iifname=iif) + rules = ip_lib.list_ip_rules(self.namespace, 4) + self._check_rules(rules, ['iif'], [iif], 'iif name %s' % iif) + self.assertEqual(4, len(rules)) + + # pyroute2.netlink.exceptions.NetlinkError(17, 'File exists') + # exception is catch. + priv_ip_lib.add_ip_rule(self.namespace, iifname=iif) + rules = ip_lib.list_ip_rules(self.namespace, 4) + self._check_rules(rules, ['iif'], [iif], 'iif name %s' % iif) + self.assertEqual(4, len(rules)) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index 8f3887bdf60..b812a84748c 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -45,6 +45,7 @@ from neutron.agent.l3 import namespaces from neutron.agent.l3 import router_info as l3router from neutron.agent.linux import dibbler from neutron.agent.linux import interface +from neutron.agent.linux import ip_lib from neutron.agent.linux import iptables_manager from neutron.agent.linux import pd from neutron.agent.linux import ra @@ -137,6 +138,9 @@ class BasicRouterOperationsFramework(base.BaseTestCase): self.mock_rule = mock.MagicMock() ip_rule.return_value = self.mock_rule + self.mock_add_ip_rule = mock.patch.object(ip_lib, + 'add_ip_rule').start() + ip_dev = mock.patch('neutron.agent.linux.ip_lib.IPDevice').start() self.mock_ip_dev = mock.MagicMock() ip_dev.return_value = self.mock_ip_dev diff --git a/neutron/tests/unit/agent/l3/test_dvr_local_router.py b/neutron/tests/unit/agent/l3/test_dvr_local_router.py index b2300fa9bea..7fa459692b7 100644 --- a/neutron/tests/unit/agent/l3/test_dvr_local_router.py +++ b/neutron/tests/unit/agent/l3/test_dvr_local_router.py @@ -304,8 +304,9 @@ class TestDvrRouterOperations(base.BaseTestCase): @mock.patch.object(ip_lib, 'send_ip_addr_adv_notif') @mock.patch.object(ip_lib, 'IPDevice') - @mock.patch.object(ip_lib, 'IPRule') - def test_floating_ip_added_dist(self, mIPRule, mIPDevice, mock_adv_notif): + @mock.patch.object(ip_lib, 'add_ip_rule') + def test_floating_ip_added_dist(self, mock_add_ip_rule, mIPDevice, + mock_adv_notif): router = mock.MagicMock() ri = self._create_router(router) ri.ex_gw_port = ri.router['gw_port'] @@ -340,18 +341,18 @@ class TestDvrRouterOperations(base.BaseTestCase): ri.fip_ns.local_subnets.allocate.return_value = subnet ip_cidr = common_utils.ip_to_cidr(fip['floating_ip_address']) ri.floating_ip_added_dist(fip, ip_cidr) - mIPRule().rule.add.assert_called_with(ip='192.168.0.1', - table=16, - priority=FIP_PRI) + mock_add_ip_rule.assert_called_with( + namespace=ri.router_namespace.name, ip='192.168.0.1', + table=16, priority=FIP_PRI) ri.fip_ns.local_subnets.allocate.assert_not_called() # Validate that fip_ns.local_subnets is called when # ri.rtr_fip_subnet is None ri.rtr_fip_subnet = None ri.floating_ip_added_dist(fip, ip_cidr) - mIPRule().rule.add.assert_called_with(ip='192.168.0.1', - table=16, - priority=FIP_PRI) + mock_add_ip_rule.assert_called_with( + namespace=ri.router_namespace.name, ip='192.168.0.1', + table=16, priority=FIP_PRI) ri.fip_ns.local_subnets.allocate.assert_called_once_with(ri.router_id) # TODO(mrsmith): add more asserts @@ -390,7 +391,8 @@ class TestDvrRouterOperations(base.BaseTestCase): ri.fip_ns.local_subnets.allocate.assert_not_called() @mock.patch.object(ip_lib, 'IPRule') - def test_floating_ip_moved_dist(self, mIPRule): + @mock.patch.object(ip_lib, 'add_ip_rule') + def test_floating_ip_moved_dist(self, mock_add_ip_rule, mIPRule): router = mock.MagicMock() ri = self._create_router(router) floating_ip_address = '15.1.2.3' @@ -408,9 +410,9 @@ class TestDvrRouterOperations(base.BaseTestCase): floating_ip_address) ri.fip_ns.allocate_rule_priority.assert_called_once_with( floating_ip_address) - mIPRule().rule.add.assert_called_with(ip=fixed_ip, - table=16, - priority=FIP_PRI) + mock_add_ip_rule.assert_called_with( + namespace=ri.router_namespace.name, ip=fixed_ip, + table=16, priority=FIP_PRI) def _test_add_floating_ip(self, ri, fip, is_failure=False): if not is_failure: diff --git a/neutron/tests/unit/agent/linux/test_ip_lib.py b/neutron/tests/unit/agent/linux/test_ip_lib.py index d1fb4dbf180..adaaac081a0 100644 --- a/neutron/tests/unit/agent/linux/test_ip_lib.py +++ b/neutron/tests/unit/agent/linux/test_ip_lib.py @@ -31,7 +31,7 @@ import testtools from neutron.agent.common import utils # noqa from neutron.agent.linux import ip_lib from neutron.common import exceptions as n_exc -from neutron.common import utils as n_utils +from neutron.common import utils as common_utils from neutron import privileged from neutron.privileged.agent.linux import ip_lib as priv_lib from neutron.tests import base @@ -150,20 +150,6 @@ SUBNET_SAMPLE1 = ("10.0.0.0/24 dev qr-23380d11-d2 scope link src 10.0.0.1\n" SUBNET_SAMPLE2 = ("10.0.0.0/24 dev tap1d7888a7-10 scope link src 10.0.0.2\n" "10.0.0.0/24 dev qr-23380d11-d2 scope link src 10.0.0.1") -RULE_V4_SAMPLE = (""" -0: from all lookup local -32766: from all lookup main -32767: from all lookup default -101: from 192.168.45.100 lookup 2 -""") - -RULE_V6_SAMPLE = (""" -0: from all lookup local -32766: from all lookup main -32767: from all lookup default -201: from 2001:db8::1 lookup 3 -""") - class TestSubProcessBase(base.BaseTestCase): def setUp(self): @@ -631,16 +617,22 @@ class TestIpRuleCommand(TestIPCmdBase): def _stop_mock(self): self._mock_priv_list_ip_rules.stop() - def _test_add_rule(self, ip, table, priority): + def _test_add_rule(self, ip, iif, table, priority): ip_version = netaddr.IPNetwork(ip).version - with mock.patch.object(ip_lib, '_parse_ip_rule'): - self.rule_cmd.add(ip, table=table, priority=priority) - self.mock_priv_list_ip_rules.assert_called_once_with( - self.parent.namespace, n_utils.get_ip_version(ip)) - self._assert_sudo([ip_version], ('add', 'from', ip, - 'priority', str(priority), - 'table', str(table), - 'type', 'unicast')) + ip_family = common_utils.get_socket_address_family(ip_version) + cmd_args = {'table': table, + 'priority': priority, + 'family': ip_family} + if iif: + cmd_args['iifname'] = iif + else: + cmd_args['src'] = ip + cmd_args['src_len'] = common_utils.get_network_length(ip_version) + + with mock.patch.object(priv_lib, 'add_ip_rule') as mock_add_ip_rule: + ip_lib.add_ip_rule('namespace', ip, iif=iif, table=table, + priority=priority) + mock_add_ip_rule.assert_called_once_with('namespace', **cmd_args) def _test_add_rule_exists(self, ip, table, priority, output): self.parent._as_root.return_value = output @@ -649,8 +641,8 @@ class TestIpRuleCommand(TestIPCmdBase): self.rule_cmd.add(ip, table=table, priority=priority) kwargs = {'from': ip, 'priority': str(priority), 'table': str(table), 'type': 'unicast'} - mock_exists.assert_called_once_with(n_utils.get_ip_version(ip), - **kwargs) + mock_exists.assert_called_once_with( + common_utils.get_ip_version(ip), **kwargs) def _test_delete_rule(self, ip, table, priority): ip_version = netaddr.IPNetwork(ip).version @@ -694,16 +686,13 @@ class TestIpRuleCommand(TestIPCmdBase): self.assertEqual({'fwmark': '0x400/0xffff', 'type': 'unicast'}, actual) def test_add_rule_v4(self): - self._test_add_rule('192.168.45.100', 2, 100) + self._test_add_rule('192.168.45.100', None, 2, 100) - def test_add_rule_v4_exists(self): - self._test_add_rule_exists('192.168.45.100', 2, 101, RULE_V4_SAMPLE) + def test_add_rule_v4_iif(self): + self._test_add_rule('192.168.45.100', 'iif_name', 2, 100) def test_add_rule_v6(self): - self._test_add_rule('2001:db8::1', 3, 200) - - def test_add_rule_v6_exists(self): - self._test_add_rule_exists('2001:db8::1', 3, 201, RULE_V6_SAMPLE) + self._test_add_rule('2001:db8::1', None, 3, 200) def test_delete_rule_v4(self): self._test_delete_rule('192.168.45.100', 2, 100)