Reorganized heat template generation code

* Extracted heat template generation from
  'utils/openstack/heat.py' to 'service/heat/templates.py'.
  Now heat.py is for heat client only.
* Moved heat template resources from 'resources' to
  'service/heat/resources'
* Separated tests for heat templates and heat client.

Change-Id: I4c7a561f1648f34e71574a556cbc07ed2cf9b173
Closes-Bug: #1373075
This commit is contained in:
Andrew Lazarev 2015-02-17 15:27:30 -08:00
parent 60c633ce2b
commit c945a9886b
17 changed files with 534 additions and 493 deletions

View File

@ -43,7 +43,7 @@ include sahara/plugins/mapr/versions/v3_1_1/resources/*.xml
include sahara/plugins/spark/resources/*.xml
include sahara/plugins/spark/resources/*.sh
include sahara/plugins/spark/resources/*.template
include sahara/resources/*.heat
include sahara/service/heat/resources/*.heat
include sahara/service/edp/resources/*.xml
include sahara/service/edp/resources/*.jar
include sahara/service/edp/resources/launch_command.py

View File

View File

@ -23,6 +23,7 @@ from sahara.i18n import _
from sahara.i18n import _LI
from sahara.i18n import _LW
from sahara.service import engine as e
from sahara.service.heat import templates as ht
from sahara.service import volumes
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import general as g
@ -202,7 +203,7 @@ class _CreateLauncher(HeatEngine):
@cpo.event_wrapper(
True, step=_('Create Heat stack'), param=('cluster', 1))
def create_instances(self, cluster, target_count):
tmpl = heat.ClusterTemplate(cluster)
tmpl = ht.ClusterTemplate(cluster)
self._configure_template(tmpl, cluster, target_count)
stack = tmpl.instantiate(update_existing=self.UPDATE_STACK,

View File

@ -0,0 +1,314 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_config import cfg
from oslo_log import log as logging
import six
from sahara.utils import files as f
from sahara.utils import general as g
from sahara.utils.openstack import heat as h
from sahara.utils.openstack import neutron
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
SSH_PORT = 22
def _get_inst_name(cluster_name, ng_name, index):
return g.generate_instance_name(cluster_name, ng_name, index + 1)
def _get_aa_group_name(cluster_name):
return g.generate_aa_group_name(cluster_name)
def _get_port_name(inst_name):
return '%s-port' % inst_name
def _get_floating_name(inst_name):
return '%s-floating' % inst_name
def _get_floating_assoc_name(inst_name):
return '%s-floating-assoc' % inst_name
def _get_volume_name(inst_name, volume_idx):
return '%s-volume-%i' % (inst_name, volume_idx)
def _get_volume_attach_name(inst_name, volume_idx):
return '%s-volume-attachment-%i' % (inst_name, volume_idx)
def _load_template(template_name, fields):
template_file = f.get_file_text(
'service/heat/resources/%s' % template_name)
return template_file.rstrip() % fields
def _prepare_userdata(userdata):
"""Converts userdata as a text into format consumable by heat template."""
userdata = userdata.replace('"', '\\"')
lines = userdata.splitlines()
return '"' + '",\n"'.join(lines) + '"'
class ClusterTemplate(object):
def __init__(self, cluster):
self.cluster = cluster
self.node_groups_extra = {}
def add_node_group_extra(self, node_group_id, node_count,
gen_userdata_func):
self.node_groups_extra[node_group_id] = {
'node_count': node_count,
'gen_userdata_func': gen_userdata_func
}
# Consider using a single Jinja template for all this
def instantiate(self, update_existing, disable_rollback=True):
main_tmpl = _load_template('main.heat',
{'resources': self._serialize_resources()})
heat = h.client()
kwargs = {
'stack_name': self.cluster.name,
'timeout_mins': 180,
'disable_rollback': disable_rollback,
'parameters': {},
'template': json.loads(main_tmpl)}
if not update_existing:
heat.stacks.create(**kwargs)
else:
for stack in heat.stacks.list():
if stack.stack_name == self.cluster.name:
stack.update(**kwargs)
break
return ClusterStack(self, h.get_stack(self.cluster.name))
def _need_aa_server_group(self, node_group):
for node_process in node_group.node_processes:
if node_process in self.cluster.anti_affinity:
return True
return False
def _get_anti_affinity_scheduler_hints(self, node_group):
if not self._need_aa_server_group(node_group):
return ''
return ('"scheduler_hints" : %s,' %
json.dumps({"group": {"Ref": _get_aa_group_name(
self.cluster.name)}}))
def _serialize_resources(self):
resources = []
if self.cluster.anti_affinity:
resources.extend(self._serialize_aa_server_group())
for ng in self.cluster.node_groups:
if ng.auto_security_group:
resources.extend(self._serialize_auto_security_group(ng))
for idx in range(0, self.node_groups_extra[ng.id]['node_count']):
resources.extend(self._serialize_instance(ng, idx))
return ',\n'.join(resources)
def _serialize_auto_security_group(self, ng):
fields = {
'security_group_name': g.generate_auto_security_group_name(ng),
'security_group_description':
"Auto security group created by Sahara for Node Group "
"'%s' of cluster '%s'." % (ng.name, ng.cluster.name),
'rules': self._serialize_auto_security_group_rules(ng)}
yield _load_template('security_group.heat', fields)
def _serialize_auto_security_group_rules(self, ng):
create_rule = lambda cidr, proto, from_port, to_port: {
"CidrIp": cidr,
"IpProtocol": proto,
"FromPort": six.text_type(from_port),
"ToPort": six.text_type(to_port)}
rules = []
for port in ng.open_ports:
rules.append(create_rule('0.0.0.0/0', 'tcp', port, port))
rules.append(create_rule('0.0.0.0/0', 'tcp', SSH_PORT, SSH_PORT))
# open all traffic for private networks
if CONF.use_neutron:
for cidr in neutron.get_private_network_cidrs(ng.cluster):
for protocol in ['tcp', 'udp']:
rules.append(create_rule(cidr, protocol, 1, 65535))
rules.append(create_rule(cidr, 'icmp', -1, -1))
return json.dumps(rules)
def _serialize_instance(self, ng, idx):
inst_name = _get_inst_name(self.cluster.name, ng.name, idx)
nets = ''
security_groups = ''
if CONF.use_neutron:
port_name = _get_port_name(inst_name)
yield self._serialize_port(port_name,
self.cluster.neutron_management_network,
self._get_security_groups(ng))
nets = '"networks" : [{ "port" : { "Ref" : "%s" }}],' % port_name
if ng.floating_ip_pool:
yield self._serialize_neutron_floating(inst_name, port_name,
ng.floating_ip_pool)
else:
if ng.floating_ip_pool:
yield self._serialize_nova_floating(inst_name,
ng.floating_ip_pool)
if ng.security_groups:
security_groups = (
'"security_groups": %s,' % json.dumps(
self._get_security_groups(ng)))
# Check if cluster contains user key-pair and include it to template.
key_name = ''
if self.cluster.user_keypair_id:
key_name = '"key_name" : "%s",' % self.cluster.user_keypair_id
gen_userdata_func = self.node_groups_extra[ng.id]['gen_userdata_func']
userdata = gen_userdata_func(ng, inst_name)
availability_zone = ''
if ng.availability_zone:
# Use json.dumps to escape ng.availability_zone
# (in case it contains quotes)
availability_zone = ('"availability_zone" : %s,' %
json.dumps(ng.availability_zone))
fields = {'instance_name': inst_name,
'flavor_id': ng.flavor_id,
'image_id': ng.get_image_id(),
'image_username': ng.image_username,
'network_interfaces': nets,
'key_name': key_name,
'userdata': _prepare_userdata(userdata),
'scheduler_hints':
self._get_anti_affinity_scheduler_hints(ng),
'security_groups': security_groups,
'availability_zone': availability_zone}
yield _load_template('instance.heat', fields)
for idx in range(0, ng.volumes_per_node):
yield self._serialize_volume(inst_name, idx, ng.volumes_size,
ng.volumes_availability_zone,
ng.volume_type)
def _serialize_port(self, port_name, fixed_net_id, security_groups):
fields = {'port_name': port_name,
'fixed_net_id': fixed_net_id,
'security_groups': ('"security_groups": %s,' % json.dumps(
security_groups) if security_groups else '')}
return _load_template('neutron-port.heat', fields)
def _serialize_neutron_floating(self, inst_name, port_name,
floating_net_id):
fields = {'floating_ip_name': _get_floating_name(inst_name),
'floating_net_id': floating_net_id,
'port_name': port_name}
return _load_template('neutron-floating.heat', fields)
def _serialize_nova_floating(self, inst_name, floating_pool_name):
fields = {
'floating_ip_name': _get_floating_name(inst_name),
'floating_ip_assoc_name': _get_floating_assoc_name(inst_name),
'instance_name': inst_name,
'pool': floating_pool_name
}
return _load_template('nova-floating.heat', fields)
def _serialize_volume_type(self, volume_type):
property = '"volume_type" : %s'
if volume_type is None:
return property % 'null'
else:
return property % ('"%s"' % volume_type)
def _serialize_volume(self, inst_name, volume_idx, volumes_size,
volumes_availability_zone, volume_type):
fields = {'volume_name': _get_volume_name(inst_name, volume_idx),
'volumes_size': volumes_size,
'volume_attach_name': _get_volume_attach_name(inst_name,
volume_idx),
'availability_zone': '',
'instance_name': inst_name,
'volume_type': self._serialize_volume_type(volume_type)}
if volumes_availability_zone:
# Use json.dumps to escape volumes_availability_zone
# (in case it contains quotes)
fields['availability_zone'] = (
'"availability_zone": %s,' %
json.dumps(volumes_availability_zone))
return _load_template('volume.heat', fields)
def _get_security_groups(self, node_group):
if not node_group.auto_security_group:
return node_group.security_groups
return (list(node_group.security_groups or []) +
[{"Ref": g.generate_auto_security_group_name(node_group)}])
def _serialize_aa_server_group(self):
fields = {'server_group_name': _get_aa_group_name(self.cluster.name)}
yield _load_template('aa_server_group.heat', fields)
class ClusterStack(object):
def __init__(self, tmpl, heat_stack):
self.tmpl = tmpl
self.heat_stack = heat_stack
def get_node_group_instances(self, node_group):
insts = []
count = self.tmpl.node_groups_extra[node_group.id]['node_count']
heat = h.client()
for i in range(0, count):
name = _get_inst_name(self.tmpl.cluster.name, node_group.name, i)
res = heat.resources.get(self.heat_stack.id, name)
insts.append((name, res.physical_resource_id))
return insts

View File

@ -0,0 +1,215 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import testtools
from sahara.service.heat import templates as h
from sahara.tests.unit import base
from sahara.tests.unit import testutils as tu
from sahara.utils import files as f
class TestHeat(testtools.TestCase):
def test_gets(self):
inst_name = "cluster-worker-001"
self.assertEqual(h._get_inst_name("cluster", "worker", 0), inst_name)
self.assertEqual(h._get_inst_name("CLUSTER", "WORKER", 0), inst_name)
self.assertEqual(h._get_port_name(inst_name),
"cluster-worker-001-port")
self.assertEqual(h._get_floating_name(inst_name),
"cluster-worker-001-floating")
self.assertEqual(h._get_floating_assoc_name(inst_name),
"cluster-worker-001-floating-assoc")
self.assertEqual(h._get_volume_name(inst_name, 1),
"cluster-worker-001-volume-1")
self.assertEqual(h._get_volume_attach_name(inst_name, 1),
"cluster-worker-001-volume-attachment-1")
def test_prepare_user_data(self):
userdata = "line1\nline2"
self.assertEqual(h._prepare_userdata(userdata), '"line1",\n"line2"')
class TestClusterTemplate(base.SaharaWithDbTestCase):
"""Checks valid structure of Resources section in generated Heat templates.
1. It checks templates generation with different OpenStack
network installations: Neutron, NovaNetwork with floating Ip auto
assignment set to True or False.
2. Cinder volume attachments.
3. Basic instances creations with multi line user data provided.
4. Anti-affinity feature with proper nova scheduler hints included
into Heat templates.
"""
def _make_node_groups(self, floating_ip_pool=None, volume_type=None):
ng1 = tu.make_ng_dict('master', 42, ['namenode'], 1,
floating_ip_pool=floating_ip_pool, image_id=None,
volumes_per_node=0, volumes_size=0, id=1,
image_username='root', volume_type=None)
ng2 = tu.make_ng_dict('worker', 42, ['datanode'], 1,
floating_ip_pool=floating_ip_pool, image_id=None,
volumes_per_node=2, volumes_size=10, id=2,
image_username='root', volume_type=volume_type)
return ng1, ng2
def _make_cluster(self, mng_network, ng1, ng2, anti_affinity=[]):
return tu.create_cluster("cluster", "tenant1", "general",
"1.2.1", [ng1, ng2],
user_keypair_id='user_key',
neutron_management_network=mng_network,
default_image_id='1', image_id=None,
anti_affinity=anti_affinity)
def _make_heat_template(self, cluster, ng1, ng2):
heat_template = h.ClusterTemplate(cluster)
heat_template.add_node_group_extra(ng1['id'], 1,
get_ud_generator('line1\nline2'))
heat_template.add_node_group_extra(ng2['id'], 1,
get_ud_generator('line2\nline3'))
return heat_template
def test_get_anti_affinity_scheduler_hints(self):
ng1, ng2 = self._make_node_groups('floating')
cluster = self._make_cluster('private_net', ng1, ng2,
anti_affinity=["datanode"])
heat_template = self._make_heat_template(cluster, ng1, ng2)
ng1 = [ng for ng in cluster.node_groups if ng.name == "master"][0]
ng2 = [ng for ng in cluster.node_groups if ng.name == "worker"][0]
expected = ('"scheduler_hints" : '
'{"group": {"Ref": "cluster-aa-group"}},')
actual = heat_template._get_anti_affinity_scheduler_hints(ng2)
self.assertEqual(expected, actual)
expected = ''
actual = heat_template._get_anti_affinity_scheduler_hints(ng1)
self.assertEqual(expected, actual)
def test_load_template_use_neutron(self):
"""Test for Heat cluster template with Neutron enabled.
Two NodeGroups used: 'master' with Ephemeral drive attached and
'worker' with 2 attached volumes 10GB size each
"""
ng1, ng2 = self._make_node_groups('floating', 'vol_type')
cluster = self._make_cluster('private_net', ng1, ng2)
heat_template = self._make_heat_template(cluster, ng1, ng2)
self.override_config("use_neutron", True)
main_template = h._load_template(
'main.heat', {'resources':
heat_template._serialize_resources()})
self.assertEqual(
json.loads(main_template),
json.loads(f.get_file_text(
"tests/unit/resources/"
"test_serialize_resources_use_neutron.heat")))
def test_load_template_use_nova_network_without_autoassignment(self):
"""Checks Heat cluster template with Nova Network enabled.
Nova Network checked without autoassignment of floating ip.
Two NodeGroups used: 'master' with Ephemeral drive attached and
'worker' with 2 attached volumes 10GB size each
"""
ng1, ng2 = self._make_node_groups('floating')
cluster = self._make_cluster(None, ng1, ng2)
heat_template = self._make_heat_template(cluster, ng1, ng2)
self.override_config("use_neutron", False)
main_template = h._load_template(
'main.heat', {'resources':
heat_template._serialize_resources()})
self.assertEqual(
json.loads(main_template),
json.loads(f.get_file_text(
"tests/unit/resources/"
"test_serialize_resources_use_nn_without_autoassignment.heat"))
)
def test_load_template_use_nova_network_with_autoassignment(self):
"""Checks Heat cluster template with Nova Network enabled.
Nova Network checked with autoassignment of floating ip.
Two NodeGroups used: 'master' with Ephemeral drive attached and
'worker' with 2 attached volumes 10GB size each
"""
ng1, ng2 = self._make_node_groups()
cluster = self._make_cluster(None, ng1, ng2)
heat_template = self._make_heat_template(cluster, ng1, ng2)
self.override_config("use_neutron", False)
main_template = h._load_template(
'main.heat', {'resources':
heat_template._serialize_resources()})
self.assertEqual(
json.loads(main_template),
json.loads(f.get_file_text(
"tests/unit/resources/"
"test_serialize_resources_use_nn_with_autoassignment.heat"))
)
def test_load_template_with_anti_affinity_single_ng(self):
"""Checks Heat cluster template with Neutron enabled.
Checks also anti-affinity feature enabled for single node process
in single node group.
"""
ng1 = tu.make_ng_dict('master', 42, ['namenode'], 1,
floating_ip_pool='floating', image_id=None,
volumes_per_node=0, volumes_size=0, id=1,
image_username='root')
ng2 = tu.make_ng_dict('worker', 42, ['datanode'], 2,
floating_ip_pool='floating', image_id=None,
volumes_per_node=0, volumes_size=0, id=2,
image_username='root')
cluster = tu.create_cluster("cluster", "tenant1", "general",
"1.2.1", [ng1, ng2],
user_keypair_id='user_key',
neutron_management_network='private_net',
default_image_id='1',
anti_affinity=['datanode'], image_id=None)
aa_heat_template = h.ClusterTemplate(cluster)
aa_heat_template.add_node_group_extra(ng1['id'], 1,
get_ud_generator('line1\nline2'))
aa_heat_template.add_node_group_extra(ng2['id'], 2,
get_ud_generator('line2\nline3'))
self.override_config("use_neutron", True)
main_template = h._load_template(
'main.heat', {'resources':
aa_heat_template._serialize_resources()})
self.assertEqual(
json.loads(main_template),
json.loads(f.get_file_text(
"tests/unit/resources/"
"test_serialize_resources_aa.heat")))
def get_ud_generator(s):
def generator(*args, **kwargs):
return s
return generator

View File

@ -13,204 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
import testtools
from sahara import exceptions as ex
from sahara.tests.unit import base
from sahara.tests.unit import testutils as tu
from sahara.utils import files as f
from sahara.utils.openstack import heat as h
class TestHeat(testtools.TestCase):
def test_gets(self):
inst_name = "cluster-worker-001"
self.assertEqual(h._get_inst_name("cluster", "worker", 0), inst_name)
self.assertEqual(h._get_inst_name("CLUSTER", "WORKER", 0), inst_name)
self.assertEqual(h._get_port_name(inst_name),
"cluster-worker-001-port")
self.assertEqual(h._get_floating_name(inst_name),
"cluster-worker-001-floating")
self.assertEqual(h._get_floating_assoc_name(inst_name),
"cluster-worker-001-floating-assoc")
self.assertEqual(h._get_volume_name(inst_name, 1),
"cluster-worker-001-volume-1")
self.assertEqual(h._get_volume_attach_name(inst_name, 1),
"cluster-worker-001-volume-attachment-1")
def test_prepare_user_data(self):
userdata = "line1\nline2"
self.assertEqual(h._prepare_userdata(userdata), '"line1",\n"line2"')
class TestClusterTemplate(base.SaharaWithDbTestCase):
"""Checks valid structure of Resources section in generated Heat templates.
1. It checks templates generation with different OpenStack
network installations: Neutron, NovaNetwork with floating Ip auto
assignment set to True or False.
2. Cinder volume attachments.
3. Basic instances creations with multi line user data provided.
4. Anti-affinity feature with proper nova scheduler hints included
into Heat templates.
"""
def _make_node_groups(self, floating_ip_pool=None, volume_type=None):
ng1 = tu.make_ng_dict('master', 42, ['namenode'], 1,
floating_ip_pool=floating_ip_pool, image_id=None,
volumes_per_node=0, volumes_size=0, id=1,
image_username='root', volume_type=None)
ng2 = tu.make_ng_dict('worker', 42, ['datanode'], 1,
floating_ip_pool=floating_ip_pool, image_id=None,
volumes_per_node=2, volumes_size=10, id=2,
image_username='root', volume_type=volume_type)
return ng1, ng2
def _make_cluster(self, mng_network, ng1, ng2, anti_affinity=[]):
return tu.create_cluster("cluster", "tenant1", "general",
"1.2.1", [ng1, ng2],
user_keypair_id='user_key',
neutron_management_network=mng_network,
default_image_id='1', image_id=None,
anti_affinity=anti_affinity)
def _make_heat_template(self, cluster, ng1, ng2):
heat_template = h.ClusterTemplate(cluster)
heat_template.add_node_group_extra(ng1['id'], 1,
get_ud_generator('line1\nline2'))
heat_template.add_node_group_extra(ng2['id'], 1,
get_ud_generator('line2\nline3'))
return heat_template
def test_get_anti_affinity_scheduler_hints(self):
ng1, ng2 = self._make_node_groups('floating')
cluster = self._make_cluster('private_net', ng1, ng2,
anti_affinity=["datanode"])
heat_template = self._make_heat_template(cluster, ng1, ng2)
ng1 = [ng for ng in cluster.node_groups if ng.name == "master"][0]
ng2 = [ng for ng in cluster.node_groups if ng.name == "worker"][0]
expected = ('"scheduler_hints" : '
'{"group": {"Ref": "cluster-aa-group"}},')
actual = heat_template._get_anti_affinity_scheduler_hints(ng2)
self.assertEqual(expected, actual)
expected = ''
actual = heat_template._get_anti_affinity_scheduler_hints(ng1)
self.assertEqual(expected, actual)
def test_load_template_use_neutron(self):
"""Test for Heat cluster template with Neutron enabled.
Two NodeGroups used: 'master' with Ephemeral drive attached and
'worker' with 2 attached volumes 10GB size each
"""
ng1, ng2 = self._make_node_groups('floating', 'vol_type')
cluster = self._make_cluster('private_net', ng1, ng2)
heat_template = self._make_heat_template(cluster, ng1, ng2)
self.override_config("use_neutron", True)
main_template = h._load_template(
'main.heat', {'resources':
heat_template._serialize_resources()})
self.assertEqual(
json.loads(main_template),
json.loads(f.get_file_text(
"tests/unit/resources/"
"test_serialize_resources_use_neutron.heat")))
def test_load_template_use_nova_network_without_autoassignment(self):
"""Checks Heat cluster template with Nova Network enabled.
Nova Network checked without autoassignment of floating ip.
Two NodeGroups used: 'master' with Ephemeral drive attached and
'worker' with 2 attached volumes 10GB size each
"""
ng1, ng2 = self._make_node_groups('floating')
cluster = self._make_cluster(None, ng1, ng2)
heat_template = self._make_heat_template(cluster, ng1, ng2)
self.override_config("use_neutron", False)
main_template = h._load_template(
'main.heat', {'resources':
heat_template._serialize_resources()})
self.assertEqual(
json.loads(main_template),
json.loads(f.get_file_text(
"tests/unit/resources/"
"test_serialize_resources_use_nn_without_autoassignment.heat"))
)
def test_load_template_use_nova_network_with_autoassignment(self):
"""Checks Heat cluster template with Nova Network enabled.
Nova Network checked with autoassignment of floating ip.
Two NodeGroups used: 'master' with Ephemeral drive attached and
'worker' with 2 attached volumes 10GB size each
"""
ng1, ng2 = self._make_node_groups()
cluster = self._make_cluster(None, ng1, ng2)
heat_template = self._make_heat_template(cluster, ng1, ng2)
self.override_config("use_neutron", False)
main_template = h._load_template(
'main.heat', {'resources':
heat_template._serialize_resources()})
self.assertEqual(
json.loads(main_template),
json.loads(f.get_file_text(
"tests/unit/resources/"
"test_serialize_resources_use_nn_with_autoassignment.heat"))
)
def test_load_template_with_anti_affinity_single_ng(self):
"""Checks Heat cluster template with Neutron enabled.
Checks also anti-affinity feature enabled for single node process
in single node group.
"""
ng1 = tu.make_ng_dict('master', 42, ['namenode'], 1,
floating_ip_pool='floating', image_id=None,
volumes_per_node=0, volumes_size=0, id=1,
image_username='root')
ng2 = tu.make_ng_dict('worker', 42, ['datanode'], 2,
floating_ip_pool='floating', image_id=None,
volumes_per_node=0, volumes_size=0, id=2,
image_username='root')
cluster = tu.create_cluster("cluster", "tenant1", "general",
"1.2.1", [ng1, ng2],
user_keypair_id='user_key',
neutron_management_network='private_net',
default_image_id='1',
anti_affinity=['datanode'], image_id=None)
aa_heat_template = h.ClusterTemplate(cluster)
aa_heat_template.add_node_group_extra(ng1['id'], 1,
get_ud_generator('line1\nline2'))
aa_heat_template.add_node_group_extra(ng2['id'], 2,
get_ud_generator('line2\nline3'))
self.override_config("use_neutron", True)
main_template = h._load_template(
'main.heat', {'resources':
aa_heat_template._serialize_resources()})
self.assertEqual(
json.loads(main_template),
json.loads(f.get_file_text(
"tests/unit/resources/"
"test_serialize_resources_aa.heat")))
class TestClusterStack(testtools.TestCase):
@mock.patch("sahara.context.sleep", return_value=None)
def test_wait_completion(self, _):
@ -244,9 +53,3 @@ class FakeHeatStack(object):
def status(self):
s = self.stack_status
return s[s.index('_') + 1:]
def get_ud_generator(s):
def generator(*args, **kwargs):
return s
return generator

View File

@ -13,20 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from heatclient import client as heat_client
from oslo_config import cfg
from oslo_log import log as logging
import six
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.utils import files as f
from sahara.utils import general as g
from sahara.utils.openstack import base
from sahara.utils.openstack import neutron
opts = [
cfg.BoolOpt('api_insecure',
@ -44,10 +38,6 @@ CONF = cfg.CONF
CONF.register_group(heat_group)
CONF.register_opts(opts, group=heat_group)
LOG = logging.getLogger(__name__)
SSH_PORT = 22
def client():
ctx = context.current()
@ -76,285 +66,3 @@ def wait_stack_completion(stack):
if stack.status != 'COMPLETE':
raise ex.HeatStackException(stack.stack_status)
def _get_inst_name(cluster_name, ng_name, index):
return g.generate_instance_name(cluster_name, ng_name, index + 1)
def _get_aa_group_name(cluster_name):
return g.generate_aa_group_name(cluster_name)
def _get_port_name(inst_name):
return '%s-port' % inst_name
def _get_floating_name(inst_name):
return '%s-floating' % inst_name
def _get_floating_assoc_name(inst_name):
return '%s-floating-assoc' % inst_name
def _get_volume_name(inst_name, volume_idx):
return '%s-volume-%i' % (inst_name, volume_idx)
def _get_volume_attach_name(inst_name, volume_idx):
return '%s-volume-attachment-%i' % (inst_name, volume_idx)
def _load_template(template_name, fields):
template_file = f.get_file_text('resources/%s' % template_name)
return template_file.rstrip() % fields
def _prepare_userdata(userdata):
"""Converts userdata as a text into format consumable by heat template."""
userdata = userdata.replace('"', '\\"')
lines = userdata.splitlines()
return '"' + '",\n"'.join(lines) + '"'
class ClusterTemplate(object):
def __init__(self, cluster):
self.cluster = cluster
self.node_groups_extra = {}
def add_node_group_extra(self, node_group_id, node_count,
gen_userdata_func):
self.node_groups_extra[node_group_id] = {
'node_count': node_count,
'gen_userdata_func': gen_userdata_func
}
# Consider using a single Jinja template for all this
def instantiate(self, update_existing, disable_rollback=True):
main_tmpl = _load_template('main.heat',
{'resources': self._serialize_resources()})
heat = client()
kwargs = {
'stack_name': self.cluster.name,
'timeout_mins': 180,
'disable_rollback': disable_rollback,
'parameters': {},
'template': json.loads(main_tmpl)}
if not update_existing:
heat.stacks.create(**kwargs)
else:
for stack in heat.stacks.list():
if stack.stack_name == self.cluster.name:
stack.update(**kwargs)
break
return ClusterStack(self, get_stack(self.cluster.name))
def _need_aa_server_group(self, node_group):
for node_process in node_group.node_processes:
if node_process in self.cluster.anti_affinity:
return True
return False
def _get_anti_affinity_scheduler_hints(self, node_group):
if not self._need_aa_server_group(node_group):
return ''
return ('"scheduler_hints" : %s,' %
json.dumps({"group": {"Ref": _get_aa_group_name(
self.cluster.name)}}))
def _serialize_resources(self):
resources = []
if self.cluster.anti_affinity:
resources.extend(self._serialize_aa_server_group())
for ng in self.cluster.node_groups:
if ng.auto_security_group:
resources.extend(self._serialize_auto_security_group(ng))
for idx in range(0, self.node_groups_extra[ng.id]['node_count']):
resources.extend(self._serialize_instance(ng, idx))
return ',\n'.join(resources)
def _serialize_auto_security_group(self, ng):
fields = {
'security_group_name': g.generate_auto_security_group_name(ng),
'security_group_description':
"Auto security group created by Sahara for Node Group "
"'%s' of cluster '%s'." % (ng.name, ng.cluster.name),
'rules': self._serialize_auto_security_group_rules(ng)}
yield _load_template('security_group.heat', fields)
def _serialize_auto_security_group_rules(self, ng):
create_rule = lambda cidr, proto, from_port, to_port: {
"CidrIp": cidr,
"IpProtocol": proto,
"FromPort": six.text_type(from_port),
"ToPort": six.text_type(to_port)}
rules = []
for port in ng.open_ports:
rules.append(create_rule('0.0.0.0/0', 'tcp', port, port))
rules.append(create_rule('0.0.0.0/0', 'tcp', SSH_PORT, SSH_PORT))
# open all traffic for private networks
if CONF.use_neutron:
for cidr in neutron.get_private_network_cidrs(ng.cluster):
for protocol in ['tcp', 'udp']:
rules.append(create_rule(cidr, protocol, 1, 65535))
rules.append(create_rule(cidr, 'icmp', -1, -1))
return json.dumps(rules)
def _serialize_instance(self, ng, idx):
inst_name = _get_inst_name(self.cluster.name, ng.name, idx)
nets = ''
security_groups = ''
if CONF.use_neutron:
port_name = _get_port_name(inst_name)
yield self._serialize_port(port_name,
self.cluster.neutron_management_network,
self._get_security_groups(ng))
nets = '"networks" : [{ "port" : { "Ref" : "%s" }}],' % port_name
if ng.floating_ip_pool:
yield self._serialize_neutron_floating(inst_name, port_name,
ng.floating_ip_pool)
else:
if ng.floating_ip_pool:
yield self._serialize_nova_floating(inst_name,
ng.floating_ip_pool)
if ng.security_groups:
security_groups = (
'"security_groups": %s,' % json.dumps(
self._get_security_groups(ng)))
# Check if cluster contains user key-pair and include it to template.
key_name = ''
if self.cluster.user_keypair_id:
key_name = '"key_name" : "%s",' % self.cluster.user_keypair_id
gen_userdata_func = self.node_groups_extra[ng.id]['gen_userdata_func']
userdata = gen_userdata_func(ng, inst_name)
availability_zone = ''
if ng.availability_zone:
# Use json.dumps to escape ng.availability_zone
# (in case it contains quotes)
availability_zone = ('"availability_zone" : %s,' %
json.dumps(ng.availability_zone))
fields = {'instance_name': inst_name,
'flavor_id': ng.flavor_id,
'image_id': ng.get_image_id(),
'image_username': ng.image_username,
'network_interfaces': nets,
'key_name': key_name,
'userdata': _prepare_userdata(userdata),
'scheduler_hints':
self._get_anti_affinity_scheduler_hints(ng),
'security_groups': security_groups,
'availability_zone': availability_zone}
yield _load_template('instance.heat', fields)
for idx in range(0, ng.volumes_per_node):
yield self._serialize_volume(inst_name, idx, ng.volumes_size,
ng.volumes_availability_zone,
ng.volume_type)
def _serialize_port(self, port_name, fixed_net_id, security_groups):
fields = {'port_name': port_name,
'fixed_net_id': fixed_net_id,
'security_groups': ('"security_groups": %s,' % json.dumps(
security_groups) if security_groups else '')}
return _load_template('neutron-port.heat', fields)
def _serialize_neutron_floating(self, inst_name, port_name,
floating_net_id):
fields = {'floating_ip_name': _get_floating_name(inst_name),
'floating_net_id': floating_net_id,
'port_name': port_name}
return _load_template('neutron-floating.heat', fields)
def _serialize_nova_floating(self, inst_name, floating_pool_name):
fields = {
'floating_ip_name': _get_floating_name(inst_name),
'floating_ip_assoc_name': _get_floating_assoc_name(inst_name),
'instance_name': inst_name,
'pool': floating_pool_name
}
return _load_template('nova-floating.heat', fields)
def _serialize_volume_type(self, volume_type):
property = '"volume_type" : %s'
if volume_type is None:
return property % 'null'
else:
return property % ('"%s"' % volume_type)
def _serialize_volume(self, inst_name, volume_idx, volumes_size,
volumes_availability_zone, volume_type):
fields = {'volume_name': _get_volume_name(inst_name, volume_idx),
'volumes_size': volumes_size,
'volume_attach_name': _get_volume_attach_name(inst_name,
volume_idx),
'availability_zone': '',
'instance_name': inst_name,
'volume_type': self._serialize_volume_type(volume_type)}
if volumes_availability_zone:
# Use json.dumps to escape volumes_availability_zone
# (in case it contains quotes)
fields['availability_zone'] = (
'"availability_zone": %s,' %
json.dumps(volumes_availability_zone))
return _load_template('volume.heat', fields)
def _get_security_groups(self, node_group):
if not node_group.auto_security_group:
return node_group.security_groups
return (list(node_group.security_groups or []) +
[{"Ref": g.generate_auto_security_group_name(node_group)}])
def _serialize_aa_server_group(self):
fields = {'server_group_name': _get_aa_group_name(self.cluster.name)}
yield _load_template('aa_server_group.heat', fields)
class ClusterStack(object):
def __init__(self, tmpl, heat_stack):
self.tmpl = tmpl
self.heat_stack = heat_stack
def get_node_group_instances(self, node_group):
insts = []
count = self.tmpl.node_groups_extra[node_group.id]['node_count']
heat = client()
for i in range(0, count):
name = _get_inst_name(self.tmpl.cluster.name, node_group.name, i)
res = heat.resources.get(self.heat_stack.id, name)
insts.append((name, res.physical_resource_id))
return insts

View File

@ -47,7 +47,7 @@ sahara.cluster.plugins =
sahara.infrastructure.engine =
direct = sahara.service.direct_engine:DirectEngine
heat = sahara.service.heat_engine:HeatEngine
heat = sahara.service.heat.heat_engine:HeatEngine
sahara.remote =
ssh = sahara.utils.ssh_remote:SshRemoteDriver