Migrationg to pykube from python-k8sclient

Config changes:
* new `insecure` flag in `kubernetes` group
* `ca-certs` -> `ca-cert`

Change-Id: I4dd91828e7276dc781baea4c6df2c01b2f1312e0
This commit is contained in:
Andrey 2016-09-04 15:58:24 +00:00 committed by Andrey Pavlov
parent 498df5bc6a
commit a069ddb9ac
9 changed files with 204 additions and 177 deletions

View File

@ -1,12 +1,12 @@
import logging
import time
from k8sclient.client import rest
from keystoneauth1 import exceptions as keystoneauth_exceptions
from keystoneauth1.identity import v3
from keystoneauth1 import session as keystone_session
from neutronclient.v2_0 import client as neutron_client
from novaclient import client as nova_client
import pykube
from fuel_ccp.common import utils
from fuel_ccp import config
@ -131,15 +131,11 @@ def _cleanup_openstack_environment(configs, auth_url=None):
LOG.info('OpenStack cleanup has been finished successfully.')
def _wait_for_namespace_delete(k8s_api):
def _wait_for_namespace_delete(namespace):
attempts = 60
while attempts > 0:
try:
k8s_api.read_namespaced_namespace(CONF.kubernetes.namespace)
except rest.ApiException as e:
if e.status == 404:
return
raise e
if not namespace.exists():
return
time.sleep(3)
attempts -= 1
raise RuntimeError(
@ -147,19 +143,17 @@ def _wait_for_namespace_delete(k8s_api):
def _cleanup_kubernetes_objects():
k8s_api = kubernetes.get_v1_api(kubernetes.get_client())
k8s_api = kubernetes.get_client()
ns = pykube.Namespace.objects(k8s_api).get_or_none(
name=CONF.kubernetes.namespace)
if ns:
LOG.info('Starting Kubernetes objects cleanup')
ns.delete()
else:
LOG.info('Kubernetes namespace not found')
return
try:
k8s_api.read_namespaced_namespace(CONF.kubernetes.namespace)
except rest.ApiException as e:
if e.status == 404:
LOG.info('Kubernetes namespace not found')
return
raise e
LOG.info('Starting Kubernetes objects cleanup')
k8s_api.delete_namespaced_namespace({}, CONF.kubernetes.namespace)
_wait_for_namespace_delete(k8s_api)
_wait_for_namespace_delete(ns)
LOG.info('Kubernetes objects cleanup has been finished successfully.')

View File

@ -1,3 +1,4 @@
import itertools
import logging
import os
import pkg_resources
@ -95,6 +96,5 @@ def get_deployed_components():
deployed_daemonsets = kubernetes.list_cluster_daemonsets()
deployed_deployments = kubernetes.list_cluster_deployments()
deployed_components = set(kubernetes.get_object_names(
deployed_daemonsets + deployed_deployments))
itertools.chain(deployed_daemonsets, deployed_deployments)))
return deployed_components

View File

@ -4,17 +4,19 @@ from oslo_config import cfg
CONF = cfg.CONF
kubernetes_opts = [
cfg.StrOpt('server',
default='127.0.0.1:8080',
help='Addres and port for kube-apiserver'),
default='http://localhost:8080',
help='The URL for the Kubernetes API server'),
cfg.StrOpt('namespace',
default='ccp',
help='The name of the namespace'),
cfg.StrOpt('ca-certs',
help='The location of the CA certificate files'),
cfg.StrOpt('ca-cert',
help='The location of the CA certificate file'),
cfg.StrOpt('key-file',
help='The location of the key file'),
cfg.StrOpt('cert-file',
help='The location of the certificate file')
help='The location of the certificate file'),
cfg.BoolOpt('insecure',
help='Disable certificate checking')
]
kubernetes_opt_group = cfg.OptGroup(name='kubernetes',
title='Kubernetes client')
@ -24,11 +26,12 @@ CONF.register_cli_opts(kubernetes_opts, kubernetes_opt_group)
DEFAULTS = {
'kubernetes': {
'server': '127.0.0.1:8080',
'server': 'http://localhost:8080',
'namespace': 'ccp',
'ca_certs': None,
'ca_cert': None,
'key_file': None,
'cert_file': None,
'insecure': None,
},
}
@ -39,9 +42,10 @@ SCHEMA = {
'properties': {
'server': {'type': 'string'},
'namespace': {'type': 'string'},
'ca_certs': {'anyOf': [{'type': 'string'}, {'type': 'null'}]},
'ca_cert': {'anyOf': [{'type': 'string'}, {'type': 'null'}]},
'key_file': {'anyOf': [{'type': 'string'}, {'type': 'null'}]},
'cert_file': {'anyOf': [{'type': 'string'}, {'type': 'null'}]},
'insecure': {'anyOf': [{'type': 'string'}, {'type': 'null'}]},
},
},
}

