Merge "Only delete pod from CNI registry after unplugging the vif"
This commit is contained in:
commit
ded5d4779d
|
@ -221,7 +221,9 @@ class CNIDaemonWatcherService(cotyledon.Service):
|
|||
with lockutils.lock(pod_name, external=True):
|
||||
if pod_name not in self.registry:
|
||||
self.registry[pod_name] = {'pod': pod, 'vifs': vif_dict,
|
||||
'containerid': None}
|
||||
'containerid': None,
|
||||
'vif_unplugged': False,
|
||||
'del_received': False}
|
||||
else:
|
||||
# NOTE(dulek): Only update vif if its status changed, we don't
|
||||
# need to care about other changes now.
|
||||
|
@ -241,12 +243,20 @@ class CNIDaemonWatcherService(cotyledon.Service):
|
|||
pod_name = utils.get_pod_unique_name(pod)
|
||||
try:
|
||||
if pod_name in self.registry:
|
||||
# NOTE(dulek): del on dict is atomic as long as we use standard
|
||||
# types as keys. This is the case, so we don't
|
||||
# need to lock here.
|
||||
del self.registry[pod_name]
|
||||
# NOTE(ndesh): We need to lock here to avoid race condition
|
||||
# with the deletion code for CNI DEL so that
|
||||
# we delete the registry entry exactly once
|
||||
with lockutils.lock(pod_name, external=True):
|
||||
if self.registry[pod_name]['vif_unplugged']:
|
||||
del self.registry[pod_name]
|
||||
else:
|
||||
pod_dict = self.registry[pod_name]
|
||||
pod_dict['del_received'] = True
|
||||
self.registry[pod_name] = pod_dict
|
||||
except KeyError:
|
||||
# This means someone else removed it. It's odd but safe to ignore.
|
||||
LOG.debug('Pod %s entry already removed from registry while '
|
||||
'handling DELETED event. Ignoring.', pod_name)
|
||||
pass
|
||||
|
||||
def terminate(self):
|
||||
|
|
|
@ -100,6 +100,23 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
|
|||
except KeyError:
|
||||
pass
|
||||
self._do_work(params, b_base.disconnect)
|
||||
# NOTE(ndesh): We need to lock here to avoid race condition
|
||||
# with the deletion code in the watcher to ensure that
|
||||
# we delete the registry entry exactly once
|
||||
try:
|
||||
with lockutils.lock(pod_name, external=True):
|
||||
if self.registry[pod_name]['del_received']:
|
||||
del self.registry[pod_name]
|
||||
else:
|
||||
pod_dict = self.registry[pod_name]
|
||||
pod_dict['vif_unplugged'] = True
|
||||
self.registry[pod_name] = pod_dict
|
||||
except KeyError:
|
||||
# This means the pod was removed before vif was unplugged. This
|
||||
# shouldn't happen, but we can't do anything about it now
|
||||
LOG.debug('Pod %s not found registry while handling DEL request. '
|
||||
'Ignoring.', pod_name)
|
||||
pass
|
||||
|
||||
def report_drivers_health(self, driver_healthy):
|
||||
if not driver_healthy:
|
||||
|
|
|
@ -29,7 +29,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
'namespace': 'default'}}
|
||||
self.vifs = fake._fake_vifs_dict()
|
||||
registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs,
|
||||
'containerid': None}}
|
||||
'containerid': None,
|
||||
'vif_unplugged': False,
|
||||
'del_received': False}}
|
||||
healthy = mock.Mock()
|
||||
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
|
||||
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo',
|
||||
|
@ -50,14 +52,32 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
self.assertEqual('cont_id',
|
||||
self.plugin.registry['default/foo']['containerid'])
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_del_present(self, m_disconnect):
|
||||
def test_del_present(self, m_disconnect, m_lock):
|
||||
self.plugin.delete(self.params)
|
||||
|
||||
m_lock.assert_called_with('default/foo', external=True)
|
||||
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123,
|
||||
report_health=mock.ANY,
|
||||
is_default_gateway=mock.ANY,
|
||||
container_id='cont_id')
|
||||
self.assertIn('default/foo', self.plugin.registry)
|
||||
self.assertEqual(True,
|
||||
self.plugin.registry['default/foo']['vif_unplugged'])
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_remove_pod_from_registry_after_del(self, m_disconnect, m_lock):
|
||||
self.plugin.registry['default/foo']['del_received'] = True
|
||||
self.plugin.delete(self.params)
|
||||
|
||||
m_lock.assert_called_with('default/foo', external=True)
|
||||
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123,
|
||||
report_health=mock.ANY,
|
||||
is_default_gateway=mock.ANY,
|
||||
container_id='cont_id')
|
||||
self.assertNotIn('default/foo', self.plugin.registry)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_del_wrong_container_id(self, m_disconnect):
|
||||
|
@ -74,9 +94,12 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
|
||||
def test_add_present_on_5_try(self, m_connect, m_lock):
|
||||
se = [KeyError] * 5
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
|
||||
'vif_unplugged': False, 'del_received': False})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
|
||||
'vif_unplugged': False, 'del_received': False})
|
||||
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
|
||||
'vif_unplugged': False, 'del_received': False})
|
||||
m_getitem = mock.Mock(side_effect=se)
|
||||
m_setitem = mock.Mock()
|
||||
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem)
|
||||
|
@ -87,7 +110,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
|
|||
m_setitem.assert_called_once_with('default/foo',
|
||||
{'pod': self.pod,
|
||||
'vifs': self.vifs,
|
||||
'containerid': 'cont_id'})
|
||||
'containerid': 'cont_id',
|
||||
'vif_unplugged': False,
|
||||
'del_received': False})
|
||||
m_connect.assert_called_with(mock.ANY, mock.ANY, 'eth0', 123,
|
||||
report_health=mock.ANY,
|
||||
is_default_gateway=mock.ANY,
|
||||
|
|
|
@ -101,3 +101,34 @@ class TestDaemonServer(base.TestCase):
|
|||
|
||||
m_delete.assert_called_once_with(mock.ANY)
|
||||
self.assertEqual(500, resp.status_code)
|
||||
|
||||
|
||||
class TestCNIDaemonWatcherService(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestCNIDaemonWatcherService, self).setUp()
|
||||
self.registry = {}
|
||||
self.pod = {'metadata': {'namespace': 'testing',
|
||||
'name': 'default'},
|
||||
'vif_unplugged': False,
|
||||
'del_receieved': False}
|
||||
self.healthy = mock.Mock()
|
||||
self.watcher = service.CNIDaemonWatcherService(
|
||||
0, self.registry, self.healthy)
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
def test_on_deleted(self, m_lock):
|
||||
pod = self.pod
|
||||
pod['vif_unplugged'] = True
|
||||
pod_name = 'testing/default'
|
||||
self.registry[pod_name] = pod
|
||||
self.watcher.on_deleted(pod)
|
||||
self.assertNotIn(pod_name, self.registry)
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
def test_on_deleted_false(self, m_lock):
|
||||
pod = self.pod
|
||||
pod_name = 'testing/default'
|
||||
self.registry[pod_name] = pod
|
||||
self.watcher.on_deleted(pod)
|
||||
self.assertIn(pod_name, self.registry)
|
||||
self.assertIs(True, pod['del_received'])
|
||||
|
|
Loading…
Reference in New Issue