fuel-ccp/fuel_ccp/action.py

446 lines
15 KiB
Python

import copy
import json
import logging
import os
import uuid
from pykube import exceptions as pykube_exc
import yaml
from fuel_ccp.common import jinja_utils
from fuel_ccp.common import utils
from fuel_ccp import config
from fuel_ccp.config import images as config_images
from fuel_ccp import exceptions
from fuel_ccp import kubernetes
from fuel_ccp import templates
CONF = config.CONF
LOG = logging.getLogger(__name__)
RESTART_POLICY_ALWAYS = "always"
RESTART_POLICY_NEVER = "never"
class Action(object):
def __init__(self, **kwargs):
self.name = kwargs.pop("name")
self.component = kwargs.pop("component")
self.component_dir = kwargs.pop("component_dir")
self.image = kwargs.pop("image")
self.command = kwargs.pop("command")
self.dependencies = kwargs.pop("dependencies", ())
self.files = kwargs.pop("files", ())
self.parameters = kwargs.pop("parameters", ())
self.restart_policy = kwargs.pop("restart_policy",
RESTART_POLICY_NEVER)
if kwargs:
key_names = ", ".join(kwargs.keys())
raise ValueError("Invalid keys '%s' for '%s' action" % (
key_names, self.name))
@property
def k8s_name(self):
if not hasattr(self, "_k8s_name"):
self._k8s_name = "%s-%s" % (self.name, str(uuid.uuid4())[:8])
return self._k8s_name
def validate(self):
pass
def run(self, user_parameters=None):
if user_parameters:
self.user_parameters = user_parameters
else:
self.user_parameters = ()
self._process_dependencies()
self._create_configmap()
self._create_action()
return self.k8s_name
def _process_dependencies(self):
services_map = utils.get_deploy_components_info()
deps_map = utils.get_dependencies_map(services_map)
new_deps = []
for dep in self.dependencies:
new_deps.extend(utils.extend_dependency(dep, deps_map, {}, {}))
self.dependencies = new_deps
# configmap methods
def _create_configmap(self):
CONF.configs._update(action_parameters=self._get_custom_parameters())
data = {
"config": CONF.configs._json(sort_keys=True),
"secret-config": CONF.secret_configs._json(sort_keys=True),
"nodes-config": utils.get_nodes_config(CONF.nodes),
"workflow": self._get_workflow()
}
data.update(self._get_file_templates())
cm = templates.serialize_configmap(self.k8s_name, data)
kubernetes.process_object(cm)
def _get_workflow(self):
wf = {
"name": self.name,
"dependencies": self.dependencies,
"job": {
"command": self.command
},
"files": []
}
for f in self.files:
wf["files"].append({
"name": f["content"],
"path": f["path"],
"perm": f.get("perm"),
"user": f.get("user")
})
return json.dumps({"workflow": wf})
def _get_custom_parameters(self):
parameters = {}
# add defaults
for param in self.parameters:
parameters[param["key"]] = param["default_value"]
# process user params
for param in self.user_parameters:
key, sep, value = param.partition("=")
if key in parameters:
parameters[key] = value
else:
raise Exception("Parameter %s is not allowed for %s action",
key, self.name)
return parameters
def _get_file_templates(self):
exports_map = utils.get_repositories_exports()
exports_template = jinja_utils.generate_jinja_imports(exports_map)
data = {}
for f in self.files:
template_path = os.path.join(self.component_dir,
"service", "files",
f["content"])
with open(template_path) as filedata:
data[f["content"]] = exports_template + "\n" + filedata.read()
for ex_item in exports_map.values():
data[ex_item["name"]] = ex_item["body"]
return data
# job methods
def _create_action(self):
cont_spec = {
"name": self.k8s_name,
"image": config_images.image_spec(self.image),
"imagePullPolicy": CONF.kubernetes.image_pull_policy,
"command": templates.get_start_cmd(self.name),
"volumeMounts": [
{
"name": "config-volume",
"mountPath": "/etc/ccp"
},
{
"name": "start-script",
"mountPath": "/opt/ccp_start_script/bin"
}
],
"env": templates.serialize_env_variables({}),
"restartPolicy": "Never"
}
config_volume_items = [
{
"key": "config",
"path": "globals/globals.json"
},
{
"key": "secret-config",
"path": "global-secrets/global-secrets.json"
},
{
"key": "nodes-config",
"path": "nodes-config/nodes-config.json"
},
{
"key": "workflow",
"path": "role/%s.json" % self.name
}
]
for f in self.files:
config_volume_items.append({
"key": f["content"],
"path": "files/%s" % f["content"]
})
for ex_item in utils.get_repositories_exports().values():
config_volume_items.append({
"key": ex_item["name"],
"path": "exports/%s" % ex_item["name"]
})
pod_spec = {
"metadata": {
"name": self.k8s_name,
"labels": {
"app": self.name,
"ccp": "true",
"ccp-action": "true",
"ccp-component": self.component
}
},
"spec": {
"containers": [cont_spec],
"restartPolicy": "Never",
"volumes": [
{
"name": "config-volume",
"configMap": {
"name": self.k8s_name,
"items": config_volume_items
}
},
{
"name": "start-script",
"configMap": {
"name": templates.SCRIPT_CONFIG,
"items": [
{
"key": templates.SCRIPT_CONFIG,
"path": "start_script.py"
}
]
}
}
]
}
}
if self.restart_policy == RESTART_POLICY_NEVER:
self._create_pod(pod_spec)
elif self.restart_policy == RESTART_POLICY_ALWAYS:
self._create_job(pod_spec)
else:
raise ValueError("Restart policy %s is not supported" % (
self.restart_policy))
def _create_pod(self, pod_spec):
spec = copy.deepcopy(pod_spec)
spec["metadata"].setdefault("labels", {})
spec.update({
"kind": "Pod",
"apiVersion": "v1"})
kubernetes.process_object(spec)
def _create_job(self, pod_spec):
job_spec = templates.serialize_job(
name=self.k8s_name,
spec=pod_spec,
component_name=self.component,
app_name=self.name)
job_spec["metadata"]["labels"].update({"ccp-action": "true"})
if kubernetes.process_object(job_spec):
LOG.info('%s: action "%s" has been successfully run',
self.component, self.k8s_name)
class ActionStatus(object):
@classmethod
def get_actions(cls, action_name=None):
selector = "ccp-action=true"
if action_name:
selector += "," + "app=%s" % action_name
actions = []
job_names = []
for job in kubernetes.list_cluster_jobs(selector=selector):
actions.append(cls(job))
job_names.append(job.name)
for pod in kubernetes.list_cluster_pods(selector=selector):
job_name = pod.labels.get("job-name")
if job_name and job_name not in job_names:
actions.append(cls(pod, job_name))
elif not job_name:
actions.append(cls(pod))
return actions
def __init__(self, k8s_spec, overridden_name=None):
self._spec = k8s_spec
self.name = k8s_spec.name
if overridden_name:
self.name = overridden_name
self.component = k8s_spec.labels["ccp-component"]
self.date = k8s_spec.obj["metadata"]["creationTimestamp"]
self.terminating = k8s_spec.obj["metadata"].get("deletionTimestamp",
False)
if k8s_spec.kind == "Job":
self.restarts = k8s_spec.obj["status"].get("failed", 0)
self.active = k8s_spec.obj["status"].get("active", 0)
self.failed = False
else:
phase = k8s_spec.obj["status"]["phase"]
self.restarts = 0
self.active = 1 if phase not in {"Failed",
"Completed",
"Succeeded"} else 0
self.failed = phase == "Failed"
@property
def status(self):
if self.terminating:
return "terminating"
if self.failed:
return "fail"
if self.active:
return "wip"
return "ok"
def log(self):
if self._spec.kind == "Pod":
return self._spec.logs()
else:
pod_selector = "job-name=%s" % self._spec.name
pods = kubernetes.list_cluster_pods(raw_selector=pod_selector)
for pod in pods:
if pod.obj['status']['phase'] == "Failed":
continue
return pod.logs()\
@classmethod
def delete(cls, action_name):
delete_configmap_status = False
delete_action_status = cls.delete_action(action_name)
if delete_action_status:
delete_configmap_status = cls.delete_configmap(action_name)
return {'action_status': delete_action_status,
'configmap_status': delete_configmap_status}
@staticmethod
def delete_action(action_name):
try:
action = kubernetes.list_cluster_jobs(name=action_name)
except pykube_exc.ObjectDoesNotExist:
try:
action = kubernetes.list_cluster_pods(name=action_name)
except pykube_exc.ObjectDoesNotExist:
LOG.error('Action with name %s not found', action_name)
return False
try:
action.delete()
except pykube_exc.HTTPError as ex:
LOG.error(ex.message)
return False
LOG.info('Action %s is terminating', action_name)
return True
@staticmethod
def delete_configmap(action_name):
try:
configmap = kubernetes.get_configmap(action_name)
configmap.delete()
except pykube_exc.ObjectDoesNotExist:
pass
except pykube_exc.HTTPError as ex:
LOG.error(ex.message)
return False
return True
def list_actions():
"""List of available actions.
:returns: list -- list of all available actions
"""
actions = []
for repo in utils.get_repositories_paths():
component_name = utils.get_component_name_from_repo_path(repo)
action_path = os.path.join(repo, "service", "actions")
if not os.path.isdir(action_path):
continue
for filename in os.listdir(action_path):
if filename.endswith(".yaml"):
action_file = os.path.join(action_path, filename)
conf = utils.get_rendering_config()
data = jinja_utils.jinja_render(action_file, conf._dict)
for action_dict in yaml.load(data).get("actions", ()):
actions.append(Action(component=component_name,
component_dir=repo,
**action_dict))
return actions
def get_action(action_name):
"""Get action by name.
:returns: Action -- action object
:raises: fuel_ccp.exceptions.NotFoundException
"""
for action in list_actions():
if action_name == action.name:
return action
raise exceptions.NotFoundException("Action with name '%s' not found" % (
action_name))
def run_action(action_name, user_parameters=None):
"""Run action.
:returns: str -- action name
:raises: fuel_ccp.exceptions.NotFoundException
"""
action = get_action(action_name)
action.validate()
return action.run(user_parameters)
def list_action_status(action_type=None):
return ActionStatus.get_actions(action_type)
def get_action_status_by_name(action_name):
for action in list_action_status():
if action.name == action_name:
return action
raise exceptions.NotFoundException("Action with name \"%s\" not found" % (
action_name))
def get_action_statuses_by_names(action_names):
action_names = set(action_names)
actions = []
for action in list_action_status():
if action.name in action_names:
action_names.remove(action.name)
actions.append(action)
if action_names:
raise exceptions.NotFoundException(
"Action(s) with name(s) %s not found" % (
", ".join(action_names)))
return actions
def delete_action(action_names):
"""Delete action.
:raises: fuel_ccp.exceptions.NotFoundException
"""
not_removed = []
configmap_not_removed = []
for action_name in action_names:
action_status = ActionStatus.delete(action_name)
if not action_status.get('action_status'):
not_removed.append(action_name)
if not action_status.get('configmap_status'):
configmap_not_removed.append(action_name)
if not_removed:
raise exceptions.NotFoundException(
'The following actions were not removed: %s'
% ','.join(not_removed)
)
if configmap_not_removed:
raise exceptions.NotFoundException(
'Configmaps for the following actions were not removed: %s'
% ','.join(configmap_not_removed)
)