View File

@ -103,8 +103,7 @@ def _fill_cmd(workflow, cmd):
def _create_workflow(workflow, name):
configmap_name = "%s-%s" % (name, templates.ROLE_CONFIG)
template = templates.serialize_configmap(configmap_name, workflow)
kubernetes.handle_exists(
kubernetes.create_object_from_definition, template)
kubernetes.create_object_from_definition(template)
def _create_service(service):
@ -215,7 +214,7 @@ def _create_globals_configmap(config):
templates.GLOBAL_CONFIG: json.dumps(config, sort_keys=True)
}
cm = templates.serialize_configmap(templates.GLOBAL_CONFIG, data)
kubernetes.handle_exists(kubernetes.create_object_from_definition, cm)
kubernetes.create_object_from_definition(cm)
def _create_start_script_configmap():
@ -230,7 +229,7 @@ def _create_start_script_configmap():
templates.SCRIPT_CONFIG: start_scr_data
}
cm = templates.serialize_configmap(templates.SCRIPT_CONFIG, data)
kubernetes.handle_exists(kubernetes.create_object_from_definition, cm)
kubernetes.create_object_from_definition(cm)
def _create_files_configmap(service_dir, service_name, configs):
@ -243,8 +242,7 @@ def _create_files_configmap(service_dir, service_name, configs):
data[filename] = f.read()
data["placeholder"] = ""
template = templates.serialize_configmap(configmap_name, data)
kubernetes.handle_exists(
kubernetes.create_object_from_definition, template)
kubernetes.create_object_from_definition(template)
def _create_meta_configmap(service):
@ -255,8 +253,7 @@ def _create_meta_configmap(service):
"host-net": service.get("host-net", False)}, sort_keys=True)
}
template = templates.serialize_configmap(configmap_name, data)
kubernetes.handle_exists(
kubernetes.create_object_from_definition, template)
kubernetes.create_object_from_definition(template)
def _make_topology(nodes, roles):
@ -304,18 +301,9 @@ def _make_topology(nodes, roles):
def _create_namespace(namespace):
if CONF.action.dry_run:
return
client = kubernetes.get_client()
api = kubernetes.get_v1_api(client)
# TODO(sreshetniak): add selector??
namespaces = api.list_namespaced_namespace().items
for ns in namespaces:
if ns.metadata.name == namespace:
LOG.info("Namespace \"%s\" exists", namespace)
break
else:
LOG.info("Create namespace \"%s\"", namespace)
api.create_namespaced_namespace(
body={"metadata": {"name": namespace}})
template = templates.serialize_namespace(namespace)
kubernetes.create_object_from_definition(template)
def _create_openrc(config, namespace):

View File

