Add support of dynamic topology

This patch removes support of hardcoded node selectors.

This patch allows to specify dynamic node selection policy via
configs.

If service is not specified in topology config, service deployment skipped.

Selectors works via affinity feature in kubernetes.

Change-Id: I1f8defd90169cd08fd2ac191c3b186efae5da010
This commit is contained in:
Sergey Reshetnyak 2016-07-06 18:23:41 +03:00
parent eaa74c67ef
commit e04cc55b1b
7 changed files with 177 additions and 14 deletions

44
etc/topology-example.yaml Normal file
View File

@ -0,0 +1,44 @@
nodes:
node1:
roles:
- controller
- lma
- lma-controller
- openvswitch
node[2-3]:
roles:
- compute
- lma
- openvswitch
roles:
controller:
- etcd
- glance-api
- glance-registry
- horizon
- keystone
- mariadb
- memcached
- neutron-dhcp-agent
- neutron-l3-agent
- neutron-metadata-agent
- neutron-server
- nova-api
- nova-conductor
- nova-consoleauth
- nova-novncproxy
- nova-scheduler
- rabbitmq
compute:
- nova-compute
- nova-libvirt
openvswitch:
- neutron-openvswitch-agent
- openvswitch-db
- openvswitch-vswitchd
lma-controller:
- elasticsearch
- influxdb
- kibana
lma:
- heka

View File

@ -243,7 +243,7 @@ def _get_config():
if CONF.registry.address:
cfg['namespace'] = '%s/%s' % (CONF.registry.address, cfg['namespace'])
cfg.update(utils.get_global_parameters('versions'))
cfg.update(utils.get_global_parameters('versions')["versions"])
return cfg

View File

@ -23,7 +23,7 @@ def get_resource_path(path):
return pkg_resources.resource_filename(fuel_ccp.version_info.package, path)
def get_global_parameters(config_group):
def get_global_parameters(*config_groups):
cfg = {}
components = list(CONF.repositories.names)
paths = []
@ -44,7 +44,10 @@ def get_global_parameters(config_group):
if os.path.isfile(path):
LOG.debug("Adding parameters from \"%s\"", path)
with open(path, "r") as f:
cfg.update(yaml.load(f).get(config_group, {}))
data = yaml.load(f)
for group in config_groups:
cfg.setdefault(group, {})
cfg[group].update(data.get(group, {}))
else:
LOG.warning("\"%s\" not found, skipping", path)

View File

@ -35,6 +35,10 @@ def _expand_files(service, files):
def parse_role(service_dir, role, config):
service = role["service"]
if service["name"] not in config.get("topology", {}):
LOG.info("Service %s not in topology config, skipping deploy",
service["name"])
return
LOG.info("Using service %s", service["name"])
_expand_files(service, role.get("files"))
@ -52,14 +56,17 @@ def parse_role(service_dir, role, config):
_create_post_jobs(service, cont)
cont_spec = templates.serialize_daemon_pod_spec(service)
affinity = templates.serialize_affinity(service, config["topology"])
if service.get("daemonset", False):
obj = templates.serialize_daemonset(service["name"], cont_spec)
obj = templates.serialize_daemonset(service["name"], cont_spec,
affinity)
else:
obj = templates.serialize_deployment(service["name"], cont_spec)
obj = templates.serialize_deployment(service["name"], cont_spec,
affinity)
kubernetes.create_object_from_definition(obj)
_create_service(service, config)
_create_service(service, config["configs"])
def _parse_workflows(service):
@ -264,6 +271,44 @@ def deploy_component(component, config):
parse_role(service_dir, role_obj, config)
def _make_topology(nodes, roles):
failed = False
# TODO(sreshetniak): move it to validation
if not nodes:
LOG.error("Nodes section is not specified in configs")
failed = True
if not roles:
LOG.error("Roles section is not specified in configs")
failed = True
if failed:
raise RuntimeError("Failed to create topology for services")
# TODO(sreshetniak): add validation
k8s_nodes = kubernetes.list_k8s_nodes()
def find_match(glob):
matcher = re.compile(glob)
nodes = []
for node in k8s_nodes:
match = matcher.match(node)
if match:
nodes.append(match.group(0))
return nodes
roles_to_node = {}
for node in nodes.keys():
matched_nodes = find_match(node)
for role in nodes[node]["roles"]:
roles_to_node.setdefault(role, [])
roles_to_node[role].extend(matched_nodes)
service_to_node = {}
for role in roles.keys():
for svc in roles[role]:
service_to_node.setdefault(svc, [])
service_to_node[svc].extend(roles_to_node[role])
return service_to_node
def _create_namespace(namespace):
if CONF.action.dry_run:
return
@ -284,12 +329,15 @@ def _create_namespace(namespace):
def deploy_components(components=None):
if components is None:
components = CONF.repositories.names
namespace = CONF.kubernetes.namespace
config = utils.get_global_parameters("configs", "nodes", "roles")
config["topology"] = _make_topology(config.get("nodes"),
config.get("roles"))
namespace = CONF.kubernetes.namespace
_create_namespace(namespace)
config = utils.get_global_parameters('configs')
_create_globals_configmap(config)
_create_globals_configmap(config["configs"])
_create_start_script_configmap()
for component in components:

