Merge "Switched anti-affinity feature to server groups"

This commit is contained in:
Jenkins 2014-09-05 07:13:02 +00:00 committed by Gerrit Code Review
commit 5921f1ae03
10 changed files with 191 additions and 127 deletions

View File

@ -93,19 +93,10 @@ is not reliable because all replicas may turn up on one physical machine.
Anti-affinity feature provides an ability to explicitly tell Sahara to run specified processes on different compute nodes. This
is especially useful for Hadoop datanode process to make HDFS replicas reliable.
.. _`enable-anti-affinity`:
The Anti-Affinity feature requires certain scheduler filters to be enabled on Nova.
Edit your ``/etc/nova/nova.conf`` in the following way:
.. sourcecode:: cfg
[DEFAULT]
...
scheduler_driver=nova.scheduler.filter_scheduler.FilterScheduler
scheduler_default_filters=DifferentHostFilter,SameHostFilter
Starting with Juno release Sahara creates server groups with
``anti-affinity`` policy to enable anti affinity feature. Sahara creates one
server group per cluster and assigns all instances with affected processes to
this server group. Refer to Nova documentation on how server groups work.
This feature is supported by all plugins out of the box.

View File

@ -156,9 +156,6 @@ To install into a virtual environment
Notes:
------
One of the :doc:`Sahara Features <features>`, Anti-Affinity, requires a Nova adjustment.
See :ref:`Enabling Anti-Affinity <enable-anti-affinity>` for details. But that is purely optional.
Make sure that your operating system is not blocking Sahara port (default: 8386).
You may need to configure iptables in CentOS and some other operating systems.

View File

@ -50,3 +50,21 @@ in the documentation.
Note, this change breaks Sahara backward compatibility for clusters created
using HEAT infrastructure engine before the change. Clusters will continue to
operate, but it is not recommended to perform scale operation over them.
Anti affinity implementation changed
++++++++++++++++++++++++++++++++++++
Starting with Juno release anti affinity feature is implemented using server
groups. There should not be much difference in Sahara behaviour from user
perspective, but there are internal changes:
1) Server group object will be created if anti affinity feature is enabled
2) New implementation doesn't allow several affected instances on the same
host even if they don't have common processes. So, if anti affinity enabled
for 'datanode' and 'tasktracker' processes, previous implementation allowed
to have instance with 'datanode' process and other instance with
'tasktracker' process on one host. New implementation guarantees that
instances will be on different hosts.
Note, new implementation will be applied for new clusters only. Old
implementation will be applied if user scales cluster created in Icehouse.

View File

@ -0,0 +1,7 @@
"%(server_group_name)s" : {
"Type" : "OS::Nova::ServerGroup",
"Properties" : {
"name" : "%(server_group_name)s",
"policies": ["anti-affinity"]
}
}

View File