@ -1,124 +1,127 @@
import logging
import os
import pykube
import yaml
from k8sclient.client import api_client
from k8sclient.client.apis import apisbatchv_api
from k8sclient.client.apis import apisextensionsvbeta_api
from k8sclient.client.apis import apiv_api
import k8sclient.client.rest
from fuel_ccp import config
CONF = config.CONF
LOG = logging.getLogger(__name__)
def get_client(kube_apiserver=None, key_file=None, cert_file=None,
ca_certs=None):
ca_cert=None, insecure=None):
kube_apiserver = kube_apiserver or CONF.kubernetes.server
key_file = key_file or CONF.kubernetes.key_file
cert_file = cert_file or CONF.kubernetes.cert_file
ca_certs = ca_certs or CONF.kubernetes.ca_certs
ca_cert = ca_cert or CONF.kubernetes.ca_cert
insecure = insecure or CONF.kubernetes.insecure
return api_client.ApiClient(host=kube_apiserver, key_file=key_file,
cert_file=cert_file, ca_certs=ca_certs)
cluster = {"server": kube_apiserver}
if ca_cert:
cluster["certificate-authority"] = ca_cert
elif insecure:
cluster['insecure-skip-tls-verify'] = insecure
user = {}
if cert_file and key_file:
user["client-certificate"] = cert_file
user["client-key"] = key_file
config = {
"clusters": [
{
"name": "ccp",
"cluster": cluster
}
],
"users": [
{
"name": "ccp",
"user": user
}
],
"contexts": [
{
"name": "ccp",
"context": {
"cluster": "ccp",
"user": "ccp"
},
}
],
"current-context": "ccp"
}
return pykube.HTTPClient(pykube.KubeConfig(config))
def export_object(object_dict):
file_name = '%s-%s.yaml' % (
object_dict['metadata']['name'], object_dict['kind'].lower())
if object_dict['kind'] == 'ConfigMap':
file_path = os.path.join(
CONF.action.export_dir, 'configmaps', file_name)
else:
file_path = os.path.join(CONF.action.export_dir, file_name)
with open(file_path, 'w') as object_file:
object_file.write(yaml.dump(
object_dict, default_flow_style=False))
def create_object_from_definition(object_dict, namespace=None, client=None):
LOG.debug("Deploying %s: \"%s\"",
object_dict["kind"], object_dict["metadata"]["name"])
if CONF.action.export_dir:
file_name = '%s-%s.yaml' % (
object_dict['metadata']['name'], object_dict['kind'].lower())
if object_dict['kind'] == 'ConfigMap':
file_path = os.path.join(
CONF.action.export_dir, 'configmaps', file_name)
else:
file_path = os.path.join(CONF.action.export_dir, file_name)
with open(file_path, 'w') as object_file:
object_file.write(yaml.dump(
object_dict, default_flow_style=False))
if not object_dict['kind'] == 'Namespace':
if CONF.action.export_dir:
export_object(object_dict)
if CONF.action.dry_run:
LOG.info(yaml.dump(object_dict, default_flow_style=False))
return
if CONF.action.dry_run:
LOG.info(yaml.dump(object_dict, default_flow_style=False))
return
object_dict['metadata']['namespace'] = (
namespace or CONF.kubernetes.namespace)
namespace = namespace or CONF.kubernetes.namespace
client = client or get_client()
if object_dict['kind'] == 'Deployment':
api = apisextensionsvbeta_api.ApisextensionsvbetaApi(client)
resp = api.create_namespaced_deployment(
body=object_dict, namespace=namespace)
elif object_dict['kind'] == 'DaemonSet':
api = apisextensionsvbeta_api.ApisextensionsvbetaApi(client)
resp = api.create_namespaced_daemon_set(
body=object_dict, namespace=namespace)
elif object_dict['kind'] == 'Service':
api = apiv_api.ApivApi(client)
resp = api.create_namespaced_service(
body=object_dict, namespace=namespace)
elif object_dict['kind'] == 'Pod':
api = apiv_api.ApivApi(client)
resp = api.create_namespaced_pod(
body=object_dict, namespace=namespace)
elif object_dict["kind"] == "Job":
api = apisbatchv_api.ApisbatchvApi(client)
resp = api.create_namespaced_job(
body=object_dict, namespace=namespace)
elif object_dict["kind"] == "ConfigMap":
api = apiv_api.ApivApi(client)
resp = api.create_namespaced_config_map(
body=object_dict, namespace=namespace)
else:
obj_class = getattr(pykube, object_dict["kind"], None)
if not obj_class:
LOG.warning('"%s" object is not supported, skipping.'
% object_dict['kind'])
return
LOG.debug('%s "%s" has been created' % (
object_dict['kind'], object_dict['metadata']['name']))
return resp
def get_v1_api(client):
return apiv_api.ApivApi(client)
client = client or get_client()
obj = obj_class(client, object_dict)
if obj.exists():
LOG.debug('%s "%s" already exists', object_dict['kind'],
object_dict['metadata']['name'])
return obj
obj.create()
LOG.debug('%s "%s" has been created', object_dict['kind'],
object_dict['metadata']['name'])
return obj
def list_k8s_nodes():
api = get_v1_api(get_client())
return api.list_namespaced_node().items
client = get_client()
return pykube.Node.objects(client).all()
def list_cluster_daemonsets():
client = get_client()
api = apisextensionsvbeta_api.ApisextensionsvbetaApi(client)
return api.list_namespaced_daemon_set(
return pykube.DaemonSet.objects(client).filter(
namespace=CONF.kubernetes.namespace,
label_selector="ccp=true").items
selector="ccp=true")
def list_cluster_deployments():
client = get_client()
api = apisextensionsvbeta_api.ApisextensionsvbetaApi(client)
return api.list_namespaced_deployment(
return pykube.Deployment.objects(client).filter(
namespace=CONF.kubernetes.namespace,
label_selector="ccp=true").items
selector="ccp=true")
def get_object_names(items):
names = []
for item in items:
names.append(item.metadata.name)
names.append(item.name)
return names
def handle_exists(fct, *args, **kwargs):
try:
fct(*args, **kwargs)
except k8sclient.client.rest.ApiException as e:
if e.status == 409:
LOG.debug("Resource exists")
else:
raise e