View File

@ -73,6 +73,15 @@ def get_v1_api(client):
return apiv_api.ApivApi(client)
def list_k8s_nodes():
api = get_v1_api(get_client())
resp = api.list_namespaced_node()
nodes = []
for node in resp.items:
nodes.append(node.metadata.name)
return nodes
def handle_exists(fct, *args, **kwargs):
try:
fct(*args, **kwargs)

View File

@ -1,4 +1,4 @@
import copy
import json
from oslo_config import cfg
@ -133,8 +133,6 @@ def serialize_daemon_pod_spec(service):
if service.get("host-net"):
cont_spec["hostNetwork"] = True
if service.get("node-selector"):
cont_spec["nodeSelector"] = copy.deepcopy(service["node-selector"])
return cont_spec
@ -240,7 +238,7 @@ def serialize_job(name, spec):
}
def serialize_deployment(name, spec):
def serialize_deployment(name, spec, affinity):
return {
"apiVersion": "extensions/v1beta1",
"kind": "Deployment",
@ -251,6 +249,7 @@ def serialize_deployment(name, spec):
"replicas": 1,
"template": {
"metadata": {
"annotations": affinity,
"labels": {
"mcp": "true",
"app": name
@ -262,7 +261,7 @@ def serialize_deployment(name, spec):
}
def serialize_daemonset(name, spec):
def serialize_daemonset(name, spec, affinity):
return {
"apiVersion": "extensions/v1beta1",
"kind": "DaemonSet",
@ -272,6 +271,7 @@ def serialize_daemonset(name, spec):
"spec": {
"template": {
"metadata": {
"annotations": affinity,
"labels": {
"mcp": "true",
"app": name
@ -283,6 +283,23 @@ def serialize_daemonset(name, spec):
}
def serialize_affinity(service, topology):
policy = {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "kubernetes.io/hostname",
"operator": "In",
"values": topology[service["name"]]
}]
}]
}
}
}
return {"scheduler.alpha.kubernetes.io/affinity": json.dumps(policy)}
def serialize_service(name, ports):
ports_spec = []
for port in ports:

View File

@ -266,3 +266,45 @@ class TestDeployParseWorkflow(base.TestCase):
}
}
self.assertDictEqual(expected_workflows, workflow)
class TestDeployMakeTopology(base.TestCase):
def test_make_empty_topology(self):
self.assertRaises(RuntimeError,
deploy._make_topology, None, None)
self.assertRaises(RuntimeError,
deploy._make_topology, None, {"spam": "eggs"})
self.assertRaises(RuntimeError,
deploy._make_topology, {"spam": "eggs"}, None)
def test_make_topology(self):
nodes = {
"node1": {
"roles": ["controller"]
},
"node[2-3]": {
"roles": ["compute"]
}
}
roles = {
"controller": [
"mysql",
"keystone"
],
"compute": [
"nova-compute",
"libvirtd"
]
}
node_list = ["node1", "node2", "node3"]
expected_topology = {
"mysql": ["node1"],
"keystone": ["node1"],
"nova-compute": ["node2", "node3"],
"libvirtd": ["node2", "node3"]
}
with mock.patch("fuel_ccp.kubernetes.list_k8s_nodes") as p:
p.return_value = node_list
topology = deploy._make_topology(nodes, roles)
self.assertDictEqual(expected_topology, topology)