Move cni plugins to a common folder

This patch moves the cni plugins that were split between api.py,
main.py and daemon/service.py to have then in a unified path.

Change-Id: Ief15e3f8a722237649dd5a1fb8c4e26f51143072
This commit is contained in:
Luis Tomas Bolivar 2018-02-27 10:00:36 +00:00
parent d00b09ce05
commit bcacff60c9
14 changed files with 388 additions and 282 deletions

View File

@ -34,18 +34,6 @@ from kuryr_kubernetes import exceptions as k_exc
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class CNIPlugin(object):
@abc.abstractmethod
def add(self, params):
raise NotImplementedError()
@abc.abstractmethod
def delete(self, params):
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class CNIRunner(object):
# TODO(ivc): extend SUPPORTED_VERSIONS and format output based on

View File

@ -24,10 +24,8 @@ import time
import cotyledon
import flask
from pyroute2.ipdb import transactional
import retrying
import os_vif
from os_vif import objects as obj_vif
from os_vif.objects import base
from oslo_concurrency import lockutils
from oslo_config import cfg
@ -35,10 +33,9 @@ from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import api
from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes.cni import health
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
@ -48,104 +45,8 @@ from kuryr_kubernetes import watcher as k_watcher
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RETRY_DELAY = 1000 # 1 second in milliseconds
HEALTH_CHECKER_DELAY = 5
# TODO(dulek): Another corner case is (and was) when pod is deleted before it's
# annotated by controller or even noticed by any watcher. Kubelet
# will try to delete such vif, but we will have no data about it.
# This is currently worked around by returning succesfully in case
# of timing out in delete. To solve this properly we need to watch
# for pod deletes as well.
class K8sCNIRegistryPlugin(api.CNIPlugin):
def __init__(self, registry, healthy):
self.healthy = healthy
self.registry = registry
def _get_name(self, pod):
return pod['metadata']['name']
def add(self, params):
vif = self._do_work(params, b_base.connect)
pod_name = params.args.K8S_POD_NAME
# NOTE(dulek): Saving containerid to be able to distinguish old DEL
# requests that we should ignore. We need a lock to
# prevent race conditions and replace whole object in the
# dict for multiprocessing.Manager to notice that.
with lockutils.lock(pod_name, external=True):
d = self.registry[pod_name]
d['containerid'] = params.CNI_CONTAINERID
self.registry[pod_name] = d
LOG.debug('Saved containerid = %s for pod %s',
params.CNI_CONTAINERID, pod_name)
# Wait for VIF to become active.
timeout = CONF.cni_daemon.vif_annotation_timeout
# Wait for timeout sec, 1 sec between tries, retry when vif not active.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_result=lambda x: not x.active)
def wait_for_active(pod_name):
return base.VersionedObject.obj_from_primitive(
self.registry[pod_name]['vif'])
vif = wait_for_active(pod_name)
if not vif.active:
raise exceptions.ResourceNotReady(pod_name)
return vif
def delete(self, params):
pod_name = params.args.K8S_POD_NAME
try:
reg_ci = self.registry[pod_name]['containerid']
LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name)
if reg_ci and reg_ci != params.CNI_CONTAINERID:
# NOTE(dulek): This is a DEL request for some older (probably
# failed) ADD call. We should ignore it or we'll
# unplug a running pod.
LOG.warning('Received DEL request for unknown ADD call. '
'Ignoring.')
return
except KeyError:
pass
self._do_work(params, b_base.disconnect)
def report_drivers_health(self, driver_healthy):
if not driver_healthy:
with self.healthy.get_lock():
LOG.debug("Reporting CNI driver not healthy.")
self.healthy.value = driver_healthy
def _do_work(self, params, fn):
pod_name = params.args.K8S_POD_NAME
timeout = CONF.cni_daemon.vif_annotation_timeout
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_exception=lambda e: isinstance(e, KeyError))
def find():
return self.registry[pod_name]
try:
d = find()
pod = d['pod']
vif = base.VersionedObject.obj_from_primitive(d['vif'])
except KeyError:
raise exceptions.ResourceNotReady(pod_name)
fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS,
self.report_drivers_health)
return vif
def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
class DaemonServer(object):
def __init__(self, plugin, healthy):
@ -250,7 +151,8 @@ class CNIDaemonServerService(cotyledon.Service):
self.run_queue_reading = False
self.registry = registry
self.healthy = healthy
self.plugin = K8sCNIRegistryPlugin(registry, self.healthy)
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry,
self.healthy)
self.server = DaemonServer(self.plugin, self.healthy)
def run(self):