@ -134,6 +134,7 @@ class DirectEngine(e.Engine):
context.ctx(), cluster, {'rollback_info': rollback_info})
return cluster
# TODO(alazarev) remove when we fully switch to server groups
def _generate_anti_affinity_groups(self, cluster):
aa_groups = {}
@ -153,13 +154,52 @@ class DirectEngine(e.Engine):
cluster = self._create_auto_security_groups(cluster)
aa_groups = {}
aa_group = None
if cluster.anti_affinity:
aa_group = self._create_aa_server_group(cluster)
for node_group in cluster.node_groups:
count = node_group.count
conductor.node_group_update(ctx, node_group, {'count': 0})
for idx in six.moves.xrange(1, count + 1):
self._run_instance(cluster, node_group, idx, aa_groups)
self._run_instance(cluster, node_group, idx, aa_group=aa_group)
def _create_aa_server_group(self, cluster):
server_group_name = g.generate_aa_group_name(cluster.name)
client = nova.client().server_groups
if client.findall(name=server_group_name):
raise exc.InvalidDataException(
_("Server group with name %s is already exists")
% server_group_name)
server_group = client.create(name=server_group_name,
policies=['anti-affinity'])
return server_group.id
def _delete_aa_server_group(self, cluster):
if cluster.anti_affinity:
server_group_name = g.generate_aa_group_name(cluster.name)
client = nova.client().server_groups
server_groups = client.findall(name=server_group_name)
if len(server_groups) == 1:
client.delete(server_groups[0].id)
def _find_aa_server_group(self, cluster):
server_group_name = g.generate_aa_group_name(cluster.name)
server_groups = nova.client().server_groups.findall(
name=server_group_name)
if len(server_groups) > 1:
raise exc.IncorrectStateError(
_("Several server groups with name %s found")
% server_group_name)
if len(server_groups) == 1:
return server_groups[0].id
return None
def _create_auto_security_groups(self, cluster):
ctx = context.ctx()
@ -171,7 +211,14 @@ class DirectEngine(e.Engine):
def _scale_cluster_instances(self, cluster, node_group_id_map):
ctx = context.ctx()
aa_groups = self._generate_anti_affinity_groups(cluster)
aa_group = None
old_aa_groups = None
if cluster.anti_affinity:
aa_group = self._find_aa_server_group(cluster)
if not aa_group:
old_aa_groups = self._generate_anti_affinity_groups(cluster)
instances_to_delete = []
node_groups_to_enlarge = []
node_groups_to_delete = []
@ -207,8 +254,9 @@ class DirectEngine(e.Engine):
for node_group in node_groups_to_enlarge:
count = node_group_id_map[node_group.id]
for idx in six.moves.xrange(node_group.count + 1, count + 1):
instance_id = self._run_instance(cluster, node_group, idx,
aa_groups)
instance_id = self._run_instance(
cluster, node_group, idx,
aa_group=aa_group, old_aa_groups=old_aa_groups)
instances_to_add.append(instance_id)
return instances_to_add
@ -220,21 +268,26 @@ class DirectEngine(e.Engine):
return None
def _run_instance(self, cluster, node_group, idx, aa_groups):
def _run_instance(self, cluster, node_group, idx, aa_group=None,
old_aa_groups=None):
"""Create instance using nova client and persist them into DB."""
ctx = context.ctx()
name = g.generate_instance_name(cluster.name, node_group.name, idx)
userdata = self._generate_user_data_script(node_group, name)
# aa_groups: node process -> instance ids
aa_ids = []
for node_process in node_group.node_processes:
aa_ids += aa_groups.get(node_process) or []
if old_aa_groups:
# aa_groups: node process -> instance ids
aa_ids = []
for node_process in node_group.node_processes:
aa_ids += old_aa_groups.get(node_process) or []
# create instances only at hosts w/ no instances
# w/ aa-enabled processes
hints = {'different_host': sorted(set(aa_ids))} if aa_ids else None
# create instances only at hosts w/ no instances
# w/ aa-enabled processes
hints = {'different_host': sorted(set(aa_ids))} if aa_ids else None
else:
hints = {'group': aa_group} if (
aa_group and self._need_aa_server_group(node_group)) else None
if CONF.use_neutron:
net_id = cluster.neutron_management_network
@ -255,12 +308,14 @@ class DirectEngine(e.Engine):
instance_id = conductor.instance_add(ctx, node_group,
{"instance_id": nova_instance.id,
"instance_name": name})
# save instance id to aa_groups to support aa feature
for node_process in node_group.node_processes:
if node_process in cluster.anti_affinity:
aa_group_ids = aa_groups.get(node_process, [])
aa_group_ids.append(nova_instance.id)
aa_groups[node_process] = aa_group_ids
if old_aa_groups:
# save instance id to aa_groups to support aa feature
for node_process in node_group.node_processes:
if node_process in cluster.anti_affinity:
aa_group_ids = old_aa_groups.get(node_process, [])
aa_group_ids.append(nova_instance.id)
old_aa_groups[node_process] = aa_group_ids
return instance_id
@ -288,6 +343,12 @@ class DirectEngine(e.Engine):
{"security_groups": security_groups})
return security_groups
def _need_aa_server_group(self, node_group):
for node_process in node_group.node_processes:
if node_process in node_group.cluster.anti_affinity:
return True
return False
def _assign_floating_ips(self, instances):
for instance in instances:
node_group = instance.node_group
@ -415,3 +476,4 @@ class DirectEngine(e.Engine):
"""Shutdown specified cluster and all related resources."""
self._shutdown_instances(cluster)
self._clean_job_executions(cluster)
self._delete_aa_server_group(cluster)

View File

