nodepool/nodepool/driver/kubernetes/provider.py

470 lines
16 KiB
Python

# Copyright 2018 Red Hat
# Copyright 2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import logging
import math
import urllib3
import time
from kubernetes import client as k8s_client
from nodepool import exceptions, stats
from nodepool.driver import Provider
from nodepool.driver.kubernetes import handler
from nodepool.driver.utils import QuotaInformation, QuotaSupport
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.utils_k8s import get_client
urllib3.disable_warnings()
class KubernetesProvider(Provider, QuotaSupport):
log = logging.getLogger("nodepool.driver.kubernetes.KubernetesProvider")
def __init__(self, provider, *args):
super().__init__()
self.provider = provider
self._zk = None
self._statsd = stats.get_client()
self.ready = False
_, _, self.k8s_client, self.rbac_client = get_client(
self.log, provider.context, k8s_client.RbacAuthorizationV1Api)
self.namespace_names = set()
for pool in provider.pools.values():
self.namespace_names.add(pool.name)
def start(self, zk_conn):
self.log.debug("Starting")
self._zk = zk_conn
if self.ready or not self.k8s_client or not self.rbac_client:
return
self.ready = True
def stop(self):
self.log.debug("Stopping")
self.ready = False
def idle(self):
pass
def listNodes(self):
servers = []
class FakeServer:
def __init__(self, namespace, valid_names):
self.id = namespace.metadata.name
self.name = namespace.metadata.name
self.metadata = {}
if [True for valid_name in valid_names
if namespace.metadata.name.startswith("%s-" % valid_name)]:
node_id = namespace.metadata.name.split('-')[-1]
try:
# Make sure last component of name is an id
int(node_id)
self.metadata = namespace.metadata.labels
except Exception:
# Probably not a managed namespace, let's skip metadata
pass
def get(self, name, default=None):
return getattr(self, name, default)
if self.ready:
for namespace in self.k8s_client.list_namespace().items:
servers.append(FakeServer(namespace, self.namespace_names))
return servers
def labelReady(self, name):
# Labels are always ready
return True
def join(self):
pass
def cleanupLeakedResources(self):
'''
Delete any leaked server instances.
Remove any servers found in this provider that are not recorded in
the ZooKeeper data.
'''
for server in self.listNodes():
meta = server.get('metadata', {})
if 'nodepool_provider_name' not in meta:
continue
if meta['nodepool_provider_name'] != self.provider.name:
# Another launcher, sharing this provider but configured
# with a different name, owns this.
continue
if not self._zk.getNode(meta['nodepool_node_id']):
self.log.warning(
"Deleting leaked instance %s (%s) in %s "
"(unknown node id %s)",
server.name, server.id, self.provider.name,
meta['nodepool_node_id']
)
self.cleanupNode(server.id)
if self._statsd:
key = ('nodepool.provider.%s.leaked.nodes'
% self.provider.name)
self._statsd.incr(key)
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
if not self.ready:
return
self.log.debug("%s: removing namespace" % server_id)
delete_body = {
"apiVersion": "v1",
"kind": "DeleteOptions",
"propagationPolicy": "Background"
}
try:
self.k8s_client.delete_namespace(server_id, body=delete_body)
self.log.info("%s: namespace removed" % server_id)
except Exception:
# TODO: implement better exception handling
self.log.exception("Couldn't remove namespace %s" % server_id)
def waitForNodeCleanup(self, server_id):
for retry in range(300):
try:
self.k8s_client.read_namespace(server_id)
except Exception:
break
time.sleep(1)
def createNamespace(
self, node, pool, label, request, restricted_access=False
):
name = node.id
namespace = "%s-%s" % (pool, name)
user = "zuul-worker"
self.log.debug("%s: creating namespace" % namespace)
k8s_labels = self._getK8sLabels(label, node, pool, request)
# Create the namespace
ns_body = {
'apiVersion': 'v1',
'kind': 'Namespace',
'metadata': {
'name': namespace,
'labels': k8s_labels,
}
}
proj = self.k8s_client.create_namespace(ns_body)
node.external_id = namespace
# Create the service account
sa_body = {
'apiVersion': 'v1',
'kind': 'ServiceAccount',
'metadata': {'name': user}
}
self.k8s_client.create_namespaced_service_account(namespace, sa_body)
secret_body = {
'apiVersion': 'v1',
'kind': 'Secret',
'type': 'kubernetes.io/service-account-token',
'metadata': {
'name': user,
'annotations': {
'kubernetes.io/service-account.name': user
}
}
}
self.k8s_client.create_namespaced_secret(namespace, secret_body)
# Wait for the token to be created
for retry in range(30):
secret = self.k8s_client.read_namespaced_secret(user, namespace)
ca_crt = None
token = None
if secret.data:
token = secret.data.get('token')
ca_crt = secret.data.get('ca.crt')
if token and ca_crt:
token = base64.b64decode(
token.encode('utf-8')).decode('utf-8')
if token and ca_crt:
break
time.sleep(1)
if not token or not ca_crt:
raise exceptions.LaunchNodepoolException(
"%s: couldn't find token for secret %s" %
(namespace, secret))
# Create service account role
all_verbs = ["create", "delete", "get", "list", "patch",
"update", "watch"]
if restricted_access:
role_name = "zuul-restricted"
role_body = {
'kind': 'Role',
'apiVersion': 'rbac.authorization.k8s.io/v1',
'metadata': {
'name': role_name,
},
'rules': [{
'apiGroups': [""],
'resources': ["pods"],
'verbs': ["get", "list"],
}, {
'apiGroups': [""],
'resources': ["pods/exec"],
'verbs': all_verbs
}, {
'apiGroups': [""],
'resources': ["pods/logs"],
'verbs': all_verbs
}, {
'apiGroups': [""],
'resources': ["pods/portforward"],
'verbs': all_verbs
}]
}
else:
role_name = "zuul"
role_body = {
'kind': 'Role',
'apiVersion': 'rbac.authorization.k8s.io/v1',
'metadata': {
'name': role_name,
},
'rules': [{
'apiGroups': [""],
'resources': ["pods", "pods/exec", "pods/log",
"pods/portforward", "services",
"endpoints", "crontabs", "jobs",
"deployments", "replicasets",
"configmaps", "secrets"],
'verbs': all_verbs,
}]
}
self.rbac_client.create_namespaced_role(namespace, role_body)
# Give service account admin access
role_binding_body = {
'apiVersion': 'rbac.authorization.k8s.io/v1',
'kind': 'RoleBinding',
'metadata': {'name': 'zuul-role'},
'roleRef': {
'apiGroup': 'rbac.authorization.k8s.io',
'kind': 'Role',
'name': role_name,
},
'subjects': [{
'kind': 'ServiceAccount',
'name': user,
'namespace': namespace,
}],
'userNames': ['system:serviceaccount:%s:zuul-worker' % namespace]
}
self.rbac_client.create_namespaced_role_binding(
namespace, role_binding_body)
resource = {
'name': proj.metadata.name,
'namespace': namespace,
'host': self.k8s_client.api_client.configuration.host,
'skiptls': not self.k8s_client.api_client.configuration.verify_ssl,
'token': token,
'user': user,
}
if not resource['skiptls']:
resource['ca_crt'] = ca_crt
self.log.info("%s: namespace created" % namespace)
return resource
def createPod(self, node, pool, label, request):
if label.spec:
pod_body = self.getPodBodyCustom(node, pool, label, request)
else:
pod_body = self.getPodBodyNodepool(node, pool, label, request)
resource = self.createNamespace(node, pool, label, request,
restricted_access=True)
namespace = resource['namespace']
self.k8s_client.create_namespaced_pod(namespace, pod_body)
for retry in range(300):
pod = self.k8s_client.read_namespaced_pod(label.name, namespace)
if pod.status.phase == "Running":
break
self.log.debug("%s: pod status is %s", namespace, pod.status.phase)
time.sleep(1)
if retry == 299:
raise exceptions.LaunchNodepoolException(
"%s: pod failed to initialize (%s)" % (
namespace, pod.status.phase))
resource["pod"] = label.name
node.host_id = pod.spec.node_name
return resource
def getPodBodyCustom(self, node, pool, label, request):
k8s_labels = self._getK8sLabels(label, node, pool, request)
k8s_annotations = {}
if label.annotations:
k8s_annotations.update(label.annotations)
pod_body = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': label.name,
'labels': k8s_labels,
'annotations': k8s_annotations,
},
'spec': label.spec,
'restartPolicy': 'Never',
}
return pod_body
def getPodBodyNodepool(self, node, pool, label, request):
container_body = {
'name': label.name,
'image': label.image,
'imagePullPolicy': label.image_pull,
'command': ["/bin/sh", "-c"],
'args': ["while true; do sleep 30; done;"],
'env': label.env,
}
requests = {}
limits = {}
if label.cpu:
requests['cpu'] = int(label.cpu)
if label.memory:
requests['memory'] = '%dMi' % int(label.memory)
if label.storage:
requests['ephemeral-storage'] = '%dM' % int(label.storage)
if label.cpu_limit:
limits['cpu'] = int(label.cpu_limit)
if label.memory_limit:
limits['memory'] = '%dMi' % int(label.memory_limit)
if label.storage_limit:
limits['ephemeral-storage'] = '%dM' % int(label.storage_limit)
if label.gpu_resource and label.gpu:
requests[label.gpu_resource] = '%.2f' % label.gpu
limits[label.gpu_resource] = '%.2f' % label.gpu
resources = {}
if requests:
resources['requests'] = requests
if limits:
resources['limits'] = limits
if resources:
container_body['resources'] = resources
if label.volume_mounts:
container_body['volumeMounts'] = label.volume_mounts
if label.privileged is not None:
container_body['securityContext'] = {
'privileged': label.privileged,
}
spec_body = {
'containers': [container_body]
}
if label.node_selector:
spec_body['nodeSelector'] = label.node_selector
if label.scheduler_name:
spec_body['schedulerName'] = label.scheduler_name
if label.volumes:
spec_body['volumes'] = label.volumes
k8s_labels = self._getK8sLabels(label, node, pool, request)
k8s_annotations = {}
if label.annotations:
k8s_annotations.update(label.annotations)
pod_body = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': label.name,
'labels': k8s_labels,
'annotations': k8s_annotations,
},
'spec': spec_body,
'restartPolicy': 'Never',
}
return pod_body
def getRequestHandler(self, poolworker, request):
return handler.KubernetesNodeRequestHandler(poolworker, request)
def getProviderLimits(self):
# TODO: query the api to get real limits
return QuotaInformation(
cores=math.inf,
instances=math.inf,
ram=math.inf,
default=math.inf)
def quotaNeededByLabel(self, ntype, pool):
provider_label = pool.labels[ntype]
resources = {}
if provider_label.cpu:
resources["cores"] = provider_label.cpu
if provider_label.memory:
resources["ram"] = provider_label.memory
if provider_label.storage:
resources["ephemeral-storage"] = provider_label.storage
if provider_label.gpu and provider_label.gpu_resource:
resources[provider_label.gpu_resource] = provider_label.gpu
resources.update(provider_label.extra_resources)
return QuotaInformation(instances=1, **resources)
def unmanagedQuotaUsed(self):
# TODO: return real quota information about quota
return QuotaInformation()
def _getK8sLabels(self, label, node, pool, request):
k8s_labels = {}
if label.labels:
k8s_labels.update(label.labels)
for k, v in label.dynamic_labels.items():
try:
k8s_labels[k] = v.format(request=request.getSafeAttributes())
except Exception:
self.log.exception("Error formatting tag %s", k)
k8s_labels.update({
'nodepool_node_id': node.id,
'nodepool_provider_name': self.provider.name,
'nodepool_pool_name': pool,
'nodepool_node_label': label.name,
})
return k8s_labels