View File

@ -22,49 +22,18 @@ from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import api as cni_api
from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes.cni.plugins import k8s_cni
from kuryr_kubernetes.cni import utils
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import objects as k_objects
from kuryr_kubernetes import watcher as k_watcher
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
_CNI_TIMEOUT = 180
class K8sCNIPlugin(cni_api.CNIPlugin):
def add(self, params):
self._setup(params)
self._pipeline.register(h_cni.AddHandler(params, self._done))
self._watcher.start()
return self._vif
def delete(self, params):
self._setup(params)
self._pipeline.register(h_cni.DelHandler(params, self._done))
self._watcher.start()
def _done(self, vif):
self._vif = vif
self._watcher.stop()
def _setup(self, params):
clients.setup_kubernetes_client()
self._pipeline = h_cni.CNIPipeline()
self._watcher = k_watcher.Watcher(self._pipeline)
self._watcher.add(
"%(base)s/namespaces/%(namespace)s/pods"
"?fieldSelector=metadata.name=%(pod)s" % {
'base': k_const.K8S_API_BASE,
'namespace': params.args.K8S_POD_NAMESPACE,
'pod': params.args.K8S_POD_NAME})
def run():
# REVISIT(ivc): current CNI implementation provided by this package is
# experimental and its primary purpose is to enable development of other
@ -87,7 +56,7 @@ def run():
if CONF.cni_daemon.daemon_enabled:
runner = cni_api.CNIDaemonizedRunner()
else:
runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
runner = cni_api.CNIStandaloneRunner(k8s_cni.K8sCNIPlugin())
LOG.info("Using '%s' ", runner.__class__.__name__)
def _timeout(signum, frame):

View File

View File

@ -0,0 +1,30 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class CNIPlugin(object):
@abc.abstractmethod
def add(self, params):
raise NotImplementedError()
@abc.abstractmethod
def delete(self, params):
raise NotImplementedError()

View File

@ -0,0 +1,49 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes.cni.plugins import base as base_cni
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import watcher as k_watcher
class K8sCNIPlugin(base_cni.CNIPlugin):
def add(self, params):
self._setup(params)
self._pipeline.register(h_cni.AddHandler(params, self._done))
self._watcher.start()
return self._vif
def delete(self, params):
self._setup(params)
self._pipeline.register(h_cni.DelHandler(params, self._done))
self._watcher.start()
def _done(self, vif):
self._vif = vif
self._watcher.stop()
def _setup(self, params):
clients.setup_kubernetes_client()
self._pipeline = h_cni.CNIPipeline()
self._watcher = k_watcher.Watcher(self._pipeline)
self._watcher.add(
"%(base)s/namespaces/%(namespace)s/pods"
"?fieldSelector=metadata.name=%(pod)s" % {
'base': k_const.K8S_API_BASE,
'namespace': params.args.K8S_POD_NAMESPACE,
'pod': params.args.K8S_POD_NAME})

View File

