Merge "Add ZooKeeper support in Vanilla cluster"

This commit is contained in:
Zuul 2017-12-07 20:54:53 +00:00 committed by Gerrit Code Review
commit b3ff28609a
15 changed files with 274 additions and 7 deletions

View File

@ -53,6 +53,7 @@ PORTS_MAP = {
"oozie": [11000],
"hiveserver": [9999, 10000],
"spark history server": [18080],
"zookeeper": [2181, 2888, 3888]
}
@ -66,12 +67,52 @@ def configure_cluster(pctx, cluster):
instances = utils.get_instances(cluster)
configure_instances(pctx, instances)
configure_topology_data(pctx, cluster)
configure_zookeeper(cluster)
configure_spark(cluster)
def configure_zookeeper(cluster, instances=None):
zk_servers = vu.get_zk_servers(cluster)
if zk_servers:
zk_conf = c_helper.generate_zk_basic_config(cluster)
zk_conf += _form_zk_servers_to_quorum(cluster, instances)
_push_zk_configs_to_nodes(cluster, zk_conf, instances)
def _form_zk_servers_to_quorum(cluster, to_delete_instances=None):
quorum = []
instances = map(vu.get_instance_hostname, vu.get_zk_servers(cluster))
if to_delete_instances:
delete_instances = map(vu.get_instance_hostname, to_delete_instances)
reserve_instances = list(set(instances) - set(delete_instances))
# keep the original order of instances
reserve_instances.sort(key=instances.index)
else:
reserve_instances = instances
for index, instance in enumerate(reserve_instances):
quorum.append("server.%s=%s:2888:3888" % (index, instance))
return '\n'.join(quorum)
def _push_zk_configs_to_nodes(cluster, zk_conf, to_delete_instances=None):
instances = vu.get_zk_servers(cluster)
if to_delete_instances:
for instance in to_delete_instances:
if instance in instances:
instances.remove(instance)
for index, instance in enumerate(instances):
with instance.remote() as r:
r.write_file_to('/opt/zookeeper/conf/zoo.cfg', zk_conf,
run_as_root=True)
r.execute_command(
'sudo su - -c "echo %s > /var/zookeeper/myid" hadoop' % index)
def configure_spark(cluster):
extra = _extract_spark_configs_to_extra(cluster)
_push_spark_configs_to_node(cluster, extra)
spark_servers = vu.get_spark_history_server(cluster)
if spark_servers:
extra = _extract_spark_configs_to_extra(cluster)
_push_spark_configs_to_node(cluster, extra)
def _push_spark_configs_to_node(cluster, extra):

View File

@ -26,7 +26,6 @@ from sahara.utils import types
CONF = cfg.CONF
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
HIDDEN_CONFS = [
'dfs.hosts',
'dfs.hosts.exclude',
@ -132,6 +131,33 @@ SPARK_CONFS = {
}
}
ZOOKEEPER_CONFS = {
"ZooKeeper": {
"OPTIONS": [
{
'name': 'tickTime',
'description': 'The number of milliseconds of each tick',
'default': 2000,
'priority': 2,
},
{
'name': 'initLimit',
'description': 'The number of ticks that the initial'
' synchronization phase can take',
'default': 10,
'priority': 2,
},
{
'name': 'syncLimit',
'description': 'The number of ticks that can pass between'
' sending a request and getting an acknowledgement',
'default': 5,
'priority': 2,
},
]
}
}
# for now we have not so many cluster-wide configs
# lets consider all of them having high priority
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
@ -305,3 +331,17 @@ def generate_job_cleanup_config(cluster):
def get_spark_home(cluster):
return utils.get_config_value_or_default("Spark", "Spark home", cluster)
def generate_zk_basic_config(cluster):
args = {
'ticktime': utils.get_config_value_or_default(
"ZooKeeper", "tickTime", cluster),
'initlimit': utils.get_config_value_or_default(
"ZooKeeper", "initLimit", cluster),
'synclimit': utils.get_config_value_or_default(
"ZooKeeper", "syncLimit", cluster)
}
zoo_cfg = f.get_file_text(
'plugins/vanilla/hadoop2/resources/zoo_sample.cfg')
return zoo_cfg.format(**args)

View File

@ -0,0 +1,29 @@
# The number of milliseconds of each tick
tickTime={ticktime}
# The number of ticks that the initial
# synchronization phase can take
initLimit={initlimit}
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit={synclimit}
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/var/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

View File

