Prevent pyroute2.IPDB threads leaking

pyroute2.IPDB is an interesting entity. It's much more than a simple
interface to `ip` command, as it is more similar to a database process.
When created IPDB object spawns a thread that will be responsible for
updating the object with changes to the underlying OS. Thread stays up
until the user will call `release()` method.

Turns out code in kuryr_kubernetes.cni.binding wasn't taking that into
account and was slowly leaking threads (and possibly processes). This
became apparent when running Kuryr with CNI daemon enabled.

This commit fixes the problem by switching all IPDB usages to context
managers, so `release()` method is called automatically.  Also the IPDB
objects cache is removed as already released IPDB objects cannot be
reused. kuryr_kubernetes.cni.binding modules were missing unit tests,
this commit adds them as well.

Change-Id: I82afda3f217dac56228677bb66703c3d80e5d751
Closes-Bug: 1728996
This commit is contained in:
Michał Dulko 2017-11-02 19:30:51 +01:00
parent 00c5f00485
commit 1c2320e11e
7 changed files with 304 additions and 63 deletions

View File

@ -18,7 +18,6 @@ import pyroute2
from stevedore import driver as stv_driver
_BINDING_NAMESPACE = 'kuryr_kubernetes.cni.binding'
_IPDB = {}
def _get_binding_driver(vif):
@ -29,14 +28,10 @@ def _get_binding_driver(vif):
def get_ipdb(netns=None):
try:
return _IPDB[netns]
except KeyError:
if netns:
ipdb = pyroute2.IPDB(nl=pyroute2.NetNS(netns))
else:
ipdb = pyroute2.IPDB()
_IPDB[netns] = ipdb
if netns:
ipdb = pyroute2.IPDB(nl=pyroute2.NetNS(netns))
else:
ipdb = pyroute2.IPDB()
return ipdb
@ -56,21 +51,23 @@ def _enable_ipv6(netns):
def _configure_l3(vif, ifname, netns):
with get_ipdb(netns).interfaces[ifname] as iface:
for subnet in vif.network.subnets.objects:
if subnet.cidr.version == 6:
_enable_ipv6(netns)
for fip in subnet.ips.objects:
iface.add_ip('%s/%s' % (fip.address, subnet.cidr.prefixlen))
with get_ipdb(netns) as ipdb:
with ipdb.interfaces[ifname] as iface:
for subnet in vif.network.subnets.objects:
if subnet.cidr.version == 6:
_enable_ipv6(netns)
for fip in subnet.ips.objects:
iface.add_ip('%s/%s' % (fip.address,
subnet.cidr.prefixlen))
routes = get_ipdb(netns).routes
for subnet in vif.network.subnets.objects:
for route in subnet.routes.objects:
routes.add(gateway=str(route.gateway),
dst=str(route.cidr)).commit()
if subnet.gateway:
routes.add(gateway=str(subnet.gateway),
dst='default').commit()
routes = ipdb.routes
for subnet in vif.network.subnets.objects:
for route in subnet.routes.objects:
routes.add(gateway=str(route.gateway),
dst=str(route.cidr)).commit()
if subnet.gateway:
routes.add(gateway=str(subnet.gateway),
dst='default').commit()
def connect(vif, instance_info, ifname, netns=None):

View File