@ -0,0 +1,124 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import retrying
from os_vif import objects as obj_vif
from os_vif.objects import base
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes.cni.plugins import base as base_cni
from kuryr_kubernetes import exceptions
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RETRY_DELAY = 1000 # 1 second in milliseconds
# TODO(dulek): Another corner case is (and was) when pod is deleted before it's
# annotated by controller or even noticed by any watcher. Kubelet
# will try to delete such vif, but we will have no data about it.
# This is currently worked around by returning succesfully in case
# of timing out in delete. To solve this properly we need to watch
# for pod deletes as well.
class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
def __init__(self, registry, healthy):
self.healthy = healthy
self.registry = registry
def _get_name(self, pod):
return pod['metadata']['name']
def add(self, params):
vif = self._do_work(params, b_base.connect)
pod_name = params.args.K8S_POD_NAME
# NOTE(dulek): Saving containerid to be able to distinguish old DEL
# requests that we should ignore. We need a lock to
# prevent race conditions and replace whole object in the
# dict for multiprocessing.Manager to notice that.
with lockutils.lock(pod_name, external=True):
d = self.registry[pod_name]
d['containerid'] = params.CNI_CONTAINERID
self.registry[pod_name] = d
LOG.debug('Saved containerid = %s for pod %s',
params.CNI_CONTAINERID, pod_name)
# Wait for VIF to become active.
timeout = CONF.cni_daemon.vif_annotation_timeout
# Wait for timeout sec, 1 sec between tries, retry when vif not active.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_result=lambda x: not x.active)
def wait_for_active(pod_name):
return base.VersionedObject.obj_from_primitive(
self.registry[pod_name]['vif'])
vif = wait_for_active(pod_name)
if not vif.active:
raise exceptions.ResourceNotReady(pod_name)
return vif
def delete(self, params):
pod_name = params.args.K8S_POD_NAME
try:
reg_ci = self.registry[pod_name]['containerid']
LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name)
if reg_ci and reg_ci != params.CNI_CONTAINERID:
# NOTE(dulek): This is a DEL request for some older (probably
# failed) ADD call. We should ignore it or we'll
# unplug a running pod.
LOG.warning('Received DEL request for unknown ADD call. '
'Ignoring.')
return
except KeyError:
pass
self._do_work(params, b_base.disconnect)
def report_drivers_health(self, driver_healthy):
if not driver_healthy:
with self.healthy.get_lock():
LOG.debug("Reporting CNI driver not healthy.")
self.healthy.value = driver_healthy
def _do_work(self, params, fn):
pod_name = params.args.K8S_POD_NAME
timeout = CONF.cni_daemon.vif_annotation_timeout
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_exception=lambda e: isinstance(e, KeyError))
def find():
return self.registry[pod_name]
try:
d = find()
pod = d['pod']
vif = base.VersionedObject.obj_from_primitive(d['vif'])
except KeyError:
raise exceptions.ResourceNotReady(pod_name)
fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS,
self.report_drivers_health)
return vif
def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])

View File

@ -0,0 +1,67 @@
# Copyright (c) 2017 NEC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kuryr_kubernetes.cni.plugins import k8s_cni
from kuryr_kubernetes import constants
from kuryr_kubernetes.tests import base as test_base
class TestK8sCNIPlugin(test_base.TestCase):
@mock.patch('kuryr_kubernetes.watcher.Watcher')
@mock.patch('kuryr_kubernetes.cni.handlers.CNIPipeline')
@mock.patch('kuryr_kubernetes.cni.handlers.DelHandler')
@mock.patch('kuryr_kubernetes.cni.handlers.AddHandler')
def _test_method(self, method, m_add_handler, m_del_handler, m_cni_pipe,
m_watcher_class):
self.passed_handler = None
def _save_handler(params, handler):
self.passed_handler = handler
def _call_handler(*args):
self.passed_handler(mock.sentinel.vif)
m_add_handler.side_effect = _save_handler
m_del_handler.side_effect = _save_handler
m_watcher = mock.MagicMock(
add=mock.MagicMock(),
start=mock.MagicMock(side_effect=_call_handler))
m_watcher_class.return_value = m_watcher
m_params = mock.MagicMock()
m_params.args.K8S_POD_NAMESPACE = 'k8s_pod_namespace'
m_params.args.K8S_POD_NAME = 'k8s_pod'
cni_plugin = k8s_cni.K8sCNIPlugin()
result = getattr(cni_plugin, method)(m_params)
self.assertEqual(mock.sentinel.vif, cni_plugin._vif)
m_watcher.add.assert_called_with(
"%(base)s/namespaces/%(namespace)s/pods"
"?fieldSelector=metadata.name=%(pod)s" % {
'base': constants.K8S_API_BASE,
'namespace': m_params.args.K8S_POD_NAMESPACE,
'pod': m_params.args.K8S_POD_NAME})
return result
def test_add(self):
result = self._test_method('add')
self.assertEqual(result, mock.sentinel.vif)
def test_delete(self):
self._test_method('delete')

