Merge "Pod annotations to KuryrPort CRD."

This commit is contained in:
Zuul 2020-07-30 08:17:28 +00:00 committed by Gerrit Code Review
commit cecd86b282
26 changed files with 1548 additions and 586 deletions

View File

@ -99,7 +99,7 @@
vars:
devstack_localrc:
DOCKER_CGROUP_DRIVER: "systemd"
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace
devstack_services:
@ -120,7 +120,7 @@
vars:
devstack_localrc:
KURYR_SUBNET_DRIVER: namespace
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_SG_DRIVER: policy
KURYR_USE_PORT_POOLS: true
KURYR_POD_VIF_DRIVER: neutron-vif
@ -134,7 +134,7 @@
parent: kuryr-kubernetes-tempest-containerized
vars:
devstack_localrc:
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace

View File

@ -98,7 +98,7 @@
KURYR_LB_ALGORITHM: SOURCE_IP_PORT
KURYR_SUBNET_DRIVER: namespace
KURYR_SG_DRIVER: policy
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
voting: false
- job:
@ -144,7 +144,7 @@
KURYR_ENFORCE_SG_RULES: false
KURYR_LB_ALGORITHM: SOURCE_IP_PORT
KURYR_HYPERKUBE_VERSION: v1.16.0
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork
KURYR_ENABLED_HANDLERS: vif,lb,lbaasspec,namespace,pod_label,policy,kuryrnetpolicy,kuryrnetwork,kuryrport
KURYR_SG_DRIVER: policy
KURYR_SUBNET_DRIVER: namespace
KURYR_K8S_CONTAINERIZED_DEPLOYMENT: true

View File

@ -453,6 +453,7 @@ rules:
- kuryrnetworks
- kuryrnetpolicies
- kuryrloadbalancers
- kuryrports
- apiGroups: ["networking.k8s.io"]
resources:
- networkpolicies

View File

@ -973,6 +973,7 @@ function update_tempest_conf_file {
fi
iniset $TEMPEST_CONFIG kuryr_kubernetes validate_crd True
iniset $TEMPEST_CONFIG kuryr_kubernetes kuryrnetworks True
iniset $TEMPEST_CONFIG kuryr_kubernetes kuryrports True
}
source $DEST/kuryr-kubernetes/devstack/lib/kuryr_kubernetes

View File

@ -43,7 +43,7 @@ KURYR_K8S_API_LB_PORT=${KURYR_K8S_API_LB_PORT:-443}
KURYR_PORT_DEBUG=${KURYR_PORT_DEBUG:-True}
KURYR_SUBNET_DRIVER=${KURYR_SUBNET_DRIVER:-default}
KURYR_SG_DRIVER=${KURYR_SG_DRIVER:-default}
KURYR_ENABLED_HANDLERS=${KURYR_ENABLED_HANDLERS:-vif,lb,lbaasspec}
KURYR_ENABLED_HANDLERS=${KURYR_ENABLED_HANDLERS:-vif,lb,lbaasspec,kuryrport}
# OpenShift
OPENSHIFT_BINARY_VERSION=${OPENSHIFT_BINARY_VERSION:-v3.11.0}

View File

@ -0,0 +1,48 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: kuryrports.openstack.org
spec:
group: openstack.org
scope: Namespaced
names:
plural: kuryrports
singular: kuryrport
kind: KuryrPort
shortNames:
- kp
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
required:
- podUid
- podNodeName
- vifs
properties:
podUid:
type: string
podNodeName:
type: string
vifs:
type: object
x-kubernetes-preserve-unknown-fields: true
additionalPrinterColumns:
- name: PodUID
type: string
description: Pod UID
jsonPath: .spec.podUid
- name: Nodename
type: string
description: Name of the node corresponding pod lives in
jsonPath: .spec.podNodeName
- name: labels
type: string
description: Labels for the CRD
jsonPath: .metadata.labels

View File

@ -15,6 +15,7 @@
import os
from os_vif import objects
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
@ -23,7 +24,6 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes import constants
from kuryr_kubernetes.handlers import health
from kuryr_kubernetes import utils
from kuryr.lib._i18n import _
@ -143,42 +143,46 @@ class DpdkDriver(health.HealthHandler, b_base.BaseBindingDriver):
def _set_vif(self, vif):
# TODO(ivc): extract annotation interactions
state, labels, resource_version = self._get_pod_details(
vifs, labels, resource_version, kp_link = self._get_pod_details(
vif.port_profile.selflink)
for ifname, vif_ex in state.vifs.items():
if vif.id == vif_ex.id:
state.vifs[ifname] = vif
for ifname, data in vifs.items():
if vif.id == data['vif'].id:
vifs[ifname] = data
break
self._set_pod_details(state, vif.port_profile.selflink, labels,
resource_version)
self._set_pod_details(vifs, vif.port_profile.selflink, labels,
resource_version, kp_link)
def _get_pod_details(self, selflink):
k8s = clients.get_kubernetes_client()
pod = k8s.get(selflink)
annotations = pod['metadata']['annotations']
kp = k8s.get(f'{constants.K8S_API_CRD_NAMESPACES}/'
f'{pod["metadata"]["namespace"]}/kuryrports/'
f'{pod["metadata"]["name"]}')
try:
vifs = {k: {'default': v['default'],
'vif': objects.base.VersionedObject
.obj_from_primitive(v['vif'])}
for k, v in kp['spec']['vifs'].items()}
except (KeyError, AttributeError):
LOG.exception(f"No vifs found on KuryrPort: {kp}")
raise
LOG.info(f"Got VIFs from Kuryrport: {vifs}")
resource_version = pod['metadata']['resourceVersion']
labels = pod['metadata'].get('labels')
try:
annotations = annotations[constants.K8S_ANNOTATION_VIF]
state_annotation = jsonutils.loads(annotations)
state = utils.extract_pod_annotation(state_annotation)
except KeyError:
LOG.exception("No annotations %s", constants.K8S_ANNOTATION_VIF)
raise
except ValueError:
LOG.exception("Unable encode annotations")
raise
LOG.info("Got VIFs from annotation: %s", state.vifs)
return state, labels, resource_version
return vifs, labels, resource_version, kp['metadata']['selflink']
def _set_pod_details(self, state, selflink, labels, resource_version):
if not state:
LOG.info("Removing VIFs annotation: %r", state)
annotation = None
else:
state_dict = state.obj_to_primitive()
annotation = jsonutils.dumps(state_dict, sort_keys=True)
LOG.info("Setting VIFs annotation: %r", annotation)
def _set_pod_details(self, vifs, selflink, labels, resource_version,
kp_link):
k8s = clients.get_kubernetes_client()
if vifs:
spec = {k: {'default': v['default'],
'vif': v['vif'].obj_to_primitive()}
for k, v in vifs.items()}
LOG.info("Setting VIFs in KuryrPort %r", spec)
k8s.patch_crd('spec', kp_link, {'vifs': spec})
if not labels:
LOG.info("Removing Label annotation: %r", labels)
@ -187,8 +191,6 @@ class DpdkDriver(health.HealthHandler, b_base.BaseBindingDriver):
labels_annotation = jsonutils.dumps(labels, sort_keys=True)
LOG.info("Setting Labels annotation: %r", labels_annotation)
k8s = clients.get_kubernetes_client()
k8s.annotate(selflink,
{constants.K8S_ANNOTATION_VIF: annotation,
constants.K8S_ANNOTATION_LABEL: labels_annotation},
{constants.K8S_ANNOTATION_LABEL: labels_annotation},
resource_version=resource_version)

View File

@ -20,14 +20,14 @@ import socket
import sys
import threading
import time
import urllib.parse
import urllib3
import cotyledon
import flask
from pyroute2.ipdb import transactional
import urllib3
import os_vif
from os_vif.objects import base
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@ -193,10 +193,12 @@ class CNIDaemonWatcherService(cotyledon.Service):
self.pipeline.register(h_cni.CallbackHandler(self.on_done,
self.on_deleted))
self.watcher = k_watcher.Watcher(self.pipeline)
self.watcher.add(
"%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
'base': k_const.K8S_API_BASE,
'node_name': self._get_nodename()})
query_label = urllib.parse.quote_plus(f'{k_const.KURYRPORT_LABEL}='
f'{self._get_nodename()}')
self.watcher.add(f'{k_const.K8S_API_CRD_KURYRPORTS}'
f'?labelSelector={query_label}')
self.is_running = True
self.health_thread = threading.Thread(
target=self._start_watcher_health_checker)
@ -211,55 +213,43 @@ class CNIDaemonWatcherService(cotyledon.Service):
self.healthy.value = False
time.sleep(HEALTH_CHECKER_DELAY)
def on_done(self, pod, vifs):
pod_name = utils.get_pod_unique_name(pod)
vif_dict = {
ifname: vif.obj_to_primitive() for
ifname, vif in vifs.items()
}
# NOTE(dulek): We need a lock when modifying shared self.registry dict
# to prevent race conditions with other processes/threads.
with lockutils.lock(pod_name, external=True):
if (pod_name not in self.registry or
self.registry[pod_name]['pod']['metadata']['uid']
!= pod['metadata']['uid']):
self.registry[pod_name] = {'pod': pod, 'vifs': vif_dict,
'containerid': None,
'vif_unplugged': False,
'del_received': False}
def on_done(self, kuryrport, vifs):
kp_name = utils.get_res_unique_name(kuryrport)
with lockutils.lock(kp_name, external=True):
if (kp_name not in self.registry or
self.registry[kp_name]['kp']['metadata']['uid']
!= kuryrport['metadata']['uid']):
self.registry[kp_name] = {'kp': kuryrport,
'vifs': vifs,
'containerid': None,
'vif_unplugged': False,
'del_received': False}
else:
# NOTE(dulek): Only update vif if its status changed, we don't
# need to care about other changes now.
old_vifs = {
ifname:
base.VersionedObject.obj_from_primitive(vif_obj) for
ifname, vif_obj in (
self.registry[pod_name]['vifs'].items())
}
old_vifs = self.registry[kp_name]['vifs']
for iface in vifs:
if old_vifs[iface].active != vifs[iface].active:
pod_dict = self.registry[pod_name]
pod_dict['vifs'] = vif_dict
self.registry[pod_name] = pod_dict
kp_dict = self.registry[kp_name]
kp_dict['vifs'] = vifs
self.registry[kp_name] = kp_dict
def on_deleted(self, pod):
pod_name = utils.get_pod_unique_name(pod)
def on_deleted(self, kp):
kp_name = utils.get_res_unique_name(kp)
try:
if pod_name in self.registry:
if kp_name in self.registry:
# NOTE(ndesh): We need to lock here to avoid race condition
# with the deletion code for CNI DEL so that
# we delete the registry entry exactly once
with lockutils.lock(pod_name, external=True):
if self.registry[pod_name]['vif_unplugged']:
del self.registry[pod_name]
with lockutils.lock(kp_name, external=True):
if self.registry[kp_name]['vif_unplugged']:
del self.registry[kp_name]
else:
pod_dict = self.registry[pod_name]
pod_dict['del_received'] = True
self.registry[pod_name] = pod_dict
kp_dict = self.registry[kp_name]
kp_dict['del_received'] = True
self.registry[kp_name] = kp_dict
except KeyError:
# This means someone else removed it. It's odd but safe to ignore.
LOG.debug('Pod %s entry already removed from registry while '
'handling DELETED event. Ignoring.', pod_name)
LOG.debug('KuryrPort %s entry already removed from registry while '
'handling DELETED event. Ignoring.', kp_name)
pass
def terminate(self):

