Add support to deploy hadoop 2.7.5

Add hadoop 2.7.5 deployment script into vanilla plugin.

Change-Id: I8f3b4a447d8b76e5a1e3f88e1e2c7f009b433bb6
This commit is contained in:
Zhuang Changkun 2018-06-01 17:30:27 +08:00
parent 4f074856b6
commit 4c5fab8e37
17 changed files with 18274 additions and 0 deletions

View File

@ -0,0 +1,4 @@
---
features:
- |
Support deploy hadoop 2.7.5 with vanilla plugin.

View File

@ -0,0 +1,151 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from oslo_config import cfg
import six
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
from sahara.utils import xmlutils as x
CONF = cfg.CONF
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
CORE_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/v2_7_5/resources/core-default.xml')
HDFS_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/v2_7_5/resources/hdfs-default.xml')
MAPRED_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')
YARN_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/v2_7_5/resources/yarn-default.xml')
OOZIE_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/v2_7_5/resources/oozie-default.xml')
HIVE_DEFAULT = x.load_hadoop_xml_defaults(
'plugins/vanilla/v2_7_5/resources/hive-default.xml')
_default_executor_classpath = ":".join(
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.5.jar'])
SPARK_CONFS = copy.deepcopy(c_helper.SPARK_CONFS)
SPARK_CONFS['Spark']['OPTIONS'].append(
{
'name': 'Executor extra classpath',
'description': 'Value for spark.executor.extraClassPath'
' in spark-defaults.conf'
' (default: %s)' % _default_executor_classpath,
'default': '%s' % _default_executor_classpath,
'priority': 2,
}
)
XML_CONFS = {
"Hadoop": [CORE_DEFAULT],
"HDFS": [HDFS_DEFAULT],
"YARN": [YARN_DEFAULT],
"MapReduce": [MAPRED_DEFAULT],
"JobFlow": [OOZIE_DEFAULT],
"Hive": [HIVE_DEFAULT]
}
ENV_CONFS = {
"YARN": {
'ResourceManager Heap Size': 1024,
'NodeManager Heap Size': 1024
},
"HDFS": {
'NameNode Heap Size': 1024,
'SecondaryNameNode Heap Size': 1024,
'DataNode Heap Size': 1024
},
"MapReduce": {
'JobHistoryServer Heap Size': 1024
},
"JobFlow": {
'Oozie Heap Size': 1024
}
}
# Initialise plugin Hadoop configurations
PLUGIN_XML_CONFIGS = c_helper.init_xml_configs(XML_CONFS)
PLUGIN_ENV_CONFIGS = c_helper.init_env_configs(ENV_CONFS)
def _init_all_configs():
configs = []
configs.extend(PLUGIN_XML_CONFIGS)
configs.extend(PLUGIN_ENV_CONFIGS)
configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS)
configs.extend(_get_spark_configs())
configs.extend(_get_zookeeper_configs())
return configs
def _get_spark_opt_default(opt_name):
for opt in SPARK_CONFS["Spark"]["OPTIONS"]:
if opt_name == opt["name"]:
return opt["default"]
return None
def _get_spark_configs():
spark_configs = []
for service, config_items in six.iteritems(SPARK_CONFS):
for item in config_items['OPTIONS']:
cfg = p.Config(name=item["name"],
description=item["description"],
default_value=item["default"],
applicable_target=service,
scope="cluster", is_optional=True,
priority=item["priority"])
spark_configs.append(cfg)
return spark_configs
def _get_zookeeper_configs():
zk_configs = []
for service, config_items in six.iteritems(c_helper.ZOOKEEPER_CONFS):
for item in config_items['OPTIONS']:
cfg = p.Config(name=item["name"],
description=item["description"],
default_value=item["default"],
applicable_target=service,
scope="cluster", is_optional=True,
priority=item["priority"])
zk_configs.append(cfg)
return zk_configs
PLUGIN_CONFIGS = _init_all_configs()
def get_plugin_configs():
return PLUGIN_CONFIGS
def get_xml_configs():
return PLUGIN_XML_CONFIGS
def get_env_configs():
return ENV_CONFS

View File

