# Copyright 2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import copy import os import yaml from devops.helpers import helpers from fuel_ccp_tests.helpers import _subprocess_runner from fuel_ccp_tests.helpers import exceptions from fuel_ccp_tests.helpers import post_install_k8s_checks from fuel_ccp_tests import logger from fuel_ccp_tests.managers.k8s import cluster from fuel_ccp_tests import settings LOG = logger.logger class K8SManager(object): """docstring for K8SManager""" __config = None __underlay = None def __init__(self, config, underlay): self.__config = config self.__underlay = underlay self._api_client = None super(K8SManager, self).__init__() def mark_lvm_nodes(self, lvm_config): if lvm_config: lvm_mark = {"lvm": "on"} # Get nodes ips lvm_nodes_ips = [self.__underlay.host_by_node_name(node_name) for node_name in lvm_config] # Get only those K8sNodes, which has ips from lvm_nodes_ips lvm_nodes = [ node for node in self.api.nodes.list() if any( ip.address in lvm_nodes_ips for ip in node.status.addresses)] for node in lvm_nodes: node.add_labels(lvm_mark) def upload_lvm_plugin(self, node_name): LOG.info("Uploading LVM plugin to node '{}'".format(node_name)) if self.__underlay: with self.__underlay.remote(node_name=node_name) as remote: remote.upload(settings.LVM_PLUGIN_PATH, '/tmp/') with remote.get_sudo(remote): remote.check_call( 'mkdir -p {}'.format(settings.LVM_PLUGIN_DIR), verbose=True ) remote.check_call( "mv /tmp/{} {}".format(settings.LVM_FILENAME, settings.LVM_PLUGIN_DIR), verbose=True ) remote.check_call( "chmod +x {}/{}".format(settings.LVM_PLUGIN_DIR, settings.LVM_FILENAME), verbose=True ) def install_k8s(self, custom_yaml=None, env_var=None, k8s_admin_ip=None, k8s_slave_ips=None, expected_ec=None, verbose=True, lvm_config=None): """Action to deploy k8s by fuel-ccp-installer script Additional steps: Add vagrant user to docker group :param env: EnvManager :param kube_settings: Dict :param custom_yaml: False if deploy with kargo default, None if deploy with environment settings, or put you own :rtype: None """ LOG.info("Trying to install k8s") current_env = copy.deepcopy(os.environ) k8s_nodes = self.__underlay.node_names() if k8s_admin_ip is None: k8s_admin_ip = self.__underlay.host_by_node_name(k8s_nodes[0]) if k8s_slave_ips is None: k8s_slave_ips = [self.__underlay.host_by_node_name(k8s_node) for k8s_node in k8s_nodes] if lvm_config: LOG.info("uploading LVM plugin for k8s") for node_name in lvm_config: self.upload_lvm_plugin(node_name) environment_variables = { "SLAVE_IPS": " ".join(k8s_slave_ips), "ADMIN_IP": k8s_admin_ip, "KARGO_REPO": settings.KARGO_REPO, "KARGO_COMMIT": settings.KARGO_COMMIT } if custom_yaml: self.set_dns(custom_yaml) environment_variables.update( {"CUSTOM_YAML": yaml.safe_dump( custom_yaml, default_flow_style=False)} ) if env_var: environment_variables.update(env_var) # Return to original dict after moving to fuel-devops3.0.2 # current_env.update(dict=environment_variables) current_env = environment_variables # TODO(ddmitriev): replace with check_call(...,env=current_env) # when migrate to fuel-devops-3.0.2 environ_str = ';'.join([ "export {0}='{1}'".format(key, value) for key, value in current_env.items()]) cmd = environ_str + ' ; ' + settings.DEPLOY_SCRIPT LOG.info("Run k8s deployment") # Use Subprocess.execute instead of Subprocess.check_call until # check_call is not fixed (fuel-devops3.0.2) result = _subprocess_runner.Subprocess.execute( cmd, verbose=verbose, timeout=settings.KARGO_TIMEOUT) if expected_ec is None: expected_ec = [0] if result.exit_code not in expected_ec: raise exceptions.UnexpectedExitCode( cmd, result.exit_code, expected_ec, stdout=result.stdout_brief, stderr=result.stdout_brief) for node_name in k8s_nodes: with self.__underlay.remote(node_name=node_name) as remote: LOG.info("Add vagrant to docker group") remote.check_call('sudo usermod -aG docker vagrant') self.__config.k8s.kube_host = k8s_admin_ip self.mark_lvm_nodes(lvm_config) hkube_image_name = '{}:{}'.format( settings.HYPERKUBE_IMAGE_REPO, settings.HYPERKUBE_IMAGE_TAG ) post_install_k8s_checks.inspect_docker_containers( image_name=hkube_image_name, underlay=self.__underlay, host_ip=k8s_admin_ip) return result @property def api(self): if self._api_client is None: self._api_client = cluster.K8sCluster( user=self.__config.k8s.kube_admin_user, password=self.__config.k8s.kube_admin_pass, host=self.__config.k8s.kube_host, default_namespace='default') return self._api_client def create_registry(self): """Create Pod and SErvice for K8S registry""" registry_pod = os.getcwd() + '/fuel_ccp_tests/templates/' \ 'registry_templates/registry-pod.yaml' service_registry = os.getcwd() + '/fuel_ccp_tests/templates/' \ 'registry_templates/' \ 'service-registry.yaml' with open(registry_pod) as f: registry = yaml.load(f.read()) with open(service_registry) as f: service = yaml.load(f.read()) registry_pod = self.api.pods.create(body=registry, namespace='default') self.api.services.create(body=service, namespace='default') registry_pod.wait_running() def get_pod_phase(self, pod_name, namespace=None): return self.api.pods.get( name=pod_name, namespace=namespace).phase def wait_pod_phase(self, pod_name, phase, namespace=None, timeout=60): """Wait phase of pod_name from namespace while timeout :param str: pod_name :param str: namespace :param list or str: phase :param int: timeout :rtype: None """ if isinstance(phase, str): phase = [phase] def check(): return self.get_pod_phase(pod_name, namespace) in phase helpers.wait(check, timeout=timeout, timeout_msg='Timeout waiting, pod {pod_name} is not in ' '"{phase}" phase'.format( pod_name=pod_name, phase=phase)) def check_pod_create(self, body, namespace=None, timeout=300, interval=5): """Check creating sample pod :param k8s_pod: V1Pod :param namespace: str :rtype: V1Pod """ LOG.info("Creating pod in k8s cluster") LOG.debug( "POD spec to create:\n{}".format( yaml.dump(body, default_flow_style=False)) ) LOG.debug("Timeout for creation is set to {}".format(timeout)) LOG.debug("Checking interval is set to {}".format(interval)) pod = self.api.pods.create(body=body, namespace=namespace) pod.wait_running(timeout=300, interval=5) LOG.info("Pod '{}' is created".format(pod.metadata.name)) return self.api.pods.get(name=pod.metadata.name, namespace=namespace) def wait_pod_deleted(self, podname, timeout=60, interval=5): helpers.wait( lambda: podname not in [pod.name for pod in self.api.pods.list()], timeout=timeout, interval=interval, timeout_msg="Pod deletion timeout reached!" ) def check_pod_delete(self, k8s_pod, timeout=300, interval=5): """Deleting pod from k8s :param k8s_pod: fuel_ccp_tests.managers.k8s.nodes.K8sNode :param k8sclient: fuel_ccp_tests.managers.k8s.cluster.K8sCluster """ LOG.info("Deleting pod '{}'".format(k8s_pod.name)) LOG.debug("Pod status:\n{}".format(k8s_pod.status)) LOG.debug("Timeout for deletion is set to {}".format(timeout)) LOG.debug("Checking interval is set to {}".format(interval)) self.api.pods.delete(body=k8s_pod, name=k8s_pod.name) self.wait_pod_deleted(k8s_pod.name, timeout, interval) LOG.debug("Pod '{}' is deleted".format(k8s_pod.name)) def check_service_create(self, body, namespace=None): """Check creating k8s service :param body: dict, service spec :param namespace: str :rtype: K8sService object """ LOG.info("Creating service in k8s cluster") LOG.debug( "Service spec to create:\n{}".format( yaml.dump(body, default_flow_style=False)) ) service = self.api.services.create(body=body, namespace=namespace) LOG.info("Service '{}' is created".format(service.metadata.name)) return self.api.services.get(name=service.metadata.name) def check_ds_create(self, body, namespace=None): """Check creating k8s DaemonSet :param body: dict, DaemonSet spec :param namespace: str :rtype: K8sDaemonSet object """ LOG.info("Creating DaemonSet in k8s cluster") LOG.debug( "DaemonSet spec to create:\n{}".format( yaml.dump(body, default_flow_style=False)) ) ds = self.api.daemonsets.create(body=body, namespace=namespace) LOG.info("DaemonSet '{}' is created".format(ds.metadata.name)) return self.api.daemonsets.get(name=ds.metadata.name) def check_ds_ready(self, dsname, namespace=None): """Check if k8s DaemonSet is ready :param dsname: str, ds name :return: bool """ ds = self.api.daemonsets.get(name=dsname, namespace=namespace) return (ds.status.current_number_scheduled == ds.status.desired_number_scheduled) def wait_ds_ready(self, dsname, namespace=None, timeout=60, interval=5): """Wait until all pods are scheduled on nodes :param dsname: str, ds name :param timeout: int :param interval: int """ helpers.wait( lambda: self.check_ds_ready(dsname, namespace=namespace), timeout=timeout, interval=interval) def create_objects(self, path): if isinstance(path, str): path = [path] params = ' '.join(["-f {}".format(p) for p in path]) cmd = 'kubectl create {params}'.format(params=params) with self.__underlay.remote( host=self.__config.k8s.kube_host) as remote: LOG.info("Running command '{cmd}' on node {node}".format( cmd=cmd, node=remote.hostname) ) result = remote.check_call(cmd) LOG.info(result['stdout']) def set_dns(self, k8s_settings): if 'nameservers' not in k8s_settings and \ self.__config.underlay.nameservers: k8s_settings['nameservers'] = self.__config.underlay.nameservers LOG.info('Added custom DNS servers to the settings: ' '{0}'.format(k8s_settings['nameservers'])) if 'upstream_dns_servers' not in k8s_settings and \ self.__config.underlay.upstream_dns_servers: k8s_settings['upstream_dns_servers'] = \ self.__config.underlay.upstream_dns_servers LOG.info('Added custom upstream DNS servers (dnsmasq) to the ' 'settings: {0}'.format(k8s_settings['nameservers'])) def get_running_pods(self, pod_name, namespace=None): pods = [pod for pod in self.api.pods.list(namespace=namespace) if pod_name in pod.name and pod.status.phase == 'Running'] return pods def get_pods_number(self, pod_name, namespace=None): pods = self.get_running_pods(pod_name, namespace) return len(pods)