View File

@ -0,0 +1,94 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base
from kuryr_kubernetes.tests import fake
class TestK8sCNIRegistryPlugin(base.TestCase):
def setUp(self):
super(TestK8sCNIRegistryPlugin, self).setUp()
self.pod = {'metadata': {'name': 'foo', 'uid': 'bar'}}
self.vif = fake._fake_vif_dict()
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
'containerid': None}}
healthy = mock.Mock()
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo'),
CNI_IFNAME='baz', CNI_NETNS=123,
CNI_CONTAINERID='cont_id')
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present(self, m_connect, m_lock):
self.plugin.add(self.params)
m_lock.assert_called_with('foo', external=True)
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
self.assertEqual('cont_id', self.plugin.registry['foo']['containerid'])
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_present(self, m_disconnect):
self.plugin.delete(self.params)
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123,
mock.ANY)
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_wrong_container_id(self, m_disconnect):
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
'containerid': 'different'}}
healthy = mock.Mock()
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
self.plugin.delete(self.params)
m_disconnect.assert_not_called()
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('time.sleep', mock.Mock())
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present_on_5_try(self, m_connect, m_lock):
se = [KeyError] * 5
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
m_getitem = mock.Mock(side_effect=se)
m_setitem = mock.Mock()
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem)
self.plugin.registry = m_registry
self.plugin.add(self.params)
m_lock.assert_called_with('foo', external=True)
m_setitem.assert_called_once_with('foo', {'pod': self.pod,
'vif': self.vif,
'containerid': 'cont_id'})
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
@mock.patch('time.sleep', mock.Mock())
def test_add_not_present(self):
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
group='cni_daemon')
m_getitem = mock.Mock(side_effect=KeyError)
m_registry = mock.Mock(__getitem__=m_getitem)
self.plugin.registry = m_registry
self.assertRaises(exceptions.ResourceNotReady, self.plugin.add,
self.params)

View File

@ -22,7 +22,7 @@ from oslo_config import cfg
from oslo_serialization import jsonutils
from kuryr_kubernetes.cni import api
from kuryr_kubernetes.cni import main
from kuryr_kubernetes.cni.plugins import k8s_cni
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests import fake
@ -54,9 +54,9 @@ class TestCNIRunnerMixin(object):
class TestCNIStandaloneRunner(test_base.TestCase, TestCNIRunnerMixin):
def setUp(self):
super(TestCNIStandaloneRunner, self).setUp()
self.runner = api.CNIStandaloneRunner(main.K8sCNIPlugin())
self.runner = api.CNIStandaloneRunner(k8s_cni.K8sCNIPlugin())
@mock.patch('kuryr_kubernetes.cni.main.K8sCNIPlugin.add')
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni.K8sCNIPlugin.add')
def test_run_add(self, m_k8s_add):
vif = fake._fake_vif()
m_k8s_add.return_value = vif
@ -76,7 +76,7 @@ class TestCNIStandaloneRunner(test_base.TestCase, TestCNIRunnerMixin):
"ip4": {"gateway": "192.168.0.1", "ip": "192.168.0.2/24"}},
result)
@mock.patch('kuryr_kubernetes.cni.main.K8sCNIPlugin.delete')
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni.K8sCNIPlugin.delete')
def test_run_del(self, m_k8s_delete):
vif = fake._fake_vif()
m_k8s_delete.return_value = vif