View File

@ -17,18 +17,20 @@ import abc
from os_vif import objects as obj_vif
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import dispatch as k_dis
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
class CNIHandlerBase(k8s_base.ResourceEventHandler, metaclass=abc.ABCMeta):
OBJECT_KIND = k_const.K8S_OBJ_POD
OBJECT_KIND = k_const.K8S_OBJ_KURYRPORT
def __init__(self, cni, on_done):
self._cni = cni
@ -59,16 +61,18 @@ class CNIHandlerBase(k8s_base.ResourceEventHandler, metaclass=abc.ABCMeta):
raise NotImplementedError()
def _get_vifs(self, pod):
# TODO(ivc): same as VIFHandler._get_vif
k8s = clients.get_kubernetes_client()
try:
annotations = pod['metadata']['annotations']
state_annotation = annotations[k_const.K8S_ANNOTATION_VIF]
except KeyError:
kuryrport_crd = k8s.get(f'{k_const.K8S_API_CRD_NAMESPACES}/'
f'{pod["metadata"]["namespace"]}/'
f'kuryrports/{pod["metadata"]["name"]}')
LOG.debug("Got CRD: %r", kuryrport_crd)
except k_exc.K8sClientException:
return {}
state_annotation = jsonutils.loads(state_annotation)
state = utils.extract_pod_annotation(state_annotation)
vifs_dict = state.vifs
LOG.debug("Got VIFs from annotation: %r", vifs_dict)
vifs_dict = utils.get_vifs_from_crd(kuryrport_crd)
LOG.debug("Got vifs: %r", vifs_dict)
return vifs_dict
def _get_inst(self, pod):
@ -81,31 +85,32 @@ class CallbackHandler(CNIHandlerBase):
def __init__(self, on_vif, on_del=None):
super(CallbackHandler, self).__init__(None, on_vif)
self._del_callback = on_del
self._pod = None
self._kuryrport = None
self._callback_vifs = None
def should_callback(self, pod, vifs):
def should_callback(self, kuryrport, vifs):
"""Called after all vifs have been processed
Calls callback if there was at least one vif in the Pod
Calls callback if there was at least one vif in the CRD
:param pod: dict containing Kubernetes Pod object
:param kuryrport: dict containing Kubernetes KuryrPort CRD object
:param vifs: dict containing os_vif VIF objects and ifnames
:returns True/False
"""
self._pod = pod
self._kuryrport = kuryrport
self._callback_vifs = vifs
if vifs:
return True
return False
def callback(self):
self._callback(self._pod, self._callback_vifs)
self._callback(self._kuryrport, self._callback_vifs)
def on_deleted(self, pod):
LOG.debug("Got pod %s deletion event.", pod['metadata']['name'])
def on_deleted(self, kuryrport):
LOG.debug("Got kuryrport %s deletion event.",
kuryrport['metadata']['name'])
if self._del_callback:
self._del_callback(pod)
self._del_callback(kuryrport)
class CNIPipeline(k_dis.EventPipeline):

View File