@ -3,6 +3,13 @@
"Description" : "Hadoop Cluster by Sahara",
"Resources" : {
"cluster-aa-group" : {
"Type" : "OS::Nova::ServerGroup",
"Properties" : {
"name" : "cluster-aa-group",
"policies": ["anti-affinity"]
}
},
"cluster-worker-001-port" : {
"Type" : "OS::Neutron::Port",
"Properties" : {
@ -25,6 +32,7 @@
"admin_user": "root",
"networks" : [{ "port" : { "Ref" : "cluster-worker-001-port" }}],
"key_name" : "user_key",
"scheduler_hints" : {"group": {"Ref": "cluster-aa-group"}},
"user_data": {
"Fn::Join" : ["\n", ["line2", "line3"]]
}
@ -52,7 +60,7 @@
"admin_user": "root",
"networks" : [{ "port" : { "Ref" : "cluster-worker-002-port" }}],
"key_name" : "user_key",
"scheduler_hints" : {"different_host": [{"Ref": "cluster-worker-001"}]},
"scheduler_hints" : {"group": {"Ref": "cluster-aa-group"}},
"user_data": {
"Fn::Join" : ["\n", ["line2", "line3"]]
}

View File

@ -15,7 +15,6 @@
import mock
from novaclient import exceptions as nova_exceptions
import six
from sahara import conductor as cond
from sahara import context
@ -42,6 +41,7 @@ class AbstractInstanceTest(base.SaharaWithDbTestCase):
self.novaclient_patcher = mock.patch(
'sahara.utils.openstack.nova.client')
self.nova = _create_nova_mock(self.novaclient_patcher.start())
self.nova.server_groups.findall.return_value = []
self.get_userdata_patcher = mock.patch(
'sahara.utils.remote.get_userdata_template')
@ -89,24 +89,25 @@ class TestClusterRollBack(AbstractInstanceTest):
class NodePlacementTest(AbstractInstanceTest):
def test_one_node_groups_and_one_affinity_group(self):
self.nova.server_groups.create.return_value = mock.Mock(id='123')
node_groups = [_make_ng_dict('test_group', 'test_flavor',
['data node'], 2)]
cluster = _create_cluster_mock(node_groups, ["data node"])
self.engine._create_instances(cluster)
userdata = _generate_user_data_script(cluster)
self.nova.servers.create.assert_has_calls(
[mock.call("test_cluster-test_group-001",
"initial",
"test_flavor",
scheduler_hints=None,
scheduler_hints={'group': "123"},
userdata=userdata,
key_name='user_keypair',
security_groups=None),
mock.call("test_cluster-test_group-002",
"initial",
"test_flavor",
scheduler_hints={'different_host': ["1"]},
scheduler_hints={'group': "123"},
userdata=userdata,
key_name='user_keypair',
security_groups=None)],
@ -117,10 +118,13 @@ class NodePlacementTest(AbstractInstanceTest):
self.assertEqual(len(cluster_obj.node_groups[0].instances), 2)
def test_one_node_groups_and_no_affinity_group(self):
self.nova.server_groups.create.return_value = mock.Mock(id='123')
node_groups = [_make_ng_dict('test_group', 'test_flavor',
['data node', 'task tracker'], 2)]
cluster = _create_cluster_mock(node_groups, [])
self.engine._create_instances(cluster)
userdata = _generate_user_data_script(cluster)
@ -146,6 +150,8 @@ class NodePlacementTest(AbstractInstanceTest):
self.assertEqual(len(cluster_obj.node_groups[0].instances), 2)
def test_two_node_groups_and_one_affinity_group(self):
self.nova.server_groups.create.return_value = mock.Mock(id='123')
node_groups = [_make_ng_dict("test_group_1", "test_flavor",
["data node", "test tracker"], 2),
_make_ng_dict("test_group_2", "test_flavor",
@ -155,61 +161,25 @@ class NodePlacementTest(AbstractInstanceTest):
self.engine._create_instances(cluster)
userdata = _generate_user_data_script(cluster)
def _find_created_at(idx):
"""Find the #N instance creation call.
To determine which instance was created first, we should check
scheduler hints For example we should find call with scheduler
hint different_hosts = [1, 2] and it's the third call of instance
create.
"""
different_hosts = []
for instance_id in six.moves.xrange(1, idx):
different_hosts.append(str(instance_id))
scheduler_hints = ({'different_host': different_hosts}
if different_hosts else None)
for call in self.nova.servers.create.mock_calls:
if call[2]['scheduler_hints'] == scheduler_hints:
return call[1][0]
self.fail("Couldn't find call with scheduler_hints='%s'"
% scheduler_hints)
# find instance names in instance create calls
instance_names = []
for idx in six.moves.xrange(1, 4):
instance_name = _find_created_at(idx)
if instance_name in instance_names:
self.fail("Create instance was called twice with the same "
"instance name='%s'" % instance_name)
instance_names.append(instance_name)
self.assertEqual(3, len(instance_names))
self.assertEqual(set(['test_cluster-test_group_1-001',
'test_cluster-test_group_1-002',
'test_cluster-test_group_2-001']),
set(instance_names))
self.nova.servers.create.assert_has_calls(
[mock.call(instance_names[0],
[mock.call('test_cluster-test_group_1-001',
"initial",
"test_flavor",
scheduler_hints=None,
scheduler_hints={'group': "123"},
userdata=userdata,
key_name='user_keypair',
security_groups=None),
mock.call(instance_names[1],
mock.call('test_cluster-test_group_1-002',
"initial",
"test_flavor",
scheduler_hints={'different_host': ["1"]},
scheduler_hints={'group': "123"},
userdata=userdata,
key_name='user_keypair',
security_groups=None),
mock.call(instance_names[2],
mock.call('test_cluster-test_group_2-001',
"initial",
"test_flavor",
scheduler_hints={'different_host': ["1", "2"]},
scheduler_hints={'group': "123"},
userdata=userdata,
key_name='user_keypair',
security_groups=None)],

View File

@ -45,23 +45,6 @@ class TestHeat(testtools.TestCase):
userdata = "line1\nline2"
self.assertEqual(h._prepare_userdata(userdata), '"line1",\n"line2"')
def test_get_anti_affinity_scheduler_hints(self):
inst_names = ['i1', 'i2']
expected = ('"scheduler_hints" : {"different_host": '
'[{"Ref": "i1"}, {"Ref": "i2"}]},')
actual = h._get_anti_affinity_scheduler_hints(inst_names)
self.assertEqual(expected, actual)
inst_names = ['i1', 'i1']
expected = '"scheduler_hints" : {"different_host": [{"Ref": "i1"}]},'
actual = h._get_anti_affinity_scheduler_hints(inst_names)
self.assertEqual(expected, actual)
inst_names = []
expected = ''
actual = h._get_anti_affinity_scheduler_hints(inst_names)
self.assertEqual(expected, actual)
class TestClusterTemplate(base.SaharaWithDbTestCase):
"""Checks valid structure of Resources section in generated Heat templates.
@ -86,13 +69,13 @@ class TestClusterTemplate(base.SaharaWithDbTestCase):
image_username='root')
return ng1, ng2
def _make_cluster(self, mng_network, ng1, ng2):
def _make_cluster(self, mng_network, ng1, ng2, anti_affinity=[]):
return tu.create_cluster("cluster", "tenant1", "general",
"1.2.1", [ng1, ng2],
user_keypair_id='user_key',
neutron_management_network=mng_network,
default_image_id='1', anti_affinity=[],
image_id=None)
default_image_id='1', image_id=None,
anti_affinity=anti_affinity)
def _make_heat_template(self, cluster, ng1, ng2):
heat_template = h.ClusterTemplate(cluster)
@ -102,6 +85,24 @@ class TestClusterTemplate(base.SaharaWithDbTestCase):
get_ud_generator('line2\nline3'))
return heat_template
def test_get_anti_affinity_scheduler_hints(self):
ng1, ng2 = self._make_node_groups('floating')
cluster = self._make_cluster('private_net', ng1, ng2,
anti_affinity=["datanode"])
heat_template = self._make_heat_template(cluster, ng1, ng2)
ng1 = [ng for ng in cluster.node_groups if ng.name == "master"][0]
ng2 = [ng for ng in cluster.node_groups if ng.name == "worker"][0]
expected = ('"scheduler_hints" : '
'{"group": {"Ref": "cluster-aa-group"}},')
actual = heat_template._get_anti_affinity_scheduler_hints(ng2)
self.assertEqual(expected, actual)
expected = ''
actual = heat_template._get_anti_affinity_scheduler_hints(ng1)
self.assertEqual(expected, actual)
def test_load_template_use_neutron(self):
"""Test for Heat cluster template with Neutron enabled.

View File

@ -123,3 +123,7 @@ def generate_instance_name(cluster_name, node_group_name, index):
def generate_auto_security_group_name(cluster_name, node_group_name):
return ("%s-%s" % (cluster_name, node_group_name)).lower()
def generate_aa_group_name(cluster_name):
return ("%s-aa-group" % cluster_name).lower()

View File

@ -61,6 +61,10 @@ def _get_inst_name(cluster_name, ng_name, index):
return g.generate_instance_name(cluster_name, ng_name, index + 1)
def _get_aa_group_name(cluster_name):
return g.generate_aa_group_name(cluster_name)
def _get_port_name(inst_name):
return '%s-port' % inst_name
@ -81,16 +85,6 @@ def _get_volume_attach_name(inst_name, volume_idx):
return '%s-volume-attachment-%i' % (inst_name, volume_idx)
def _get_anti_affinity_scheduler_hints(instances_names):
if not instances_names:
return ''
aa_list = []
for instances_name in sorted(set(instances_names)):
aa_list.append({"Ref": instances_name})
return '"scheduler_hints" : %s,' % json.dumps({"different_host": aa_list})
def _load_template(template_name, fields):
template_file = f.get_file_text('resources/%s' % template_name)
return template_file.rstrip() % fields
@ -121,6 +115,7 @@ class ClusterTemplate(object):
def instantiate(self, update_existing, disable_rollback=True):
main_tmpl = _load_template('main.heat',
{'resources': self._serialize_resources()})
heat = client()
kwargs = {
@ -140,15 +135,31 @@ class ClusterTemplate(object):
return ClusterStack(self, get_stack(self.cluster.name))
def _need_aa_server_group(self, node_group):
for node_process in node_group.node_processes:
if node_process in self.cluster.anti_affinity:
return True
return False
def _get_anti_affinity_scheduler_hints(self, node_group):
if not self._need_aa_server_group(node_group):
return ''
return ('"scheduler_hints" : %s,' %
json.dumps({"group": {"Ref": _get_aa_group_name(
self.cluster.name)}}))
def _serialize_resources(self):
resources = []
aa_groups = {}
if self.cluster.anti_affinity:
resources.extend(self._serialize_aa_server_group())
for ng in self.cluster.node_groups:
if ng.auto_security_group:
resources.extend(self._serialize_auto_security_group(ng))
for idx in range(0, self.node_groups_extra[ng.id]['node_count']):
resources.extend(self._serialize_instance(ng, idx, aa_groups))
resources.extend(self._serialize_instance(ng, idx))
return ',\n'.join(resources)
@ -174,7 +185,7 @@ class ClusterTemplate(object):
return json.dumps(rules)
def _serialize_instance(self, ng, idx, aa_groups):
def _serialize_instance(self, ng, idx):
inst_name = _get_inst_name(self.cluster.name, ng.name, idx)
nets = ''
@ -200,10 +211,6 @@ class ClusterTemplate(object):
'"security_groups": %s,' % json.dumps(
self._get_security_groups(ng)))
aa_names = []
for node_process in ng.node_processes:
aa_names += aa_groups.get(node_process) or []
# Check if cluster contains user key-pair and include it to template.
key_name = ''
if self.cluster.user_keypair_id:
@ -219,16 +226,10 @@ class ClusterTemplate(object):
'network_interfaces': nets,
'key_name': key_name,
'userdata': _prepare_userdata(userdata),
'scheduler_hints': _get_anti_affinity_scheduler_hints(
aa_names),
'scheduler_hints':
self._get_anti_affinity_scheduler_hints(ng),
'security_groups': security_groups}
for node_process in ng.node_processes:
if node_process in self.cluster.anti_affinity:
aa_group_names = aa_groups.get(node_process, [])
aa_group_names.append(inst_name)
aa_groups[node_process] = aa_group_names
yield _load_template('instance.heat', fields)
for idx in range(0, ng.volumes_per_node):
@ -277,6 +278,11 @@ class ClusterTemplate(object):
[{"Ref": g.generate_auto_security_group_name(
node_group.cluster.name, node_group.name)}])
def _serialize_aa_server_group(self):
fields = {'server_group_name': _get_aa_group_name(self.cluster.name)}
yield _load_template('aa_server_group.heat', fields)
class ClusterStack(object):
def __init__(self, tmpl, heat_stack):