@ -23,22 +23,21 @@ class BaseBridgeDriver(object):
def connect(self, vif, ifname, netns):
host_ifname = vif.vif_name
c_ipdb = b_base.get_ipdb(netns)
h_ipdb = b_base.get_ipdb()
with b_base.get_ipdb(netns) as c_ipdb:
with c_ipdb.create(ifname=ifname, peer=host_ifname,
kind='veth') as c_iface:
c_iface.mtu = vif.network.mtu
c_iface.address = str(vif.address)
c_iface.up()
with c_ipdb.create(ifname=ifname, peer=host_ifname,
kind='veth') as c_iface:
c_iface.mtu = vif.network.mtu
c_iface.address = str(vif.address)
c_iface.up()
if netns:
with c_ipdb.interfaces[host_ifname] as h_iface:
h_iface.net_ns_pid = os.getpid()
if netns:
with c_ipdb.interfaces[host_ifname] as h_iface:
h_iface.net_ns_pid = os.getpid()
with h_ipdb.interfaces[host_ifname] as h_iface:
h_iface.mtu = vif.network.mtu
h_iface.up()
with b_base.get_ipdb() as h_ipdb:
with h_ipdb.interfaces[host_ifname] as h_iface:
h_iface.mtu = vif.network.mtu
h_iface.up()
def disconnect(self, vif, ifname, netns):
pass
@ -50,10 +49,9 @@ class BridgeDriver(BaseBridgeDriver):
host_ifname = vif.vif_name
bridge_name = vif.bridge_name
h_ipdb = b_base.get_ipdb()
with h_ipdb.interfaces[bridge_name] as h_br:
h_br.add_port(host_ifname)
with b_base.get_ipdb() as h_ipdb:
with h_ipdb.interfaces[bridge_name] as h_br:
h_br.add_port(host_ifname)
def disconnect(self, vif, ifname, netns):
# NOTE(ivc): veth pair is destroyed automatically along with the

View File

@ -31,31 +31,31 @@ class NestedDriver(object):
raise NotImplementedError()
def connect(self, vif, ifname, netns):
h_ipdb = b_base.get_ipdb()
c_ipdb = b_base.get_ipdb(netns)
with b_base.get_ipdb() as h_ipdb:
# NOTE(vikasc): Ideally 'ifname' should be used here but instead a
# temporary name is being used while creating the device for
# container in host network namespace. This is because cni expects
# only 'eth0' as interface name and if host already has an
# interface named 'eth0', device creation will fail with 'already
# exists' error.
temp_name = vif.vif_name
# NOTE(vikasc): Ideally 'ifname' should be used here but instead a
# temporary name is being used while creating the device for container
# in host network namespace. This is because cni expects only 'eth0'
# as interface name and if host already has an interface named 'eth0',
# device creation will fail with 'already exists' error.
temp_name = vif.vif_name
# TODO(vikasc): evaluate whether we should have stevedore
# driver for getting the link device.
vm_iface_name = config.CONF.binding.link_iface
# TODO(vikasc): evaluate whether we should have stevedore
# driver for getting the link device.
vm_iface_name = config.CONF.binding.link_iface
args = self._get_iface_create_args(vif)
with h_ipdb.create(ifname=temp_name,
link=h_ipdb.interfaces[vm_iface_name],
**args) as iface:
iface.net_ns_fd = netns
args = self._get_iface_create_args(vif)
with h_ipdb.create(ifname=temp_name,
link=h_ipdb.interfaces[vm_iface_name],
**args) as iface:
iface.net_ns_fd = netns
with c_ipdb.interfaces[temp_name] as iface:
iface.ifname = ifname
iface.mtu = vif.network.mtu
iface.address = str(vif.address)
iface.up()
with b_base.get_ipdb(netns) as c_ipdb:
with c_ipdb.interfaces[temp_name] as iface:
iface.ifname = ifname
iface.mtu = vif.network.mtu
iface.address = str(vif.address)
iface.up()
def disconnect(self, vif, ifname, netns):
# NOTE(vikasc): device will get deleted with container namespace, so

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from kuryr_kubernetes import config
import os_vif
from oslotest import base
@ -20,3 +21,4 @@ class TestCase(base.BaseTestCase):
super(TestCase, self).setUp()
args = []
config.init(args=args)
os_vif.initialize()

View File