View File

@ -31,6 +31,16 @@ def _get_readiness_cmd(role_name):
return ["python", ENTRYPOINT_PATH, "status", role_name]
def serialize_namespace(name):
return {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": name
}
}
def serialize_configmap(name, data):
return {
"apiVersion": "v1",

View File

@ -1,7 +1,5 @@
import mock
from k8sclient.client import rest
from fuel_ccp import cleanup
from fuel_ccp.tests import base
@ -25,10 +23,10 @@ class TestCleanup(base.TestCase):
@mock.patch('time.sleep')
def test_wait_for_namespace_delete(self, m_sleep):
# namespace was deleted
k8s_api = mock.Mock()
k8s_api.read_namespaced_namespace.side_effect = [
'ns', 'ns', rest.ApiException(404)]
cleanup._wait_for_namespace_delete(k8s_api)
namespace = mock.Mock()
namespace.exists.side_effect = [
True, True, False]
cleanup._wait_for_namespace_delete(namespace)
self.assertEqual(2, m_sleep.call_count)
# namespace is still exists

View File

@ -1,64 +1,94 @@
import fixtures
import mock
import testscenarios
from fuel_ccp import kubernetes
from fuel_ccp.tests import base
class TestKubernetes(base.TestCase):
@mock.patch('k8sclient.client.api_client.ApiClient')
def test_get_client_with_conf(self, api_client):
class TestKubernetesClient(base.TestCase):
config = {
'contexts': [{
'name': 'ccp',
'context': {
'cluster': 'ccp',
'user': 'ccp'
}
}],
'clusters': [{
'cluster': {
'certificate-authority': 'ca.crt',
'server': 'http://localhost:8080'
},
'name': 'ccp'
}],
'users': [{
'name': 'ccp',
'user': {
'client-certificate': 'test.cert',
'client-key': 'test.key'
}
}],
'current-context': 'ccp'
}
@mock.patch('pykube.KubeConfig')
@mock.patch('pykube.HTTPClient')
def test_get_client_with_conf(self, m_client, m_config):
self.conf['kubernetes']._update(
key_file='test.key',
ca_certs='ca.crt',
ca_cert='ca.crt',
cert_file='test.cert',
)
kubernetes.get_client()
api_client.assert_called_once_with(
ca_certs='ca.crt', cert_file='test.cert', host='127.0.0.1:8080',
key_file='test.key')
@mock.patch('k8sclient.client.api_client.ApiClient')
def test_get_client(self, api_client):
self.conf['kubernetes']._update(
key_file='test.key',
ca_certs='ca.crt',
cert_file='test.cert',
)
m_config.assert_called_once_with(self.config)
m_client.assrt_called_once_with(m_config)
@mock.patch('pykube.KubeConfig')
@mock.patch('pykube.HTTPClient')
def test_get_client(self, m_client, m_config):
kubernetes.get_client(
kube_apiserver='1.2.3.4:8080', key_file='test.key',
cert_file='test.cert', ca_certs='ca.crt')
api_client.assert_called_once_with(
ca_certs='ca.crt', cert_file='test.cert', host='1.2.3.4:8080',
key_file='test.key')
kube_apiserver='http://localhost:8080', key_file='test.key',
cert_file='test.cert', ca_cert='ca.crt')
m_config.assert_called_once_with(self.config)
m_client.assrt_called_once_with(m_config)
@mock.patch(
'k8sclient.client.apis.apisextensionsvbeta_api.ApisextensionsvbetaApi')
def test_create_deployment(self, api_beta):
class TestKubernetesObjects(testscenarios.WithScenarios, base.TestCase):
scenarios = (
('ConfigMap', {'kind': 'ConfigMap'}),
('Deployment', {'kind': 'Deployment'}),
('DaemonSet', {'kind': 'DaemonSet'}),
('Job', {'kind': 'Job'}),
('Namespace', {'kind': 'Namespace'}),
('Service', {'kind': 'Service'})
)
def setUp(self):
super(TestKubernetesObjects, self).setUp()
self.conf.action.dry_run = False
self.conf.action.export_dir = False
api = mock.Mock()
api.create_namespaced_deployment = mock.Mock()
api_beta.return_value = api
deployment_dict = {'kind': 'Deployment', 'metadata': {'name': 'test'}}
def test_object_create(self):
obj_dict = {'kind': self.kind, 'metadata': {'name': 'test'}}
m_obj = mock.Mock(exists=mock.Mock(return_value=False))
m_class = self.useFixture(fixtures.MockPatch(
'pykube.{}'.format(self.kind), return_value=m_obj))
kubernetes.create_object_from_definition(
deployment_dict, client=mock.Mock())
api.create_namespaced_deployment.assert_called_once_with(
body=deployment_dict, namespace='ccp')
obj_dict, client=mock.Mock())
m_class.mock.assert_called_once_with(mock.ANY, obj_dict)
m_obj.create.assert_called_once_with()
@mock.patch('k8sclient.client.apis.apiv_api.ApivApi')
def test_create_service(self, api_v1):
self.conf.action.dry_run = False
self.conf.action.export_dir = False
api = mock.Mock()
api.create_namespaced_service = mock.Mock()
api_v1.return_value = api
def test_object_exists(self):
obj_dict = {'kind': self.kind, 'metadata': {'name': 'test'}}
m_obj = mock.Mock(exists=mock.Mock(return_value=True))
m_class = self.useFixture(fixtures.MockPatch(
'pykube.{}'.format(self.kind), return_value=m_obj))
service_dict = {'kind': 'Service', 'metadata': {'name': 'test'}}
kubernetes.create_object_from_definition(
service_dict, client=mock.Mock())
api.create_namespaced_service.assert_called_once_with(
body=service_dict, namespace='ccp')
obj_dict, client=mock.Mock())
m_class.mock.assert_called_once_with(mock.ANY, obj_dict)
m_obj.exists.assert_called_once_with()
m_obj.create.assert_not_called()

View File

@ -13,7 +13,7 @@ oslo.config>=3.9.0 # Apache-2.0
oslo.log>=1.14.0 # Apache-2.0
PyYAML>=3.1.0 # MIT
six>=1.9.0 # MIT
python-k8sclient
pykube
keystoneauth1>=2.7.0 # Apache-2.0
python-neutronclient>=4.2.0 # Apache-2.0
python-novaclient>=2.29.0,!=2.33.0 # Apache-2.0