@ -0,0 +1,81 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.plugins import utils as plugin_utils
from sahara.plugins.vanilla import confighints_helper as ch_helper
from sahara.plugins.vanilla.hadoop2 import edp_engine
from sahara.plugins.vanilla import utils as v_utils
from sahara.service.edp.spark import engine as edp_spark_engine
from sahara.utils import edp
class EdpOozieEngine(edp_engine.EdpOozieEngine):
@staticmethod
def get_possible_job_config(job_type):
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):
return {'job_config': ch_helper.get_possible_hive_config_from(
'plugins/vanilla/v2_7_5/resources/hive-default.xml')}
if edp.compare_job_type(job_type,
edp.JOB_TYPE_MAPREDUCE,
edp.JOB_TYPE_MAPREDUCE_STREAMING):
return {'job_config': ch_helper.get_possible_mapreduce_config_from(
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')}
if edp.compare_job_type(job_type, edp.JOB_TYPE_PIG):
return {'job_config': ch_helper.get_possible_pig_config_from(
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')}
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
edp_base_version = "2.7.5"
def __init__(self, cluster):
super(EdpSparkEngine, self).__init__(cluster)
self.master = plugin_utils.get_instance(cluster,
"spark history server")
self.plugin_params["spark-user"] = "sudo -u hadoop "
self.plugin_params["spark-submit"] = os.path.join(
plugin_utils.get_config_value_or_default(
"Spark", "Spark home", self.cluster),
"bin/spark-submit")
self.plugin_params["deploy-mode"] = "cluster"
self.plugin_params["master"] = "yarn"
driver_cp = plugin_utils.get_config_value_or_default(
"Spark", "Executor extra classpath", self.cluster)
self.plugin_params["driver-class-path"] = driver_cp
@staticmethod
def edp_supported(version):
return version >= EdpSparkEngine.edp_base_version
@staticmethod
def job_type_supported(job_type):
return (job_type in
edp_spark_engine.SparkJobEngine.get_supported_job_types())
def validate_job_execution(self, cluster, job, data):
if (not self.edp_supported(cluster.hadoop_version) or
not v_utils.get_spark_history_server(cluster)):
raise ex.InvalidDataException(
_('Spark {base} or higher required to run {type} jobs').format(
base=EdpSparkEngine.edp_base_version, type=job.type))
super(EdpSparkEngine, self).validate_job_execution(cluster, job, data)

View File

@ -0,0 +1,27 @@
Apache Hadoop Configurations for Sahara
=======================================
This directory contains default XML configuration files:
* core-default.xml
* hdfs-default.xml
* mapred-default.xml
* yarn-default.xml
* oozie-default.xml
* hive-default.xml
These files are applied for Sahara's plugin of Apache Hadoop version 2.7.5
Files were taken from here:
* `core-default.xml <https://github.com/apache/hadoop/tree/branch-2.7.5/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml>`_
* `hdfs-default.xml <https://github.com/apache/hadoop/tree/branch-2.7.5/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml>`_
* `yarn-default.xml <https://github.com/apache/hadoop/tree/branch-2.7.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml>`_
* `mapred-default.xml <https://github.com/apache/hadoop/tree/branch-2.7.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml>`_
* `oozie-default.xml <https://github.com/apache/oozie/blob/release-4.3.0/core/src/main/resources/oozie-default.xml>`_
XML configs are used to expose default Hadoop configurations to the users
through Sahara's REST API. It allows users to override some config values which
will be pushed to the provisioned VMs running Hadoop services as part of
appropriate xml config.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,9 @@
CREATE DATABASE metastore;
USE metastore;
SOURCE /opt/hive/scripts/metastore/upgrade/mysql/hive-schema-2.3.0.mysql.sql;
CREATE USER 'hive'@'localhost' IDENTIFIED BY '{{password}}';
REVOKE ALL PRIVILEGES, GRANT OPTION FROM 'hive'@'localhost';
GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'localhost' IDENTIFIED BY '{{password}}';
GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'%' IDENTIFIED BY '{{password}}';
FLUSH PRIVILEGES;
exit

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,170 @@
# Copyright (c) 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from sahara import conductor
from sahara import context
from sahara.plugins import utils
from sahara.plugins.vanilla import abstractversionhandler as avm
from sahara.plugins.vanilla.hadoop2 import config as c
from sahara.plugins.vanilla.hadoop2 import keypairs
from sahara.plugins.vanilla.hadoop2 import recommendations_utils as ru
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
from sahara.plugins.vanilla.hadoop2 import scaling as sc
from sahara.plugins.vanilla.hadoop2 import starting_scripts as s_scripts
from sahara.plugins.vanilla.hadoop2 import utils as u
from sahara.plugins.vanilla.hadoop2 import validation as vl
from sahara.plugins.vanilla import utils as vu
from sahara.plugins.vanilla.v2_7_5 import config_helper as c_helper
from sahara.plugins.vanilla.v2_7_5 import edp_engine
from sahara.swift import swift_helper
from sahara.utils import cluster as cluster_utils
conductor = conductor.API
CONF = cfg.CONF
class VersionHandler(avm.AbstractVersionHandler):
def __init__(self):
self.pctx = {
'env_confs': c_helper.get_env_configs(),
'all_confs': c_helper.get_plugin_configs()
}
def get_plugin_configs(self):
return self.pctx['all_confs']
def get_node_processes(self):
return {
"Hadoop": [],
"MapReduce": ["historyserver"],
"HDFS": ["namenode", "datanode", "secondarynamenode"],
"YARN": ["resourcemanager", "nodemanager"],
"JobFlow": ["oozie"],
"Hive": ["hiveserver"],
"Spark": ["spark history server"],
"ZooKeeper": ["zookeeper"]
}
def validate(self, cluster):
vl.validate_cluster_creating(self.pctx, cluster)
def update_infra(self, cluster):
pass
def configure_cluster(self, cluster):
c.configure_cluster(self.pctx, cluster)
def start_cluster(self, cluster):
keypairs.provision_keypairs(cluster)
s_scripts.start_namenode(cluster)
s_scripts.start_secondarynamenode(cluster)
s_scripts.start_resourcemanager(cluster)
run.start_dn_nm_processes(utils.get_instances(cluster))
run.await_datanodes(cluster)
s_scripts.start_historyserver(cluster)
s_scripts.start_oozie(self.pctx, cluster)
s_scripts.start_hiveserver(self.pctx, cluster)
s_scripts.start_zookeeper(cluster)
swift_helper.install_ssl_certs(cluster_utils.get_instances(cluster))
self._set_cluster_info(cluster)
s_scripts.start_spark(cluster)
def decommission_nodes(self, cluster, instances):
sc.decommission_nodes(self.pctx, cluster, instances)
def validate_scaling(self, cluster, existing, additional):
vl.validate_additional_ng_scaling(cluster, additional)
vl.validate_existing_ng_scaling(self.pctx, cluster, existing)
zk_ng = utils.get_node_groups(cluster, "zookeeper")
if zk_ng:
vl.validate_zookeeper_node_count(zk_ng, existing, additional)
def scale_cluster(self, cluster, instances):
keypairs.provision_keypairs(cluster, instances)
sc.scale_cluster(self.pctx, cluster, instances)
def _set_cluster_info(self, cluster):
nn = vu.get_namenode(cluster)
rm = vu.get_resourcemanager(cluster)
hs = vu.get_historyserver(cluster)
oo = vu.get_oozie(cluster)
sp = vu.get_spark_history_server(cluster)
info = {}
if rm:
info['YARN'] = {
'Web UI': 'http://%s:%s' % (rm.get_ip_or_dns_name(), '8088'),
'ResourceManager': 'http://%s:%s' % (
rm.get_ip_or_dns_name(), '8032')
}
if nn:
info['HDFS'] = {
'Web UI': 'http://%s:%s' % (nn.get_ip_or_dns_name(), '50070'),
'NameNode': 'hdfs://%s:%s' % (nn.hostname(), '9000')
}
if oo:
info['JobFlow'] = {
'Oozie': 'http://%s:%s' % (oo.get_ip_or_dns_name(), '11000')
}
if hs:
info['MapReduce JobHistory Server'] = {
'Web UI': 'http://%s:%s' % (hs.get_ip_or_dns_name(), '19888')
}
if sp:
info['Apache Spark'] = {
'Spark UI': 'http://%s:%s' % (sp.management_ip, '4040'),
'Spark History Server UI':
'http://%s:%s' % (sp.management_ip, '18080')
}
ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {'info': info})
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
if job_type in edp_engine.EdpSparkEngine.get_supported_job_types():
return edp_engine.EdpSparkEngine(cluster)
return None
def get_edp_job_types(self):
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
edp_engine.EdpSparkEngine.get_supported_job_types())
def get_edp_config_hints(self, job_type):
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
def on_terminate_cluster(self, cluster):
u.delete_oozie_password(cluster)
keypairs.drop_key(cluster)
def get_open_ports(self, node_group):
return c.get_open_ports(node_group)
def recommend_configs(self, cluster, scaling):
ru.recommend_configs(cluster, self.get_plugin_configs(), scaling)

View File

@ -0,0 +1,73 @@
# Copyright (c) 2017 EasyStack Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla.v2_7_5 import config_helper as v_helper
from sahara.tests.unit import base
class TestConfigHelper(base.SaharaTestCase):
plugin_path = 'sahara.plugins.vanilla.v2_7_5.'
plugin_hadoop_path = 'sahara.plugins.vanilla.hadoop2.'
def setUp(self):
super(TestConfigHelper, self).setUp()
@mock.patch(plugin_hadoop_path + 'config_helper.PLUGIN_GENERAL_CONFIGS')
@mock.patch(plugin_path + 'config_helper.PLUGIN_ENV_CONFIGS')
@mock.patch(plugin_path + 'config_helper.PLUGIN_XML_CONFIGS')
@mock.patch(plugin_path + 'config_helper._get_spark_configs')
@mock.patch(plugin_path + 'config_helper._get_zookeeper_configs')
def test_init_all_configs(self,
_get_zk_configs,
_get_spark_configs,
PLUGIN_XML_CONFIGS,
PLUGIN_ENV_CONFIGS,
PLUGIN_GENERAL_CONFIGS):
configs = []
configs.extend(PLUGIN_XML_CONFIGS)
configs.extend(PLUGIN_ENV_CONFIGS)
configs.extend(PLUGIN_GENERAL_CONFIGS)
configs.extend(_get_spark_configs())
configs.extend(_get_zk_configs())
init_configs = v_helper._init_all_configs()
self.assertEqual(init_configs, configs)
def test_get_spark_opt_default(self):
opt_name = 'Executor extra classpath'
_default_executor_classpath = ":".join(
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.5.jar'])
default = v_helper._get_spark_opt_default(opt_name)
self.assertEqual(default, _default_executor_classpath)
def test_get_spark_configs(self):
spark_configs = v_helper._get_spark_configs()
for i in spark_configs:
self.assertIsInstance(i, p.Config)
def test_get_plugin_configs(self):
self.assertEqual(v_helper.get_plugin_configs(),
v_helper.PLUGIN_CONFIGS)
def test_get_xml_configs(self):
self.assertEqual(v_helper.get_xml_configs(),
v_helper.PLUGIN_XML_CONFIGS)
def test_get_env_configs(self):
self.assertEqual(v_helper.get_env_configs(),
v_helper.ENV_CONFS)

View File

@ -0,0 +1,96 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.vanilla.v2_7_5 import edp_engine
from sahara.tests.unit import base as sahara_base
from sahara.utils import edp
class Vanilla2ConfigHintsTest(sahara_base.SaharaTestCase):
@mock.patch(
'sahara.plugins.vanilla.confighints_helper.'
'get_possible_hive_config_from',
return_value={})
def test_get_possible_job_config_hive(
self, get_possible_hive_config_from):
expected_config = {'job_config': {}}
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_HIVE)
get_possible_hive_config_from.assert_called_once_with(
'plugins/vanilla/v2_7_5/resources/hive-default.xml')
self.assertEqual(expected_config, actual_config)
@mock.patch('sahara.plugins.vanilla.hadoop2.edp_engine.EdpOozieEngine')
def test_get_possible_job_config_java(self, BaseVanillaEdpOozieEngine):
expected_config = {'job_config': {}}
BaseVanillaEdpOozieEngine.get_possible_job_config.return_value = (
expected_config)
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_JAVA)
(BaseVanillaEdpOozieEngine.get_possible_job_config.
assert_called_once_with(edp.JOB_TYPE_JAVA))
self.assertEqual(expected_config, actual_config)
@mock.patch(
'sahara.plugins.vanilla.confighints_helper.'
'get_possible_mapreduce_config_from',
return_value={})
def test_get_possible_job_config_mapreduce(
self, get_possible_mapreduce_config_from):
expected_config = {'job_config': {}}
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_MAPREDUCE)
get_possible_mapreduce_config_from.assert_called_once_with(
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')
self.assertEqual(expected_config, actual_config)
@mock.patch(
'sahara.plugins.vanilla.confighints_helper.'
'get_possible_mapreduce_config_from',
return_value={})
def test_get_possible_job_config_mapreduce_streaming(
self, get_possible_mapreduce_config_from):
expected_config = {'job_config': {}}
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_MAPREDUCE_STREAMING)
get_possible_mapreduce_config_from.assert_called_once_with(
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')
self.assertEqual(expected_config, actual_config)
@mock.patch(
'sahara.plugins.vanilla.confighints_helper.'
'get_possible_pig_config_from',
return_value={})
def test_get_possible_job_config_pig(
self, get_possible_pig_config_from):
expected_config = {'job_config': {}}
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_PIG)
get_possible_pig_config_from.assert_called_once_with(
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')
self.assertEqual(expected_config, actual_config)
@mock.patch('sahara.plugins.vanilla.hadoop2.edp_engine.EdpOozieEngine')
def test_get_possible_job_config_shell(self, BaseVanillaEdpOozieEngine):
expected_config = {'job_config': {}}
BaseVanillaEdpOozieEngine.get_possible_job_config.return_value = (
expected_config)
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_SHELL)
(BaseVanillaEdpOozieEngine.get_possible_job_config.
assert_called_once_with(edp.JOB_TYPE_SHELL))
self.assertEqual(expected_config, actual_config)