@ -0,0 +1,60 @@
# Copyright (c) 2017 Red Hat.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from os_vif import objects as osv_objects
from os_vif.objects import vif as osv_vif
from oslo_serialization import jsonutils
def _fake_vif(cls=osv_vif.VIFOpenVSwitch):
vif = cls(
id=uuid.uuid4(),
vif_name='h_interface',
bridge_name='bridge',
address='3e:94:b7:31:a0:83',
port_profile=osv_objects.vif.VIFPortProfileOpenVSwitch(
interface_id='89eccd45-43e9-43d8-b4cc-4c13db13f782',
profile_id=str(uuid.uuid4()),
),
)
vif.network = osv_objects.network.Network(id=uuid.uuid4(), mtu=1)
subnet = osv_objects.subnet.Subnet(
uuid=uuid.uuid4(),
dns=['192.168.0.1'],
cidr='192.168.0.0/24',
gateway='192.168.0.1',
routes=osv_objects.route.RouteList(objects=[]),
)
subnet.ips = osv_objects.fixed_ip.FixedIPList(objects=[])
subnet.ips.objects.append(
osv_objects.fixed_ip.FixedIP(address='192.168.0.2'))
vif.network.subnets.objects.append(subnet)
return vif
def _fake_vif_dict(obj=None):
if obj:
return obj.obj_to_primitive()
else:
return _fake_vif().obj_to_primitive()
def _fake_vif_string(dictionary=None):
if dictionary:
return jsonutils.dumps(dictionary)
else:
return jsonutils.dumps(_fake_vif_dict())

View File

