Allow to specify affinity for jobs

This is needed for backup jobs to be run on specific nodes that have
backup volumes mounted.
Jobs can provide a 'topology_key' value that will be looked up in
topology to find which nodes this job can run on. So for backup one
would need to do smth like this:

nodes:
  node1:
    - backup
roles:
  backup:
    - backup

And add "topology_key: backup" to backup jobs.

Change-Id: I3b51b7a957735873b0de098578e1b83c586f111a
This commit is contained in:
Yuriy Taraday 2016-11-17 00:58:41 +03:00
parent 592b873327
commit 0180721e78
2 changed files with 27 additions and 18 deletions

View File

@ -89,8 +89,8 @@ def parse_role(component, topology, configmaps):
daemon_cmd = cont["daemon"]
daemon_cmd["name"] = cont["name"]
yield _create_pre_jobs(service, cont, component_name)
yield _create_post_jobs(service, cont, component_name)
yield _create_pre_jobs(service, cont, component_name, topology)
yield _create_post_jobs(service, cont, component_name, topology)
cont['cm_version'] = cm_version
cont_spec = templates.serialize_daemon_pod_spec(service)
@ -224,21 +224,27 @@ def _is_single_job(job):
return job.get("type", "local") == "single"
def _create_pre_jobs(service, container, component_name):
def _create_pre_jobs(service, container, component_name, topology):
for job in container.get("pre", ()):
if _is_single_job(job):
yield _get_job(service, container, job, component_name)
yield _get_job(service, container, job, component_name, topology)
def _create_post_jobs(service, container, component_name):
def _create_post_jobs(service, container, component_name, topology):
for job in container.get("post", ()):
if _is_single_job(job):
yield _get_job(service, container, job, component_name)
yield _get_job(service, container, job, component_name, topology)
def _get_job(service, container, job, component_name):
def _get_job(service, container, job, component_name, topology):
if 'topology_key' in job:
affinity = templates.serialize_affinity(
{"name": job['topology_key']}, topology)
else:
affinity = {}
cont_spec = templates.serialize_job_container_spec(container, job)
pod_spec = templates.serialize_job_pod_spec(service, job, cont_spec)
pod_spec = templates.serialize_job_pod_spec(service, job, cont_spec,
affinity)
job_spec = templates.serialize_job(job["name"], pod_spec, component_name,
service["name"])
return job_spec
@ -428,7 +434,7 @@ def check_images_change(objects):
return False
def create_upgrade_jobs(component_name, upgrade_data, configmaps):
def create_upgrade_jobs(component_name, upgrade_data, configmaps, topology):
from_version = upgrade_data['_meta']['from']
to_version = upgrade_data['_meta']['to']
component = upgrade_data['_meta']['component']
@ -463,10 +469,9 @@ def create_upgrade_jobs(component_name, upgrade_data, configmaps):
step_type = step.get('type', 'single')
job_name = "{}-{}".format(prefix, step['name'])
job = {"name": job_name, "type": "single"}
if step.get('files'):
job['files'] = step['files']
if step.get('volumes'):
job['volumes'] = step['volumes']
for key in ['files', 'volumes', 'topology_key']:
if step.get(key):
job[key] = step[key]
jobs.append(job)
workflow = {
'name': job_name,
@ -500,7 +505,8 @@ def create_upgrade_jobs(component_name, upgrade_data, configmaps):
_create_workflow(workflows, prefix)
for job_spec in _create_pre_jobs(service, container, component_name):
job_specs = _create_pre_jobs(service, container, component_name, topology)
for job_spec in job_specs:
kubernetes.process_object(job_spec)
LOG.info("Upgrade of component %s successfuly scheduled", component_name)
@ -515,7 +521,8 @@ def version_diff(from_image, to_image):
def deploy_components(components_map, components):
topology = _make_topology(CONF.nodes, CONF.roles, CONF.replicas._dict)
components = components or set(topology.keys())
if not components:
components = set(topology.keys()) & set(components_map.keys())
deploy_validation.validate_requested_components(components, components_map)
@ -562,7 +569,8 @@ def deploy_components(components_map, components):
upgrading_components[component_name][service_name] = objects
for component_name, component_upg in upgrading_components.items():
create_upgrade_jobs(component_name, component_upg, configmaps)
create_upgrade_jobs(component_name, component_upg, configmaps,
topology)
if 'keystone' in components:
_create_openrc(CONF.configs)

View File

@ -159,10 +159,11 @@ def serialize_job_container_spec(container, job):
}
def serialize_job_pod_spec(service, job, cont_spec):
def serialize_job_pod_spec(service, job, cont_spec, affinity):
return {
"metadata": {
"name": job["name"]
"name": job["name"],
"annotations": affinity,
},
"spec": {
"containers": [cont_spec],