@ -115,6 +115,46 @@ def start_spark_history_server(master):
sp_home, 'sbin/start-history-server.sh'))
def start_zk_server(instances):
cpo.add_provisioning_step(
instances[0].cluster_id,
pu.start_process_event_message("ZooKeeper"),
len(instances))
with context.ThreadGroup() as tg:
for instance in instances:
with context.set_current_instance_id(instance.instance_id):
tg.spawn('ZK-start-processes-%s' % instance.instance_name,
_start_zk_processes, instance, 'start')
def refresh_zk_servers(cluster, to_delete_instances=None):
instances = vu.get_zk_servers(cluster)
if to_delete_instances:
for instance in to_delete_instances:
if instance in instances:
instances.remove(instance)
cpo.add_provisioning_step(
cluster.id,
pu.start_process_event_message("ZooKeeper"),
len(instances))
with context.ThreadGroup() as tg:
for instance in instances:
with context.set_current_instance_id(instance.instance_id):
tg.spawn('ZK-restart-processes-%s' % instance.instance_name,
_start_zk_processes, instance, 'restart')
@cpo.event_wrapper(True)
def _start_zk_processes(instance, operation):
with instance.remote() as r:
r.execute_command(
'sudo su - -c "bash /opt/zookeeper/bin/zkServer.sh %s"'
' hadoop' % operation)
def format_namenode(instance):
instance.remote().execute_command(
'sudo su - -c "hdfs namenode -format" hadoop')

View File

@ -39,6 +39,8 @@ def scale_cluster(pctx, cluster, instances):
config.configure_topology_data(pctx, cluster)
run.start_dn_nm_processes(instances)
swift_helper.install_ssl_certs(instances)
config.configure_zookeeper(cluster)
run.refresh_zk_servers(cluster)
def _get_instances_with_service(instances, service):
@ -87,6 +89,9 @@ def decommission_nodes(pctx, cluster, instances):
run.refresh_hadoop_nodes(cluster)
config.configure_topology_data(pctx, cluster)
config.configure_zookeeper(cluster, instances)
# TODO(shuyingya):should invent a way to lastly restart the leader node
run.refresh_zk_servers(cluster, instances)
def _update_exclude_files(cluster, instances):

View File

@ -77,3 +77,9 @@ def start_spark(cluster):
spark = vu.get_spark_history_server(cluster)
if spark:
run.start_spark_history_server(spark)
def start_zookeeper(cluster):
zk_servers = vu.get_zk_servers(cluster)
if zk_servers:
run.start_zk_server(zk_servers)

View File

@ -83,6 +83,12 @@ def validate_cluster_creating(pctx, cluster):
raise ex.InvalidComponentCountException('hive', _('0 or 1'),
hive_count)
zk_count = _get_inst_count(cluster, 'zookeeper')
if zk_count > 0 and (zk_count % 2) != 1:
raise ex.InvalidComponentCountException(
'zookeeper', _('odd'), zk_count, _('Number of zookeeper nodes'
'should be in odd.'))
def validate_additional_ng_scaling(cluster, additional):
rm = vu.get_resourcemanager(cluster)
@ -125,8 +131,27 @@ def validate_existing_ng_scaling(pctx, cluster, existing):
cluster.name, msg % rep_factor)
def validate_zookeeper_node_count(zk_ng, existing, additional):
zk_amount = 0
for ng in zk_ng:
if ng.id in existing:
zk_amount += existing[ng.id]
else:
zk_amount += ng.count
for ng_id in additional:
ng = gu.get_by_id(zk_ng, ng_id)
if "zookeeper" in ng.node_processes:
zk_amount += ng.count
if (zk_amount % 2) != 1:
msg = _("Vanilla plugin cannot scale cluster because it must keep"
" zookeeper service in odd.")
raise ex.ClusterCannotBeScaled(zk_ng[0].cluster.name, msg)
def _get_scalable_processes():
return ['datanode', 'nodemanager']
return ['datanode', 'nodemanager', 'zookeeper']
def _get_inst_count(cluster, process):

View File

@ -60,6 +60,10 @@ def get_instance_hostname(instance):
return instance.hostname() if instance else None
def get_zk_servers(cluster):
return u.get_instances(cluster, 'zookeeper')
def generate_random_password():
password = uuidutils.generate_uuid()
return castellan.store_secret(password)

View File

@ -79,6 +79,7 @@ def _init_all_configs():
configs.extend(PLUGIN_ENV_CONFIGS)
configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS)
configs.extend(_get_spark_configs())
configs.extend(_get_zookeeper_configs())
return configs
@ -96,6 +97,20 @@ def _get_spark_configs():
return spark_configs
def _get_zookeeper_configs():
zk_configs = []
for service, config_items in six.iteritems(c_helper.ZOOKEEPER_CONFS):
for item in config_items['OPTIONS']:
cfg = p.Config(name=item["name"],
description=item["description"],
default_value=item["default"],
applicable_target=service,
scope="cluster", is_optional=True,
priority=item["priority"])
zk_configs.append(cfg)
return zk_configs
PLUGIN_CONFIGS = _init_all_configs()