@ -15,7 +15,6 @@
import retrying
from os_vif import objects as obj_vif
from os_vif.objects import base
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@ -31,12 +30,14 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RETRY_DELAY = 1000 # 1 second in milliseconds
# TODO(dulek): Another corner case is (and was) when pod is deleted before it's
# annotated by controller or even noticed by any watcher. Kubelet
# will try to delete such vif, but we will have no data about it.
# This is currently worked around by returning successfully in
# case of timing out in delete. To solve this properly we need
# to watch for pod deletes as well.
# TODO(dulek, gryf): Another corner case is (and was) when pod is deleted
# before it's corresponding CRD was created and populated by vifs by
# controller or even noticed by any watcher. Kubelet will try to delete such
# vif, but we will have no data about it. This is currently worked around by
# returning successfully in case of timing out in delete. To solve this
# properly we need to watch for pod deletes as well, or perhaps create
# finalizer for the pod as soon, as we know, that kuryrport CRD will be
# created.
class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
@ -45,32 +46,32 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
self.registry = registry
self.k8s = clients.get_kubernetes_client()
def _get_pod_name(self, params):
def _get_obj_name(self, params):
return "%(namespace)s/%(name)s" % {
'namespace': params.args.K8S_POD_NAMESPACE,
'name': params.args.K8S_POD_NAME}
def add(self, params):
pod_name = self._get_pod_name(params)
kp_name = self._get_obj_name(params)
timeout = CONF.cni_daemon.vif_annotation_timeout
# Try to confirm if pod in the registry is not stale cache. If it is,
# Try to confirm if CRD in the registry is not stale cache. If it is,
# remove it.
with lockutils.lock(pod_name, external=True):
if pod_name in self.registry:
cached_pod = self.registry[pod_name]['pod']
with lockutils.lock(kp_name, external=True):
if kp_name in self.registry:
cached_kp = self.registry[kp_name]['kp']
try:
pod = self.k8s.get(cached_pod['metadata']['selfLink'])
kp = self.k8s.get(cached_kp['metadata']['selfLink'])
except Exception:
LOG.exception('Error when getting pod %s', pod_name)
raise exceptions.ResourceNotReady(pod_name)
LOG.exception('Error when getting KuryrPort %s', kp_name)
raise exceptions.ResourceNotReady(kp_name)
if pod['metadata']['uid'] != cached_pod['metadata']['uid']:
LOG.warning('Stale pod %s detected in cache. (API '
if kp['metadata']['uid'] != cached_kp['metadata']['uid']:
LOG.warning('Stale KuryrPort %s detected in cache. (API '
'uid=%s, cached uid=%s). Removing it from '
'cache.', pod_name, pod['metadata']['uid'],
cached_pod['metadata']['uid'])
del self.registry[pod_name]
'cache.', kp_name, kp['metadata']['uid'],
cached_kp['metadata']['uid'])
del self.registry[kp_name]
vifs = self._do_work(params, b_base.connect, timeout)
@ -78,70 +79,68 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
# requests that we should ignore. We need a lock to
# prevent race conditions and replace whole object in the
# dict for multiprocessing.Manager to notice that.
with lockutils.lock(pod_name, external=True):
d = self.registry[pod_name]
with lockutils.lock(kp_name, external=True):
d = self.registry[kp_name]
d['containerid'] = params.CNI_CONTAINERID
self.registry[pod_name] = d
LOG.debug('Saved containerid = %s for pod %s',
params.CNI_CONTAINERID, pod_name)
self.registry[kp_name] = d
LOG.debug('Saved containerid = %s for CRD %s',
params.CNI_CONTAINERID, kp_name)
# Wait for timeout sec, 1 sec between tries, retry when even one
# vif is not active.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_result=utils.any_vif_inactive)
def wait_for_active(pod_name):
return {
ifname: base.VersionedObject.obj_from_primitive(vif_obj) for
ifname, vif_obj in self.registry[pod_name]['vifs'].items()
}
def wait_for_active(kp_name):
return self.registry[kp_name]['vifs']
vifs = wait_for_active(pod_name)
vifs = wait_for_active(kp_name)
for vif in vifs.values():
if not vif.active:
LOG.error("Timed out waiting for vifs to become active")
raise exceptions.ResourceNotReady(pod_name)
raise exceptions.ResourceNotReady(kp_name)
return vifs[k_const.DEFAULT_IFNAME]
def delete(self, params):
pod_name = self._get_pod_name(params)
kp_name = self._get_obj_name(params)
try:
reg_ci = self.registry[pod_name]['containerid']
LOG.debug('Read containerid = %s for pod %s', reg_ci, pod_name)
reg_ci = self.registry[kp_name]['containerid']
LOG.debug('Read containerid = %s for KuryrPort %s', reg_ci,
kp_name)
if reg_ci and reg_ci != params.CNI_CONTAINERID:
# NOTE(dulek): This is a DEL request for some older (probably
# failed) ADD call. We should ignore it or we'll
# unplug a running pod.
LOG.warning('Received DEL request for unknown ADD call for '
'pod %s (CNI_CONTAINERID=%s). Ignoring.', pod_name,
params.CNI_CONTAINERID)
'Kuryrport %s (CNI_CONTAINERID=%s). Ignoring.',
kp_name, params.CNI_CONTAINERID)
return
except KeyError:
pass
# Passing arbitrary 5 seconds as timeout, as it does not make any sense
# to wait on CNI DEL. If pod got deleted from API - VIF info is gone.
# If pod got the annotation removed - it is now gone too. The number's
# not 0, because we need to anticipate for restarts and delay before
# registry is populated by watcher.
# to wait on CNI DEL. If kuryrport got deleted from API - VIF info is
# gone. If kuryrport got the vif info removed - it is now gone too.
# The number's not 0, because we need to anticipate for restarts and
# delay before registry is populated by watcher.
self._do_work(params, b_base.disconnect, 5)
# NOTE(ndesh): We need to lock here to avoid race condition
# with the deletion code in the watcher to ensure that
# we delete the registry entry exactly once
try:
with lockutils.lock(pod_name, external=True):
if self.registry[pod_name]['del_received']:
del self.registry[pod_name]
with lockutils.lock(kp_name, external=True):
if self.registry[kp_name]['del_received']:
del self.registry[kp_name]
else:
pod_dict = self.registry[pod_name]
pod_dict['vif_unplugged'] = True
self.registry[pod_name] = pod_dict
kp_dict = self.registry[kp_name]
kp_dict['vif_unplugged'] = True
self.registry[kp_name] = kp_dict
except KeyError:
# This means the pod was removed before vif was unplugged. This
# shouldn't happen, but we can't do anything about it now
LOG.debug('Pod %s not found registry while handling DEL request. '
'Ignoring.', pod_name)
# This means the kuryrport was removed before vif was unplugged.
# This shouldn't happen, but we can't do anything about it now
LOG.debug('KuryrPort %s not found registry while handling DEL '
'request. Ignoring.', kp_name)
pass
def report_drivers_health(self, driver_healthy):
@ -151,25 +150,22 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
self.healthy.value = driver_healthy
def _do_work(self, params, fn, timeout):
pod_name = self._get_pod_name(params)
kp_name = self._get_obj_name(params)
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_exception=lambda e: isinstance(e, KeyError))
def find():
return self.registry[pod_name]
return self.registry[kp_name]
try:
d = find()
pod = d['pod']
vifs = {
ifname: base.VersionedObject.obj_from_primitive(vif_obj) for
ifname, vif_obj in d['vifs'].items()
}
kp = d['kp']
vifs = d['vifs']
except KeyError:
LOG.error("Timed out waiting for requested pod to appear in "
LOG.error("Timed out waiting for requested KuryrPort to appear in "
"registry")
raise exceptions.ResourceNotReady(pod_name)
raise exceptions.ResourceNotReady(kp_name)
for ifname, vif in vifs.items():
is_default_gateway = (ifname == k_const.DEFAULT_IFNAME)
@ -178,12 +174,13 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin):
# use the ifname supplied in the CNI ADD request
ifname = params.CNI_IFNAME
fn(vif, self._get_inst(pod), ifname, params.CNI_NETNS,
fn(vif, self._get_inst(kp), ifname, params.CNI_NETNS,
report_health=self.report_drivers_health,
is_default_gateway=is_default_gateway,
container_id=params.CNI_CONTAINERID)
return vifs
def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
def _get_inst(self, kp):
return (obj_vif.instance_info
.InstanceInfo(uuid=kp['spec']['podUid'],
name=kp['metadata']['name']))

View File

@ -13,14 +13,18 @@
# License for the specific language governing permissions and limitations
# under the License.
KURYR_FQDN = 'kuryr.openstack.org'
K8S_API_BASE = '/api/v1'
K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces'
K8S_API_CRD = '/apis/openstack.org/v1'
K8S_API_CRD_VERSION = 'openstack.org/v1'
K8S_API_CRD = '/apis/' + K8S_API_CRD_VERSION
K8S_API_CRD_NAMESPACES = K8S_API_CRD + '/namespaces'
K8S_API_CRD_KURYRNETS = K8S_API_CRD + '/kuryrnets'
K8S_API_CRD_KURYRNETWORKS = K8S_API_CRD + '/kuryrnetworks'
K8S_API_CRD_KURYRNETPOLICIES = K8S_API_CRD + '/kuryrnetpolicies'
K8S_API_CRD_KURYRLOADBALANCERS = K8S_API_CRD + '/kuryrloadbalancers'
K8S_API_CRD_KURYRPORTS = K8S_API_CRD + '/kuryrports'
K8S_API_POLICIES = '/apis/networking.k8s.io/v1/networkpolicies'
K8S_API_NPWG_CRD = '/apis/k8s.cni.cncf.io/v1'
@ -34,6 +38,7 @@ K8S_OBJ_KURYRNET = 'KuryrNet'
K8S_OBJ_KURYRNETWORK = 'KuryrNetwork'
K8S_OBJ_KURYRNETPOLICY = 'KuryrNetPolicy'
K8S_OBJ_KURYRLOADBALANCER = 'KuryrLoadBalancer'
K8S_OBJ_KURYRPORT = 'KuryrPort'
K8S_POD_STATUS_PENDING = 'Pending'
K8S_POD_STATUS_SUCCEEDED = 'Succeeded'
@ -59,8 +64,12 @@ K8S_ANNOTATION_OLD_DRIVER = 'old_driver'
K8S_ANNOTATION_CURRENT_DRIVER = 'current_driver'
K8S_ANNOTATION_NEUTRON_PORT = 'neutron_id'
POD_FINALIZER = KURYR_FQDN + '/pod-finalizer'
KURYRNETWORK_FINALIZER = 'kuryrnetwork.finalizers.kuryr.openstack.org'
KURYRPORT_FINALIZER = KURYR_FQDN + '/kuryrport-finalizer'
KURYRPORT_LABEL = KURYR_FQDN + '/nodeName'
K8S_OS_VIF_NOOP_PLUGIN = "noop"
CNI_EXCEPTION_CODE = 100

View File

@ -95,10 +95,10 @@ class NeutronPodVIFDriver(base.PodVIFDriver):
def update_vif_sgs(self, pod, security_groups):
os_net = clients.get_network_client()
pod_state = utils.get_pod_state(pod)
if pod_state:
vifs = utils.get_vifs(pod)
if vifs:
# NOTE(ltomasbo): It just updates the default_vif security group
port_id = pod_state.vifs[constants.DEFAULT_IFNAME].id
port_id = vifs[constants.DEFAULT_IFNAME].id
os_net.update_port(port_id, security_groups=list(security_groups))
def _get_port_request(self, pod, project_id, subnets, security_groups,

View File

@ -17,6 +17,7 @@ import urllib
import netaddr
from openstack import exceptions as os_exc
from os_vif import objects
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
@ -24,7 +25,6 @@ from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes import utils
OPERATORS_WITH_VALUES = [constants.K8S_OPERATOR_IN,
@ -59,15 +59,23 @@ def get_host_id(pod):
return pod['spec']['nodeName']
def get_pod_state(pod):
def get_kuryrport(pod):
k8s = clients.get_kubernetes_client()
try:
annotations = pod['metadata']['annotations']
state_annotation = annotations[constants.K8S_ANNOTATION_VIF]
except KeyError:
return k8s.get(f'{constants.K8S_API_CRD_NAMESPACES}/'
f'{pod["metadata"]["namespace"]}/kuryrports/'
f'{pod["metadata"]["name"]}')
except k_exc.K8sResourceNotFound:
return None
state_annotation = jsonutils.loads(state_annotation)
state = utils.extract_pod_annotation(state_annotation)
return state
def get_vifs(pod):
kp = get_kuryrport(pod)
try:
return {k: objects.base.VersionedObject.obj_from_primitive(v['vif'])
for k, v in kp['spec']['vifs'].items()}
except (KeyError, AttributeError, TypeError):
return {}
def is_host_network(pod):
@ -274,19 +282,17 @@ def create_security_group_rule_body(
def get_pod_ip(pod):
try:
pod_metadata = pod['metadata']['annotations']
vif = pod_metadata[constants.K8S_ANNOTATION_VIF]
except KeyError:
kp = get_kuryrport(pod)
vif = [x['vif'] for x in kp['spec']['vifs'].values()
if x['default']][0]
except (KeyError, TypeError, IndexError):
return None
vif = jsonutils.loads(vif)
vif = vif['versioned_object.data']['default_vif']
network = (vif['versioned_object.data']['network']
['versioned_object.data'])
first_subnet = (network['subnets']['versioned_object.data']
['objects'][0]['versioned_object.data'])
first_subnet_ip = (first_subnet['ips']['versioned_object.data']
['objects'][0]['versioned_object.data']['address'])
return first_subnet_ip
return (vif['versioned_object.data']['network']
['versioned_object.data']['subnets']
['versioned_object.data']['objects'][0]
['versioned_object.data']['ips']
['versioned_object.data']['objects'][0]
['versioned_object.data']['address'])
def get_annotations(resource, annotation):

View File

@ -27,7 +27,6 @@ from oslo_concurrency import lockutils
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging
from oslo_log import versionutils
from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes import config
@ -280,16 +279,9 @@ class BaseVIFPool(base.VIFPoolDriver, metaclass=abc.ABCMeta):
in_use_ports = []
running_pods = kubernetes.get(constants.K8S_API_BASE + '/pods')
for pod in running_pods['items']:
try:
annotations = jsonutils.loads(pod['metadata']['annotations'][
constants.K8S_ANNOTATION_VIF])
pod_state = utils.extract_pod_annotation(annotations)
except KeyError:
LOG.debug("Skipping pod without kuryr VIF annotation: %s",
pod)
else:
for vif in pod_state.vifs.values():
in_use_ports.append(vif.id)
vifs = c_utils.get_vifs(pod)
for data in vifs.values():
in_use_ports.append(data.id)
return in_use_ports
def list_pools(self):

View File

@ -0,0 +1,267 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from openstack import exceptions as os_exc
from os_vif import objects
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
LOG = logging.getLogger(__name__)
KURYRPORT_URI = constants.K8S_API_CRD_NAMESPACES + '/{ns}/kuryrports/{crd}'
class KuryrPortHandler(k8s_base.ResourceEventHandler):
"""Controller side of KuryrPort process for Kubernetes pods.
`KuryrPortHandler` runs on the Kuryr-Kubernetes controller and is
responsible for creating/removing the OpenStack resources associated to
the newly created pods, namely ports and update the KuryrPort CRD data.
"""
OBJECT_KIND = constants.K8S_OBJ_KURYRPORT
OBJECT_WATCH_PATH = constants.K8S_API_CRD_KURYRPORTS
def __init__(self):
super(KuryrPortHandler, self).__init__()
self._drv_project = drivers.PodProjectDriver.get_instance()
self._drv_subnets = drivers.PodSubnetsDriver.get_instance()
self._drv_sg = drivers.PodSecurityGroupsDriver.get_instance()
# REVISIT(ltomasbo): The VIF Handler should not be aware of the pool
# directly. Due to the lack of a mechanism to load and set the
# VIFHandler driver, for now it is aware of the pool driver, but this
# will be reverted as soon as a mechanism is in place.
self._drv_vif_pool = drivers.VIFPoolDriver.get_instance(
specific_driver='multi_pool')
self._drv_vif_pool.set_vif_driver()
self._drv_multi_vif = drivers.MultiVIFDriver.get_enabled_drivers()
if self._is_network_policy_enabled():
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
self._drv_svc_sg = (drivers.ServiceSecurityGroupsDriver
.get_instance())
self.k8s = clients.get_kubernetes_client()
def on_present(self, kuryrport_crd):
if not kuryrport_crd['spec']['vifs']:
# Get vifs
if not self.get_vifs(kuryrport_crd):
# Ignore this event, according to one of the cases logged in
# get_vifs method.
return
vifs = {ifname: {'default': data['default'],
'vif': objects.base.VersionedObject
.obj_from_primitive(data['vif'])}
for ifname, data in kuryrport_crd['spec']['vifs'].items()}
if all([v['vif'].active for v in vifs.values()]):
return
changed = False
try:
for ifname, data in vifs.items():
if (data['vif'].plugin == constants.KURYR_VIF_TYPE_SRIOV and
oslo_cfg.CONF.sriov.enable_node_annotations):
pod_node = kuryrport_crd['spec']['podNodeName']
# TODO(gryf): This probably will need adoption, so it will
# add information to CRD instead of the pod.
driver_utils.update_port_pci_info(pod_node, data['vif'])
if not data['vif'].active:
try:
self._drv_vif_pool.activate_vif(data['vif'])
changed = True
except os_exc.ResourceNotFound:
LOG.debug("Port not found, possibly already deleted. "
"No need to activate it")
finally:
if changed:
try:
name = kuryrport_crd['metadata']['name']
namespace = kuryrport_crd['metadata']['namespace']
pod = self.k8s.get(f"{constants.K8S_API_NAMESPACES}"
f"/{namespace}/pods/{name}")
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to get pod: %s", ex)
raise
project_id = self._drv_project.get_project(pod)
try:
self._update_kuryrport_crd(kuryrport_crd, vifs)
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to update KuryrPort CRD: %s", ex)
security_groups = self._drv_sg.get_security_groups(
pod, project_id)
for ifname, data in vifs.items():
self._drv_vif_pool.release_vif(pod, data['vif'],
project_id,
security_groups)
except k_exc.K8sClientException:
raise k_exc.ResourceNotReady(pod['metadata']['name'])
if self._is_network_policy_enabled():
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
if oslo_cfg.CONF.octavia_defaults.enforce_sg_rules:
services = driver_utils.get_services()
self._update_services(services, crd_pod_selectors,
project_id)
def on_finalize(self, kuryrport_crd):
name = kuryrport_crd['metadata']['name']
namespace = kuryrport_crd['metadata']['namespace']
try:
pod = self.k8s.get(f"{constants.K8S_API_NAMESPACES}"
f"/{namespace}/pods/{name}")
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to get pod: %s", ex)
# TODO(gryf): Free resources
self.k8s.remove_finalizer(kuryrport_crd, constants.POD_FINALIZER)
raise
if (driver_utils.is_host_network(pod) or
not pod['spec'].get('nodeName')):
return
project_id = self._drv_project.get_project(pod)
try:
crd_pod_selectors = self._drv_sg.delete_sg_rules(pod)
except k_exc.ResourceNotReady:
# NOTE(ltomasbo): If the pod is being deleted before
# kuryr-controller annotated any information about the port
# associated, there is no need for deleting sg rules associated to
# it. So this exception could be safetly ignored for the current
# sg drivers. Only the NP driver associates rules to the pods ips,
# and that waits for annotations to start.
#
# NOTE(gryf): perhaps we don't need to handle this case, since
# during CRD creation all the things, including security groups
# rules would be created too.
LOG.debug("Skipping SG rules deletion associated to the pod %s",
pod)
crd_pod_selectors = []
try:
security_groups = self._drv_sg.get_security_groups(pod, project_id)
except k_exc.ResourceNotReady:
# NOTE(ltomasbo): If the namespace object gets deleted first the
# namespace security group driver will raise a ResourceNotReady
# exception as it cannot access anymore the kuryrnetwork CRD
# annotated on the namespace object. In such case we set security
# groups to empty list so that if pools are enabled they will be
# properly released.
security_groups = []
for data in kuryrport_crd['spec']['vifs'].values():
vif = objects.base.VersionedObject.obj_from_primitive(data['vif'])
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
if (self._is_network_policy_enabled() and crd_pod_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
services = driver_utils.get_services()
self._update_services(services, crd_pod_selectors, project_id)
# Remove finalizer out of pod.
self.k8s.remove_finalizer(pod, constants.POD_FINALIZER)
# Finally, remove finalizer from KuryrPort CRD
self.k8s.remove_finalizer(kuryrport_crd, constants.KURYRPORT_FINALIZER)
def get_vifs(self, kuryrport_crd):
try:
pod = self.k8s.get(f"{constants.K8S_API_NAMESPACES}"
f"/{kuryrport_crd['metadata']['namespace']}"
f"/pods"
f"/{kuryrport_crd['metadata']['name']}")
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to get pod: %s", ex)
# TODO(gryf): Release resources
self.k8s.remove_finalizer(kuryrport_crd,
constants.KURYRPORT_FINALIZER)
raise
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
try:
subnets = self._drv_subnets.get_subnets(pod, project_id)
except (os_exc.ResourceNotFound, k_exc.K8sResourceNotFound):
LOG.warning("Subnet does not exists. If namespace driver is "
"used, probably the namespace for the pod is "
"already deleted. So this pod does not need to "
"get a port as it will be deleted too. If the "
"default subnet driver is used, then you must "
"select an existing subnet to be used by Kuryr.")
return False
# Request the default interface of pod
main_vif = self._drv_vif_pool.request_vif(
pod, project_id, subnets, security_groups)
if not main_vif:
pod_name = pod['metadata']['name']
LOG.warning("Ignoring event due to pod %s not being "
"scheduled yet.", pod_name)
return False
vifs = {constants.DEFAULT_IFNAME: {'default': True, 'vif': main_vif}}
# Request the additional interfaces from multiple drivers
index = 0
for driver in self._drv_multi_vif:
additional_vifs = driver.request_additional_vifs(pod, project_id,
security_groups)
for index, vif in enumerate(additional_vifs, start=index+1):
ifname = (oslo_cfg.CONF.kubernetes.additional_ifname_prefix +
str(index))
vifs[ifname] = {'default': False, 'vif': vif}
try:
self._update_kuryrport_crd(kuryrport_crd, vifs)
except k_exc.K8sClientException as ex:
LOG.exception("Kubernetes Client Exception creating "
"KuryrPort CRD: %s", ex)
for ifname, data in vifs.items():
self._drv_vif_pool.release_vif(pod, data['vif'],
project_id,
security_groups)
return True
def _update_kuryrport_crd(self, kuryrport_crd, vifs):
LOG.debug('Updatting CRD %s', kuryrport_crd["metadata"]["name"])
spec = {}
for ifname, data in vifs.items():
data['vif'].obj_reset_changes(recursive=True)
spec[ifname] = {'default': data['default'],
'vif': data['vif'].obj_to_primitive()}
self.k8s.patch_crd('spec', kuryrport_crd['metadata']['selfLink'],
{'vifs': spec})
def _is_network_policy_enabled(self):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')
def _update_services(self, services, crd_pod_selectors, project_id):
for service in services.get('items'):
if not driver_utils.service_matches_affected_pods(
service, crd_pod_selectors):
continue
sgs = self._drv_svc_sg.get_security_groups(service,
project_id)
self._drv_lbaas.update_lbaas_sg(service, sgs)

View File

@ -47,11 +47,17 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
def on_present(self, pod):
if driver_utils.is_host_network(pod) or not self._has_pod_state(pod):
if driver_utils.is_host_network(pod) or not self._has_vifs(pod):
# NOTE(ltomasbo): The event will be retried once the vif handler
# annotates the pod with the pod state.
return
if (constants.K8S_ANNOTATION_VIF in
pod['metadata'].get('annotations', {})):
# NOTE(dulek): This might happen on upgrade, we need to wait for
# annotation to be moved to KuryrPort CRD.
return
current_pod_labels = pod['metadata'].get('labels')
previous_pod_labels = self._get_pod_labels(pod)
LOG.debug("Got previous pod labels from annotation: %r",
@ -97,11 +103,11 @@ class PodLabelHandler(k8s_base.ResourceEventHandler):
{constants.K8S_ANNOTATION_LABEL: annotation},
resource_version=pod['metadata']['resourceVersion'])
def _has_pod_state(self, pod):
def _has_vifs(self, pod):
try:
pod_state = pod['metadata']['annotations'][
constants.K8S_ANNOTATION_VIF]
LOG.debug("Pod state is: %s", pod_state)
kp = driver_utils.get_vifs(pod)
vifs = kp['spec']['vifs']
LOG.debug("Pod have associated KuryrPort with vifs: %s", vifs)
except KeyError:
return False
return True

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack import exceptions as os_exc
from os_vif import objects
from oslo_config import cfg as oslo_cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
@ -24,10 +24,10 @@ from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import objects
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
KURYRPORT_URI = constants.K8S_API_CRD_NAMESPACES + '/{ns}/kuryrports/{crd}'
class VIFHandler(k8s_base.ResourceEventHandler):
@ -63,20 +63,16 @@ class VIFHandler(k8s_base.ResourceEventHandler):
drivers.ServiceSecurityGroupsDriver.get_instance())
def on_present(self, pod):
state = driver_utils.get_pod_state(pod)
if (self._is_pod_completed(pod)):
if state:
if self._move_annotations_to_crd(pod):
return
kp = driver_utils.get_kuryrport(pod)
if self._is_pod_completed(pod):
if kp:
LOG.debug("Pod has completed execution, removing the vifs")
self.on_deleted(pod)
try:
self._set_pod_state(pod, None)
except k_exc.K8sClientException:
LOG.exception("Could not clear pod annotation")
raise k_exc.ResourceNotReady(pod['metadata']['name'])
except k_exc.K8sResourceNotFound:
pass
self.on_finalize(pod)
else:
LOG.debug("Pod has completed execution, no annotation found."
LOG.debug("Pod has completed execution, no KuryrPort found."
" Skipping")
return
@ -87,129 +83,31 @@ class VIFHandler(k8s_base.ResourceEventHandler):
# where certain pods/namespaces/nodes can be managed by other
# networking solutions/CNI drivers.
return
LOG.debug("Got VIFs from annotation: %r", state)
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
if not state:
LOG.debug("Got KuryrPort: %r", kp)
if not kp:
try:
subnets = self._drv_subnets.get_subnets(pod, project_id)
except (os_exc.ResourceNotFound, k_exc.K8sResourceNotFound):
LOG.warning("Subnet does not exists. If namespace driver is "
"used, probably the namespace for the pod is "
"already deleted. So this pod does not need to "
"get a port as it will be deleted too. If the "
"default subnet driver is used, then you must "
"select an existing subnet to be used by Kuryr.")
return
# Request the default interface of pod
main_vif = self._drv_vif_pool.request_vif(
pod, project_id, subnets, security_groups)
if not main_vif:
pod_name = pod['metadata']['name']
LOG.warning("Ignoring event due to pod %s not being "
"scheduled yet.", pod_name)
return
state = objects.vif.PodState(default_vif=main_vif)
# Request the additional interfaces from multiple dirvers
additional_vifs = []
for driver in self._drv_multi_vif:
additional_vifs.extend(
driver.request_additional_vifs(
pod, project_id, security_groups))
if additional_vifs:
state.additional_vifs = {}
for i, vif in enumerate(additional_vifs, start=1):
k = (oslo_cfg.CONF.kubernetes.additional_ifname_prefix
+ str(i))
state.additional_vifs[k] = vif
try:
self._set_pod_state(pod, state)
self._add_kuryrport_crd(pod)
except k_exc.K8sClientException as ex:
LOG.debug("Failed to set annotation: %s", ex)
# FIXME(ivc): improve granularity of K8sClient exceptions:
# only resourceVersion conflict should be ignored
for ifname, vif in state.vifs.items():
self._drv_vif_pool.release_vif(pod, vif,
project_id,
security_groups)
else:
changed = False
try:
for ifname, vif in state.vifs.items():
if (vif.plugin == constants.KURYR_VIF_TYPE_SRIOV and
oslo_cfg.CONF.sriov.enable_node_annotations):
driver_utils.update_port_pci_info(pod, vif)
if not vif.active:
try:
self._drv_vif_pool.activate_vif(vif)
changed = True
except os_exc.ResourceNotFound:
LOG.debug("Port not found, possibly already "
"deleted. No need to activate it")
finally:
if changed:
try:
self._set_pod_state(pod, state)
except k_exc.K8sResourceNotFound as ex:
LOG.exception("Failed to set annotation: %s", ex)
for ifname, vif in state.vifs.items():
self._drv_vif_pool.release_vif(
pod, vif, project_id,
security_groups)
except k_exc.K8sClientException:
pod_name = pod['metadata']['name']
raise k_exc.ResourceNotReady(pod_name)
if self._is_network_policy_enabled():
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
if oslo_cfg.CONF.octavia_defaults.enforce_sg_rules:
services = driver_utils.get_services()
self._update_services(
services, crd_pod_selectors, project_id)
LOG.exception("Kubernetes Client Exception creating "
"KuryrPort CRD: %s", ex)
raise k_exc.ResourceNotReady(pod)
def on_deleted(self, pod):
if (driver_utils.is_host_network(pod) or
not pod['spec'].get('nodeName')):
return
k8s = clients.get_kubernetes_client()
k8s.add_finalizer(pod, constants.POD_FINALIZER)
project_id = self._drv_project.get_project(pod)
def on_finalize(self, pod):
k8s = clients.get_kubernetes_client()
try:
crd_pod_selectors = self._drv_sg.delete_sg_rules(pod)
except k_exc.ResourceNotReady:
# NOTE(ltomasbo): If the pod is being deleted before
# kuryr-controller annotated any information about the port
# associated, there is no need for deleting sg rules associated to
# it. So this exception could be safetly ignored for the current
# sg drivers. Only the NP driver associates rules to the pods ips,
# and that waits for annotations to start.
LOG.debug("Pod was not yet annotated by Kuryr-controller. "
"Skipping SG rules deletion associated to the pod %s",
pod)
crd_pod_selectors = []
try:
security_groups = self._drv_sg.get_security_groups(pod, project_id)
except k_exc.ResourceNotReady:
# NOTE(ltomasbo): If the namespace object gets deleted first the
# namespace security group driver will raise a ResourceNotReady
# exception as it cannot access anymore the kuryrnetwork CRD
# annotated on the namespace object. In such case we set security
# groups to empty list so that if pools are enabled they will be
# properly released.
security_groups = []
k8s.delete(KURYRPORT_URI.format(ns=pod["metadata"]["namespace"],
crd=pod["metadata"]["name"]))
except k_exc.K8sResourceNotFound:
k8s.remove_finalizer(pod, constants.POD_FINALIZER)
state = driver_utils.get_pod_state(pod)
LOG.debug("Got VIFs from annotation: %r", state)
if state:
for ifname, vif in state.vifs.items():
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
if (self._is_network_policy_enabled() and crd_pod_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
services = driver_utils.get_services()
self._update_services(services, crd_pod_selectors, project_id)
except k_exc.K8sClientException:
LOG.exception("Could not remove KuryrPort CRD for pod %s.",
pod['metadata']['name'])
raise k_exc.ResourceNotReady(pod['metadata']['name'])
def is_ready(self, quota):
if (utils.has_limit(quota.ports) and
@ -236,42 +134,6 @@ class VIFHandler(k8s_base.ResourceEventHandler):
except KeyError:
return False
def _set_pod_state(self, pod, state):
# TODO(ivc): extract annotation interactions
if not state:
old_annotation = pod['metadata'].get('annotations', {})
LOG.debug("Removing VIFs annotation: %r for pod %s/%s (uid: %s)",
old_annotation.get(constants.K8S_ANNOTATION_VIF),
pod['metadata']['namespace'], pod['metadata']['name'],
pod['metadata']['uid'])
annotation = None
else:
state_dict = state.obj_to_primitive()
annotation = jsonutils.dumps(state_dict, sort_keys=True)
LOG.debug("Setting VIFs annotation: %r for pod %s/%s (uid: %s)",
annotation, pod['metadata']['namespace'],
pod['metadata']['name'], pod['metadata']['uid'])
labels = pod['metadata'].get('labels')
if not labels:
LOG.debug("Removing Label annotation: %r", labels)
labels_annotation = None
else:
labels_annotation = jsonutils.dumps(labels, sort_keys=True)
LOG.debug("Setting Labels annotation: %r", labels_annotation)
# NOTE(dulek): We don't care about compatibility with Queens format
# here, as eventually all Kuryr services will be upgraded
# and cluster will start working normally. Meanwhile
# we just ignore issue of old services being unable to
# read new annotations.
k8s = clients.get_kubernetes_client()
k8s.annotate(pod['metadata']['selfLink'],
{constants.K8S_ANNOTATION_VIF: annotation,
constants.K8S_ANNOTATION_LABEL: labels_annotation},
resource_version=pod['metadata']['resourceVersion'])
def _update_services(self, services, crd_pod_selectors, project_id):
for service in services.get('items'):
if not driver_utils.service_matches_affected_pods(
@ -285,3 +147,59 @@ class VIFHandler(k8s_base.ResourceEventHandler):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')
def _add_kuryrport_crd(self, pod, vifs=None):
LOG.debug('Adding CRD %s', pod["metadata"]["name"])
if not vifs:
vifs = {}
kuryr_port = {
'apiVersion': constants.K8S_API_CRD_VERSION,
'kind': constants.K8S_OBJ_KURYRPORT,
'metadata': {
'name': pod['metadata']['name'],
'finalizers': [constants.KURYRPORT_FINALIZER],
'labels': {
constants.KURYRPORT_LABEL: pod['spec']['nodeName']
}
},
'spec': {
'podUid': pod['metadata']['uid'],
'podNodeName': pod['spec']['nodeName'],
'vifs': vifs
}
}
k8s = clients.get_kubernetes_client()
k8s.post(KURYRPORT_URI.format(ns=pod["metadata"]["namespace"],
crd=''), kuryr_port)
def _move_annotations_to_crd(self, pod):
"""Support upgrade from annotations to KuryrPort CRD."""
try:
state = (pod['metadata']['annotations']
[constants.K8S_ANNOTATION_VIF])
except KeyError:
return False
_dict = jsonutils.loads(state)
state = objects.base.VersionedObject.obj_from_primitive(_dict)
vifs = {ifname: {'default': state.default_vif == vif,
'vif': objects.base.VersionedObject
.obj_to_primitive(vif)}
for ifname, vif in state.vifs.items()}
try:
self._add_kuryrport_crd(pod, vifs)
except k_exc.K8sClientException as ex:
LOG.exception("Kubernetes Client Exception recreating "
"KuryrPort CRD from annotation: %s", ex)
raise k_exc.ResourceNotReady(pod)
k8s = clients.get_kubernetes_client()
k8s.remove_annotations(pod['metadata']['selfLink'],
constants.K8S_ANNOTATION_VIF)
return True

View File

@ -316,7 +316,7 @@ class K8sClient(object):
headers=header, cert=self.cert,
verify=self.verify_server)
if response.ok:
return response.json()['metadata']['annotations']
return response.json()['metadata'].get('annotations', {})
if response.status_code == requests.codes.conflict:
resource = self.get(path)
new_version = resource['metadata']['resourceVersion']

View File

@ -29,10 +29,11 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient()).client
self.default_iface = 'baz'
self.additional_iface = 'eth1'
self.pod = {'metadata': {'name': 'foo', 'uid': 'bar',
'namespace': 'default', 'selfLink': 'baz'}}
self.vifs = fake._fake_vifs_dict()
registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs,
self.kp = {'metadata': {'name': 'foo', 'uid': 'bar',
'namespace': 'default', 'selfLink': 'baz'},
'spec': {'podUid': 'bar'}}
self.vifs = fake._fake_vifs()
registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs,
'containerid': None,
'vif_unplugged': False,
'del_received': False}}
@ -46,7 +47,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
@mock.patch('oslo_concurrency.lockutils.lock')
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present(self, m_connect, m_lock):
self.k8s_mock.get.return_value = self.pod
self.k8s_mock.get.return_value = self.kp
self.plugin.add(self.params)
@ -99,7 +100,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
@mock.patch('kuryr_kubernetes.cni.binding.base.disconnect')
def test_del_wrong_container_id(self, m_disconnect):
registry = {'default/foo': {'pod': self.pod, 'vifs': self.vifs,
registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs,
'containerid': 'different'}}
healthy = mock.Mock()
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin(registry, healthy)
@ -112,11 +113,11 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
@mock.patch('kuryr_kubernetes.cni.binding.base.connect')
def test_add_present_on_5_try(self, m_connect, m_lock):
se = [KeyError] * 5
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
se.append({'kp': self.kp, 'vifs': self.vifs, 'containerid': None,
'vif_unplugged': False, 'del_received': False})
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
se.append({'kp': self.kp, 'vifs': self.vifs, 'containerid': None,
'vif_unplugged': False, 'del_received': False})
se.append({'pod': self.pod, 'vifs': self.vifs, 'containerid': None,
se.append({'kp': self.kp, 'vifs': self.vifs, 'containerid': None,
'vif_unplugged': False, 'del_received': False})
m_getitem = mock.Mock(side_effect=se)
m_setitem = mock.Mock()
@ -127,7 +128,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase):
m_lock.assert_called_with('default/foo', external=True)
m_setitem.assert_called_once_with('default/foo',
{'pod': self.pod,
{'kp': self.kp,
'vifs': self.vifs,
'containerid': 'cont_id',
'vif_unplugged': False,

View File

@ -20,7 +20,6 @@ import ddt
import munch
from openstack import exceptions as os_exc
from oslo_config import cfg as oslo_cfg
from oslo_serialization import jsonutils
from os_vif.objects import vif as osv_vif
@ -29,7 +28,6 @@ from kuryr_kubernetes.controller.drivers import nested_vlan_vif
from kuryr_kubernetes.controller.drivers import neutron_vif
from kuryr_kubernetes.controller.drivers import vif_pool
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.objects import vif
from kuryr_kubernetes import os_vif_util as ovu
from kuryr_kubernetes.tests import base as test_base
from kuryr_kubernetes.tests import fake
@ -276,7 +274,8 @@ class BaseVIFPool(test_base.TestCase):
m_driver._return_ports_to_pool.assert_not_called()
def test__get_in_use_ports(self):
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_vifs')
def test__get_in_use_ports(self, get_vifs):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
@ -284,10 +283,7 @@ class BaseVIFPool(test_base.TestCase):
pod = get_pod_obj()
port_id = str(uuid.uuid4())
pod_vif = osv_vif.VIFBase(id=port_id)
pod_state = vif.PodState(default_vif=pod_vif)
pod['metadata']['annotations'][constants.K8S_ANNOTATION_VIF] = (
jsonutils.dumps(pod_state.obj_to_primitive()))
get_vifs.return_value = {'eth0': pod_vif}
items = [pod]
kubernetes.get.return_value = {'items': items}
@ -295,20 +291,6 @@ class BaseVIFPool(test_base.TestCase):
self.assertEqual(resp, [port_id])
def test__get_in_use_ports_exception(self):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
kubernetes = self.useFixture(k_fix.MockK8sClient()).client
pod = get_pod_obj()
del pod['metadata']['annotations'][constants.K8S_ANNOTATION_VIF]
items = [pod]
kubernetes.get.return_value = {'items': items}
resp = cls._get_in_use_ports(m_driver)
self.assertEqual(resp, [])
def test__get_in_use_ports_empty(self):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)

View File

@ -0,0 +1,751 @@
# Copyright (c) 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from openstack import exceptions as os_exc
from os_vif import objects as os_obj
from oslo_config import cfg
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import multi_vif
from kuryr_kubernetes.controller.handlers import kuryrport
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.tests import base as test_base
CONF = cfg.CONF
class TestKuryrPortHandler(test_base.TestCase):
def setUp(self):
super().setUp()
self._project_id = mock.sentinel.project_id
self._subnets = mock.sentinel.subnets
self._security_groups = mock.sentinel.security_groups
self._host = mock.sentinel.hostname
self._pod_version = mock.sentinel.pod_version
self._pod_link = mock.sentinel.pod_link
self._kp_version = mock.sentinel.kp_version
self._kp_link = mock.sentinel.kp_link
self._kp_namespace = mock.sentinel.namespace
self._kp_uid = mock.sentinel.kp_uid
self._kp_name = 'pod1'
self._pod = {'metadata': {'resourceVersion': self._pod_version,
'selfLink': self._pod_link,
'name': self._kp_name,
'namespace': self._kp_namespace},
'spec': {'nodeName': self._host}}
self._kp = {
'metadata': {
'resourceVersion': self._kp_version,
'selfLink': self._kp_link,
'name': self._kp_name,
'namespace': self._kp_namespace,
'labels': {
constants.KURYRPORT_LABEL: self._host
}
},
'spec': {
'podUid': 'deadbeef',
'podNodeName': self._host,
'vifs': {}
}
}
self._vif1 = os_obj.vif.VIFBase()
self._vif2 = os_obj.vif.VIFBase()
self._vif1.active = False
self._vif2.active = False
self._vif1.plugin = 'object'
self._vif2.plugin = 'object'
self._vif1_primitive = self._vif1.obj_to_primitive()
self._vif2_primitive = self._vif2.obj_to_primitive()
self._vifs_primitive = {'eth0': {'default': True,
'vif': self._vif1_primitive},
'eth1': {'default': False,
'vif': self._vif2_primitive}}
self._vifs = {'eth0': {'default': True,
'vif': self._vif1},
'eth1': {'default': False,
'vif': self._vif2}}
self._pod_uri = (f"{constants.K8S_API_NAMESPACES}"
f"/{self._kp['metadata']['namespace']}/pods/"
f"{self._kp['metadata']['name']}")
self._driver = multi_vif.NoopMultiVIFDriver()
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler.get_vifs')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_no_vifs_create(self, ged, get_k8s_client, get_vifs):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
get_vifs.return_value = True
kp.on_present(self._kp)
get_vifs.assert_called_once_with(self._kp)
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler.get_vifs')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_getting_vifs_failed(self, ged, get_k8s_client,
get_vifs):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
get_vifs.return_value = False
self.assertFalse(kp.on_present(self._kp))
get_vifs.assert_called_once_with(self._kp)
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present(self, ged, get_k8s_client, activate_vif,
update_crd, get_project):
ged.return_value = [mock.MagicMock]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
get_project.return_value = self._project_id
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod
kp.on_present(self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
activate_vif.assert_has_calls([mock.call(self._vif1),
mock.call(self._vif2)])
update_crd.assert_called_once_with(self._kp, self._vifs)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_active(self, ged, get_k8s_client):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._vif1.active = True
self._vif2.active = True
self._kp['spec']['vifs'] = {
'eth0': {'default': True,
'vif': self._vif1.obj_to_primitive()},
'eth1': {'default': False,
'vif': self._vif2.obj_to_primitive()}}
kp.on_present(self._kp)
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_port_not_found(self, ged, get_k8s_client, activate_vif,
update_crd):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
activate_vif.side_effect = os_exc.ResourceNotFound()
kp.on_present(self._kp)
activate_vif.assert_has_calls([mock.call(self._vif1),
mock.call(self._vif2)])
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_pod_not_found(self, ged, get_k8s_client, activate_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.side_effect = k_exc.K8sResourceNotFound(self._pod)
self.assertRaises(k_exc.K8sResourceNotFound, kp.on_present,
self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'release_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_fail_update_crd(self, ged, get_k8s_client,
activate_vif, update_crd, get_project,
get_sg, release_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
update_crd.side_effect = k_exc.K8sResourceNotFound(self._kp)
get_project.return_value = self._project_id
get_sg.return_value = self._security_groups
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod
kp.on_present(self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'release_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_exception_during_update_crd(self, ged, get_k8s_client,
activate_vif,
update_crd, get_project,
get_sg, release_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
update_crd.side_effect = k_exc.K8sClientException()
get_project.return_value = self._project_id
get_sg.return_value = self._security_groups
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod
self.assertRaises(k_exc.ResourceNotReady, kp.on_present, self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
update_crd.assert_called_once_with(self._kp, self._vifs)
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'update_port_pci_info')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_sriov(self, ged, get_k8s_client, update_port_pci_info,
activate_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._vif2.plugin = constants.KURYR_VIF_TYPE_SRIOV
self._vif2.active = True
self._kp['spec']['vifs'] = {
'eth0': {'default': True,
'vif': self._vif2.obj_to_primitive()},
'eth1': {'default': False,
'vif': self._vif1.obj_to_primitive()}}
CONF.set_override('enable_node_annotations', True, group='sriov')
self.addCleanup(CONF.clear_override, 'enable_node_annotations',
group='sriov')
activate_vif.side_effect = os_exc.ResourceNotFound()
kp.on_present(self._kp)
update_port_pci_info.assert_called_once_with(self._host, self._vif2)
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_services')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.create_sg_rules')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.LBaaSDriver.'
'get_instance')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'activate_vif')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._is_network_policy_enabled')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_present_np(self, ged, is_np_enabled, get_k8s_client,
activate_vif, update_crd, get_lb_instance,
get_sg_instance, create_sgr, update_services,
get_services, get_project):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod
kp.on_present(self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
activate_vif.assert_has_calls([mock.call(self._vif1),
mock.call(self._vif2)])
update_crd.assert_called_once_with(self._kp, self._vifs)
create_sgr.assert_called_once_with(self._pod)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_finalize_exception_on_pod(self, ged, k8s):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.side_effect = k_exc.K8sResourceNotFound(self._pod)
self.assertRaises(k_exc.K8sResourceNotFound, kp.on_finalize,
self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
k8s.remove_finalizer.assert_called_once_with(
self._kp, constants.POD_FINALIZER)
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_finalize_host_net_or_no_nodename(self, ged, k8s,
is_host_network):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
is_host_network.return_value = False
_pod = dict(self._pod)
del _pod['spec']['nodeName']
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = _pod
kp.on_finalize(self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
is_host_network.assert_called_once_with(self._pod)
is_host_network.reset_mock()
is_host_network.return_value = False
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod
kp.on_finalize(self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
is_host_network.assert_called_once_with(self._pod)
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'release_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.delete_sg_rules')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_finalize_crd_sg_exceptions(self, ged, k8s, is_host_network,
get_project, delete_sg_rules,
get_sg, release_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
is_host_network.return_value = False
get_project.return_value = self._project_id
delete_sg_rules.side_effect = k_exc.ResourceNotReady(self._pod)
get_sg.side_effect = k_exc.ResourceNotReady(self._pod)
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod
kp.on_finalize(self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
k8s.remove_finalizer.assert_has_calls(
[mock.call(self._pod, constants.POD_FINALIZER),
mock.call(self._kp, constants.KURYRPORT_FINALIZER)])
is_host_network.assert_called_once_with(self._pod)
delete_sg_rules.assert_called_once_with(self._pod)
get_sg.assert_called_once_with(self._pod, self._project_id)
release_vif.assert_has_calls([mock.call(self._pod, self._vif1,
self._project_id, []),
mock.call(self._pod, self._vif2,
self._project_id, [])])
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.base.'
'ServiceSecurityGroupsDriver.get_instance')
@mock.patch('kuryr_kubernetes.controller.drivers.base.LBaaSDriver.'
'get_instance')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._is_network_policy_enabled')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'release_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.delete_sg_rules')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_on_finalize_np(self, ged, k8s, is_host_network, get_project,
delete_sg_rules, get_sg, release_vif,
is_np_enabled, get_lb_instance, get_sg_instance,
get_services, update_services):
ged.return_value = [self._driver]
CONF.set_override('enforce_sg_rules', True, group='octavia_defaults')
self.addCleanup(CONF.clear_override, 'enforce_sg_rules',
group='octavia_defaults')
kp = kuryrport.KuryrPortHandler()
self._kp['spec']['vifs'] = self._vifs_primitive
is_host_network.return_value = False
get_project.return_value = self._project_id
selector = mock.sentinel.selector
delete_sg_rules.return_value = selector
get_sg.return_value = self._security_groups
get_services.return_value = mock.sentinel.services
with mock.patch.object(kp, 'k8s') as k8s:
k8s.get.return_value = self._pod
kp.on_finalize(self._kp)
k8s.get.assert_called_once_with(self._pod_uri)
k8s.remove_finalizer.assert_has_calls(
[mock.call(self._pod, constants.POD_FINALIZER),
mock.call(self._kp, constants.KURYRPORT_FINALIZER)])
is_host_network.assert_called_once_with(self._pod)
delete_sg_rules.assert_called_once_with(self._pod)
get_sg.assert_called_once_with(self._pod, self._project_id)
release_vif.assert_has_calls([mock.call(self._pod, self._vif1,
self._project_id,
self._security_groups),
mock.call(self._pod, self._vif2,
self._project_id,
self._security_groups)])
get_services.assert_called_once()
update_services.assert_called_once_with(mock.sentinel.services,
selector, self._project_id)
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'request_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_subnet.'
'DefaultPodSubnetDriver.get_subnets')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_get_vifs(self, ged, k8s, get_project, get_sg, get_subnets,
request_vif, update_crd):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp.k8s.get.return_value = self._pod
get_sg.return_value = self._security_groups
get_project.return_value = self._project_id
get_subnets.return_value = mock.sentinel.subnets
request_vif.return_value = self._vif1
self.assertTrue(kp.get_vifs(self._kp))
kp.k8s.get.assert_called_once_with(self._pod_uri)
get_project.assert_called_once_with(self._pod)
get_sg.assert_called_once_with(self._pod, self._project_id)
get_subnets.assert_called_once_with(self._pod, self._project_id)
request_vif.assert_called_once_with(self._pod, self._project_id,
mock.sentinel.subnets,
self._security_groups)
update_crd.assert_called_once_with(self._kp,
{constants.DEFAULT_IFNAME:
{'default': True,
'vif': self._vif1}})
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_get_vifs_pod_not_found(self, ged, k8s):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp.k8s.get.side_effect = k_exc.K8sResourceNotFound(self._pod)
self.assertRaises(k_exc.K8sResourceNotFound, kp.get_vifs, self._kp)
kp.k8s.get.assert_called_once_with(self._pod_uri)
kp.k8s.remove_finalizer.assert_called_once_with(
self._kp, constants.KURYRPORT_FINALIZER)
@mock.patch('kuryr_kubernetes.controller.drivers.default_subnet.'
'DefaultPodSubnetDriver.get_subnets')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_get_vifs_subnet_error(self, ged, k8s, get_project, get_sg,
get_subnets):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp.k8s.get.return_value = self._pod
get_sg.return_value = self._security_groups
get_project.return_value = self._project_id
get_subnets.side_effect = os_exc.ResourceNotFound()
self.assertFalse(kp.get_vifs(self._kp))
kp.k8s.get.assert_called_once_with(self._pod_uri)
get_project.assert_called_once_with(self._pod)
get_sg.assert_called_once_with(self._pod, self._project_id)
get_subnets.assert_called_once_with(self._pod, self._project_id)
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'request_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_subnet.'
'DefaultPodSubnetDriver.get_subnets')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_get_vifs_no_vif(self, ged, k8s, get_project, get_sg, get_subnets,
request_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp.k8s.get.return_value = self._pod
get_sg.return_value = self._security_groups
get_project.return_value = self._project_id
get_subnets.return_value = mock.sentinel.subnets
request_vif.return_value = None
self.assertFalse(kp.get_vifs(self._kp))
kp.k8s.get.assert_called_once_with(self._pod_uri)
get_project.assert_called_once_with(self._pod)
get_sg.assert_called_once_with(self._pod, self._project_id)
get_subnets.assert_called_once_with(self._pod, self._project_id)
request_vif.assert_called_once_with(self._pod, self._project_id,
mock.sentinel.subnets,
self._security_groups)
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'request_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_subnet.'
'DefaultPodSubnetDriver.get_subnets')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_get_vifs_with_additional_vif(self, ged, k8s, get_project, get_sg,
get_subnets, request_vif,
update_crd):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp.k8s.get.return_value = self._pod
fake_driver = mock.MagicMock()
fake_driver.request_additional_vifs.return_value = [self._vif2]
kp._drv_multi_vif.append(fake_driver)
get_sg.return_value = self._security_groups
get_project.return_value = self._project_id
get_subnets.return_value = mock.sentinel.subnets
request_vif.return_value = self._vif1
self.assertTrue(kp.get_vifs(self._kp))
kp.k8s.get.assert_called_once_with(self._pod_uri)
get_project.assert_called_once_with(self._pod)
get_sg.assert_called_once_with(self._pod, self._project_id)
get_subnets.assert_called_once_with(self._pod, self._project_id)
request_vif.assert_called_once_with(self._pod, self._project_id,
mock.sentinel.subnets,
self._security_groups)
update_crd.assert_called_once_with(self._kp,
{'eth0': {'default': True,
'vif': self._vif1},
'eth1': {'default': False,
'vif': self._vif2}})
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'release_vif')
@mock.patch('kuryr_kubernetes.controller.handlers.kuryrport.'
'KuryrPortHandler._update_kuryrport_crd')
@mock.patch('kuryr_kubernetes.controller.drivers.vif_pool.MultiVIFPool.'
'request_vif')
@mock.patch('kuryr_kubernetes.controller.drivers.default_subnet.'
'DefaultPodSubnetDriver.get_subnets')
@mock.patch('kuryr_kubernetes.controller.drivers.default_security_groups.'
'DefaultPodSecurityGroupsDriver.get_security_groups')
@mock.patch('kuryr_kubernetes.controller.drivers.default_project.'
'DefaultPodProjectDriver.get_project')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_get_exception_on_update_crd(self, ged, k8s, get_project, get_sg,
get_subnets, request_vif, update_crd,
release_vif):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp.k8s.get.return_value = self._pod
get_sg.return_value = self._security_groups
get_project.return_value = self._project_id
get_subnets.return_value = mock.sentinel.subnets
request_vif.return_value = self._vif1
update_crd.side_effect = k_exc.K8sClientException()
self.assertTrue(kp.get_vifs(self._kp))
kp.k8s.get.assert_called_once_with(self._pod_uri)
get_project.assert_called_once_with(self._pod)
get_sg.assert_called_once_with(self._pod, self._project_id)
get_subnets.assert_called_once_with(self._pod, self._project_id)
request_vif.assert_called_once_with(self._pod, self._project_id,
mock.sentinel.subnets,
self._security_groups)
update_crd.assert_called_once_with(self._kp,
{constants.DEFAULT_IFNAME:
{'default': True,
'vif': self._vif1}})
release_vif.assert_called_once_with(self._pod, self._vif1,
self._project_id,
self._security_groups)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_update_kuryrport_crd(self, ged, k8s):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp._update_kuryrport_crd(self._kp, self._vifs)
self._vif1.obj_reset_changes()
self._vif2.obj_reset_changes()
vif1 = self._vif1.obj_to_primitive()
vif2 = self._vif2.obj_to_primitive()
kp.k8s.patch_crd.assert_called_once_with(
'spec', self._kp_link, {'vifs': {'eth0': {'default': True,
'vif': vif1},
'eth1': {'default': False,
'vif': vif2}}})
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_is_network_policy_enabled(self, ged, k8s):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
CONF.set_override('enabled_handlers', ['fake_handler'],
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'foo',
group='kubernetes')
self.assertFalse(kp._is_network_policy_enabled())
CONF.set_override('enabled_handlers', ['policy'],
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'foo',
group='kubernetes')
self.assertFalse(kp._is_network_policy_enabled())
CONF.set_override('enabled_handlers', ['policy'],
group='kubernetes')
self.addCleanup(CONF.clear_override, 'enabled_handlers',
group='kubernetes')
CONF.set_override('service_security_groups_driver', 'policy',
group='kubernetes')
self.addCleanup(CONF.clear_override, 'service_security_groups_driver',
group='kubernetes')
self.assertTrue(kp._is_network_policy_enabled())
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'service_matches_affected_pods')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.base.MultiVIFDriver.'
'get_enabled_drivers')
def test_update_services(self, ged, k8s, smap):
ged.return_value = [self._driver]
kp = kuryrport.KuryrPortHandler()
kp._drv_lbaas = mock.MagicMock()
kp._drv_svc_sg = mock.MagicMock()
kp._drv_svc_sg.get_security_groups.return_value = self._security_groups
smap.side_effect = [True, False]
services = {'items': ['service1', 'service2']}
kp._update_services(services, mock.sentinel.crd_pod_selectors,
self._project_id)
smap.assert_has_calls([mock.call('service1',
mock.sentinel.crd_pod_selectors),
mock.call('service2',
mock.sentinel.crd_pod_selectors)])
kp._drv_svc_sg.get_security_groups.assert_called_once_with(
'service1', self._project_id)
kp._drv_lbaas.update_lbaas_sg.assert_called_once_with(
'service1', self._security_groups)

View File

@ -49,7 +49,7 @@ class TestPodLabelHandler(test_base.TestCase):
self._set_vif_driver = self._handler._drv_vif_pool.set_vif_driver
self._get_pod_labels = self._handler._get_pod_labels
self._set_pod_labels = self._handler._set_pod_labels
self._has_pod_state = self._handler._has_pod_state
self._has_vifs = self._handler._has_vifs
self._update_vif_sgs = self._handler._drv_vif_pool.update_vif_sgs
self._get_project.return_value = self._project_id
@ -80,12 +80,12 @@ class TestPodLabelHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
def test_on_present(self, m_get_services):
m_get_services.return_value = {"items": []}
self._has_pod_state.return_value = True
self._has_vifs.return_value = True
self._get_pod_labels.return_value = {'test1': 'test'}
p_label.PodLabelHandler.on_present(self._handler, self._pod)
self._has_pod_state.assert_called_once_with(self._pod)
self._has_vifs.assert_called_once_with(self._pod)
self._get_pod_labels.assert_called_once_with(self._pod)
self._get_project.assert_called_once()
self._get_security_groups.assert_called_once()
@ -93,33 +93,33 @@ class TestPodLabelHandler(test_base.TestCase):
self._set_pod_labels.assert_called_once_with(self._pod, None)
def test_on_present_no_state(self):
self._has_pod_state.return_value = False
self._has_vifs.return_value = False
resp = p_label.PodLabelHandler.on_present(self._handler, self._pod)
self.assertIsNone(resp)
self._has_pod_state.assert_called_once_with(self._pod)
self._has_vifs.assert_called_once_with(self._pod)
self._get_pod_labels.assert_not_called()
self._set_pod_labels.assert_not_called()
def test_on_present_no_labels(self):
self._has_pod_state.return_value = True
self._has_vifs.return_value = True
self._get_pod_labels.return_value = None
p_label.PodLabelHandler.on_present(self._handler, self._pod)
self._has_pod_state.assert_called_once_with(self._pod)
self._has_vifs.assert_called_once_with(self._pod)
self._get_pod_labels.assert_called_once_with(self._pod)
self._set_pod_labels.assert_not_called()
def test_on_present_no_changes(self):
self._has_pod_state.return_value = True
self._has_vifs.return_value = True
pod_with_label = self._pod.copy()
pod_with_label['metadata']['labels'] = {'test1': 'test'}
self._get_pod_labels.return_value = {'test1': 'test'}
p_label.PodLabelHandler.on_present(self._handler, pod_with_label)
self._has_pod_state.assert_called_once_with(pod_with_label)
self._has_vifs.assert_called_once_with(pod_with_label)
self._get_pod_labels.assert_called_once_with(pod_with_label)
self._set_pod_labels.assert_not_called()

View File

@ -16,6 +16,7 @@
from unittest import mock
from os_vif import objects as os_obj
from oslo_serialization import jsonutils
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drivers
@ -43,15 +44,30 @@ class TestVIFHandler(test_base.TestCase):
self._pod_version = mock.sentinel.pod_version
self._pod_link = mock.sentinel.pod_link
self._pod_namespace = mock.sentinel.namespace
self._pod_uid = mock.sentinel.pod_uid
self._pod_name = 'pod1'
self._pod = {
'metadata': {'resourceVersion': self._pod_version,
'selfLink': self._pod_link,
'name': self._pod_name,
'namespace': self._pod_namespace},
'status': {'phase': k_const.K8S_POD_STATUS_PENDING},
'spec': {'hostNetwork': False,
'nodeName': 'hostname'}
}
self._kp_version = mock.sentinel.kp_version
self._kp_link = mock.sentinel.kp_link
self._kp = {'apiVersion': 'openstack.org/v1',
'kind': 'KuryrPort',
'metadata': {'resourceVersion': self._kp_version,
'selfLink': mock.sentinel.kp_link,
'namespace': self._pod_namespace,
'labels': mock.ANY},
'spec': {'podUid': self._pod_uid,
'podNodeName': 'hostname',
'vifs': {}}}
self._handler = mock.MagicMock(spec=h_vif.VIFHandler)
self._handler._drv_project = mock.Mock(spec=drivers.PodProjectDriver)
self._handler._drv_subnets = mock.Mock(spec=drivers.PodSubnetsDriver)
@ -68,7 +84,7 @@ class TestVIFHandler(test_base.TestCase):
self._request_vif = self._handler._drv_vif_pool.request_vif
self._release_vif = self._handler._drv_vif_pool.release_vif
self._activate_vif = self._handler._drv_vif_pool.activate_vif
self._set_pod_state = self._handler._set_pod_state
self._matc = self._handler._move_annotations_to_crd
self._is_pod_scheduled = self._handler._is_pod_scheduled
self._is_pod_completed = self._handler._is_pod_completed
self._request_additional_vifs = \
@ -152,224 +168,183 @@ class TestVIFHandler(test_base.TestCase):
self.assertTrue(h_vif.VIFHandler._is_pod_completed({'status': {'phase':
k_const.K8S_POD_STATUS_FAILED}}))
@mock.patch('oslo_config.cfg.CONF')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'update_port_pci_info')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present(self, m_get_pod_state, m_host_network, m_update_pci,
m_conf):
m_get_pod_state.return_value = self._state
m_host_network.return_value = False
self._vif.plugin = 'sriov'
m_conf.sriov.enable_node_annotations = True
h_vif.VIFHandler.on_present(self._handler, self._pod)
m_get_pod_state.assert_called_once_with(self._pod)
m_update_pci.assert_called_once_with(self._pod, self._vif)
self._request_vif.assert_not_called()
self._request_additional_vifs.assert_not_called()
self._activate_vif.assert_not_called()
self._set_pod_state.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_host_network(self, m_get_pod_state, m_host_network):
m_get_pod_state.return_value = self._state
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_present_host_network(self, m_get_kuryrport, m_host_network):
m_get_kuryrport.return_value = self._kp
m_host_network.return_value = True
self._matc.return_value = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
m_get_pod_state.assert_called_once()
self._matc.assert_called_once_with(self._pod)
m_get_kuryrport.assert_called_once()
self._request_vif.assert_not_called()
self._request_additional_vifs.assert_not_called()
self._activate_vif.assert_not_called()
self._set_pod_state.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_not_pending(self, m_get_pod_state, m_host_network):
m_get_pod_state.return_value = self._state
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_present_not_pending(self, m_get_kuryrport, m_host_network):
m_get_kuryrport.return_value = self._kp
m_host_network.return_value = False
self._is_pod_scheduled.return_value = False
self._matc.return_value = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
m_get_pod_state.assert_called_once()
self._matc.assert_called_once_with(self._pod)
m_get_kuryrport.assert_called_once()
self._request_vif.assert_not_called()
self._request_additional_vifs.assert_not_called()
self._activate_vif.assert_not_called()
self._set_pod_state.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_on_completed_with_annotation(self, m_get_pod_state):
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_present_on_completed_with_annotation(self, m_get_kuryrport):
self._is_pod_completed.return_value = True
m_get_pod_state.return_value = self._state
m_get_kuryrport.return_value = self._kp
self._matc.return_value = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._handler.on_deleted.assert_called_once_with(self._pod)
self._set_pod_state.assert_called_once_with(self._pod, None)
self._matc.assert_called_once_with(self._pod)
self._handler.on_finalize.assert_called_once_with(self._pod)
self._request_vif.assert_not_called()
self._request_additional_vifs.assert_not_called()
self._activate_vif.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_on_completed_without_annotation(self, m_get_pod_state):
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_present_on_completed_without_annotation(self, m_get_kuryrport):
self._is_pod_completed.return_value = True
m_get_pod_state.return_value = None
m_get_kuryrport.return_value = None
self._matc.return_value = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._handler.on_deleted.assert_not_called()
self._set_pod_state.assert_not_called()
self._matc.assert_called_once_with(self._pod)
self._handler.on_finalize.assert_not_called()
self._request_vif.assert_not_called()
self._request_additional_vifs.assert_not_called()
self._activate_vif.assert_not_called()
@mock.patch('oslo_config.cfg.CONF')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.'
'update_port_pci_info')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_activate(self, m_get_pod_state, m_host_network,
m_get_services, m_update_pci, m_conf):
m_get_pod_state.return_value = self._state
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_present_create(self, m_get_kuryrport, m_host_network,
m_get_k8s_client):
m_get_kuryrport.return_value = None
m_host_network.return_value = False
m_get_services.return_value = {"items": []}
self._vif.active = False
self._vif.plugin = 'sriov'
m_conf.sriov.enable_node_annotations = True
self._matc.return_value = False
k8s = mock.MagicMock()
m_get_k8s_client.return_value = k8s
h_vif.VIFHandler.on_present(self._handler, self._pod)
m_get_pod_state.assert_called_once_with(self._pod)
m_update_pci.assert_called_once_with(self._pod, self._vif)
self._activate_vif.assert_called_once_with(self._vif)
self._set_pod_state.assert_called_once_with(self._pod, self._state)
self._request_vif.assert_not_called()
self._request_additional_vifs.assert_not_called()
m_get_kuryrport.assert_called_once_with(self._pod)
self._matc.assert_called_once_with(self._pod)
self._handler._add_kuryrport_crd.assert_called_once_with(self._pod)
k8s.add_finalizer.assert_called_once_with(self._pod,
k_const.POD_FINALIZER)
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_create(self, m_get_pod_state, m_host_network):
m_get_pod_state.return_value = None
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_present_update(self, m_get_kuryrport, m_host_network):
m_get_kuryrport.return_value = self._kp
m_host_network.return_value = False
self._matc.return_value = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
m_get_pod_state.assert_called_once_with(self._pod)
self._request_vif.assert_called_once_with(
self._pod, self._project_id, self._subnets, self._security_groups)
self._request_additional_vifs.assert_called_once_with(
self._pod, self._project_id, self._security_groups)
self._set_pod_state.assert_called_once_with(self._pod, self._state)
self._activate_vif.assert_not_called()
@mock.patch('oslo_config.cfg.CONF')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_create_with_additional_vifs(self, m_get_pod_state,
m_host_network, m_conf):
m_get_pod_state.return_value = None
m_host_network.return_value = False
ifname_prefix = 'baz'
m_conf.kubernetes.additional_ifname_prefix = ifname_prefix
additional_vif = os_obj.vif.VIFBase()
self._state.additional_vifs = {ifname_prefix+'1': additional_vif}
self._request_additional_vifs.return_value = [additional_vif]
h_vif.VIFHandler.on_present(self._handler, self._pod)
m_get_pod_state.assert_called_once_with(self._pod)
self._request_vif.assert_called_once_with(
self._pod, self._project_id, self._subnets, self._security_groups)
self._request_additional_vifs.assert_called_once_with(
self._pod, self._project_id, self._security_groups)
self._set_pod_state.assert_called_once_with(self._pod, self._state)
self._activate_vif.assert_not_called()
self._matc.assert_called_once_with(self._pod)
m_get_kuryrport.assert_called_once_with(self._pod)
self._handler._add_kuryrport_crd.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_present_rollback(self, m_get_pod_state, m_host_network):
m_get_pod_state.return_value = None
m_host_network.return_value = False
self._set_pod_state.side_effect = k_exc.K8sClientException
h_vif.VIFHandler.on_present(self._handler, self._pod)
m_get_pod_state.assert_called_once_with(self._pod)
self._request_vif.assert_called_once_with(
self._pod, self._project_id, self._subnets, self._security_groups)
self._request_additional_vifs.assert_called_once_with(
self._pod, self._project_id, self._security_groups)
self._set_pod_state.assert_called_once_with(self._pod, self._state)
self._release_vif.assert_called_once_with(self._pod, self._vif,
self._project_id,
self._security_groups)
self._activate_vif.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_deleted(self, m_get_pod_state, m_host_network, m_get_services):
m_get_pod_state.return_value = self._state
m_host_network.return_value = False
m_get_services.return_value = {"items": []}
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
m_get_pod_state.assert_called_once_with(self._pod)
self._release_vif.assert_called_once_with(self._pod, self._vif,
self._project_id,
self._security_groups)
@mock.patch('oslo_config.cfg.CONF')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_deleted_with_additional_vifs(self, m_get_pod_state,
m_host_network, m_get_services,
m_conf):
additional_vif = os_obj.vif.VIFBase()
ifname_prefix = 'bar'
m_conf.kubernetes.additional_ifname_prefix = ifname_prefix
self._state.additional_vifs = {ifname_prefix+'1': additional_vif}
m_get_pod_state.return_value = self._state
m_host_network.return_value = False
m_get_services.return_value = {"items": []}
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
self._release_vif.assert_any_call(self._pod, self._vif,
self._project_id,
self._security_groups)
self._release_vif.assert_any_call(self._pod, additional_vif,
self._project_id,
self._security_groups)
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_deleted_host_network(self, m_get_pod_state, m_host_network):
m_get_pod_state.return_value = self._state
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_present_upgrade(self, m_get_kuryrport, m_host_network):
m_get_kuryrport.return_value = self._kp
m_host_network.return_value = True
self._matc.return_value = True
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
h_vif.VIFHandler.on_present(self._handler, self._pod)
m_get_pod_state.assert_not_called()
self._release_vif.assert_not_called()
self._matc.assert_called_once_with(self._pod)
m_get_kuryrport.assert_not_called()
self._request_vif.assert_not_called()
self._request_additional_vifs.assert_not_called()
self._activate_vif.assert_not_called()
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_services')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.is_host_network')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_state')
def test_on_deleted_no_annotation(self, m_get_pod_state, m_host_network,
m_get_services):
m_get_pod_state.return_value = None
m_host_network.return_value = False
m_get_services.return_value = {"items": []}
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_finalize_crd(self, m_get_kuryrport, m_get_k8s_client):
m_get_kuryrport.return_value = self._kp
k8s = mock.MagicMock()
m_get_k8s_client.return_value = k8s
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
h_vif.VIFHandler.on_finalize(self._handler, self._pod)
m_get_pod_state.assert_called_once_with(self._pod)
self._release_vif.assert_not_called()
k8s.delete.assert_called_once_with(
h_vif.KURYRPORT_URI.format(
ns=self._pod["metadata"]["namespace"],
crd=self._pod["metadata"]["name"]))
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_finalize_crd_exception(self, m_get_kuryrport,
m_get_k8s_client):
m_get_kuryrport.return_value = self._kp
k8s = mock.MagicMock()
m_get_k8s_client.return_value = k8s
k8s.delete.side_effect = k_exc.K8sClientException
self.assertRaises(k_exc.ResourceNotReady, h_vif.VIFHandler
.on_finalize, self._handler, self._pod)
k8s.delete.assert_called_once_with(
h_vif.KURYRPORT_URI.format(
ns=self._pod["metadata"]["namespace"],
crd=self._pod["metadata"]["name"]))
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
@mock.patch('kuryr_kubernetes.controller.drivers.utils.get_kuryrport')
def test_on_finalize_crd_not_found(self, m_get_kuryrport,
m_get_k8s_client):
m_get_kuryrport.return_value = self._kp
k8s = mock.MagicMock()
m_get_k8s_client.return_value = k8s
k8s.delete.side_effect = k_exc.K8sResourceNotFound(self._pod)
h_vif.VIFHandler.on_finalize(self._handler, self._pod)
k8s.delete.assert_called_once_with(
h_vif.KURYRPORT_URI.format(
ns=self._pod["metadata"]["namespace"],
crd=self._pod["metadata"]["name"]))
(k8s.remove_finalizer
.assert_called_once_with(self._pod, k_const.POD_FINALIZER))
def test_move_annotations_to_crd_no_annotations(self):
res = h_vif.VIFHandler._move_annotations_to_crd(self._handler,
self._pod)
self.assertFalse(res)
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_move_annotations_to_crd_with_annotations(self, m_get_k8s_client):
vifobj = os_obj.vif.VIFOpenVSwitch()
state = vif.PodState(default_vif=vifobj)
annotation = jsonutils.dumps(state.obj_to_primitive())
self._pod['metadata']['annotations'] = {
k_const.K8S_ANNOTATION_VIF: annotation}
vifs = {'eth0': {'default': True, 'vif': vifobj.obj_to_primitive()}}
k8s = mock.MagicMock()
m_get_k8s_client.return_value = k8s
res = h_vif.VIFHandler._move_annotations_to_crd(self._handler,
self._pod)
self.assertTrue(res)
self._handler._add_kuryrport_crd.assert_called_once_with(self._pod,
vifs)
m_get_k8s_client.assert_called_once()
k8s.remove_annotations.assert_called_once_with(
self._pod['metadata']['selfLink'], k_const.K8S_ANNOTATION_VIF)

View File

@ -97,15 +97,15 @@ def convert_netns(netns):
return netns
def get_pod_unique_name(pod):
"""Returns a unique name for the pod.
def get_res_unique_name(resource):
"""Returns a unique name for the resource like pod or CRD.
It returns a pod unique name for the pod composed of its name and the
It returns a unique name for the resource composed of its name and the
namespace it is running on.
:returns: String with namespace/name of the pod
:returns: String with namespace/name of the resource
"""
return "%(namespace)s/%(name)s" % pod['metadata']
return "%(namespace)s/%(name)s" % resource['metadata']
def check_suitable_multi_pool_driver_opt(pool_driver, pod_driver):
@ -252,6 +252,15 @@ def extract_pod_annotation(annotation):
return obj
def get_vifs_from_crd(crd):
result = {}
for ifname in crd['spec']['vifs']:
result[ifname] = (objects.base.VersionedObject
.obj_from_primitive(crd['spec']['vifs']
[ifname]['vif']))
return result
def has_limit(quota):
NO_LIMIT = -1
return quota['limit'] != NO_LIMIT

View File

@ -107,6 +107,7 @@ kuryr_kubernetes.controller.handlers =
kuryrnetwork = kuryr_kubernetes.controller.handlers.kuryrnetwork:KuryrNetworkHandler
kuryrnetwork_population = kuryr_kubernetes.controller.handlers.kuryrnetwork_population:KuryrNetworkPopulationHandler
test_handler = kuryr_kubernetes.tests.unit.controller.handlers.test_fake_handler:TestHandler
kuryrport = kuryr_kubernetes.controller.handlers.kuryrport:KuryrPortHandler
kuryr_kubernetes.controller.drivers.multi_vif =
noop = kuryr_kubernetes.controller.drivers.multi_vif:NoopMultiVIFDriver

View File

@ -37,6 +37,7 @@ sudo chown ${USER}:${USER} ${HOME}/.kube/config
/usr/local/bin/kubectl --kubeconfig=${HOME}/.kube/config get kuryrnetworks -o yaml --all-namespaces >> ${K8S_LOG_DIR}/kuryrnetworks_crds.txt
/usr/local/bin/kubectl --kubeconfig=${HOME}/.kube/config get endpoints -o yaml --all-namespaces >> ${K8S_LOG_DIR}/endpoints.txt
/usr/local/bin/kubectl --kubeconfig=${HOME}/.kube/config get kuryrnetpolicy -o yaml --all-namespaces >> ${K8S_LOG_DIR}/kuryrnetpolicy_crds.txt
/usr/local/bin/kubectl --kubeconfig=${HOME}/.kube/config get kuryrport -o yaml --all-namespaces >> ${K8S_LOG_DIR}/kuryrport_crds.txt
# Kubernetes pods logs
mkdir -p ${K8S_LOG_DIR}/pod_logs
while read -r line