View File

@ -0,0 +1,253 @@
# Copyright (c) 2017 EasyStack Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import six
import testtools
from sahara.conductor import resource as r
from sahara.plugins import exceptions as ex
from sahara.plugins.vanilla.v2_7_5.edp_engine import EdpOozieEngine
from sahara.plugins.vanilla.v2_7_5.edp_engine import EdpSparkEngine
from sahara.plugins.vanilla.v2_7_5 import versionhandler as v_h
from sahara.tests.unit import base
from sahara.tests.unit import testutils
class TestConfig(object):
def __init__(self, applicable_target, name, default_value):
self.applicable_target = applicable_target
self.name = name
self.default_value = default_value
class VersionHandlerTest(base.SaharaTestCase):
plugin_path = 'sahara.plugins.vanilla.'
plugin_hadoop2_path = 'sahara.plugins.vanilla.hadoop2.'
def setUp(self):
super(VersionHandlerTest, self).setUp()
self.cluster = mock.Mock()
self.vh = v_h.VersionHandler()
def test_get_plugin_configs(self):
self.vh.pctx['all_confs'] = 'haha'
conf = self.vh.get_plugin_configs()
self.assertEqual(conf, 'haha')
def test_get_node_processes(self):
processes = self.vh.get_node_processes()
for k, v in six.iteritems(processes):
for p in v:
self.assertIsInstance(p, str)
@mock.patch(plugin_hadoop2_path +
'validation.validate_cluster_creating')
def test_validate(self, validate_create):
self.vh.pctx = mock.Mock()
self.vh.validate(self.cluster)
validate_create.assert_called_once_with(self.vh.pctx,
self.cluster)
@mock.patch(plugin_path +
'v2_7_5.versionhandler.VersionHandler.update_infra')
def test_update_infra(self, update_infra):
self.vh.update_infra(self.cluster)
update_infra.assert_called_once_with(self.cluster)
@mock.patch(plugin_hadoop2_path + 'config.configure_cluster')
def test_configure_cluster(self, configure_cluster):
self.vh.pctx = mock.Mock()
self.vh.configure_cluster(self.cluster)
configure_cluster.assert_called_once_with(self.vh.pctx, self.cluster)
@mock.patch(plugin_path + 'v2_7_5.versionhandler.run')
@mock.patch(plugin_path + 'v2_7_5.versionhandler.s_scripts')
@mock.patch('sahara.swift.swift_helper.install_ssl_certs')
@mock.patch(plugin_hadoop2_path + 'keypairs.provision_keypairs')
@mock.patch('sahara.plugins.utils.get_instances')
@mock.patch('sahara.utils.cluster.get_instances')
def test_start_cluster(self, c_get_instances, u_get_instances,
provision_keypairs, install_ssl_certs,
s_scripts, run):
self.vh.pctx = mock.Mock()
instances = mock.Mock()
c_get_instances.return_value = instances
u_get_instances.return_value = instances
self.vh._set_cluster_info = mock.Mock()
self.vh.start_cluster(self.cluster)
provision_keypairs.assert_called_once_with(self.cluster)
s_scripts.start_namenode.assert_called_once_with(self.cluster)
s_scripts.start_secondarynamenode.assert_called_once_with(self.cluster)
s_scripts.start_resourcemanager.assert_called_once_with(self.cluster)
s_scripts.start_historyserver.assert_called_once_with(self.cluster)
s_scripts.start_oozie.assert_called_once_with(self.vh.pctx,
self.cluster)
s_scripts.start_hiveserver.assert_called_once_with(self.vh.pctx,
self.cluster)
s_scripts.start_spark.assert_called_once_with(self.cluster)
run.start_dn_nm_processes.assert_called_once_with(instances)
run.await_datanodes.assert_called_once_with(self.cluster)
install_ssl_certs.assert_called_once_with(instances)
self.vh._set_cluster_info.assert_called_once_with(self.cluster)
@mock.patch(plugin_hadoop2_path + 'scaling.decommission_nodes')
def test_decommission_nodes(self, decommission_nodes):
self.vh.pctx = mock.Mock()
cluster = mock.Mock()
instances = mock.Mock()
self.vh.decommission_nodes(cluster, instances)
decommission_nodes.assert_called_once_with(self.vh.pctx,
cluster,
instances)
@mock.patch('sahara.utils.general.get_by_id')
@mock.patch(plugin_hadoop2_path +
'validation.validate_additional_ng_scaling')
@mock.patch(plugin_hadoop2_path +
'validation.validate_existing_ng_scaling')
def test_validate_scaling(self, vls, vla, get_by_id):
self.vh.pctx['all_confs'] = [TestConfig('HDFS', 'dfs.replication', -1)]
ng1 = testutils.make_ng_dict('ng1', '40', ['namenode'], 1)
ng2 = testutils.make_ng_dict('ng2', '41', ['datanode'], 2)
ng3 = testutils.make_ng_dict('ng3', '42', ['datanode'], 3)
additional = [ng2['id'], ng3['id']]
existing = {ng2['id']: 1}
cluster = testutils.create_cluster('test-cluster', 'tenant1', 'fake',
'0.1', [ng1, ng2, ng3])
self.vh.validate_scaling(cluster, existing, additional)
vla.assert_called_once_with(cluster, additional)
vls.assert_called_once_with(self.vh.pctx, cluster, existing)
ng4 = testutils.make_ng_dict('ng4', '43', ['datanode', 'zookeeper'], 3)
ng5 = testutils.make_ng_dict('ng5', '44', ['datanode', 'zookeeper'], 1)
existing = {ng4['id']: 2}
additional = {ng5['id']}
cluster = testutils.create_cluster('test-cluster', 'tenant1', 'fake',
'0.1', [ng1, ng4])
with testtools.ExpectedException(ex.ClusterCannotBeScaled):
self.vh.validate_scaling(cluster, existing, {})
get_by_id.return_value = r.NodeGroupResource(ng5)
with testtools.ExpectedException(ex.ClusterCannotBeScaled):
self.vh.validate_scaling(cluster, {}, additional)
@mock.patch(plugin_hadoop2_path + 'scaling.scale_cluster')
@mock.patch(plugin_hadoop2_path + 'keypairs.provision_keypairs')
def test_scale_cluster(self, provision_keypairs, scale_cluster):
self.vh.pctx = mock.Mock()
instances = mock.Mock()
self.vh.scale_cluster(self.cluster, instances)
provision_keypairs.assert_called_once_with(self.cluster,
instances)
scale_cluster.assert_called_once_with(self.vh.pctx,
self.cluster,
instances)
@mock.patch("sahara.conductor.API.cluster_update")
@mock.patch("sahara.context.ctx")
@mock.patch(plugin_path + 'utils.get_namenode')
@mock.patch(plugin_path + 'utils.get_resourcemanager')
@mock.patch(plugin_path + 'utils.get_historyserver')
@mock.patch(plugin_path + 'utils.get_oozie')
@mock.patch(plugin_path + 'utils.get_spark_history_server')
def test_set_cluster_info(self, get_spark_history_server, get_oozie,
get_historyserver, get_resourcemanager,
get_namenode, ctx, cluster_update):
get_spark_history_server.return_value.management_ip = '1.2.3.0'
get_oozie.return_value.get_ip_or_dns_name = mock.Mock(
return_value='1.2.3.1')
get_historyserver.return_value.get_ip_or_dns_name = mock.Mock(
return_value='1.2.3.2')
get_resourcemanager.return_value.get_ip_or_dns_name = mock.Mock(
return_value='1.2.3.3')
get_namenode.return_value.get_ip_or_dns_name = mock.Mock(
return_value='1.2.3.4')
get_namenode.return_value.hostname = mock.Mock(
return_value='testnode')
self.vh._set_cluster_info(self.cluster)
info = {'YARN': {
'Web UI': 'http://1.2.3.3:8088',
'ResourceManager': 'http://1.2.3.3:8032'
},
'HDFS': {
'Web UI': 'http://1.2.3.4:50070',
'NameNode': 'hdfs://testnode:9000'
},
'JobFlow': {
'Oozie': 'http://1.2.3.1:11000'
},
'MapReduce JobHistory Server': {
'Web UI': 'http://1.2.3.2:19888'
},
'Apache Spark': {
'Spark UI': 'http://1.2.3.0:4040',
'Spark History Server UI': 'http://1.2.3.0:18080'
}
}
cluster_update.assert_called_once_with(ctx(), self.cluster,
{'info': info})
@mock.patch("sahara.service.edp.job_utils.get_plugin")
@mock.patch('sahara.plugins.utils.get_instance')
@mock.patch('os.path.join')
def test_get_edp_engine(self, join, get_instance, get_plugin):
job_type = ''
ret = self.vh.get_edp_engine(self.cluster, job_type)
self.assertIsNone(ret)
job_type = 'Java'
ret = self.vh.get_edp_engine(self.cluster, job_type)
self.assertIsInstance(ret, EdpOozieEngine)
job_type = 'Spark'
ret = self.vh.get_edp_engine(self.cluster, job_type)
self.assertIsInstance(ret, EdpSparkEngine)
def test_get_edp_job_types(self):
job_types = ['Hive', 'Java', 'MapReduce',
'MapReduce.Streaming', 'Pig', 'Shell', 'Spark']
self.assertEqual(self.vh.get_edp_job_types(), job_types)
def test_get_edp_config_hints(self):
job_type = 'Java'
ret = {'job_config': {'args': [], 'configs': []}}
self.assertEqual(self.vh.get_edp_config_hints(job_type), ret)
@mock.patch(plugin_hadoop2_path + 'utils.delete_oozie_password')
@mock.patch(plugin_hadoop2_path + 'keypairs.drop_key')
def test_on_terminate_cluster(self, delete_oozie_password, drop_key):
self.vh.on_terminate_cluster(self.cluster)
delete_oozie_password.assert_called_once_with(self.cluster)
drop_key.assert_called_once_with(self.cluster)
@mock.patch(plugin_hadoop2_path + 'config.get_open_ports')
def test_get_open_ports(self, get_open_ports):
node_group = mock.Mock()
self.vh.get_open_ports(node_group)
get_open_ports.assert_called_once_with(node_group)
@mock.patch(plugin_hadoop2_path +
'recommendations_utils.recommend_configs')
def test_recommend_configs(self, recommend_configs):
scaling = mock.Mock()
configs = mock.Mock()
self.vh.pctx['all_confs'] = configs
self.vh.recommend_configs(self.cluster, scaling)
recommend_configs.assert_called_once_with(self.cluster,
configs,
scaling)