Switch ip addr add/del/flush commands to use pyroute2
This patch replaces usage of "ip addr add/del/flush" commands
with pyroute2 library.
This also switches from rootwrap to privsep when doing those
actions.
This patch adds also UT for _run_iproute_neigh/addr functions
from privileged module.
Change-Id: I6f4df391ec1899f8a4b10b50735dc9a79fa8483f
Related-Bug: #1492714
(cherry picked from commit e0223edf88
)
This commit is contained in:
parent
3f8ee68f11
commit
3f7ff45e73
|
@ -587,22 +587,15 @@ class IpAddrCommand(IpDeviceCommandBase):
|
|||
COMMAND = 'addr'
|
||||
|
||||
def add(self, cidr, scope='global', add_broadcast=True):
|
||||
net = netaddr.IPNetwork(cidr)
|
||||
args = ['add', cidr,
|
||||
'scope', scope,
|
||||
'dev', self.name]
|
||||
if add_broadcast and net.version == 4:
|
||||
args += ['brd', str(net[-1])]
|
||||
self._as_root([net.version], tuple(args))
|
||||
add_ip_address(self.name, self._parent.namespace, cidr, scope,
|
||||
add_broadcast)
|
||||
|
||||
def delete(self, cidr):
|
||||
ip_version = common_utils.get_ip_version(cidr)
|
||||
self._as_root([ip_version],
|
||||
('del', cidr,
|
||||
'dev', self.name))
|
||||
delete_ip_address(self.name, self._parent.namespace, cidr)
|
||||
|
||||
def flush(self, ip_version):
|
||||
self._as_root([ip_version], ('flush', self.name))
|
||||
flush_ip_addresses(
|
||||
self.name, self._parent.namespace, ip_version)
|
||||
|
||||
def get_devices_with_ip(self, name=None, scope=None, to=None,
|
||||
filters=None, ip_version=None):
|
||||
|
@ -960,6 +953,30 @@ NetworkNamespaceNotFound = privileged.NetworkNamespaceNotFound
|
|||
NetworkInterfaceNotFound = privileged.NetworkInterfaceNotFound
|
||||
|
||||
|
||||
def add_ip_address(device, namespace, cidr, scope='global',
|
||||
add_broadcast=True):
|
||||
net = netaddr.IPNetwork(cidr)
|
||||
broadcast = None
|
||||
if add_broadcast and net.version == 4:
|
||||
# NOTE(slaweq): in case if cidr is /32 net.broadcast is None so
|
||||
# same IP address as cidr should be set as broadcast
|
||||
broadcast = str(net.broadcast or net.ip)
|
||||
privileged.add_ip_address(
|
||||
net.version, str(net.ip), net.prefixlen,
|
||||
device, namespace, scope, broadcast)
|
||||
|
||||
|
||||
def delete_ip_address(device, namespace, cidr):
|
||||
net = netaddr.IPNetwork(cidr)
|
||||
privileged.delete_ip_address(
|
||||
net.version, str(net.ip), net.prefixlen, device, namespace)
|
||||
|
||||
|
||||
def flush_ip_addresses(device, namespace, ip_version):
|
||||
privileged.flush_ip_addresses(
|
||||
ip_version, device, namespace)
|
||||
|
||||
|
||||
def get_routing_table(ip_version, namespace=None):
|
||||
"""Return a list of dictionaries, each representing a route.
|
||||
|
||||
|
|
|
@ -29,7 +29,11 @@ _IP_VERSION_FAMILY_MAP = {4: socket.AF_INET, 6: socket.AF_INET6}
|
|||
def _get_scope_name(scope):
|
||||
"""Return the name of the scope (given as a number), or the scope number
|
||||
if the name is unknown.
|
||||
|
||||
For backward compatibility (with "ip" tool) "global" scope is converted to
|
||||
"universe" before converting to number
|
||||
"""
|
||||
scope = 'universe' if scope == 'global' else scope
|
||||
return rtnl.rt_scope.get(scope, scope)
|
||||
|
||||
|
||||
|
@ -86,7 +90,7 @@ def _get_iproute(namespace):
|
|||
return pyroute2.IPRoute()
|
||||
|
||||
|
||||
def _run_iproute(command, device, namespace, **kwargs):
|
||||
def _run_iproute_neigh(command, device, namespace, **kwargs):
|
||||
try:
|
||||
with _get_iproute(namespace) as ip:
|
||||
idx = ip.link_lookup(ifname=device)[0]
|
||||
|
@ -102,6 +106,65 @@ def _run_iproute(command, device, namespace, **kwargs):
|
|||
raise
|
||||
|
||||
|
||||
def _run_iproute_addr(command, device, namespace, **kwargs):
|
||||
try:
|
||||
with _get_iproute(namespace) as ip:
|
||||
idx = ip.link_lookup(ifname=device)[0]
|
||||
return ip.addr(command, index=idx, **kwargs)
|
||||
except IndexError:
|
||||
msg = _("Network interface %(device)s not found in namespace "
|
||||
"%(namespace)s.") % {'device': device,
|
||||
'namespace': namespace}
|
||||
raise NetworkInterfaceNotFound(msg)
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
def add_ip_address(ip_version, ip, prefixlen, device, namespace, scope,
|
||||
broadcast=None):
|
||||
family = _IP_VERSION_FAMILY_MAP[ip_version]
|
||||
_run_iproute_addr('add',
|
||||
device,
|
||||
namespace,
|
||||
address=ip,
|
||||
mask=prefixlen,
|
||||
family=family,
|
||||
broadcast=broadcast,
|
||||
scope=_get_scope_name(scope))
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
def delete_ip_address(ip_version, ip, prefixlen, device, namespace):
|
||||
family = _IP_VERSION_FAMILY_MAP[ip_version]
|
||||
_run_iproute_addr("delete",
|
||||
device,
|
||||
namespace,
|
||||
address=ip,
|
||||
mask=prefixlen,
|
||||
family=family)
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
def flush_ip_addresses(ip_version, device, namespace):
|
||||
family = _IP_VERSION_FAMILY_MAP[ip_version]
|
||||
try:
|
||||
with _get_iproute(namespace) as ip:
|
||||
idx = ip.link_lookup(ifname=device)[0]
|
||||
ip.flush_addr(index=idx, family=family)
|
||||
except IndexError:
|
||||
msg = _("Network interface %(device)s not found in namespace "
|
||||
"%(namespace)s.") % {'device': device,
|
||||
'namespace': namespace}
|
||||
raise NetworkInterfaceNotFound(msg)
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise NetworkNamespaceNotFound(netns_name=namespace)
|
||||
raise
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
def open_namespace(namespace):
|
||||
"""Open namespace to test if the namespace is ready to be manipulated"""
|
||||
|
@ -120,14 +183,14 @@ def add_neigh_entry(ip_version, ip_address, mac_address, device, namespace,
|
|||
:param namespace: The name of the namespace in which to add the entry
|
||||
"""
|
||||
family = _IP_VERSION_FAMILY_MAP[ip_version]
|
||||
_run_iproute('replace',
|
||||
device,
|
||||
namespace,
|
||||
dst=ip_address,
|
||||
lladdr=mac_address,
|
||||
family=family,
|
||||
state=ndmsg.states['permanent'],
|
||||
**kwargs)
|
||||
_run_iproute_neigh('replace',
|
||||
device,
|
||||
namespace,
|
||||
dst=ip_address,
|
||||
lladdr=mac_address,
|
||||
family=family,
|
||||
state=ndmsg.states['permanent'],
|
||||
**kwargs)
|
||||
|
||||
|
||||
@privileged.default.entrypoint
|
||||
|
@ -142,13 +205,13 @@ def delete_neigh_entry(ip_version, ip_address, mac_address, device, namespace,
|
|||
"""
|
||||
family = _IP_VERSION_FAMILY_MAP[ip_version]
|
||||
try:
|
||||
_run_iproute('delete',
|
||||
device,
|
||||
namespace,
|
||||
dst=ip_address,
|
||||
lladdr=mac_address,
|
||||
family=family,
|
||||
**kwargs)
|
||||
_run_iproute_neigh('delete',
|
||||
device,
|
||||
namespace,
|
||||
dst=ip_address,
|
||||
lladdr=mac_address,
|
||||
family=family,
|
||||
**kwargs)
|
||||
except NetlinkError as e:
|
||||
# trying to delete a non-existent entry shouldn't raise an error
|
||||
if e.code == errno.ENOENT:
|
||||
|
@ -171,11 +234,11 @@ def dump_neigh_entries(ip_version, device, namespace, **kwargs):
|
|||
"""
|
||||
family = _IP_VERSION_FAMILY_MAP[ip_version]
|
||||
entries = []
|
||||
dump = _run_iproute('dump',
|
||||
device,
|
||||
namespace,
|
||||
family=family,
|
||||
**kwargs)
|
||||
dump = _run_iproute_neigh('dump',
|
||||
device,
|
||||
namespace,
|
||||
family=family,
|
||||
**kwargs)
|
||||
|
||||
for entry in dump:
|
||||
attrs = dict(entry['attrs'])
|
||||
|
|
|
@ -842,37 +842,57 @@ class TestIpAddrCommand(TestIPCmdBase):
|
|||
self.command = 'addr'
|
||||
self.addr_cmd = ip_lib.IpAddrCommand(self.parent)
|
||||
|
||||
def test_add_address(self):
|
||||
@mock.patch.object(priv_lib, 'add_ip_address')
|
||||
def test_add_address(self, add):
|
||||
self.addr_cmd.add('192.168.45.100/24')
|
||||
self._assert_sudo([4],
|
||||
('add', '192.168.45.100/24',
|
||||
'scope', 'global',
|
||||
'dev', 'tap0',
|
||||
'brd', '192.168.45.255'))
|
||||
add.assert_called_once_with(
|
||||
4,
|
||||
'192.168.45.100',
|
||||
24,
|
||||
self.parent.name,
|
||||
self.addr_cmd._parent.namespace,
|
||||
'global',
|
||||
'192.168.45.255')
|
||||
|
||||
def test_add_address_scoped(self):
|
||||
@mock.patch.object(priv_lib, 'add_ip_address')
|
||||
def test_add_address_scoped(self, add):
|
||||
self.addr_cmd.add('192.168.45.100/24', scope='link')
|
||||
self._assert_sudo([4],
|
||||
('add', '192.168.45.100/24',
|
||||
'scope', 'link',
|
||||
'dev', 'tap0',
|
||||
'brd', '192.168.45.255'))
|
||||
add.assert_called_once_with(
|
||||
4,
|
||||
'192.168.45.100',
|
||||
24,
|
||||
self.parent.name,
|
||||
self.addr_cmd._parent.namespace,
|
||||
'link',
|
||||
'192.168.45.255')
|
||||
|
||||
def test_add_address_no_broadcast(self):
|
||||
@mock.patch.object(priv_lib, 'add_ip_address')
|
||||
def test_add_address_no_broadcast(self, add):
|
||||
self.addr_cmd.add('192.168.45.100/24', add_broadcast=False)
|
||||
self._assert_sudo([4],
|
||||
('add', '192.168.45.100/24',
|
||||
'scope', 'global',
|
||||
'dev', 'tap0'))
|
||||
add.assert_called_once_with(
|
||||
4,
|
||||
'192.168.45.100',
|
||||
24,
|
||||
self.parent.name,
|
||||
self.addr_cmd._parent.namespace,
|
||||
'global',
|
||||
None)
|
||||
|
||||
def test_del_address(self):
|
||||
@mock.patch.object(priv_lib, 'delete_ip_address')
|
||||
def test_del_address(self, delete):
|
||||
self.addr_cmd.delete('192.168.45.100/24')
|
||||
self._assert_sudo([4],
|
||||
('del', '192.168.45.100/24', 'dev', 'tap0'))
|
||||
delete.assert_called_once_with(
|
||||
4,
|
||||
'192.168.45.100',
|
||||
24,
|
||||
self.parent.name,
|
||||
self.addr_cmd._parent.namespace)
|
||||
|
||||
def test_flush(self):
|
||||
@mock.patch.object(priv_lib, 'flush_ip_addresses')
|
||||
def test_flush(self, flush):
|
||||
self.addr_cmd.flush(6)
|
||||
self._assert_sudo([6], ('flush', 'tap0'))
|
||||
flush.assert_called_once_with(
|
||||
6, self.parent.name, self.addr_cmd._parent.namespace)
|
||||
|
||||
def test_list(self):
|
||||
expected = [
|
||||
|
@ -1666,7 +1686,7 @@ class TestIpNeighCommand(TestIPCmdBase):
|
|||
family=2,
|
||||
ifindex=1)
|
||||
|
||||
@mock.patch.object(priv_lib, '_run_iproute')
|
||||
@mock.patch.object(priv_lib, '_run_iproute_neigh')
|
||||
def test_delete_entry_not_exist(self, mock_run_iproute):
|
||||
# trying to delete a non-existent entry shouldn't raise an error
|
||||
mock_run_iproute.side_effect = NetlinkError(errno.ENOENT, None)
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import errno
|
||||
|
||||
import mock
|
||||
import pyroute2
|
||||
|
||||
from neutron.privileged.agent.linux import ip_lib as priv_lib
|
||||
from neutron.tests import base
|
||||
|
||||
|
||||
class IpLibTestCase(base.BaseTestCase):
|
||||
|
||||
def _test_run_iproute_neigh(self, namespace=None):
|
||||
ip_obj = "NetNS" if namespace else "IPRoute"
|
||||
with mock.patch.object(pyroute2, ip_obj) as ip_mock_cls:
|
||||
ip_mock = ip_mock_cls()
|
||||
ip_mock.__enter__().link_lookup.return_value = [2]
|
||||
priv_lib._run_iproute_neigh("test_cmd", "eth0", namespace,
|
||||
test_param="test_value")
|
||||
ip_mock.assert_has_calls([
|
||||
mock.call.__enter__().link_lookup(ifname="eth0"),
|
||||
mock.call.__enter__().neigh("test_cmd", ifindex=2,
|
||||
test_param="test_value")])
|
||||
|
||||
def test_run_iproute_neigh_no_namespace(self):
|
||||
self._test_run_iproute_neigh()
|
||||
|
||||
def test_run_iproute_neigh_in_namespace(self):
|
||||
self._test_run_iproute_neigh(namespace="testns")
|
||||
|
||||
def test_run_iproute_neigh_interface_not_exists(self):
|
||||
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
|
||||
ip_mock = iproute_mock()
|
||||
ip_mock.__enter__().link_lookup.return_value = []
|
||||
self.assertRaises(
|
||||
priv_lib.NetworkInterfaceNotFound,
|
||||
priv_lib._run_iproute_neigh,
|
||||
"test_cmd", "eth0", None, test_param="test_value")
|
||||
|
||||
def test_run_iproute_neigh_namespace_not_exists(self):
|
||||
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
|
||||
iproute_mock.side_effect = OSError(
|
||||
errno.ENOENT, "Test no netns exception")
|
||||
self.assertRaises(
|
||||
priv_lib.NetworkNamespaceNotFound,
|
||||
priv_lib._run_iproute_neigh,
|
||||
"test_cmd", "eth0", None, test_param="test_value")
|
||||
|
||||
def test_run_iproute_neigh_error(self):
|
||||
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
|
||||
iproute_mock.side_effect = OSError(
|
||||
errno.EINVAL, "Test invalid argument exception")
|
||||
try:
|
||||
priv_lib._run_iproute_neigh(
|
||||
"test_cmd", "eth0", None, test_param="test_value")
|
||||
self.fail("OSError exception not raised")
|
||||
except OSError as e:
|
||||
self.assertEqual(errno.EINVAL, e.errno)
|
||||
|
||||
def _test_run_iproute_addr(self, namespace=None):
|
||||
ip_obj = "NetNS" if namespace else "IPRoute"
|
||||
with mock.patch.object(pyroute2, ip_obj) as ip_mock_cls:
|
||||
ip_mock = ip_mock_cls()
|
||||
ip_mock.__enter__().link_lookup.return_value = [2]
|
||||
priv_lib._run_iproute_addr("test_cmd", "eth0", namespace,
|
||||
test_param="test_value")
|
||||
ip_mock.assert_has_calls([
|
||||
mock.call.__enter__().link_lookup(ifname="eth0"),
|
||||
mock.call.__enter__().addr("test_cmd", index=2,
|
||||
test_param="test_value")])
|
||||
|
||||
def test_run_iproute_addr_no_namespace(self):
|
||||
self._test_run_iproute_addr()
|
||||
|
||||
def test_run_iproute_addr_in_namespace(self):
|
||||
self._test_run_iproute_addr(namespace="testns")
|
||||
|
||||
def test_run_iproute_addr_interface_not_exists(self):
|
||||
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
|
||||
ip_mock = iproute_mock()
|
||||
ip_mock.__enter__().link_lookup.return_value = []
|
||||
self.assertRaises(
|
||||
priv_lib.NetworkInterfaceNotFound,
|
||||
priv_lib._run_iproute_addr,
|
||||
"test_cmd", "eth0", None, test_param="test_value")
|
||||
|
||||
def test_run_iproute_addr_namespace_not_exists(self):
|
||||
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
|
||||
iproute_mock.side_effect = OSError(
|
||||
errno.ENOENT, "Test no netns exception")
|
||||
self.assertRaises(
|
||||
priv_lib.NetworkNamespaceNotFound,
|
||||
priv_lib._run_iproute_addr,
|
||||
"test_cmd", "eth0", None, test_param="test_value")
|
||||
|
||||
def test_run_iproute_addr_error(self):
|
||||
with mock.patch.object(pyroute2, "IPRoute") as iproute_mock:
|
||||
iproute_mock.side_effect = OSError(
|
||||
errno.EINVAL, "Test invalid argument exception")
|
||||
try:
|
||||
priv_lib._run_iproute_addr(
|
||||
"test_cmd", "eth0", None, test_param="test_value")
|
||||
self.fail("OSError exception not raised")
|
||||
except OSError as e:
|
||||
self.assertEqual(errno.EINVAL, e.errno)
|
Loading…
Reference in New Issue