@ -0,0 +1,184 @@
# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import uuid
from os_vif import objects as osv_objects
from kuryr_kubernetes.cni.binding import base
from kuryr_kubernetes import objects
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests import fake
class TestDriverMixin(test_base.TestCase):
def setUp(self):
super(TestDriverMixin, self).setUp()
self.instance_info = osv_objects.instance_info.InstanceInfo(
uuid=uuid.uuid4(), name='foo')
self.ifname = 'c_interface'
self.netns = '/proc/netns/1234'
# Mock IPDB context managers
self.ipdbs = {}
self.m_bridge_iface = mock.Mock(__exit__=mock.Mock())
self.m_c_iface = mock.Mock()
self.m_h_iface = mock.Mock()
self.h_ipdb, self.h_ipdb_exit = self._mock_ipdb_context_manager(None)
self.c_ipdb, self.c_ipdb_exit = self._mock_ipdb_context_manager(
self.netns)
self.m_create = mock.Mock()
self.c_ipdb.create = mock.Mock(
return_value=mock.Mock(
__enter__=mock.Mock(return_value=self.m_create),
__exit__=mock.Mock()))
def _mock_ipdb_context_manager(self, netns):
mock_ipdb = mock.Mock(
interfaces={
'bridge': mock.Mock(
__enter__=mock.Mock(return_value=self.m_bridge_iface),
__exit__=mock.Mock(),
),
'c_interface': mock.Mock(
__enter__=mock.Mock(return_value=self.m_c_iface),
__exit__=mock.Mock(),
),
'h_interface': mock.Mock(
__enter__=mock.Mock(return_value=self.m_h_iface),
__exit__=mock.Mock(),
),
}
)
mock_exit = mock.Mock()
self.ipdbs[netns] = mock.Mock(
__enter__=mock.Mock(return_value=mock_ipdb),
__exit__=mock_exit)
return mock_ipdb, mock_exit
@mock.patch('kuryr_kubernetes.cni.binding.base.get_ipdb')
@mock.patch('os_vif.plug')
def _test_connect(self, m_vif_plug, m_get_ipdb):
def get_ipdb(netns=None):
return self.ipdbs[netns]
m_get_ipdb.side_effect = get_ipdb
base.connect(self.vif, self.instance_info, self.ifname, self.netns)
m_vif_plug.assert_called_once_with(self.vif, self.instance_info)
self.m_c_iface.add_ip.assert_called_once_with('192.168.0.2/24')
@mock.patch('os_vif.unplug')
def _test_disconnect(self, m_vif_unplug):
base.disconnect(self.vif, self.instance_info, self.ifname, self.netns)
m_vif_unplug.assert_called_once_with(self.vif, self.instance_info)
class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase):
def setUp(self):
super(TestOpenVSwitchDriver, self).setUp()
self.vif = fake._fake_vif(osv_objects.vif.VIFOpenVSwitch)
@mock.patch('os.getpid', mock.Mock(return_value=123))
@mock.patch('kuryr_kubernetes.linux_net_utils.create_ovs_vif_port')
def test_connect(self, mock_create_ovs):
self._test_connect()
self.assertEqual(1, self.h_ipdb_exit.call_count)
self.assertEqual(2, self.c_ipdb_exit.call_count)
self.c_ipdb.create.assert_called_once_with(
ifname=self.ifname, peer='h_interface', kind='veth')
self.assertEqual(1, self.m_create.mtu)
self.assertEqual(str(self.vif.address),
self.m_create.address)
self.m_create.up.assert_called_once_with()
self.assertEqual(123, self.m_h_iface.net_ns_pid)
self.assertEqual(1, self.m_h_iface.mtu)
self.m_h_iface.up.assert_called_once_with()
mock_create_ovs.assert_called_once_with(
'bridge', 'h_interface', '89eccd45-43e9-43d8-b4cc-4c13db13f782',
'3e:94:b7:31:a0:83', 'kuryr')
@mock.patch('kuryr_kubernetes.linux_net_utils.delete_ovs_vif_port')
def test_disconnect(self, mock_delete_ovs):
self._test_disconnect()
mock_delete_ovs.assert_called_once_with('bridge', 'h_interface')
class TestBridgeDriver(TestDriverMixin, test_base.TestCase):
def setUp(self):
super(TestBridgeDriver, self).setUp()
self.vif = fake._fake_vif(osv_objects.vif.VIFBridge)
@mock.patch('os.getpid', mock.Mock(return_value=123))
def test_connect(self):
self._test_connect()
self.assertEqual(2, self.h_ipdb_exit.call_count)
self.assertEqual(2, self.c_ipdb_exit.call_count)
self.c_ipdb.create.assert_called_once_with(
ifname=self.ifname, peer='h_interface', kind='veth')
self.assertEqual(1, self.m_create.mtu)
self.assertEqual(str(self.vif.address),
self.m_create.address)
self.m_create.up.assert_called_once_with()
self.assertEqual(123, self.m_h_iface.net_ns_pid)
self.assertEqual(1, self.m_h_iface.mtu)
self.m_h_iface.up.assert_called_once_with()
self.m_bridge_iface.add_port.assert_called_once_with('h_interface')
def test_disconnect(self):
self._test_disconnect()
class TestNestedVlanDriver(TestDriverMixin, test_base.TestCase):
def setUp(self):
super(TestNestedVlanDriver, self).setUp()
self.vif = fake._fake_vif(objects.vif.VIFVlanNested)
def test_connect(self):
self._test_connect()
self.assertEqual(1, self.h_ipdb_exit.call_count)
self.assertEqual(2, self.c_ipdb_exit.call_count)
self.assertEqual(self.ifname, self.m_h_iface.ifname)
self.assertEqual(1, self.m_h_iface.mtu)
self.assertEqual(str(self.vif.address), self.m_h_iface.address)
self.m_h_iface.up.assert_called_once_with()
def test_disconnect(self):
self._test_disconnect()
class TestNestedMacvlanDriver(TestDriverMixin, test_base.TestCase):
def setUp(self):
super(TestNestedMacvlanDriver, self).setUp()
self.vif = fake._fake_vif(objects.vif.VIFMacvlanNested)
def test_connect(self):
self._test_connect()
self.assertEqual(1, self.h_ipdb_exit.call_count)
self.assertEqual(2, self.c_ipdb_exit.call_count)
self.assertEqual(self.ifname, self.m_h_iface.ifname)
self.assertEqual(1, self.m_h_iface.mtu)
self.assertEqual(str(self.vif.address), self.m_h_iface.address)
self.m_h_iface.up.assert_called_once_with()
def test_disconnect(self):
self._test_disconnect()