View File

@ -56,7 +56,8 @@ class VersionHandler(avm.AbstractVersionHandler):
"YARN": ["resourcemanager", "nodemanager"],
"JobFlow": ["oozie"],
"Hive": ["hiveserver"],
"Spark": ["spark history server"]
"Spark": ["spark history server"],
"ZooKeeper": ["zookeeper"]
}
def validate(self, cluster):
@ -81,6 +82,7 @@ class VersionHandler(avm.AbstractVersionHandler):
s_scripts.start_historyserver(cluster)
s_scripts.start_oozie(self.pctx, cluster)
s_scripts.start_hiveserver(self.pctx, cluster)
s_scripts.start_zookeeper(cluster)
swift_helper.install_ssl_certs(cluster_utils.get_instances(cluster))
@ -93,6 +95,9 @@ class VersionHandler(avm.AbstractVersionHandler):
def validate_scaling(self, cluster, existing, additional):
vl.validate_additional_ng_scaling(cluster, additional)
vl.validate_existing_ng_scaling(self.pctx, cluster, existing)
zk_ng = utils.get_node_groups(cluster, "zookeeper")
if zk_ng:
vl.validate_zookeeper_node_count(zk_ng, existing, additional)
def scale_cluster(self, cluster, instances):
keypairs.provision_keypairs(cluster, instances)

View File

@ -191,3 +191,19 @@ class TestConfigHelper(base.SaharaTestCase):
get_config_value_or_default.assert_called_once_with('Spark',
'Spark home',
self.cluster)
@mock.patch('sahara.utils.files.get_file_text')
@mock.patch('sahara.plugins.utils.get_config_value_or_default')
def test_generate_zk_basic_config(self, get_config_value_or_default,
get_file_text):
key = ('tickTime={ticktime}\n' +
'initLimit={initlimit}\n' +
'syncLimit={synclimit}\n')
actual = 'tickTime=5\ninitLimit=5\nsyncLimit=5\n'
get_config_value_or_default.return_value = 5
get_file_text.return_value = key
ret = c_helper.generate_zk_basic_config(self.cluster)
self.assertEqual(get_config_value_or_default.call_count, 3)
self.assertEqual(ret, actual)

View File

@ -39,6 +39,8 @@ class ScalingTest(base.SaharaTestCase):
@mock.patch('sahara.swift.swift_helper.install_ssl_certs')
@mock.patch('sahara.plugins.vanilla.utils.get_resourcemanager')
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_zk_servers')
@mock.patch(PLUGINS_PATH + 'config.configure_zookeeper')
@mock.patch(PLUGINS_PATH + 'run_scripts.start_dn_nm_processes')
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_yarn_nodes')
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_hadoop_nodes')
@ -51,6 +53,7 @@ class ScalingTest(base.SaharaTestCase):
refresh_hadoop_nodes,
refresh_yarn_nodes,
start_dn_nm_processes,
configure_zk, refresh_zk,
get_resourcemanager,
install_ssl_certs):
get_resourcemanager.return_value = 'node1'
@ -64,6 +67,9 @@ class ScalingTest(base.SaharaTestCase):
configure_topology_data.assert_called_once_with(pctx, self.cluster)
start_dn_nm_processes.assert_called_once_with(self.instances)
install_ssl_certs.assert_called_once_with(self.instances)
configure_topology_data.assert_called_once_with(pctx, self.cluster)
configure_zk.assert_called_once_with(self.cluster)
refresh_zk.assert_called_once_with(self.cluster)
def test_get_instances_with_service(self):
ins_1 = mock.Mock()
@ -115,6 +121,8 @@ class ScalingTest(base.SaharaTestCase):
self.r.execute_command.assert_has_calls(command_calls, any_order=True)
@mock.patch('sahara.plugins.vanilla.utils.get_resourcemanager')
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_zk_servers')
@mock.patch(PLUGINS_PATH + 'config.configure_zookeeper')
@mock.patch(PLUGINS_PATH + 'config.configure_topology_data')
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_yarn_nodes')
@mock.patch(PLUGINS_PATH + 'run_scripts.refresh_hadoop_nodes')
@ -130,6 +138,7 @@ class ScalingTest(base.SaharaTestCase):
_update_include_files, _clear_exclude_files,
_update_exclude_files, refresh_hadoop_nodes,
refresh_yarn_nodes, configure_topology_data,
configure_zk, refresh_zk,
get_resourcemanager):
data = 'test_data'
_get_instances_with_service.return_value = data
@ -152,6 +161,8 @@ class ScalingTest(base.SaharaTestCase):
self.instances)
_clear_exclude_files.assert_called_once_with(self.cluster)
configure_topology_data.assert_called_once_with(pctx, self.cluster)
configure_zk.assert_called_once_with(self.cluster, self.instances)
refresh_zk.assert_called_once_with(self.cluster, self.instances)
@mock.patch(PLUGINS_PATH + 'scaling._get_instances_with_service')
@mock.patch('sahara.plugins.utils.generate_fqdn_host_names')

