Experimental CNI & VIFBridge binding

This patch provides an experimental CNI driver. It's primary purpose
is to enable development of other components (e.g. functional tests,
service/LBaaSv2 support). It is expected to be replaced with daemon
to configure VIF and connect it to the pods and a small lightweight
client to serve as CNI driver called by Kubernetes.

NOTE: unit tests are not provided as part of this patch as it is yet
unclear what parts of it will be reused in daemon-based
implementation.

Change-Id: Iacc8439dd3aee910d542e48ed013d6d3f354786e
Partially-Implements: blueprint kuryr-k8s-integration
This commit is contained in:
Ilya Chukhnakov 2016-11-29 09:34:37 +03:00
parent 1b1e9eb5e8
commit fa03953aff
17 changed files with 525 additions and 13 deletions

View File

@ -73,6 +73,9 @@ function configure_kuryr {
# "$(get_distutils_data_path)/libexec/kuryr"
iniset "$KURYR_CONFIG" kubernetes api_root "$KURYR_K8S_API_URL"
# REVISIT(ivc): 'use_stderr' is required for current CNI driver. Once a
# daemon-based CNI driver is implemented, this could be removed.
iniset "$KURYR_CONFIG" DEFAULT use_stderr true
create_kuryr_cache_dir
@ -83,6 +86,12 @@ function configure_kuryr {
fi
}
function install_kuryr_cni {
local kuryr_cni_bin=$(which kuryr-cni)
sudo install -o "$STACK_USER" -m 0555 -D \
"$kuryr_cni_bin" "${CNI_BIN_DIR}/kuryr-cni"
}
function configure_neutron_defaults {
local project_id=$(get_or_create_project \
"$KURYR_NEUTRON_DEFAULT_PROJECT" default)
@ -349,6 +358,9 @@ function run_k8s_kubelet {
if is_service_enabled kuryr-kubernetes; then
if [[ "$1" == "stack" && "$2" == "install" ]]; then
setup_develop "$KURYR_HOME"
if is_service_enabled kubelet; then
install_kuryr_cni
fi
elif [[ "$1" == "stack" && "$2" == "post-config" ]]; then
create_kuryr_account

View File

@ -1,5 +1,7 @@
{
"cniVersion": "0.3.0",
"name": "kuryr",
"type": "kuryr"
"type": "kuryr-cni",
"kuryr_conf": "/etc/kuryr/kuryr.conf",
"debug": true
}

View File

@ -32,6 +32,14 @@ def get_kubernetes_client():
def setup_clients():
setup_neutron_client()
setup_kubernetes_client()
def setup_neutron_client():
_clients[_NEUTRON_CLIENT] = utils.get_neutron_client()
def setup_kubernetes_client():
_clients[_KUBERNETES_CLIENT] = k8s_client.K8sClient(
config.CONF.kubernetes.api_root)

View File

@ -0,0 +1,22 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr_kubernetes.cni import main
run = main.run
if __name__ == '__main__':
run()

View File

136
kuryr_kubernetes/cni/api.py Normal file
View File

@ -0,0 +1,136 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
import traceback
from kuryr.lib._i18n import _LE
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions as k_exc
LOG = logging.getLogger(__name__)
_CNI_TIMEOUT = 60
class CNIConfig(dict):
def __init__(self, cfg):
super(CNIConfig, self).__init__(cfg)
for k, v in six.iteritems(self):
if not k.startswith('_'):
setattr(self, k, v)
class CNIArgs(object):
def __init__(self, value):
for item in value.split(';'):
k, v = item.split('=', 1)
if not k.startswith('_'):
setattr(self, k, v)
class CNIParameters(object):
def __init__(self, env, cfg):
for k, v in six.iteritems(env):
if k.startswith('CNI_'):
setattr(self, k, v)
self.config = CNIConfig(cfg)
self.args = CNIArgs(self.CNI_ARGS)
@six.add_metaclass(abc.ABCMeta)
class CNIPlugin(object):
@abc.abstractmethod
def add(self, params):
raise NotImplementedError()
@abc.abstractmethod
def delete(self, params):
raise NotImplementedError()
class CNIRunner(object):
# TODO(ivc): extend SUPPORTED_VERSIONS and format output based on
# requested params.CNI_VERSION and/or params.config.cniVersion
VERSION = '0.3.0'
SUPPORTED_VERSIONS = ['0.3.0']
def __init__(self, plugin):
self._plugin = plugin
def run(self, env, fin, fout):
try:
params = CNIParameters(env, jsonutils.load(fin))
if params.CNI_COMMAND == 'ADD':
vif = self._plugin.add(params)
self._write_vif(fout, vif)
elif params.CNI_COMMAND == 'DEL':
self._plugin.delete(params)
elif params.CNI_COMMAND == 'VERSION':
self._write_version(fout)
else:
raise k_exc.CNIError(_LE("unknown CNI_COMMAND: %s")
% params.CNI_COMMAND)
except Exception as ex:
# LOG.exception
self._write_exception(fout, str(ex))
return 1
def _write_dict(self, fout, dct):
output = {'cniVersion': self.VERSION}
output.update(dct)
LOG.debug("CNI output: %s", output)
jsonutils.dump(output, fout, sort_keys=True)
def _write_exception(self, fout, msg):
self._write_dict(fout, {
'msg': msg,
'code': k_const.CNI_EXCEPTION_CODE,
'details': traceback.format_exc(),
})
def _write_version(self, fout):
self._write_dict(fout, {'supportedVersions': self.SUPPORTED_VERSIONS})
def _write_vif(self, fout, vif):
result = {}
nameservers = []
for subnet in vif.network.subnets.objects:
nameservers.extend(subnet.dns)
ip = subnet.ips.objects[0].address
cni_ip = result.setdefault("ip%s" % ip.version, {})
cni_ip['ip'] = "%s/%s" % (ip, subnet.cidr.prefixlen)
if subnet.gateway:
cni_ip['gateway'] = str(subnet.gateway)
if subnet.routes.objects:
cni_ip['routes'] = [
{'dst': str(route.cidr), 'gw': str(route.gateway)}
for route in subnet.routes.objects]
if nameservers:
result['dns']['nameservers'] = nameservers
self._write_dict(fout, result)

View File

View File

@ -0,0 +1,72 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os_vif
from oslo_log import log as logging
import pyroute2
from stevedore import driver as stv_driver
_BINDING_NAMESPACE = 'kuryr_kubernetes.cni.binding'
_IPDB = {}
LOG = logging.getLogger(__name__)
def _get_binding_driver(vif):
mgr = stv_driver.DriverManager(namespace=_BINDING_NAMESPACE,
name=type(vif).__name__,
invoke_on_load=True)
return mgr.driver
def get_ipdb(netns=None):
try:
return _IPDB[netns]
except KeyError:
if netns:
ipdb = pyroute2.IPDB(nl=pyroute2.NetNS(netns))
else:
ipdb = pyroute2.IPDB()
_IPDB[netns] = ipdb
return ipdb
def _configure_l3(vif, ifname, netns):
with get_ipdb(netns).interfaces[ifname] as iface:
for subnet in vif.network.subnets.objects:
for fip in subnet.ips.objects:
iface.add_ip(str(fip.address), mask=str(subnet.cidr.netmask))
routes = get_ipdb(netns).routes
for subnet in vif.network.subnets.objects:
for route in subnet.routes.objects:
routes.add(gateway=str(route.gateway),
dst=str(route.cidr)).commit()
if subnet.gateway:
routes.add(gateway=str(subnet.gateway),
dst='default').commit()
def connect(vif, instance_info, ifname, netns=None):
driver = _get_binding_driver(vif)
os_vif.plug(vif, instance_info)
driver.connect(vif, ifname, netns)
_configure_l3(vif, ifname, netns)
def disconnect(vif, instance_info, ifname, netns=None):
driver = _get_binding_driver(vif)
driver.disconnect(vif, ifname, netns)
os_vif.unplug(vif, instance_info)

View File

@ -0,0 +1,49 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from kuryr_kubernetes.cni.binding import base as b_base
class BridgeDriver(object):
def connect(self, vif, ifname, netns):
host_ifname = vif.vif_name
bridge_name = vif.bridge_name
c_ipdb = b_base.get_ipdb(netns)
h_ipdb = b_base.get_ipdb()
with c_ipdb.create(ifname=ifname, peer=host_ifname,
kind='veth') as c_iface:
c_iface.mtu = vif.network.mtu
c_iface.address = str(vif.address)
c_iface.up()
if netns:
with c_ipdb.interfaces[host_ifname] as h_iface:
h_iface.net_ns_pid = os.getpid()
with h_ipdb.interfaces[host_ifname] as h_iface:
h_iface.mtu = vif.network.mtu
h_iface.up()
with h_ipdb.interfaces[bridge_name] as h_br:
h_br.add_port(host_ifname)
def disconnect(self, vif, ifname, netns):
# NOTE(ivc): veth pair is destroyed automatically along with the
# container namespace
pass

View File

@ -0,0 +1,98 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from os_vif import objects as obj_vif
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.handlers import dispatch as k_dis
from kuryr_kubernetes.handlers import k8s_base
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class CNIHandlerBase(k8s_base.ResourceEventHandler):
OBJECT_KIND = k_const.K8S_OBJ_POD
def __init__(self, cni, on_done):
self._cni = cni
self._callback = on_done
self._vif = None
def on_present(self, pod):
vif = self._get_vif(pod)
if vif:
self.on_vif(pod, vif)
@abc.abstractmethod
def on_vif(self, pod, vif):
raise NotImplementedError()
def _get_vif(self, pod):
# TODO(ivc): same as VIFHandler._get_vif
try:
annotations = pod['metadata']['annotations']
vif_annotation = annotations[k_const.K8S_ANNOTATION_VIF]
except KeyError:
return None
vif_dict = jsonutils.loads(vif_annotation)
vif = obj_vif.vif.VIFBase.obj_from_primitive(vif_dict)
LOG.debug("Got VIF from annotation: %r", vif)
return vif
def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
class AddHandler(CNIHandlerBase):
def __init__(self, cni, on_done):
super(AddHandler, self).__init__(cni, on_done)
self._vif = None
def on_vif(self, pod, vif):
if not self._vif:
self._vif = vif.obj_clone()
self._vif.active = True
b_base.connect(self._vif, self._get_inst(pod),
self._cni.CNI_IFNAME, self._cni.CNI_NETNS)
if vif.active:
self._callback(vif)
class DelHandler(CNIHandlerBase):
def on_vif(self, pod, vif):
b_base.disconnect(vif, self._get_inst(pod),
self._cni.CNI_IFNAME, self._cni.CNI_NETNS)
self._callback(vif)
class CNIPipeline(k_dis.EventPipeline):
def _wrap_dispatcher(self, dispatcher):
return dispatcher
def _wrap_consumer(self, consumer):
return consumer

View File

@ -0,0 +1,93 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import signal
import sys
import os_vif
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes.cni import api as cni_api
from kuryr_kubernetes.cni import handlers as h_cni
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import watcher as k_watcher
LOG = logging.getLogger(__name__)
_CNI_TIMEOUT = 180
class K8sCNIPlugin(cni_api.CNIPlugin):
def add(self, params):
self._setup(params)
self._pipeline.register(h_cni.AddHandler(params, self._done))
self._watcher.start()
return self._vif
def delete(self, params):
self._setup(params)
self._pipeline.register(h_cni.DelHandler(params, self._done))
self._watcher.start()
def _done(self, vif):
self._vif = vif
self._watcher.stop()
def _setup(self, params):
args = ['--config-file', params.config.kuryr_conf]
try:
if params.config.debug:
args.append('-d')
except AttributeError:
pass
config.init(args)
config.setup_logging()
os_vif.initialize()
clients.setup_kubernetes_client()
self._pipeline = h_cni.CNIPipeline()
self._watcher = k_watcher.Watcher(self._pipeline)
self._watcher.add(
"%(base)s/namespaces/%(namespace)s/pods"
"?fieldSelector=metadata.name=%(pod)s" % {
'base': k_const.K8S_API_BASE,
'namespace': params.args.K8S_POD_NAMESPACE,
'pod': params.args.K8S_POD_NAME})
def run():
# REVISIT(ivc): current CNI implementation provided by this package is
# experimental and its primary purpose is to enable development of other
# components (e.g. functional tests, service/LBaaSv2 support)
runner = cni_api.CNIRunner(K8sCNIPlugin())
def _timeout(signum, frame):
runner._write_dict(sys.stdout, {
'msg': 'timeout',
'code': k_const.CNI_TIMEOUT_CODE,
})
LOG.debug('timed out')
sys.exit(1)
signal.signal(signal.SIGALRM, _timeout)
signal.alarm(_CNI_TIMEOUT)
status = runner.run(os.environ, sys.stdin, sys.stdout)
LOG.debug("Exiting with status %s", status)
if status:
sys.exit(status)

View File

@ -25,3 +25,6 @@ K8S_POD_STATUS_PENDING = 'Pending'
K8S_ANNOTATION_PREFIX = 'openstack.org/kuryr'
K8S_ANNOTATION_VIF = K8S_ANNOTATION_PREFIX + '-vif'
CNI_EXCEPTION_CODE = 100
CNI_TIMEOUT_CODE = 200

View File

@ -30,5 +30,9 @@ class ResourceNotReady(Exception):
% resource)
class CNIError(Exception):
pass
def format_msg(exception):
return "%s: %s" % (exception.__class__.__name__, exception)

View File

@ -19,8 +19,6 @@ import six
from oslo_log import log as logging
from kuryr_kubernetes.handlers import base as h_base
from kuryr_kubernetes.handlers import logging as h_log
LOG = logging.getLogger(__name__)
@ -84,6 +82,7 @@ class EventConsumer(h_base.EventHandler):
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class EventPipeline(h_base.EventHandler):
"""Serves as an entry-point for event handling.
@ -110,8 +109,10 @@ class EventPipeline(h_base.EventHandler):
def __call__(self, event):
self._handler(event)
@abc.abstractmethod
def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(dispatcher)
raise NotImplementedError()
@abc.abstractmethod
def _wrap_consumer(self, consumer):
return h_log.LogExceptions(consumer)
raise NotImplementedError()

View File

@ -60,22 +60,30 @@ class TestDispatch(test_base.TestCase):
handler.assert_called_once_with(events[key])
class _TestEventPipeline(h_dis.EventPipeline):
def _wrap_dispatcher(self, dispatcher):
pass
def _wrap_consumer(self, consumer):
pass
class TestEventPipeline(test_base.TestCase):
@mock.patch.object(h_dis.EventPipeline, '_wrap_dispatcher')
@mock.patch.object(_TestEventPipeline, '_wrap_dispatcher')
@mock.patch('kuryr_kubernetes.handlers.dispatch.Dispatcher')
def test_init(self, m_dispatcher_type, m_wrapper):
m_dispatcher_type.return_value = mock.sentinel.dispatcher
m_wrapper.return_value = mock.sentinel.handler
pipeline = h_dis.EventPipeline()
pipeline = _TestEventPipeline()
m_dispatcher_type.assert_called_once()
m_wrapper.assert_called_once_with(mock.sentinel.dispatcher)
self.assertEqual(mock.sentinel.dispatcher, pipeline._dispatcher)
self.assertEqual(mock.sentinel.handler, pipeline._handler)
@mock.patch.object(h_dis.EventPipeline, '_wrap_consumer')
@mock.patch.object(h_dis.EventPipeline, '__init__')
@mock.patch.object(_TestEventPipeline, '_wrap_consumer')
@mock.patch.object(_TestEventPipeline, '__init__')
def test_register(self, m_init, m_wrap_consumer):
consumes = {mock.sentinel.key_fn1: mock.sentinel.key1,
mock.sentinel.key_fn2: mock.sentinel.key2,
@ -85,7 +93,7 @@ class TestEventPipeline(test_base.TestCase):
m_consumer.consumes = consumes
m_wrap_consumer.return_value = mock.sentinel.handler
m_init.return_value = None
pipeline = h_dis.EventPipeline()
pipeline = _TestEventPipeline()
pipeline._dispatcher = m_dispatcher
pipeline.register(m_consumer)
@ -95,11 +103,11 @@ class TestEventPipeline(test_base.TestCase):
mock.call(key_fn, key, mock.sentinel.handler)
for key_fn, key in consumes.items()], any_order=True)
@mock.patch.object(h_dis.EventPipeline, '__init__')
@mock.patch.object(_TestEventPipeline, '__init__')
def test_call(self, m_init):
m_init.return_value = None
m_handler = mock.Mock()
pipeline = h_dis.EventPipeline()
pipeline = _TestEventPipeline()
pipeline._handler = m_handler
pipeline(mock.sentinel.event)

View File

@ -2,7 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
-e git://github.com/openstack/kuryr.git@master#egg=kuryr_lib
kuryr-lib>=0.1.0 # Apache-2.0
pbr>=1.6 # Apache-2.0
requests>=2.10.0 # Apache-2.0
eventlet!=0.18.3,>=0.18.2 # MIT

View File

@ -25,10 +25,14 @@ oslo.config.opts =
console_scripts =
kuryr-k8s-controller = kuryr_kubernetes.cmd.eventlet.controller:start
kuryr-cni = kuryr_kubernetes.cmd.cni:run
kuryr_kubernetes.vif_translators =
ovs = kuryr_kubernetes.os_vif_util:neutron_to_osvif_vif_ovs
kuryr_kubernetes.cni.binding =
VIFBridge = kuryr_kubernetes.cni.binding.bridge:BridgeDriver
kuryr_kubernetes.controller.drivers.pod_project =
default = kuryr_kubernetes.controller.drivers.default_project:DefaultPodProjectDriver