View File

@ -104,8 +104,8 @@ class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase):
super(TestOpenVSwitchDriver, self).setUp()
self.vif = fake._fake_vif(osv_objects.vif.VIFOpenVSwitch)
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.'
'report_drivers_health')
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
'K8sCNIRegistryPlugin.report_drivers_health')
@mock.patch('os.getpid', mock.Mock(return_value=123))
@mock.patch('kuryr_kubernetes.linux_net_utils.create_ovs_vif_port')
def test_connect(self, mock_create_ovs, m_report):
@ -126,8 +126,8 @@ class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase):
'bridge', 'h_interface', '89eccd45-43e9-43d8-b4cc-4c13db13f782',
'3e:94:b7:31:a0:83', 'kuryr')
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.'
'report_drivers_health')
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
'K8sCNIRegistryPlugin.report_drivers_health')
@mock.patch('kuryr_kubernetes.linux_net_utils.delete_ovs_vif_port')
def test_disconnect(self, mock_delete_ovs, m_report):
self._test_disconnect(report=m_report)

View File

@ -18,7 +18,6 @@ import mock
from oslo_config import cfg
from kuryr_kubernetes.cni import main
from kuryr_kubernetes import constants
from kuryr_kubernetes.tests import base as test_base
@ -70,50 +69,3 @@ class TestCNIMain(test_base.TestCase):
m_setup_logging.assert_called()
m_cni_daemon.run.assert_called()
m_sysexit.assert_called()
class TestK8sCNIPlugin(test_base.TestCase):
@mock.patch('kuryr_kubernetes.watcher.Watcher')
@mock.patch('kuryr_kubernetes.cni.handlers.CNIPipeline')
@mock.patch('kuryr_kubernetes.cni.handlers.DelHandler')
@mock.patch('kuryr_kubernetes.cni.handlers.AddHandler')
def _test_method(self, method, m_add_handler, m_del_handler, m_cni_pipe,
m_watcher_class):
self.passed_handler = None
def _save_handler(params, handler):
self.passed_handler = handler
def _call_handler(*args):
self.passed_handler(mock.sentinel.vif)
m_add_handler.side_effect = _save_handler
m_del_handler.side_effect = _save_handler
m_watcher = mock.MagicMock(
add=mock.MagicMock(),
start=mock.MagicMock(side_effect=_call_handler))
m_watcher_class.return_value = m_watcher
m_params = mock.MagicMock()
m_params.args.K8S_POD_NAMESPACE = 'k8s_pod_namespace'
m_params.args.K8S_POD_NAME = 'k8s_pod'
cni_plugin = main.K8sCNIPlugin()
result = getattr(cni_plugin, method)(m_params)
self.assertEqual(mock.sentinel.vif, cni_plugin._vif)
m_watcher.add.assert_called_with(
"%(base)s/namespaces/%(namespace)s/pods"
"?fieldSelector=metadata.name=%(pod)s" % {
'base': constants.K8S_API_BASE,
'namespace': m_params.args.K8S_POD_NAMESPACE,
'pod': m_params.args.K8S_POD_NAME})
return result
def test_add(self):
result = self._test_method('add')
self.assertEqual(result, mock.sentinel.vif)
def test_delete(self):
self._test_method('delete')

View File

