Move cni plugins to a common folder
This patch moves the cni plugins that were split between api.py, main.py and daemon/service.py to have then in a unified path. Change-Id: Ief15e3f8a722237649dd5a1fb8c4e26f51143072
This commit is contained in:
parent
d00b09ce05
commit
bcacff60c9
|
@ -34,18 +34,6 @@ from kuryr_kubernetes import exceptions as k_exc
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CNIPlugin(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def add(self, params):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete(self, params):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CNIRunner(object):
|
||||
# TODO(ivc): extend SUPPORTED_VERSIONS and format output based on
|
||||
|
|
|
@ -24,10 +24,8 @@ import time
|
|||
import cotyledon
|
||||
import flask
|
||||
from pyroute2.ipdb import transactional
|
||||
import retrying
|
||||
|
||||
import os_vif
|
||||
from os_vif import objects as obj_vif
|
||||
from os_vif.objects import base
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
|
@ -35,10 +33,9 @@ from oslo_log import log as logging
|
|||
from oslo_serialization import jsonutils
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes.cni import api
|
||||
from kuryr_kubernetes.cni.binding import base as b_base
|
||||
from kuryr_kubernetes.cni import handlers as h_cni
|
||||
from kuryr_kubernetes.cni import health
|
||||
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
|
||||
from kuryr_kubernetes.cni import utils
|
||||
from kuryr_kubernetes import config
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
|
@ -48,104 +45,8 @@ from kuryr_kubernetes import watcher as k_watcher
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
RETRY_DELAY = 1000 # 1 second in milliseconds
|
||||
HEALTH_CHECKER_DELAY = 5
|
||||
|
||||
# TODO(dulek): Another corner case is (and was) when pod is deleted before it's
|
||||
# annotated by controller or even noticed by any watcher. Kubelet
|
||||
# will try to delete such vif, but we will have no data about it.
|
||||
# This is currently worked around by returning succesfully in case
|
||||
# of timing out in delete. To solve this properly we need to watch
|
||||
# for pod deletes as well.
|
||||
|
||||
|
||||
class K8sCNIRegistryPlugin(api.CNIPlugin):
|
||||
def __init__(self, registry, healthy):
|
||||
self.healthy = healthy
|
||||
self.registry = registry
|
||||
|
||||
def _get_name(self, pod):
|
||||
return pod['metadata']['name']
|
||||
|
||||
def add(self, params):
|
||||
vif = self._do_work(params, b_base.connect)
|
||||
|
||||
pod_name = params.args.K8S_POD_NAME
|
||||
# NOTE(dulek): Saving containerid to be able to distinguish old DEL
|
||||
# requests that we should ignore. We need a lock to
|
||||
# prevent race conditions and replace whole object in the
|
||||
# dict for multiprocessing.Manager to notice that.
|
||||
with lockutils.lock(pod_name, external=True):
|
||||
d = self.registry[pod_name]
|
||||
d['containerid'] = params.CNI_CONTAINERID
|
||||
self.registry[pod_name] = d
|
||||
LOG.debug('Saved containerid = %s for pod %s',
|
||||
params.CNI_CONTAINERID, pod_name)
|
||||
|
||||
# Wait for VIF to become active.
|
||||
timeout = CONF.cni_daemon.vif_annotation_timeout
|
||||
|
||||
# Wait for timeout sec, 1 sec between tries, retry when vif not active.
|
||||
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
|
||||
retry_on_result=lambda x: not x.active)
|
||||
def wait_for_active(pod_name):
|
||||
return base.VersionedObject.obj_from_primitive(
|
||||
self.registry[pod_name]['vif'])
|
||||
|
||||
vif = wait_for_active(pod_name)
|
||||
if not vif.active:
|
||||
raise exceptions.ResourceNotReady(pod_name)
|
||||
|
||||
return vif
|
||||
|
||||
def delete(self, params):
|
||||
pod_name = params.args.K8S_POD_NAME
|
||||
try:
|
||||
reg_ci = self.registry[pod_name]['containerid']
|
||||
LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name)
|
||||
if reg_ci and reg_ci != params.CNI_CONTAINERID:
|
||||
# NOTE(dulek): This is a DEL request for some older (probably
|
||||
# failed) ADD call. We should ignore it or we'll
|
||||
# unplug a running pod.
|
||||
LOG.warning('Received DEL request for unknown ADD call. '
|
||||
'Ignoring.')
|
||||
return
|
||||
except KeyError:
|
||||
pass
|
||||
self._do_work(params, b_base.disconnect)
|
||||
|
||||
def report_drivers_health(self, driver_healthy):
|
||||
if not driver_healthy:
|
||||
with self.healthy.get_lock():
|
||||
LOG.debug("Reporting CNI driver not healthy.")
|
||||
self.healthy.value = driver_healthy
|
||||
|
||||
def _do_work(self, params, fn):
|
||||
pod_name = params.args.K8S_POD_NAME
|
||||
|
||||
timeout = CONF.cni_daemon.vif_annotation_timeout
|
||||
|
||||
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
|
||||
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
|
||||
retry_on_exception=lambda e: isinstance(e, KeyError))
|
||||
def find():
|
||||
return self.registry[pod_name]
|
||||
|
||||
try:
|
||||
d = find()
|
||||
pod = d['pod']
|
||||
vif = base.VersionedObject.obj_from_primitive(d['vif'])
|
||||
except KeyError:
|
||||
raise exceptions.ResourceNotReady(pod_name)
|
||||
|
||||
fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS,
|
||||
self.report_drivers_health)
|
||||
return vif
|
||||
|
||||
def _get_inst(self, pod):
|
||||
return obj_vif.instance_info.InstanceInfo(
|
||||
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
|
||||
|
||||
|
||||
class DaemonServer(object):
|
||||
def __init__(self, plugin, healthy):
|
||||
|
@ -250,7 +151,8 @@ class CNIDaemonServerService(cotyledon.Service):
|
|||
self.run_queue_reading = False
|
||||
self.registry = registry
|
||||
self.healthy = healthy
|
||||
self.plugin = K8sCNIRegistryPlugin(registry, self.healthy)
|
||||
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry,
|
||||
self.healthy)
|
||||
self.server = DaemonServer(self.plugin, self.healthy)
|
||||
|
||||
def run(self):
|
||||
|
|
|
@ -22,49 +22,18 @@ from oslo_config import cfg
|
|||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes.cni import api as cni_api
|
||||
from kuryr_kubernetes.cni import handlers as h_cni
|
||||
from kuryr_kubernetes.cni.plugins import k8s_cni
|
||||
from kuryr_kubernetes.cni import utils
|
||||
from kuryr_kubernetes import config
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes import objects as k_objects
|
||||
from kuryr_kubernetes import watcher as k_watcher
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
_CNI_TIMEOUT = 180
|
||||
|
||||
|
||||
class K8sCNIPlugin(cni_api.CNIPlugin):
|
||||
|
||||
def add(self, params):
|
||||
self._setup(params)
|
||||
self._pipeline.register(h_cni.AddHandler(params, self._done))
|
||||
self._watcher.start()
|
||||
return self._vif
|
||||
|
||||
def delete(self, params):
|
||||
self._setup(params)
|
||||
self._pipeline.register(h_cni.DelHandler(params, self._done))
|
||||
self._watcher.start()
|
||||
|
||||
def _done(self, vif):
|
||||
self._vif = vif
|
||||
self._watcher.stop()
|
||||
|
||||
def _setup(self, params):
|
||||
clients.setup_kubernetes_client()
|
||||
self._pipeline = h_cni.CNIPipeline()
|
||||
self._watcher = k_watcher.Watcher(self._pipeline)
|
||||
self._watcher.add(
|
||||
"%(base)s/namespaces/%(namespace)s/pods"
|
||||
"?fieldSelector=metadata.name=%(pod)s" % {
|
||||
'base': k_const.K8S_API_BASE,
|
||||
'namespace': params.args.K8S_POD_NAMESPACE,
|
||||
'pod': params.args.K8S_POD_NAME})
|
||||
|
||||
|
||||
def run():
|
||||
# REVISIT(ivc): current CNI implementation provided by this package is
|
||||
# experimental and its primary purpose is to enable development of other
|
||||
|
@ -87,7 +56,7 @@ def run():
|
|||
if CONF.cni_daemon.daemon_enabled:
|
||||
runner = cni_api.CNIDaemonizedRunner()
|
||||
else:
|
||||
runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
|
||||
runner = cni_api.CNIStandaloneRunner(k8s_cni.K8sCNIPlugin())
|
||||
LOG.info("Using '%s' ", runner.__class__.__name__)
|
||||
|
||||
def _timeout(signum, frame):
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import abc
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CNIPlugin(object):
|
||||
|
||||
@abc.abstractmethod
|
||||
def add(self, params):
|
||||
raise NotImplementedError()
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete(self, params):
|
||||
raise NotImplementedError()
|
|
@ -0,0 +1,49 @@
|
|||
# Copyright (c) 2016 Mirantis, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
from kuryr_kubernetes.cni import handlers as h_cni
|
||||
from kuryr_kubernetes.cni.plugins import base as base_cni
|
||||
from kuryr_kubernetes import constants as k_const
|
||||
from kuryr_kubernetes import watcher as k_watcher
|
||||
|
||||
|
||||
class K8sCNIPlugin(base_cni.CNIPlugin):
|
||||
|
||||
def add(self, params):
|
||||
self._setup(params)
|
||||
self._pipeline.register(h_cni.AddHandler(params, self._done))
|
||||
self._watcher.start()
|
||||
return self._vif
|
||||
|
||||
def delete(self, params):
|
||||
self._setup(params)
|
||||
self._pipeline.register(h_cni.DelHandler(params, self._done))
|
||||
self._watcher.start()
|
||||
|
||||
def _done(self, vif):
|
||||
self._vif = vif
|
||||
self._watcher.stop()
|
||||
|
||||
def _setup(self, params):
|
||||
clients.setup_kubernetes_client()
|
||||
self._pipeline = h_cni.CNIPipeline()
|
||||
self._watcher = k_watcher.Watcher(self._pipeline)
|
||||
self._watcher.add(
|
||||
"%(base)s/namespaces/%(namespace)s/pods"
|
||||
"?fieldSelector=metadata.name=%(pod)s" % {
|
||||
'base': k_const.K8S_API_BASE,
|
||||
'namespace': params.args.K8S_POD_NAMESPACE,
|
||||
'pod': params.args.K8S_POD_NAME})
|
|
@ -0,0 +1,124 @@
|
|||
# Copyright 2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import retrying
|
||||
|
||||
from os_vif import objects as obj_vif
|
||||
from os_vif.objects import base
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from kuryr_kubernetes.cni.binding import base as b_base
|
||||
from kuryr_kubernetes.cni.plugins import base as base_cni
|
||||
from kuryr_kubernetes import exceptions
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
RETRY_DELAY = 1000 # 1 second in milliseconds
|
||||
|
||||
# TODO(dulek): Another corner case is (and was) when pod is deleted before it's
|
||||
# annotated by controller or even noticed by any watcher. Kubelet
|
||||
# will try to delete such vif, but we will have no data about it.
|
||||
# This is currently worked around by returning succesfully in case
|
||||
# of timing out in delete. To solve this properly we need to watch
|
||||
# for pod deletes as well.
|
||||
|
||||
|
||||
class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
|
||||
def __init__(self, registry, healthy):
|
||||
self.healthy = healthy
|
||||
self.registry = registry
|
||||
|
||||
def _get_name(self, pod):
|
||||
return pod['metadata']['name']
|
||||
|
||||
def add(self, params):
|
||||
vif = self._do_work(params, b_base.connect)
|
||||
|
||||
pod_name = params.args.K8S_POD_NAME
|
||||
# NOTE(dulek): Saving containerid to be able to distinguish old DEL
|
||||
# requests that we should ignore. We need a lock to
|
||||
# prevent race conditions and replace whole object in the
|
||||
# dict for multiprocessing.Manager to notice that.
|
||||
with lockutils.lock(pod_name, external=True):
|
||||
d = self.registry[pod_name]
|
||||
d['containerid'] = params.CNI_CONTAINERID
|
||||
self.registry[pod_name] = d
|
||||
LOG.debug('Saved containerid = %s for pod %s',
|
||||
params.CNI_CONTAINERID, pod_name)
|
||||
|
||||
# Wait for VIF to become active.
|
||||
timeout = CONF.cni_daemon.vif_annotation_timeout
|
||||
|
||||
# Wait for timeout sec, 1 sec between tries, retry when vif not active.
|
||||
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
|
||||
retry_on_result=lambda x: not x.active)
|
||||
def wait_for_active(pod_name):
|
||||
return base.VersionedObject.obj_from_primitive(
|
||||
self.registry[pod_name]['vif'])
|
||||
|
||||
vif = wait_for_active(pod_name)
|
||||
if not vif.active:
|
||||
raise exceptions.ResourceNotReady(pod_name)
|
||||
|
||||
return vif
|
||||
|
||||
def delete(self, params):
|
||||
pod_name = params.args.K8S_POD_NAME
|
||||
try:
|
||||
reg_ci = self.registry[pod_name]['containerid']
|
||||
LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name)
|
||||
if reg_ci and reg_ci != params.CNI_CONTAINERID:
|
||||
# NOTE(dulek): This is a DEL request for some older (probably
|
||||
# failed) ADD call. We should ignore it or we'll
|
||||
# unplug a running pod.
|
||||
LOG.warning('Received DEL request for unknown ADD call. '
|
||||
'Ignoring.')
|
||||
return
|
||||
except KeyError:
|
||||
pass
|
||||
self._do_work(params, b_base.disconnect)
|
||||
|
||||
def report_drivers_health(self, driver_healthy):
|
||||
if not driver_healthy:
|
||||
with self.healthy.get_lock():
|
||||
LOG.debug("Reporting CNI driver not healthy.")
|
||||
self.healthy.value = driver_healthy
|
||||
|
||||
def _do_work(self, params, fn):
|
||||
pod_name = params.args.K8S_POD_NAME
|
||||
|
||||
timeout = CONF.cni_daemon.vif_annotation_timeout
|
||||
|
||||
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
|
||||
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
|
||||
retry_on_exception=lambda e: isinstance(e, KeyError))
|
||||
def find():
|
||||
return self.registry[pod_name]
|
||||
|
||||
try:
|
||||
d = find()
|
||||
pod = d['pod']
|
||||
vif = base.VersionedObject.obj_from_primitive(d['vif'])
|
||||
except KeyError:
|
||||
raise exceptions.ResourceNotReady(pod_name)
|
||||
|
||||
fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS,
|
||||
self.report_drivers_health)
|
||||
return vif
|
||||
|
||||
def _get_inst(self, pod):
|
||||
return obj_vif.instance_info.InstanceInfo(
|
||||
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
|
|
@ -0,0 +1,67 @@
|
|||
# Copyright (c) 2017 NEC Corporation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from kuryr_kubernetes.cni.plugins import k8s_cni
|
||||
from kuryr_kubernetes import constants
|
||||
from kuryr_kubernetes.tests import base as test_base
|
||||
|
||||
|
||||
class TestK8sCNIPlugin(test_base.TestCase):
|
||||
@mock.patch('kuryr_kubernetes.watcher.Watcher')
|
||||
@mock.patch('kuryr_kubernetes.cni.handlers.CNIPipeline')
|
||||
@mock.patch('kuryr_kubernetes.cni.handlers.DelHandler')
|
||||
@mock.patch('kuryr_kubernetes.cni.handlers.AddHandler')
|
||||
def _test_method(self, method, m_add_handler, m_del_handler, m_cni_pipe,
|
||||
m_watcher_class):
|
||||
self.passed_handler = None
|
||||
|
||||
def _save_handler(params, handler):
|
||||
self.passed_handler = handler
|
||||
|
||||
def _call_handler(*args):
|
||||
self.passed_handler(mock.sentinel.vif)
|
||||
|
||||
m_add_handler.side_effect = _save_handler
|
||||
m_del_handler.side_effect = _save_handler
|
||||
|
||||
m_watcher = mock.MagicMock(
|
||||
add=mock.MagicMock(),
|
||||
start=mock.MagicMock(side_effect=_call_handler))
|
||||
m_watcher_class.return_value = m_watcher
|
||||
|
||||
m_params = mock.MagicMock()
|
||||
m_params.args.K8S_POD_NAMESPACE = 'k8s_pod_namespace'
|
||||
m_params.args.K8S_POD_NAME = 'k8s_pod'
|
||||
|
||||
cni_plugin = k8s_cni.K8sCNIPlugin()
|
||||
result = getattr(cni_plugin, method)(m_params)
|
||||
self.assertEqual(mock.sentinel.vif, cni_plugin._vif)
|
||||
m_watcher.add.assert_called_with(
|
||||
"%(base)s/namespaces/%(namespace)s/pods"
|
||||
"?fieldSelector=metadata.name=%(pod)s" % {
|
||||
'base': constants.K8S_API_BASE,
|
||||
'namespace': m_params.args.K8S_POD_NAMESPACE,
|
||||
'pod': m_params.args.K8S_POD_NAME})
|
||||
|
||||
return result
|
||||
|
||||
def test_add(self):
|
||||
result = self._test_method('add')
|
||||
self.assertEqual(result, mock.sentinel.vif)
|
||||
|
||||
def test_delete(self):
|
||||
self._test_method('delete')
|
|
@ -0,0 +1,94 @@
|
|||
# Copyright 2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
|
||||
from kuryr_kubernetes import exceptions
|
||||
from kuryr_kubernetes.tests import base
|
||||
from kuryr_kubernetes.tests import fake
|
||||
|
||||
|
||||
class TestK8sCNIRegistryPlugin(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestK8sCNIRegistryPlugin, self).setUp()
|
||||
self.pod = {'metadata': {'name': 'foo', 'uid': 'bar'}}
|
||||
self.vif = fake._fake_vif_dict()
|
||||
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
|
||||
'containerid': None}}
|
||||
healthy = mock.Mock()
|
||||
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
|
||||
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo'),
|
||||
CNI_IFNAME='baz', CNI_NETNS=123,
|
||||
CNI_CONTAINERID='cont_id')
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
|
||||
def test_add_present(self, m_connect, m_lock):
|
||||
self.plugin.add(self.params)
|
||||
|
||||
m_lock.assert_called_with('foo', external=True)
|
||||
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
|
||||
self.assertEqual('cont_id', self.plugin.registry['foo']['containerid'])
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_del_present(self, m_disconnect):
|
||||
self.plugin.delete(self.params)
|
||||
|
||||
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123,
|
||||
mock.ANY)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_del_wrong_container_id(self, m_disconnect):
|
||||
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
|
||||
'containerid': 'different'}}
|
||||
healthy = mock.Mock()
|
||||
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
|
||||
self.plugin.delete(self.params)
|
||||
|
||||
m_disconnect.assert_not_called()
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('time.sleep', mock.Mock())
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
|
||||
def test_add_present_on_5_try(self, m_connect, m_lock):
|
||||
se = [KeyError] * 5
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
m_getitem = mock.Mock(side_effect=se)
|
||||
m_setitem = mock.Mock()
|
||||
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem)
|
||||
self.plugin.registry = m_registry
|
||||
self.plugin.add(self.params)
|
||||
|
||||
m_lock.assert_called_with('foo', external=True)
|
||||
m_setitem.assert_called_once_with('foo', {'pod': self.pod,
|
||||
'vif': self.vif,
|
||||
'containerid': 'cont_id'})
|
||||
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
|
||||
|
||||
@mock.patch('time.sleep', mock.Mock())
|
||||
def test_add_not_present(self):
|
||||
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
|
||||
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
|
||||
group='cni_daemon')
|
||||
|
||||
m_getitem = mock.Mock(side_effect=KeyError)
|
||||
m_registry = mock.Mock(__getitem__=m_getitem)
|
||||
self.plugin.registry = m_registry
|
||||
self.assertRaises(exceptions.ResourceNotReady, self.plugin.add,
|
||||
self.params)
|
|
@ -22,7 +22,7 @@ from oslo_config import cfg
|
|||
from oslo_serialization import jsonutils
|
||||
|
||||
from kuryr_kubernetes.cni import api
|
||||
from kuryr_kubernetes.cni import main
|
||||
from kuryr_kubernetes.cni.plugins import k8s_cni
|
||||
from kuryr_kubernetes.tests import base as test_base
|
||||
from kuryr_kubernetes.tests import fake
|
||||
|
||||
|
@ -54,9 +54,9 @@ class TestCNIRunnerMixin(object):
|
|||
class TestCNIStandaloneRunner(test_base.TestCase, TestCNIRunnerMixin):
|
||||
def setUp(self):
|
||||
super(TestCNIStandaloneRunner, self).setUp()
|
||||
self.runner = api.CNIStandaloneRunner(main.K8sCNIPlugin())
|
||||
self.runner = api.CNIStandaloneRunner(k8s_cni.K8sCNIPlugin())
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.main.K8sCNIPlugin.add')
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni.K8sCNIPlugin.add')
|
||||
def test_run_add(self, m_k8s_add):
|
||||
vif = fake._fake_vif()
|
||||
m_k8s_add.return_value = vif
|
||||
|
@ -76,7 +76,7 @@ class TestCNIStandaloneRunner(test_base.TestCase, TestCNIRunnerMixin):
|
|||
"ip4": {"gateway": "192.168.0.1", "ip": "192.168.0.2/24"}},
|
||||
result)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.main.K8sCNIPlugin.delete')
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni.K8sCNIPlugin.delete')
|
||||
def test_run_del(self, m_k8s_delete):
|
||||
vif = fake._fake_vif()
|
||||
m_k8s_delete.return_value = vif
|
||||
|
|
|
@ -104,8 +104,8 @@ class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase):
|
|||
super(TestOpenVSwitchDriver, self).setUp()
|
||||
self.vif = fake._fake_vif(osv_objects.vif.VIFOpenVSwitch)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.'
|
||||
'report_drivers_health')
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
|
||||
'K8sCNIRegistryPlugin.report_drivers_health')
|
||||
@mock.patch('os.getpid', mock.Mock(return_value=123))
|
||||
@mock.patch('kuryr_kubernetes.linux_net_utils.create_ovs_vif_port')
|
||||
def test_connect(self, mock_create_ovs, m_report):
|
||||
|
@ -126,8 +126,8 @@ class TestOpenVSwitchDriver(TestDriverMixin, test_base.TestCase):
|
|||
'bridge', 'h_interface', '89eccd45-43e9-43d8-b4cc-4c13db13f782',
|
||||
'3e:94:b7:31:a0:83', 'kuryr')
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.'
|
||||
'report_drivers_health')
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
|
||||
'K8sCNIRegistryPlugin.report_drivers_health')
|
||||
@mock.patch('kuryr_kubernetes.linux_net_utils.delete_ovs_vif_port')
|
||||
def test_disconnect(self, mock_delete_ovs, m_report):
|
||||
self._test_disconnect(report=m_report)
|
||||
|
|
|
@ -18,7 +18,6 @@ import mock
|
|||
from oslo_config import cfg
|
||||
|
||||
from kuryr_kubernetes.cni import main
|
||||
from kuryr_kubernetes import constants
|
||||
from kuryr_kubernetes.tests import base as test_base
|
||||
|
||||
|
||||
|
@ -70,50 +69,3 @@ class TestCNIMain(test_base.TestCase):
|
|||
m_setup_logging.assert_called()
|
||||
m_cni_daemon.run.assert_called()
|
||||
m_sysexit.assert_called()
|
||||
|
||||
|
||||
class TestK8sCNIPlugin(test_base.TestCase):
|
||||
@mock.patch('kuryr_kubernetes.watcher.Watcher')
|
||||
@mock.patch('kuryr_kubernetes.cni.handlers.CNIPipeline')
|
||||
@mock.patch('kuryr_kubernetes.cni.handlers.DelHandler')
|
||||
@mock.patch('kuryr_kubernetes.cni.handlers.AddHandler')
|
||||
def _test_method(self, method, m_add_handler, m_del_handler, m_cni_pipe,
|
||||
m_watcher_class):
|
||||
self.passed_handler = None
|
||||
|
||||
def _save_handler(params, handler):
|
||||
self.passed_handler = handler
|
||||
|
||||
def _call_handler(*args):
|
||||
self.passed_handler(mock.sentinel.vif)
|
||||
|
||||
m_add_handler.side_effect = _save_handler
|
||||
m_del_handler.side_effect = _save_handler
|
||||
|
||||
m_watcher = mock.MagicMock(
|
||||
add=mock.MagicMock(),
|
||||
start=mock.MagicMock(side_effect=_call_handler))
|
||||
m_watcher_class.return_value = m_watcher
|
||||
|
||||
m_params = mock.MagicMock()
|
||||
m_params.args.K8S_POD_NAMESPACE = 'k8s_pod_namespace'
|
||||
m_params.args.K8S_POD_NAME = 'k8s_pod'
|
||||
|
||||
cni_plugin = main.K8sCNIPlugin()
|
||||
result = getattr(cni_plugin, method)(m_params)
|
||||
self.assertEqual(mock.sentinel.vif, cni_plugin._vif)
|
||||
m_watcher.add.assert_called_with(
|
||||
"%(base)s/namespaces/%(namespace)s/pods"
|
||||
"?fieldSelector=metadata.name=%(pod)s" % {
|
||||
'base': constants.K8S_API_BASE,
|
||||
'namespace': m_params.args.K8S_POD_NAMESPACE,
|
||||
'pod': m_params.args.K8S_POD_NAME})
|
||||
|
||||
return result
|
||||
|
||||
def test_add(self):
|
||||
result = self._test_method('add')
|
||||
self.assertEqual(result, mock.sentinel.vif)
|
||||
|
||||
def test_delete(self):
|
||||
self._test_method('delete')
|
||||
|
|
|
@ -14,92 +14,20 @@
|
|||
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from kuryr_kubernetes.cni.daemon import service
|
||||
from kuryr_kubernetes.cni.plugins import k8s_cni_registry
|
||||
from kuryr_kubernetes import exceptions
|
||||
from kuryr_kubernetes.tests import base
|
||||
from kuryr_kubernetes.tests import fake
|
||||
|
||||
|
||||
class TestK8sCNIRegistryPlugin(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestK8sCNIRegistryPlugin, self).setUp()
|
||||
self.pod = {'metadata': {'name': 'foo', 'uid': 'bar'}}
|
||||
self.vif = fake._fake_vif_dict()
|
||||
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
|
||||
'containerid': None}}
|
||||
healthy = mock.Mock()
|
||||
self.plugin = service.K8sCNIRegistryPlugin(registry, healthy)
|
||||
self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo'),
|
||||
CNI_IFNAME='baz', CNI_NETNS=123,
|
||||
CNI_CONTAINERID='cont_id')
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
|
||||
def test_add_present(self, m_connect, m_lock):
|
||||
self.plugin.add(self.params)
|
||||
|
||||
m_lock.assert_called_with('foo', external=True)
|
||||
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
|
||||
self.assertEqual('cont_id', self.plugin.registry['foo']['containerid'])
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_del_present(self, m_disconnect):
|
||||
self.plugin.delete(self.params)
|
||||
|
||||
m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123,
|
||||
mock.ANY)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
|
||||
def test_del_wrong_container_id(self, m_disconnect):
|
||||
registry = {'foo': {'pod': self.pod, 'vif': self.vif,
|
||||
'containerid': 'different'}}
|
||||
healthy = mock.Mock()
|
||||
self.plugin = service.K8sCNIRegistryPlugin(registry, healthy)
|
||||
self.plugin.delete(self.params)
|
||||
|
||||
m_disconnect.assert_not_called()
|
||||
|
||||
@mock.patch('oslo_concurrency.lockutils.lock')
|
||||
@mock.patch('time.sleep', mock.Mock())
|
||||
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
|
||||
def test_add_present_on_5_try(self, m_connect, m_lock):
|
||||
se = [KeyError] * 5
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None})
|
||||
m_getitem = mock.Mock(side_effect=se)
|
||||
m_setitem = mock.Mock()
|
||||
m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem)
|
||||
self.plugin.registry = m_registry
|
||||
self.plugin.add(self.params)
|
||||
|
||||
m_lock.assert_called_with('foo', external=True)
|
||||
m_setitem.assert_called_once_with('foo', {'pod': self.pod,
|
||||
'vif': self.vif,
|
||||
'containerid': 'cont_id'})
|
||||
m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123, mock.ANY)
|
||||
|
||||
@mock.patch('time.sleep', mock.Mock())
|
||||
def test_add_not_present(self):
|
||||
cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon')
|
||||
self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120,
|
||||
group='cni_daemon')
|
||||
|
||||
m_getitem = mock.Mock(side_effect=KeyError)
|
||||
m_registry = mock.Mock(__getitem__=m_getitem)
|
||||
self.plugin.registry = m_registry
|
||||
self.assertRaises(exceptions.ResourceNotReady, self.plugin.add,
|
||||
self.params)
|
||||
|
||||
|
||||
class TestDaemonServer(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestDaemonServer, self).setUp()
|
||||
healthy = mock.Mock()
|
||||
self.plugin = service.K8sCNIRegistryPlugin({}, healthy)
|
||||
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin({}, healthy)
|
||||
self.health_registry = mock.Mock()
|
||||
self.srv = service.DaemonServer(self.plugin, self.health_registry)
|
||||
|
||||
|
@ -109,7 +37,8 @@ class TestDaemonServer(base.TestCase):
|
|||
'CNI_CONTAINERID': 'baz', 'CNI_COMMAND': 'ADD'}
|
||||
self.params_str = jsonutils.dumps(params)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
|
||||
'K8sCNIRegistryPlugin.add')
|
||||
def test_add(self, m_add):
|
||||
vif = fake._fake_vif()
|
||||
m_add.return_value = vif
|
||||
|
@ -122,7 +51,8 @@ class TestDaemonServer(base.TestCase):
|
|||
fake._fake_vif_string(vif.obj_to_primitive()).encode(), resp.data)
|
||||
self.assertEqual(202, resp.status_code)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
|
||||
'K8sCNIRegistryPlugin.add')
|
||||
def test_add_timeout(self, m_add):
|
||||
m_add.side_effect = exceptions.ResourceNotReady(mock.Mock())
|
||||
|
||||
|
@ -132,7 +62,8 @@ class TestDaemonServer(base.TestCase):
|
|||
m_add.assert_called_once_with(mock.ANY)
|
||||
self.assertEqual(504, resp.status_code)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add')
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
|
||||
'K8sCNIRegistryPlugin.add')
|
||||
def test_add_error(self, m_add):
|
||||
m_add.side_effect = Exception
|
||||
|
||||
|
@ -142,7 +73,7 @@ class TestDaemonServer(base.TestCase):
|
|||
m_add.assert_called_once_with(mock.ANY)
|
||||
self.assertEqual(500, resp.status_code)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
|
||||
'K8sCNIRegistryPlugin.delete')
|
||||
def test_delete(self, m_delete):
|
||||
resp = self.test_client.post('/delNetwork', data=self.params_str,
|
||||
|
@ -151,7 +82,7 @@ class TestDaemonServer(base.TestCase):
|
|||
m_delete.assert_called_once_with(mock.ANY)
|
||||
self.assertEqual(204, resp.status_code)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
|
||||
'K8sCNIRegistryPlugin.delete')
|
||||
def test_delete_timeout(self, m_delete):
|
||||
m_delete.side_effect = exceptions.ResourceNotReady(mock.Mock())
|
||||
|
@ -161,7 +92,7 @@ class TestDaemonServer(base.TestCase):
|
|||
m_delete.assert_called_once_with(mock.ANY)
|
||||
self.assertEqual(204, resp.status_code)
|
||||
|
||||
@mock.patch('kuryr_kubernetes.cni.daemon.service.'
|
||||
@mock.patch('kuryr_kubernetes.cni.plugins.k8s_cni_registry.'
|
||||
'K8sCNIRegistryPlugin.delete')
|
||||
def test_delete_error(self, m_delete):
|
||||
m_delete.side_effect = Exception
|
||||
|
|
Loading…
Reference in New Issue