View File

@ -89,6 +89,13 @@ class ValidationTest(base.SaharaTestCase):
with testtools.ExpectedException(ex.InvalidComponentCountException):
self._validate_case(1, 1, 0, 0, 3, 0, 0, 0, 2)
self.ng.append(tu.make_ng_dict("zk", "f1", ["zookeeper"], 0))
self._validate_case(1, 1, 0, 0, 3, 0, 0, 1, 1, 3)
with testtools.ExpectedException(ex.InvalidComponentCountException):
self._validate_case(1, 1, 0, 0, 3, 0, 0, 1, 1, 2)
def _create_cluster(self, *args, **kwargs):
lst = []
for i in range(0, len(args)):

View File

@ -33,7 +33,10 @@ class TestConfigHelper(base.SaharaTestCase):
@mock.patch(plugin_path + 'config_helper.PLUGIN_ENV_CONFIGS')
@mock.patch(plugin_path + 'config_helper.PLUGIN_XML_CONFIGS')
@mock.patch(plugin_path + 'config_helper._get_spark_configs')
def test_init_all_configs(self, _get_spark_configs,
@mock.patch(plugin_path + 'config_helper._get_zookeeper_configs')
def test_init_all_configs(self,
_get_zk_configs,
_get_spark_configs,
PLUGIN_XML_CONFIGS,
PLUGIN_ENV_CONFIGS,
PLUGIN_GENERAL_CONFIGS):
@ -42,6 +45,7 @@ class TestConfigHelper(base.SaharaTestCase):
configs.extend(PLUGIN_ENV_CONFIGS)
configs.extend(PLUGIN_GENERAL_CONFIGS)
configs.extend(_get_spark_configs())
configs.extend(_get_zk_configs())
init_configs = v_helper._init_all_configs()
self.assertEqual(init_configs, configs)

View File

@ -15,7 +15,10 @@
import mock
import six
import testtools
from sahara.conductor import resource as r
from sahara.plugins import exceptions as ex
from sahara.plugins.vanilla.v2_7_1.edp_engine import EdpOozieEngine
from sahara.plugins.vanilla.v2_7_1.edp_engine import EdpSparkEngine
from sahara.plugins.vanilla.v2_7_1 import versionhandler as v_h
@ -110,11 +113,12 @@ class VersionHandlerTest(base.SaharaTestCase):
cluster,
instances)
@mock.patch('sahara.utils.general.get_by_id')
@mock.patch(plugin_hadoop2_path +
'validation.validate_additional_ng_scaling')
@mock.patch(plugin_hadoop2_path +
'validation.validate_existing_ng_scaling')
def test_validate_scaling(self, vls, vla):
def test_validate_scaling(self, vls, vla, get_by_id):
self.vh.pctx['all_confs'] = [TestConfig('HDFS', 'dfs.replication', -1)]
ng1 = testutils.make_ng_dict('ng1', '40', ['namenode'], 1)
ng2 = testutils.make_ng_dict('ng2', '41', ['datanode'], 2)
@ -127,6 +131,21 @@ class VersionHandlerTest(base.SaharaTestCase):
vla.assert_called_once_with(cluster, additional)
vls.assert_called_once_with(self.vh.pctx, cluster, existing)
ng4 = testutils.make_ng_dict('ng4', '43', ['datanode', 'zookeeper'], 3)
ng5 = testutils.make_ng_dict('ng5', '44', ['datanode', 'zookeeper'], 1)
existing = {ng4['id']: 2}
additional = {ng5['id']}
cluster = testutils.create_cluster('test-cluster', 'tenant1', 'fake',
'0.1', [ng1, ng4])
with testtools.ExpectedException(ex.ClusterCannotBeScaled):
self.vh.validate_scaling(cluster, existing, {})
get_by_id.return_value = r.NodeGroupResource(ng5)
with testtools.ExpectedException(ex.ClusterCannotBeScaled):
self.vh.validate_scaling(cluster, {}, additional)
@mock.patch(plugin_hadoop2_path + 'scaling.scale_cluster')
@mock.patch(plugin_hadoop2_path + 'keypairs.provision_keypairs')
def test_scale_cluster(self, provision_keypairs, scale_cluster):