diff --git a/devstack/local.conf.sample b/devstack/local.conf.sample index be4bcf784..45003e6d8 100644 --- a/devstack/local.conf.sample +++ b/devstack/local.conf.sample @@ -176,6 +176,20 @@ enable_service kubelet # resource events and convert them to Neutron actions enable_service kuryr-kubernetes + +# Kuryr Daemon +# ============ +# +# Kuryr can run CNI plugin in daemonized way - i.e. kubelet will run kuryr CNI +# driver and the driver will pass requests to Kuryr daemon running on the node, +# instead of processing them on its own. This limits the number of Kubernetes +# API requests (as only Kuryr Daemon will watch for new pod events) and should +# increase scalability in environments that often delete and create pods. +# Please note that kuryr-daemon is not yet supported in containerized +# deployment. To enable kuryr-daemon uncomment next line. +# enable_service kuryr-daemon + + # Containerized Kuryr # =================== # diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 146469c92..fc05a04c7 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -63,6 +63,10 @@ function configure_kuryr { iniset "$KURYR_CONFIG" kubernetes port_debug "$KURYR_PORT_DEBUG" + if is_service_enabled kuryr-daemon; then + iniset "$KURYR_CONFIG" cni_daemon daemon_enabled True + fi + create_kuryr_cache_dir # Neutron API server & Neutron plugin @@ -540,10 +544,23 @@ function run_kuryr_kubernetes { } +function run_kuryr_daemon { + local daemon_bin=$(which kuryr-daemon) + run_process kuryr-daemon "$daemon_bin --config-file $KURYR_CONFIG" root root +} + + source $DEST/kuryr-kubernetes/devstack/lib/kuryr_kubernetes # main loop -if [[ "$1" == "stack" && "$2" == "install" ]]; then +if [[ "$1" == "stack" && "$2" == "pre-install" ]]; then + KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT) + if is_service_enabled kuryr-daemon && [[ "$KURYR_K8S_CONTAINERIZED_DEPLOYMENT" == "True" ]]; then + die $LINENO "Cannot enable kuryr-daemon with KURYR_K8S_CONTAINERIZED_DEPLOYMENT." + fi + + +elif [[ "$1" == "stack" && "$2" == "install" ]]; then setup_develop "$KURYR_HOME" if is_service_enabled kubelet || is_service_enabled openshift-node; then KURYR_K8S_CONTAINERIZED_DEPLOYMENT=$(trueorfalse False KURYR_K8S_CONTAINERIZED_DEPLOYMENT) @@ -623,6 +640,8 @@ if [[ "$1" == "stack" && "$2" == "extra" ]]; then run_k8s_scheduler fi + run_kuryr_daemon + if is_service_enabled kubelet; then prepare_kubelet extract_hyperkube @@ -663,6 +682,7 @@ if [[ "$1" == "unstack" ]]; then elif is_service_enabled kubelet; then $KURYR_HYPERKUBE_BINARY kubectl delete nodes ${HOSTNAME} fi + stop_process kuryr-daemon docker kill devstack-k8s-setup-files docker rm devstack-k8s-setup-files diff --git a/kuryr_kubernetes/cmd/daemon.py b/kuryr_kubernetes/cmd/daemon.py new file mode 100644 index 000000000..0f572ad6d --- /dev/null +++ b/kuryr_kubernetes/cmd/daemon.py @@ -0,0 +1,22 @@ +# Copyright 2017 NEC Technologies India Pvt. Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kuryr_kubernetes.cni.daemon import service + + +start = service.start + +if __name__ == '__main__': + start() diff --git a/kuryr_kubernetes/cni/api.py b/kuryr_kubernetes/cni/api.py index 6794e34bc..990c3e0d7 100644 --- a/kuryr_kubernetes/cni/api.py +++ b/kuryr_kubernetes/cni/api.py @@ -13,49 +13,25 @@ # License for the specific language governing permissions and limitations # under the License. + import abc import six +from six.moves import http_client as httplib import traceback +import requests + from kuryr.lib._i18n import _ +from os_vif.objects import base from oslo_log import log as logging from oslo_serialization import jsonutils +from kuryr_kubernetes.cni import utils +from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import exceptions as k_exc LOG = logging.getLogger(__name__) -_CNI_TIMEOUT = 60 - - -class CNIConfig(dict): - def __init__(self, cfg): - super(CNIConfig, self).__init__(cfg) - - for k, v in self.items(): - if not k.startswith('_'): - setattr(self, k, v) - - -class CNIArgs(object): - def __init__(self, value): - for item in value.split(';'): - k, v = item.split('=', 1) - if not k.startswith('_'): - setattr(self, k, v) - - -class CNIParameters(object): - def __init__(self, env, cfg): - for k, v in env.items(): - if k.startswith('CNI_'): - setattr(self, k, v) - self.config = CNIConfig(cfg) - self.args = CNIArgs(self.CNI_ARGS) - - def __repr__(self): - return repr({key: value for key, value in self.__dict__.items() if - key.startswith('CNI_')}) @six.add_metaclass(abc.ABCMeta) @@ -70,35 +46,20 @@ class CNIPlugin(object): raise NotImplementedError() +@six.add_metaclass(abc.ABCMeta) class CNIRunner(object): - # TODO(ivc): extend SUPPORTED_VERSIONS and format output based on # requested params.CNI_VERSION and/or params.config.cniVersion VERSION = '0.3.0' SUPPORTED_VERSIONS = ['0.3.0'] - def __init__(self, plugin): - self._plugin = plugin + @abc.abstractmethod + def _add(self, params): + raise NotImplementedError() - def run(self, env, fin, fout): - try: - params = CNIParameters(env, jsonutils.load(fin)) - - if params.CNI_COMMAND == 'ADD': - vif = self._plugin.add(params) - self._write_vif(fout, vif) - elif params.CNI_COMMAND == 'DEL': - self._plugin.delete(params) - elif params.CNI_COMMAND == 'VERSION': - self._write_version(fout) - else: - raise k_exc.CNIError(_("unknown CNI_COMMAND: %s") - % params.CNI_COMMAND) - return 0 - except Exception as ex: - # LOG.exception - self._write_exception(fout, str(ex)) - return 1 + @abc.abstractmethod + def _delete(self, params): + raise NotImplementedError() def _write_dict(self, fout, dct): output = {'cniVersion': self.VERSION} @@ -116,7 +77,31 @@ class CNIRunner(object): def _write_version(self, fout): self._write_dict(fout, {'supportedVersions': self.SUPPORTED_VERSIONS}) - def _write_vif(self, fout, vif): + @abc.abstractmethod + def prepare_env(self, env, stdin): + raise NotImplementedError() + + def run(self, env, fin, fout): + try: + # Prepare params according to calling Object + params = self.prepare_env(env, fin) + if env.get('CNI_COMMAND') == 'ADD': + vif = self._add(params) + self._write_dict(fout, vif) + elif env.get('CNI_COMMAND') == 'DEL': + self._delete(params) + elif env.get('CNI_COMMAND') == 'VERSION': + self._write_version(fout) + else: + raise k_exc.CNIError(_("unknown CNI_COMMAND: %s") + % env['CNI_COMMAND']) + return 0 + except Exception as ex: + # LOG.exception + self._write_exception(fout, str(ex)) + return 1 + + def _vif_data(self, vif): result = {} nameservers = [] @@ -137,5 +122,61 @@ class CNIRunner(object): if nameservers: result['dns'] = {'nameservers': nameservers} + return result - self._write_dict(fout, result) + +class CNIStandaloneRunner(CNIRunner): + + def __init__(self, plugin): + self._plugin = plugin + + def _add(self, params): + vif = self._plugin.add(params) + return self._vif_data(vif) + + def _delete(self, params): + self._plugin.delete(params) + + def prepare_env(self, env, stdin): + return utils.CNIParameters(env, stdin) + + +class CNIDaemonizedRunner(CNIRunner): + + def _add(self, params): + resp = self._make_request('addNetwork', params, httplib.ACCEPTED) + vif = base.VersionedObject.obj_from_primitive(resp.json()) + return self._vif_data(vif) + + def _delete(self, params): + self._make_request('delNetwork', params, httplib.NO_CONTENT) + + def prepare_env(self, env, stdin): + cni_envs = {} + cni_envs.update( + {k: v for k, v in env.items() if k.startswith('CNI_')}) + cni_envs['config_kuryr'] = dict(stdin) + return cni_envs + + def _make_request(self, path, cni_envs, expected_status=None): + method = 'POST' + + address = config.CONF.cni_daemon.bind_address + url = 'http://%s/%s' % (address, path) + try: + LOG.debug('Making request to CNI Daemon. %(method)s %(path)s\n' + '%(body)s', + {'method': method, 'path': url, 'body': cni_envs}) + resp = requests.post(url, json=cni_envs, + headers={'Connection': 'close'}) + except requests.ConnectionError: + LOG.exception('Looks like %s cannot be reached. Is kuryr-daemon ' + 'running?', address) + raise + LOG.debug('CNI Daemon returned "%(status)d %(reason)s".', + {'status': resp.status_code, 'reason': resp.reason}) + if expected_status and resp.status_code != expected_status: + LOG.error('CNI daemon returned error "%(status)d %(reason)s".', + {'status': resp.status_code, 'reason': resp.reason}) + raise k_exc.CNIError('Got invalid status code from CNI daemon.') + return resp diff --git a/kuryr_kubernetes/cni/daemon/__init__.py b/kuryr_kubernetes/cni/daemon/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py new file mode 100644 index 000000000..e019786d0 --- /dev/null +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -0,0 +1,267 @@ +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +from six.moves import http_client as httplib +import socket +import sys + +import cotyledon +import flask +from pyroute2.ipdb import transactional +import retrying + +import os_vif +from os_vif import objects as obj_vif +from os_vif.objects import base +from oslo_config import cfg +from oslo_log import log as logging +from oslo_serialization import jsonutils + +from kuryr_kubernetes import clients +from kuryr_kubernetes.cni import api +from kuryr_kubernetes.cni.binding import base as b_base +from kuryr_kubernetes.cni import handlers as h_cni +from kuryr_kubernetes.cni import utils +from kuryr_kubernetes import config +from kuryr_kubernetes import constants as k_const +from kuryr_kubernetes import exceptions +from kuryr_kubernetes import objects +from kuryr_kubernetes import watcher as k_watcher + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +# TODO(dulek): Another corner case is (and was) when pod is deleted before it's +# annotated by controller or even noticed by any watcher. Kubelet +# will try to delete such vif, but we will have no data about it. +# This is currently worked around by returning succesfully in case +# of timing out in delete. To solve this properly we need to watch +# for pod deletes as well. + + +class K8sCNIRegistryPlugin(api.CNIPlugin): + def __init__(self, registry): + self.registry = registry + + def _get_name(self, pod): + return pod['metadata']['name'] + + def add(self, params): + vif = self._do_work(params, b_base.connect) + + # NOTE(dulek): Saving containerid to be able to distinguish old DEL + # requests that we should ignore. We need to replace whole + # object in the dict for multiprocessing.Manager to work. + pod_name = params.args.K8S_POD_NAME + d = self.registry[pod_name] + d['containerid'] = params.CNI_CONTAINERID + self.registry[pod_name] = d + LOG.debug('Saved containerid = %s for pod %s', params.CNI_CONTAINERID, + pod_name) + + return vif + + def delete(self, params): + pod_name = params.args.K8S_POD_NAME + try: + reg_ci = self.registry[pod_name]['containerid'] + LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name) + if reg_ci and reg_ci != params.CNI_CONTAINERID: + # NOTE(dulek): This is a DEL request for some older (probably + # failed) ADD call. We should ignore it or we'll + # unplug a running pod. + LOG.warning('Received DEL request for unknown ADD call. ' + 'Ignoring.') + return + except KeyError: + pass + self._do_work(params, b_base.disconnect) + + def _do_work(self, params, fn): + pod_name = params.args.K8S_POD_NAME + + timeout = CONF.cni_daemon.vif_annotation_timeout + + # In case of KeyError retry for `timeout` s, wait 1 s between tries. + @retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000, + retry_on_exception=lambda e: isinstance(e, KeyError)) + def find(): + return self.registry[pod_name] + + try: + d = find() + pod = d['pod'] + vif = base.VersionedObject.obj_from_primitive(d['vif']) + except KeyError: + raise exceptions.ResourceNotReady(pod_name) + + fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS) + return vif + + def _get_inst(self, pod): + return obj_vif.instance_info.InstanceInfo( + uuid=pod['metadata']['uid'], name=pod['metadata']['name']) + + +class DaemonServer(object): + def __init__(self, plugin): + self.ctx = None + self.plugin = plugin + + self.application = flask.Flask('kuryr-daemon') + self.application.add_url_rule( + '/addNetwork', methods=['POST'], view_func=self.add) + self.application.add_url_rule( + '/delNetwork', methods=['POST'], view_func=self.delete) + self.headers = {'ContentType': 'application/json', + 'Connection': 'close'} + + def add(self): + params = None + try: + params = utils.CNIParameters(flask.request.get_json()) + LOG.debug('Received addNetwork request. CNI Params: %s', params) + vif = self.plugin.add(params) + data = jsonutils.dumps(vif.obj_to_primitive()) + except exceptions.ResourceNotReady as e: + LOG.error("Timed out waiting for requested pod to appear in " + "registry: %s.", e) + return '', httplib.GATEWAY_TIMEOUT, self.headers + except Exception: + LOG.exception('Error when processing addNetwork request. CNI ' + 'Params: %s', params) + return '', httplib.INTERNAL_SERVER_ERROR, self.headers + return data, httplib.ACCEPTED, self.headers + + def delete(self): + params = None + try: + params = utils.CNIParameters(flask.request.get_json()) + LOG.debug('Received delNetwork request. CNI Params: %s', params) + self.plugin.delete(params) + except exceptions.ResourceNotReady as e: + # NOTE(dulek): It's better to ignore this error - most of the time + # it will happen when pod is long gone and kubelet + # overzealously tries to delete it from the network. + # We cannot really do anything without VIF annotation, + # so let's just tell kubelet to move along. + LOG.warning("Timed out waiting for requested pod to appear in " + "registry: %s. Ignoring.", e) + return '', httplib.NO_CONTENT, self.headers + except Exception: + LOG.exception('Error when processing delNetwork request. CNI ' + 'Params: %s.', params) + return '', httplib.INTERNAL_SERVER_ERROR, self.headers + return '', httplib.NO_CONTENT, self.headers + + def run(self): + server_pair = CONF.cni_daemon.bind_address + LOG.info('Starting server on %s.', server_pair) + try: + address, port = server_pair.split(':') + except ValueError: + LOG.exception('Cannot start server on %s.', server_pair) + raise + + try: + self.application.run(address, port, + processes=CONF.cni_daemon.worker_num) + except Exception: + LOG.exception('Failed to start kuryr-daemon.') + raise + + +class CNIDaemonServerService(cotyledon.Service): + name = "server" + + def __init__(self, worker_id, registry): + super(CNIDaemonServerService, self).__init__(worker_id) + self.run_queue_reading = False + self.registry = registry + self.plugin = K8sCNIRegistryPlugin(registry) + self.server = DaemonServer(self.plugin) + + def run(self): + # NOTE(dulek): We might do a *lot* of pyroute2 operations, let's + # make the pyroute2 timeout configurable to make sure + # kernel will have chance to catch up. + transactional.SYNC_TIMEOUT = CONF.cni_daemon.pyroute2_timeout + + # Run HTTP server + self.server.run() + + +class CNIDaemonWatcherService(cotyledon.Service): + name = "watcher" + + def __init__(self, worker_id, registry): + super(CNIDaemonWatcherService, self).__init__(worker_id) + self.pipeline = None + self.watcher = None + self.registry = registry + + def run(self): + self.pipeline = h_cni.CNIPipeline() + self.pipeline.register(h_cni.CallbackHandler(self.on_done)) + self.watcher = k_watcher.Watcher(self.pipeline) + self.watcher.add( + "%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % { + 'base': k_const.K8S_API_BASE, + 'node_name': socket.gethostname()}) + self.watcher.start() + + def on_done(self, pod, vif): + # Add to registry only if it isn't already there. + if pod['metadata']['name'] not in self.registry: + vif_dict = vif.obj_to_primitive() + self.registry[pod['metadata']['name']] = {'pod': pod, + 'vif': vif_dict, + 'containerid': None} + + def terminate(self): + if self.watcher: + self.watcher.stop() + + +class CNIDaemonServiceManager(cotyledon.ServiceManager): + def __init__(self): + super(CNIDaemonServiceManager, self).__init__() + # TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload. + + # TODO(vikasc): Should be done using dynamically loadable OVO types + # plugin. + objects.register_locally_defined_vifs() + + os_vif.initialize() + clients.setup_kubernetes_client() + + self.manager = multiprocessing.Manager() + registry = self.manager.dict() # For Watcher->Server communication. + self.add(CNIDaemonWatcherService, workers=1, args=(registry,)) + self.add(CNIDaemonServerService, workers=1, args=(registry,)) + self.register_hooks(on_terminate=self.terminate) + + def run(self): + super(CNIDaemonServiceManager, self).run() + + def terminate(self): + self.manager.shutdown() + + +def start(): + config.init(sys.argv[1:]) + config.setup_logging() + + CNIDaemonServiceManager().run() diff --git a/kuryr_kubernetes/cni/handlers.py b/kuryr_kubernetes/cni/handlers.py index 2504fa097..fdd2ed351 100644 --- a/kuryr_kubernetes/cni/handlers.py +++ b/kuryr_kubernetes/cni/handlers.py @@ -90,6 +90,15 @@ class DelHandler(CNIHandlerBase): self._callback(vif) +class CallbackHandler(CNIHandlerBase): + + def __init__(self, on_vif): + super(CallbackHandler, self).__init__(None, on_vif) + + def on_vif(self, pod, vif): + self._callback(pod, vif) + + class CNIPipeline(k_dis.EventPipeline): def _wrap_dispatcher(self, dispatcher): diff --git a/kuryr_kubernetes/cni/main.py b/kuryr_kubernetes/cni/main.py index 3188600d4..471fb4443 100644 --- a/kuryr_kubernetes/cni/main.py +++ b/kuryr_kubernetes/cni/main.py @@ -18,16 +18,20 @@ import signal import sys import os_vif +from oslo_config import cfg from oslo_log import log as logging +from oslo_serialization import jsonutils from kuryr_kubernetes import clients from kuryr_kubernetes.cni import api as cni_api from kuryr_kubernetes.cni import handlers as h_cni +from kuryr_kubernetes.cni import utils from kuryr_kubernetes import config from kuryr_kubernetes import constants as k_const -from kuryr_kubernetes import objects +from kuryr_kubernetes import objects as k_objects from kuryr_kubernetes import watcher as k_watcher +CONF = cfg.CONF LOG = logging.getLogger(__name__) _CNI_TIMEOUT = 180 @@ -50,17 +54,6 @@ class K8sCNIPlugin(cni_api.CNIPlugin): self._watcher.stop() def _setup(self, params): - args = ['--config-file', params.config.kuryr_conf] - - try: - if params.config.debug: - args.append('-d') - except AttributeError: - pass - - config.init(args) - config.setup_logging() - os_vif.initialize() clients.setup_kubernetes_client() self._pipeline = h_cni.CNIPipeline() self._watcher = k_watcher.Watcher(self._pipeline) @@ -76,11 +69,26 @@ def run(): # REVISIT(ivc): current CNI implementation provided by this package is # experimental and its primary purpose is to enable development of other # components (e.g. functional tests, service/LBaaSv2 support) + cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin)) + args = ['--config-file', cni_conf.kuryr_conf] - # TODO(vikasc): Should be done using dynamically loadable OVO types plugin. - objects.register_locally_defined_vifs() + try: + if cni_conf.debug: + args.append('-d') + except AttributeError: + pass + config.init(args) + config.setup_logging() - runner = cni_api.CNIRunner(K8sCNIPlugin()) + # Initialize o.vo registry. + k_objects.register_locally_defined_vifs() + os_vif.initialize() + + if CONF.cni_daemon.daemon_enabled: + runner = cni_api.CNIDaemonizedRunner() + else: + runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin()) + LOG.info("Using '%s' ", runner.__class__.__name__) def _timeout(signum, frame): runner._write_dict(sys.stdout, { @@ -92,7 +100,7 @@ def run(): signal.signal(signal.SIGALRM, _timeout) signal.alarm(_CNI_TIMEOUT) - status = runner.run(os.environ, sys.stdin, sys.stdout) + status = runner.run(os.environ, cni_conf, sys.stdout) LOG.debug("Exiting with status %s", status) if status: sys.exit(status) diff --git a/kuryr_kubernetes/cni/utils.py b/kuryr_kubernetes/cni/utils.py new file mode 100644 index 000000000..da9c16d20 --- /dev/null +++ b/kuryr_kubernetes/cni/utils.py @@ -0,0 +1,47 @@ +# Copyright (c) 2017 NEC Technologies India Pvt Ltd. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class CNIConfig(dict): + def __init__(self, cfg): + super(CNIConfig, self).__init__(cfg) + + for k, v in self.items(): + if not k.startswith('_'): + setattr(self, k, v) + + +class CNIArgs(object): + def __init__(self, value): + for item in value.split(';'): + k, v = item.split('=', 1) + if not k.startswith('_'): + setattr(self, k, v) + + +class CNIParameters(object): + def __init__(self, env, cfg=None): + for k, v in env.items(): + if k.startswith('CNI_'): + setattr(self, k, v) + if cfg is None: + self.config = CNIConfig(env['config_kuryr']) + else: + self.config = cfg + self.args = CNIArgs(self.CNI_ARGS) + + def __repr__(self): + return repr({key: value for key, value in self.__dict__.items() if + key.startswith('CNI_')}) diff --git a/kuryr_kubernetes/config.py b/kuryr_kubernetes/config.py index 4e693f05d..e15d05320 100644 --- a/kuryr_kubernetes/config.py +++ b/kuryr_kubernetes/config.py @@ -30,6 +30,32 @@ kuryr_k8s_opts = [ '../../'))), ] +daemon_opts = [ + cfg.BoolOpt('daemon_enabled', + help=_('Enable CNI Daemon configuration.'), + default=False), + cfg.StrOpt('bind_address', + help=_('Bind address for CNI daemon HTTP server. It is ' + 'recommened to allow only local connections.'), + default='127.0.0.1:50036'), + cfg.IntOpt('worker_num', + help=_('Maximum number of processes that will be spawned to ' + 'process requests from CNI driver.'), + default=30), + cfg.IntOpt('vif_annotation_timeout', + help=_('Time (in seconds) the CNI daemon will wait for VIF ' + 'annotation to appear in pod metadata before failing ' + 'the CNI request.'), + default=60), + cfg.IntOpt('pyroute2_timeout', + help=_('Kuryr uses pyroute2 library to manipulate networking ' + 'interfaces. When processing a high number of Kuryr ' + 'requests in parallel, it may take kernel more time to ' + 'process all networking stack changes. This option ' + 'allows to tune internal pyroute2 timeout.'), + default=10), +] + k8s_opts = [ cfg.StrOpt('api_root', help=_("The root URL of the Kubernetes API"), @@ -125,6 +151,7 @@ octavia_defaults = [ CONF = cfg.CONF CONF.register_opts(kuryr_k8s_opts) +CONF.register_opts(daemon_opts, group='cni_daemon') CONF.register_opts(k8s_opts, group='kubernetes') CONF.register_opts(neutron_defaults, group='neutron_defaults') CONF.register_opts(octavia_defaults, group='octavia_defaults') diff --git a/kuryr_kubernetes/opts.py b/kuryr_kubernetes/opts.py index 755ab45a4..2d5d89fff 100644 --- a/kuryr_kubernetes/opts.py +++ b/kuryr_kubernetes/opts.py @@ -27,6 +27,7 @@ _kuryr_k8s_opts = [ ('vif_pool', vif_pool.vif_pool_driver_opts), ('octavia_defaults', config.octavia_defaults), ('pool_manager', pool.pool_manager_opts), + ('cni_daemon', config.daemon_opts), ] diff --git a/kuryr_kubernetes/tests/unit/cmd/test_daemon.py b/kuryr_kubernetes/tests/unit/cmd/test_daemon.py new file mode 100644 index 000000000..81a073e12 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/cmd/test_daemon.py @@ -0,0 +1,27 @@ +# Copyright (c) 2017 NEC Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from kuryr_kubernetes.tests import base as test_base + + +class TestDaemonCmd(test_base.TestCase): + @mock.patch('kuryr_kubernetes.cni.daemon.service.start') + def test_start(self, m_start): + from kuryr_kubernetes.cmd import daemon # To make it import a mock. + daemon.start() + + m_start.assert_called() diff --git a/kuryr_kubernetes/tests/unit/cni/test_api.py b/kuryr_kubernetes/tests/unit/cni/test_api.py new file mode 100644 index 000000000..ecdf51641 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/cni/test_api.py @@ -0,0 +1,142 @@ +# Copyright (c) 2017 NEC Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from six import StringIO + +import requests + +from oslo_config import cfg +from oslo_serialization import jsonutils + +from kuryr_kubernetes.cni import api +from kuryr_kubernetes.cni import main +from kuryr_kubernetes.tests import base as test_base +from kuryr_kubernetes.tests import fake + +CONF = cfg.CONF + + +class TestCNIRunnerMixin(object): + def test_run_invalid(self, *args): + m_fin = StringIO() + m_fout = StringIO() + code = self.runner.run( + {'CNI_COMMAND': 'INVALID', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout) + + self.assertEqual(1, code) + + def test_run_write_version(self, *args): + m_fin = StringIO() + m_fout = StringIO() + code = self.runner.run( + {'CNI_COMMAND': 'VERSION', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout) + result = jsonutils.loads(m_fout.getvalue()) + + self.assertEqual(0, code) + self.assertEqual(api.CNIRunner.SUPPORTED_VERSIONS, + result['supportedVersions']) + self.assertEqual(api.CNIRunner.VERSION, result['cniVersion']) + + +class TestCNIStandaloneRunner(test_base.TestCase, TestCNIRunnerMixin): + def setUp(self): + super(TestCNIStandaloneRunner, self).setUp() + self.runner = api.CNIStandaloneRunner(main.K8sCNIPlugin()) + + @mock.patch('kuryr_kubernetes.cni.main.K8sCNIPlugin.add') + def test_run_add(self, m_k8s_add): + vif = fake._fake_vif() + m_k8s_add.return_value = vif + m_fin = StringIO() + m_fout = StringIO() + env = { + 'CNI_COMMAND': 'ADD', + 'CNI_ARGS': 'foo=bar', + } + self.runner.run(env, m_fin, m_fout) + self.assertTrue(m_k8s_add.called) + self.assertEqual('foo=bar', m_k8s_add.call_args[0][0].CNI_ARGS) + result = jsonutils.loads(m_fout.getvalue()) + self.assertDictEqual( + {"cniVersion": "0.3.0", + "dns": {"nameservers": ["192.168.0.1"]}, + "ip4": {"gateway": "192.168.0.1", "ip": "192.168.0.2/24"}}, + result) + + @mock.patch('kuryr_kubernetes.cni.main.K8sCNIPlugin.delete') + def test_run_del(self, m_k8s_delete): + vif = fake._fake_vif() + m_k8s_delete.return_value = vif + m_fin = StringIO() + m_fout = StringIO() + env = { + 'CNI_COMMAND': 'DEL', + 'CNI_ARGS': 'foo=bar', + } + self.runner.run(env, m_fin, m_fout) + self.assertTrue(m_k8s_delete.called) + self.assertEqual('foo=bar', m_k8s_delete.call_args[0][0].CNI_ARGS) + + +@mock.patch('requests.post') +class TestCNIDaemonizedRunner(test_base.TestCase, TestCNIRunnerMixin): + def setUp(self): + super(TestCNIDaemonizedRunner, self).setUp() + self.runner = api.CNIDaemonizedRunner() + self.port = int(CONF.cni_daemon.bind_address.split(':')[1]) + + def _test_run(self, cni_cmd, path, m_post): + m_fin = StringIO() + m_fout = StringIO() + env = { + 'CNI_COMMAND': cni_cmd, + 'CNI_ARGS': 'foo=bar', + } + result = self.runner.run(env, m_fin, m_fout) + m_post.assert_called_with( + 'http://127.0.0.1:%d/%s' % (self.port, path), + json=mock.ANY, headers={'Connection': 'close'}) + return result + + def test_run_add(self, m_post): + m_response = mock.Mock(status_code=202) + m_response.json = mock.Mock(return_value=fake._fake_vif_dict()) + m_post.return_value = m_response + result = self._test_run('ADD', 'addNetwork', m_post) + self.assertEqual(0, result) + + def test_run_add_invalid(self, m_post): + m_response = mock.Mock(status_code=400) + m_response.json = mock.Mock() + m_post.return_value = m_response + result = self._test_run('ADD', 'addNetwork', m_post) + self.assertEqual(1, result) + m_response.json.assert_not_called() + + def test_run_del(self, m_post): + m_post.return_value = mock.Mock(status_code=204) + result = self._test_run('DEL', 'delNetwork', m_post) + self.assertEqual(0, result) + + def test_run_del_invalid(self, m_post): + m_post.return_value = mock.Mock(status_code=400) + result = self._test_run('DEL', 'delNetwork', m_post) + self.assertEqual(1, result) + + def test_run_socket_error(self, m_post): + m_post.side_effect = requests.ConnectionError + result = self._test_run('DEL', 'delNetwork', m_post) + self.assertEqual(1, result) diff --git a/kuryr_kubernetes/tests/unit/cni/test_main.py b/kuryr_kubernetes/tests/unit/cni/test_main.py new file mode 100644 index 000000000..ffc5efada --- /dev/null +++ b/kuryr_kubernetes/tests/unit/cni/test_main.py @@ -0,0 +1,119 @@ +# Copyright (c) 2017 NEC Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from oslo_config import cfg + +from kuryr_kubernetes.cni import main +from kuryr_kubernetes import constants +from kuryr_kubernetes.tests import base as test_base + + +class TestCNIMain(test_base.TestCase): + @mock.patch('kuryr_kubernetes.cni.main.jsonutils.load') + @mock.patch('sys.exit') + @mock.patch('sys.stdin') + @mock.patch('kuryr_kubernetes.cni.utils.CNIConfig') + @mock.patch('kuryr_kubernetes.cni.api') + @mock.patch('kuryr_kubernetes.config.init') + @mock.patch('kuryr_kubernetes.config.setup_logging') + @mock.patch('kuryr_kubernetes.cni.api.CNIDaemonizedRunner') + def test_daemonized_run(self, m_cni_dr, m_setup_logging, m_config_init, + m_api, m_conf, m_sys, m_sysexit, m_json): + m_conf.debug = mock.Mock() + m_conf.debug.return_value = True + m_cni_dr.return_value = mock.MagicMock() + m_cni_daemon = m_cni_dr.return_value + + cfg.CONF.set_override('daemon_enabled', True, group='cni_daemon') + + main.run() + + m_config_init.assert_called() + m_setup_logging.assert_called() + m_cni_daemon.run.assert_called() + m_sysexit.assert_called() + + @mock.patch('kuryr_kubernetes.cni.main.jsonutils.load') + @mock.patch('sys.exit') + @mock.patch('sys.stdin') + @mock.patch('kuryr_kubernetes.cni.utils.CNIConfig') + @mock.patch('kuryr_kubernetes.cni.api') + @mock.patch('kuryr_kubernetes.config.init') + @mock.patch('kuryr_kubernetes.config.setup_logging') + @mock.patch('kuryr_kubernetes.cni.api.CNIStandaloneRunner') + def test_standalone_run(self, m_cni_sr, m_setup_logging, m_config_init, + m_api, m_conf, m_sys, m_sysexit, m_json): + m_conf.debug = mock.Mock() + m_conf.debug.return_value = True + m_cni_sr.return_value = mock.MagicMock() + m_cni_daemon = m_cni_sr.return_value + + cfg.CONF.set_override('daemon_enabled', False, group='cni_daemon') + + main.run() + + m_config_init.assert_called() + m_setup_logging.assert_called() + m_cni_daemon.run.assert_called() + m_sysexit.assert_called() + + +class TestK8sCNIPlugin(test_base.TestCase): + @mock.patch('kuryr_kubernetes.watcher.Watcher') + @mock.patch('kuryr_kubernetes.cni.handlers.CNIPipeline') + @mock.patch('kuryr_kubernetes.cni.handlers.DelHandler') + @mock.patch('kuryr_kubernetes.cni.handlers.AddHandler') + def _test_method(self, method, m_add_handler, m_del_handler, m_cni_pipe, + m_watcher_class): + self.passed_handler = None + + def _save_handler(params, handler): + self.passed_handler = handler + + def _call_handler(*args): + self.passed_handler(mock.sentinel.vif) + + m_add_handler.side_effect = _save_handler + m_del_handler.side_effect = _save_handler + + m_watcher = mock.MagicMock( + add=mock.MagicMock(), + start=mock.MagicMock(side_effect=_call_handler)) + m_watcher_class.return_value = m_watcher + + m_params = mock.MagicMock() + m_params.args.K8S_POD_NAMESPACE = 'k8s_pod_namespace' + m_params.args.K8S_POD_NAME = 'k8s_pod' + + cni_plugin = main.K8sCNIPlugin() + result = getattr(cni_plugin, method)(m_params) + self.assertEqual(mock.sentinel.vif, cni_plugin._vif) + m_watcher.add.assert_called_with( + "%(base)s/namespaces/%(namespace)s/pods" + "?fieldSelector=metadata.name=%(pod)s" % { + 'base': constants.K8S_API_BASE, + 'namespace': m_params.args.K8S_POD_NAMESPACE, + 'pod': m_params.args.K8S_POD_NAME}) + + return result + + def test_add(self): + result = self._test_method('add') + self.assertEqual(result, mock.sentinel.vif) + + def test_delete(self): + self._test_method('delete') diff --git a/kuryr_kubernetes/tests/unit/cni/test_service.py b/kuryr_kubernetes/tests/unit/cni/test_service.py new file mode 100644 index 000000000..286187adb --- /dev/null +++ b/kuryr_kubernetes/tests/unit/cni/test_service.py @@ -0,0 +1,161 @@ +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from oslo_config import cfg +from oslo_serialization import jsonutils + +from kuryr_kubernetes.cni.daemon import service +from kuryr_kubernetes import exceptions +from kuryr_kubernetes.tests import base +from kuryr_kubernetes.tests import fake + + +class TestK8sCNIRegistryPlugin(base.TestCase): + def setUp(self): + super(TestK8sCNIRegistryPlugin, self).setUp() + self.pod = {'metadata': {'name': 'foo', 'uid': 'bar'}} + self.vif = fake._fake_vif_dict() + registry = {'foo': {'pod': self.pod, 'vif': self.vif, + 'containerid': None}} + self.plugin = service.K8sCNIRegistryPlugin(registry) + self.params = mock.Mock(args=mock.Mock(K8S_POD_NAME='foo'), + CNI_IFNAME='baz', CNI_NETNS=123, + CNI_CONTAINERID='cont_id') + + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_present(self, m_connect): + self.plugin.add(self.params) + + m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123) + self.assertEqual('cont_id', self.plugin.registry['foo']['containerid']) + + @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') + def test_del_present(self, m_disconnect): + self.plugin.delete(self.params) + + m_disconnect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123) + + @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') + def test_del_wrong_container_id(self, m_disconnect): + registry = {'foo': {'pod': self.pod, 'vif': self.vif, + 'containerid': 'different'}} + self.plugin = service.K8sCNIRegistryPlugin(registry) + self.plugin.delete(self.params) + + m_disconnect.assert_not_called() + + @mock.patch('time.sleep', mock.Mock()) + @mock.patch('kuryr_kubernetes.cni.binding.base.connect') + def test_add_present_on_5_try(self, m_connect): + se = [KeyError] * 5 + se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None}) + se.append({'pod': self.pod, 'vif': self.vif, 'containerid': None}) + m_getitem = mock.Mock(side_effect=se) + m_setitem = mock.Mock() + m_registry = mock.Mock(__getitem__=m_getitem, __setitem__=m_setitem) + self.plugin.registry = m_registry + self.plugin.add(self.params) + + m_setitem.assert_called_once_with('foo', {'pod': self.pod, + 'vif': self.vif, + 'containerid': 'cont_id'}) + m_connect.assert_called_with(mock.ANY, mock.ANY, 'baz', 123) + + @mock.patch('time.sleep', mock.Mock()) + def test_add_not_present(self): + cfg.CONF.set_override('vif_annotation_timeout', 0, group='cni_daemon') + self.addCleanup(cfg.CONF.set_override, 'vif_annotation_timeout', 120, + group='cni_daemon') + + m_getitem = mock.Mock(side_effect=KeyError) + m_registry = mock.Mock(__getitem__=m_getitem) + self.plugin.registry = m_registry + self.assertRaises(exceptions.ResourceNotReady, self.plugin.add, + self.params) + + +class TestDaemonServer(base.TestCase): + def setUp(self): + super(TestDaemonServer, self).setUp() + self.plugin = service.K8sCNIRegistryPlugin({}) + self.srv = service.DaemonServer(self.plugin) + + self.srv.application.testing = True + self.test_client = self.srv.application.test_client() + params = {'config_kuryr': {}, 'CNI_ARGS': 'foo=bar'} + self.params_str = jsonutils.dumps(params) + + @mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add') + def test_add(self, m_add): + vif = fake._fake_vif() + m_add.return_value = vif + + resp = self.test_client.post('/addNetwork', data=self.params_str, + content_type='application/json') + + m_add.assert_called_once_with(mock.ANY) + self.assertEqual( + fake._fake_vif_string(vif.obj_to_primitive()).encode(), resp.data) + self.assertEqual(202, resp.status_code) + + @mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add') + def test_add_timeout(self, m_add): + m_add.side_effect = exceptions.ResourceNotReady(mock.Mock()) + + resp = self.test_client.post('/addNetwork', data=self.params_str, + content_type='application/json') + + m_add.assert_called_once_with(mock.ANY) + self.assertEqual(504, resp.status_code) + + @mock.patch('kuryr_kubernetes.cni.daemon.service.K8sCNIRegistryPlugin.add') + def test_add_error(self, m_add): + m_add.side_effect = Exception + + resp = self.test_client.post('/addNetwork', data=self.params_str, + content_type='application/json') + + m_add.assert_called_once_with(mock.ANY) + self.assertEqual(500, resp.status_code) + + @mock.patch('kuryr_kubernetes.cni.daemon.service.' + 'K8sCNIRegistryPlugin.delete') + def test_delete(self, m_delete): + resp = self.test_client.post('/delNetwork', data=self.params_str, + content_type='application/json') + + m_delete.assert_called_once_with(mock.ANY) + self.assertEqual(204, resp.status_code) + + @mock.patch('kuryr_kubernetes.cni.daemon.service.' + 'K8sCNIRegistryPlugin.delete') + def test_delete_timeout(self, m_delete): + m_delete.side_effect = exceptions.ResourceNotReady(mock.Mock()) + resp = self.test_client.post('/delNetwork', data=self.params_str, + content_type='application/json') + + m_delete.assert_called_once_with(mock.ANY) + self.assertEqual(204, resp.status_code) + + @mock.patch('kuryr_kubernetes.cni.daemon.service.' + 'K8sCNIRegistryPlugin.delete') + def test_delete_error(self, m_delete): + m_delete.side_effect = Exception + resp = self.test_client.post('/delNetwork', data=self.params_str, + content_type='application/json') + + m_delete.assert_called_once_with(mock.ANY) + self.assertEqual(500, resp.status_code) diff --git a/requirements.txt b/requirements.txt index dd73a11a9..81e3c43aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,8 @@ # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. +cotyledon>=1.3.0 # Apache-2.0 +Flask>=0.10,!=0.11,<1.0 # BSD kuryr-lib>=0.5.0 # Apache-2.0 pbr!=2.1.0,>=2.0.0 # Apache-2.0 requests>=2.14.2 # Apache-2.0 @@ -14,5 +16,6 @@ oslo.service>=1.24.0 # Apache-2.0 oslo.utils>=3.28.0 # Apache-2.0 os-vif>=1.7.0 # Apache-2.0 pyroute2>=0.4.21 # Apache-2.0 (+ dual licensed GPL2) +retrying>=1.2.3,!=1.3.0 # Apache-2.0 six>=1.9.0 # MIT stevedore>=1.20.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 059305d63..88ee49eb2 100755 --- a/setup.cfg +++ b/setup.cfg @@ -28,6 +28,7 @@ os_vif = console_scripts = kuryr-k8s-controller = kuryr_kubernetes.cmd.eventlet.controller:start + kuryr-daemon = kuryr_kubernetes.cmd.daemon:start kuryr-cni = kuryr_kubernetes.cmd.cni:run kuryr_kubernetes.vif_translators =