@ -14,92 +14,20 @@
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
from kuryr_kubernetes.cni.daemon import service
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base
from kuryr_kubernetes.tests import fake
class TestK8sCNIRegistryPlugin(base.TestCase):
def setUp(self):
super(TestK8sCNIRegistryPlugin, self).setUp()
self.pod = {'metadata': {'name': 'foo', 'uid': 'bar'}}
self.vif = fake._fake_vif_dict()
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
'containerid': None}}
healthy = mock.Mock()
self.plugin = service.K8sCNIRegistryPlugin(registry, healthy)
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo'),
CNI_IFNAME='baz', CNI_NETNS=123,
CNI_CONTAINERID='cont_id')
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present(self, m_connect, m_lock):
self.plugin.add(self.params)
m_lock.assert_called_with('foo', external=True)
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
self.assertEqual('cont_id', self.plugin.registry['foo']['containerid'])
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_present(self, m_disconnect):
self.plugin.delete(self.params)
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123,
mock.ANY)
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_wrong_container_id(self, m_disconnect):
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
'containerid': 'different'}}
healthy = mock.Mock()
self.plugin = service.K8sCNIRegistryPlugin(registry, healthy)
self.plugin.delete(self.params)
m_disconnect.assert_not_called()
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('time.sleep', mock.Mock())
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present_on_5_try(self, m_connect, m_lock):
se = [KeyError] * 5
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
m_getitem = mock.Mock(side_effect=se)
m_setitem = mock.Mock()
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem)
self.plugin.registry = m_registry
self.plugin.add(self.params)
m_lock.assert_called_with('foo', external=True)
m_setitem.assert_called_once_with('foo', {'pod': self.pod,
'vif': self.vif,
'containerid': 'cont_id'})
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
@mock.patch('time.sleep', mock.Mock())
def test_add_not_present(self):
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
group='cni_daemon')
m_getitem = mock.Mock(side_effect=KeyError)
m_registry = mock.Mock(__getitem__=m_getitem)
self.plugin.registry = m_registry
self.assertRaises(exceptions.ResourceNotReady, self.plugin.add,
self.params)
class TestDaemonServer(base.TestCase):
def setUp(self):
super(TestDaemonServer, self).setUp()
healthy = mock.Mock()
self.plugin = service.K8sCNIRegistryPlugin({}, healthy)
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin({}, healthy)
self.health_registry = mock.Mock()
self.srv = service.DaemonServer(self.plugin, self.health_registry)
@ -109,7 +37,8 @@ class TestDaemonServer(base.TestCase):
'CNI_CONTAINERID': 'baz', 'CNI_COMMAND': 'ADD'}
self.params_str = jsonutils.dumps(params)
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
'K8sCNIRegistryPlugin.add')
def test_add(self, m_add):
vif = fake._fake_vif()
m_add.return_value = vif
@ -122,7 +51,8 @@ class TestDaemonServer(base.TestCase):
fake._fake_vif_string(vif.obj_to_primitive()).encode(), resp.data)
self.assertEqual(202, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
'K8sCNIRegistryPlugin.add')
def test_add_timeout(self, m_add):
m_add.side_effect = exceptions.ResourceNotReady(mock.Mock())
@ -132,7 +62,8 @@ class TestDaemonServer(base.TestCase):
m_add.assert_called_once_with(mock.ANY)
self.assertEqual(504, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
'K8sCNIRegistryPlugin.add')
def test_add_error(self, m_add):
m_add.side_effect = Exception
@ -142,7 +73,7 @@ class TestDaemonServer(base.TestCase):
m_add.assert_called_once_with(mock.ANY)
self.assertEqual(500, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
'K8sCNIRegistryPlugin.delete')
def test_delete(self, m_delete):
resp = self.test_client.post('/delNetwork', data=self.params_str,
@ -151,7 +82,7 @@ class TestDaemonServer(base.TestCase):
m_delete.assert_called_once_with(mock.ANY)
self.assertEqual(204, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
'K8sCNIRegistryPlugin.delete')
def test_delete_timeout(self, m_delete):
m_delete.side_effect = exceptions.ResourceNotReady(mock.Mock())
@ -161,7 +92,7 @@ class TestDaemonServer(base.TestCase):
m_delete.assert_called_once_with(mock.ANY)
self.assertEqual(204, resp.status_code)
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
'K8sCNIRegistryPlugin.delete')
def test_delete_error(self, m_delete):
m_delete.side_effect = Exception