Add support to deploy hadoop 2.7.5
Add hadoop 2.7.5 deployment script into vanilla plugin. Change-Id: I8f3b4a447d8b76e5a1e3f88e1e2c7f009b433bb6
This commit is contained in:
parent
4f074856b6
commit
4c5fab8e37
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Support deploy hadoop 2.7.5 with vanilla plugin.
|
|
@ -0,0 +1,151 @@
|
|||
# Copyright (c) 2015 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
from sahara.plugins import provisioning as p
|
||||
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
|
||||
from sahara.utils import xmlutils as x
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
|
||||
|
||||
CORE_DEFAULT = x.load_hadoop_xml_defaults(
|
||||
'plugins/vanilla/v2_7_5/resources/core-default.xml')
|
||||
|
||||
HDFS_DEFAULT = x.load_hadoop_xml_defaults(
|
||||
'plugins/vanilla/v2_7_5/resources/hdfs-default.xml')
|
||||
|
||||
MAPRED_DEFAULT = x.load_hadoop_xml_defaults(
|
||||
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')
|
||||
|
||||
YARN_DEFAULT = x.load_hadoop_xml_defaults(
|
||||
'plugins/vanilla/v2_7_5/resources/yarn-default.xml')
|
||||
|
||||
OOZIE_DEFAULT = x.load_hadoop_xml_defaults(
|
||||
'plugins/vanilla/v2_7_5/resources/oozie-default.xml')
|
||||
|
||||
HIVE_DEFAULT = x.load_hadoop_xml_defaults(
|
||||
'plugins/vanilla/v2_7_5/resources/hive-default.xml')
|
||||
|
||||
_default_executor_classpath = ":".join(
|
||||
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.5.jar'])
|
||||
|
||||
SPARK_CONFS = copy.deepcopy(c_helper.SPARK_CONFS)
|
||||
|
||||
SPARK_CONFS['Spark']['OPTIONS'].append(
|
||||
{
|
||||
'name': 'Executor extra classpath',
|
||||
'description': 'Value for spark.executor.extraClassPath'
|
||||
' in spark-defaults.conf'
|
||||
' (default: %s)' % _default_executor_classpath,
|
||||
'default': '%s' % _default_executor_classpath,
|
||||
'priority': 2,
|
||||
}
|
||||
)
|
||||
|
||||
XML_CONFS = {
|
||||
"Hadoop": [CORE_DEFAULT],
|
||||
"HDFS": [HDFS_DEFAULT],
|
||||
"YARN": [YARN_DEFAULT],
|
||||
"MapReduce": [MAPRED_DEFAULT],
|
||||
"JobFlow": [OOZIE_DEFAULT],
|
||||
"Hive": [HIVE_DEFAULT]
|
||||
}
|
||||
|
||||
ENV_CONFS = {
|
||||
"YARN": {
|
||||
'ResourceManager Heap Size': 1024,
|
||||
'NodeManager Heap Size': 1024
|
||||
},
|
||||
"HDFS": {
|
||||
'NameNode Heap Size': 1024,
|
||||
'SecondaryNameNode Heap Size': 1024,
|
||||
'DataNode Heap Size': 1024
|
||||
},
|
||||
"MapReduce": {
|
||||
'JobHistoryServer Heap Size': 1024
|
||||
},
|
||||
"JobFlow": {
|
||||
'Oozie Heap Size': 1024
|
||||
}
|
||||
}
|
||||
|
||||
# Initialise plugin Hadoop configurations
|
||||
PLUGIN_XML_CONFIGS = c_helper.init_xml_configs(XML_CONFS)
|
||||
PLUGIN_ENV_CONFIGS = c_helper.init_env_configs(ENV_CONFS)
|
||||
|
||||
|
||||
def _init_all_configs():
|
||||
configs = []
|
||||
configs.extend(PLUGIN_XML_CONFIGS)
|
||||
configs.extend(PLUGIN_ENV_CONFIGS)
|
||||
configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS)
|
||||
configs.extend(_get_spark_configs())
|
||||
configs.extend(_get_zookeeper_configs())
|
||||
return configs
|
||||
|
||||
|
||||
def _get_spark_opt_default(opt_name):
|
||||
for opt in SPARK_CONFS["Spark"]["OPTIONS"]:
|
||||
if opt_name == opt["name"]:
|
||||
return opt["default"]
|
||||
return None
|
||||
|
||||
|
||||
def _get_spark_configs():
|
||||
spark_configs = []
|
||||
for service, config_items in six.iteritems(SPARK_CONFS):
|
||||
for item in config_items['OPTIONS']:
|
||||
cfg = p.Config(name=item["name"],
|
||||
description=item["description"],
|
||||
default_value=item["default"],
|
||||
applicable_target=service,
|
||||
scope="cluster", is_optional=True,
|
||||
priority=item["priority"])
|
||||
spark_configs.append(cfg)
|
||||
return spark_configs
|
||||
|
||||
|
||||
def _get_zookeeper_configs():
|
||||
zk_configs = []
|
||||
for service, config_items in six.iteritems(c_helper.ZOOKEEPER_CONFS):
|
||||
for item in config_items['OPTIONS']:
|
||||
cfg = p.Config(name=item["name"],
|
||||
description=item["description"],
|
||||
default_value=item["default"],
|
||||
applicable_target=service,
|
||||
scope="cluster", is_optional=True,
|
||||
priority=item["priority"])
|
||||
zk_configs.append(cfg)
|
||||
return zk_configs
|
||||
|
||||
|
||||
PLUGIN_CONFIGS = _init_all_configs()
|
||||
|
||||
|
||||
def get_plugin_configs():
|
||||
return PLUGIN_CONFIGS
|
||||
|
||||
|
||||
def get_xml_configs():
|
||||
return PLUGIN_XML_CONFIGS
|
||||
|
||||
|
||||
def get_env_configs():
|
||||
return ENV_CONFS
|
|
@ -0,0 +1,81 @@
|
|||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
|
||||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins import utils as plugin_utils
|
||||
from sahara.plugins.vanilla import confighints_helper as ch_helper
|
||||
from sahara.plugins.vanilla.hadoop2 import edp_engine
|
||||
from sahara.plugins.vanilla import utils as v_utils
|
||||
from sahara.service.edp.spark import engine as edp_spark_engine
|
||||
from sahara.utils import edp
|
||||
|
||||
|
||||
class EdpOozieEngine(edp_engine.EdpOozieEngine):
|
||||
@staticmethod
|
||||
def get_possible_job_config(job_type):
|
||||
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):
|
||||
return {'job_config': ch_helper.get_possible_hive_config_from(
|
||||
'plugins/vanilla/v2_7_5/resources/hive-default.xml')}
|
||||
if edp.compare_job_type(job_type,
|
||||
edp.JOB_TYPE_MAPREDUCE,
|
||||
edp.JOB_TYPE_MAPREDUCE_STREAMING):
|
||||
return {'job_config': ch_helper.get_possible_mapreduce_config_from(
|
||||
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')}
|
||||
if edp.compare_job_type(job_type, edp.JOB_TYPE_PIG):
|
||||
return {'job_config': ch_helper.get_possible_pig_config_from(
|
||||
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')}
|
||||
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
|
||||
|
||||
|
||||
class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
|
||||
|
||||
edp_base_version = "2.7.5"
|
||||
|
||||
def __init__(self, cluster):
|
||||
super(EdpSparkEngine, self).__init__(cluster)
|
||||
self.master = plugin_utils.get_instance(cluster,
|
||||
"spark history server")
|
||||
self.plugin_params["spark-user"] = "sudo -u hadoop "
|
||||
self.plugin_params["spark-submit"] = os.path.join(
|
||||
plugin_utils.get_config_value_or_default(
|
||||
"Spark", "Spark home", self.cluster),
|
||||
"bin/spark-submit")
|
||||
self.plugin_params["deploy-mode"] = "cluster"
|
||||
self.plugin_params["master"] = "yarn"
|
||||
|
||||
driver_cp = plugin_utils.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", self.cluster)
|
||||
self.plugin_params["driver-class-path"] = driver_cp
|
||||
|
||||
@staticmethod
|
||||
def edp_supported(version):
|
||||
return version >= EdpSparkEngine.edp_base_version
|
||||
|
||||
@staticmethod
|
||||
def job_type_supported(job_type):
|
||||
return (job_type in
|
||||
edp_spark_engine.SparkJobEngine.get_supported_job_types())
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
if (not self.edp_supported(cluster.hadoop_version) or
|
||||
not v_utils.get_spark_history_server(cluster)):
|
||||
|
||||
raise ex.InvalidDataException(
|
||||
_('Spark {base} or higher required to run {type} jobs').format(
|
||||
base=EdpSparkEngine.edp_base_version, type=job.type))
|
||||
|
||||
super(EdpSparkEngine, self).validate_job_execution(cluster, job, data)
|
|
@ -0,0 +1,27 @@
|
|||
Apache Hadoop Configurations for Sahara
|
||||
=======================================
|
||||
|
||||
This directory contains default XML configuration files:
|
||||
|
||||
* core-default.xml
|
||||
* hdfs-default.xml
|
||||
* mapred-default.xml
|
||||
* yarn-default.xml
|
||||
* oozie-default.xml
|
||||
* hive-default.xml
|
||||
|
||||
These files are applied for Sahara's plugin of Apache Hadoop version 2.7.5
|
||||
|
||||
|
||||
Files were taken from here:
|
||||
|
||||
* `core-default.xml <https://github.com/apache/hadoop/tree/branch-2.7.5/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml>`_
|
||||
* `hdfs-default.xml <https://github.com/apache/hadoop/tree/branch-2.7.5/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml>`_
|
||||
* `yarn-default.xml <https://github.com/apache/hadoop/tree/branch-2.7.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml>`_
|
||||
* `mapred-default.xml <https://github.com/apache/hadoop/tree/branch-2.7.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml>`_
|
||||
* `oozie-default.xml <https://github.com/apache/oozie/blob/release-4.3.0/core/src/main/resources/oozie-default.xml>`_
|
||||
|
||||
XML configs are used to expose default Hadoop configurations to the users
|
||||
through Sahara's REST API. It allows users to override some config values which
|
||||
will be pushed to the provisioned VMs running Hadoop services as part of
|
||||
appropriate xml config.
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,9 @@
|
|||
CREATE DATABASE metastore;
|
||||
USE metastore;
|
||||
SOURCE /opt/hive/scripts/metastore/upgrade/mysql/hive-schema-2.3.0.mysql.sql;
|
||||
CREATE USER 'hive'@'localhost' IDENTIFIED BY '{{password}}';
|
||||
REVOKE ALL PRIVILEGES, GRANT OPTION FROM 'hive'@'localhost';
|
||||
GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'localhost' IDENTIFIED BY '{{password}}';
|
||||
GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'%' IDENTIFIED BY '{{password}}';
|
||||
FLUSH PRIVILEGES;
|
||||
exit
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,170 @@
|
|||
# Copyright (c) 2015 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from sahara import conductor
|
||||
from sahara import context
|
||||
from sahara.plugins import utils
|
||||
from sahara.plugins.vanilla import abstractversionhandler as avm
|
||||
from sahara.plugins.vanilla.hadoop2 import config as c
|
||||
from sahara.plugins.vanilla.hadoop2 import keypairs
|
||||
from sahara.plugins.vanilla.hadoop2 import recommendations_utils as ru
|
||||
from sahara.plugins.vanilla.hadoop2 import run_scripts as run
|
||||
from sahara.plugins.vanilla.hadoop2 import scaling as sc
|
||||
from sahara.plugins.vanilla.hadoop2 import starting_scripts as s_scripts
|
||||
from sahara.plugins.vanilla.hadoop2 import utils as u
|
||||
from sahara.plugins.vanilla.hadoop2 import validation as vl
|
||||
from sahara.plugins.vanilla import utils as vu
|
||||
from sahara.plugins.vanilla.v2_7_5 import config_helper as c_helper
|
||||
from sahara.plugins.vanilla.v2_7_5 import edp_engine
|
||||
from sahara.swift import swift_helper
|
||||
from sahara.utils import cluster as cluster_utils
|
||||
|
||||
|
||||
conductor = conductor.API
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class VersionHandler(avm.AbstractVersionHandler):
|
||||
def __init__(self):
|
||||
self.pctx = {
|
||||
'env_confs': c_helper.get_env_configs(),
|
||||
'all_confs': c_helper.get_plugin_configs()
|
||||
}
|
||||
|
||||
def get_plugin_configs(self):
|
||||
return self.pctx['all_confs']
|
||||
|
||||
def get_node_processes(self):
|
||||
return {
|
||||
"Hadoop": [],
|
||||
"MapReduce": ["historyserver"],
|
||||
"HDFS": ["namenode", "datanode", "secondarynamenode"],
|
||||
"YARN": ["resourcemanager", "nodemanager"],
|
||||
"JobFlow": ["oozie"],
|
||||
"Hive": ["hiveserver"],
|
||||
"Spark": ["spark history server"],
|
||||
"ZooKeeper": ["zookeeper"]
|
||||
}
|
||||
|
||||
def validate(self, cluster):
|
||||
vl.validate_cluster_creating(self.pctx, cluster)
|
||||
|
||||
def update_infra(self, cluster):
|
||||
pass
|
||||
|
||||
def configure_cluster(self, cluster):
|
||||
c.configure_cluster(self.pctx, cluster)
|
||||
|
||||
def start_cluster(self, cluster):
|
||||
keypairs.provision_keypairs(cluster)
|
||||
|
||||
s_scripts.start_namenode(cluster)
|
||||
s_scripts.start_secondarynamenode(cluster)
|
||||
s_scripts.start_resourcemanager(cluster)
|
||||
|
||||
run.start_dn_nm_processes(utils.get_instances(cluster))
|
||||
run.await_datanodes(cluster)
|
||||
|
||||
s_scripts.start_historyserver(cluster)
|
||||
s_scripts.start_oozie(self.pctx, cluster)
|
||||
s_scripts.start_hiveserver(self.pctx, cluster)
|
||||
s_scripts.start_zookeeper(cluster)
|
||||
|
||||
swift_helper.install_ssl_certs(cluster_utils.get_instances(cluster))
|
||||
|
||||
self._set_cluster_info(cluster)
|
||||
s_scripts.start_spark(cluster)
|
||||
|
||||
def decommission_nodes(self, cluster, instances):
|
||||
sc.decommission_nodes(self.pctx, cluster, instances)
|
||||
|
||||
def validate_scaling(self, cluster, existing, additional):
|
||||
vl.validate_additional_ng_scaling(cluster, additional)
|
||||
vl.validate_existing_ng_scaling(self.pctx, cluster, existing)
|
||||
zk_ng = utils.get_node_groups(cluster, "zookeeper")
|
||||
if zk_ng:
|
||||
vl.validate_zookeeper_node_count(zk_ng, existing, additional)
|
||||
|
||||
def scale_cluster(self, cluster, instances):
|
||||
keypairs.provision_keypairs(cluster, instances)
|
||||
sc.scale_cluster(self.pctx, cluster, instances)
|
||||
|
||||
def _set_cluster_info(self, cluster):
|
||||
nn = vu.get_namenode(cluster)
|
||||
rm = vu.get_resourcemanager(cluster)
|
||||
hs = vu.get_historyserver(cluster)
|
||||
oo = vu.get_oozie(cluster)
|
||||
sp = vu.get_spark_history_server(cluster)
|
||||
info = {}
|
||||
|
||||
if rm:
|
||||
info['YARN'] = {
|
||||
'Web UI': 'http://%s:%s' % (rm.get_ip_or_dns_name(), '8088'),
|
||||
'ResourceManager': 'http://%s:%s' % (
|
||||
rm.get_ip_or_dns_name(), '8032')
|
||||
}
|
||||
|
||||
if nn:
|
||||
info['HDFS'] = {
|
||||
'Web UI': 'http://%s:%s' % (nn.get_ip_or_dns_name(), '50070'),
|
||||
'NameNode': 'hdfs://%s:%s' % (nn.hostname(), '9000')
|
||||
}
|
||||
|
||||
if oo:
|
||||
info['JobFlow'] = {
|
||||
'Oozie': 'http://%s:%s' % (oo.get_ip_or_dns_name(), '11000')
|
||||
}
|
||||
|
||||
if hs:
|
||||
info['MapReduce JobHistory Server'] = {
|
||||
'Web UI': 'http://%s:%s' % (hs.get_ip_or_dns_name(), '19888')
|
||||
}
|
||||
|
||||
if sp:
|
||||
info['Apache Spark'] = {
|
||||
'Spark UI': 'http://%s:%s' % (sp.management_ip, '4040'),
|
||||
'Spark History Server UI':
|
||||
'http://%s:%s' % (sp.management_ip, '18080')
|
||||
}
|
||||
|
||||
ctx = context.ctx()
|
||||
conductor.cluster_update(ctx, cluster, {'info': info})
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
|
||||
return edp_engine.EdpOozieEngine(cluster)
|
||||
if job_type in edp_engine.EdpSparkEngine.get_supported_job_types():
|
||||
return edp_engine.EdpSparkEngine(cluster)
|
||||
|
||||
return None
|
||||
|
||||
def get_edp_job_types(self):
|
||||
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
|
||||
edp_engine.EdpSparkEngine.get_supported_job_types())
|
||||
|
||||
def get_edp_config_hints(self, job_type):
|
||||
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
|
||||
|
||||
def on_terminate_cluster(self, cluster):
|
||||
u.delete_oozie_password(cluster)
|
||||
keypairs.drop_key(cluster)
|
||||
|
||||
def get_open_ports(self, node_group):
|
||||
return c.get_open_ports(node_group)
|
||||
|
||||
def recommend_configs(self, cluster, scaling):
|
||||
ru.recommend_configs(cluster, self.get_plugin_configs(), scaling)
|
|
@ -0,0 +1,73 @@
|
|||
# Copyright (c) 2017 EasyStack Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from sahara.plugins import provisioning as p
|
||||
from sahara.plugins.vanilla.v2_7_5 import config_helper as v_helper
|
||||
from sahara.tests.unit import base
|
||||
|
||||
|
||||
class TestConfigHelper(base.SaharaTestCase):
|
||||
|
||||
plugin_path = 'sahara.plugins.vanilla.v2_7_5.'
|
||||
plugin_hadoop_path = 'sahara.plugins.vanilla.hadoop2.'
|
||||
|
||||
def setUp(self):
|
||||
super(TestConfigHelper, self).setUp()
|
||||
|
||||
@mock.patch(plugin_hadoop_path + 'config_helper.PLUGIN_GENERAL_CONFIGS')
|
||||
@mock.patch(plugin_path + 'config_helper.PLUGIN_ENV_CONFIGS')
|
||||
@mock.patch(plugin_path + 'config_helper.PLUGIN_XML_CONFIGS')
|
||||
@mock.patch(plugin_path + 'config_helper._get_spark_configs')
|
||||
@mock.patch(plugin_path + 'config_helper._get_zookeeper_configs')
|
||||
def test_init_all_configs(self,
|
||||
_get_zk_configs,
|
||||
_get_spark_configs,
|
||||
PLUGIN_XML_CONFIGS,
|
||||
PLUGIN_ENV_CONFIGS,
|
||||
PLUGIN_GENERAL_CONFIGS):
|
||||
configs = []
|
||||
configs.extend(PLUGIN_XML_CONFIGS)
|
||||
configs.extend(PLUGIN_ENV_CONFIGS)
|
||||
configs.extend(PLUGIN_GENERAL_CONFIGS)
|
||||
configs.extend(_get_spark_configs())
|
||||
configs.extend(_get_zk_configs())
|
||||
init_configs = v_helper._init_all_configs()
|
||||
self.assertEqual(init_configs, configs)
|
||||
|
||||
def test_get_spark_opt_default(self):
|
||||
opt_name = 'Executor extra classpath'
|
||||
_default_executor_classpath = ":".join(
|
||||
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.5.jar'])
|
||||
default = v_helper._get_spark_opt_default(opt_name)
|
||||
self.assertEqual(default, _default_executor_classpath)
|
||||
|
||||
def test_get_spark_configs(self):
|
||||
spark_configs = v_helper._get_spark_configs()
|
||||
for i in spark_configs:
|
||||
self.assertIsInstance(i, p.Config)
|
||||
|
||||
def test_get_plugin_configs(self):
|
||||
self.assertEqual(v_helper.get_plugin_configs(),
|
||||
v_helper.PLUGIN_CONFIGS)
|
||||
|
||||
def test_get_xml_configs(self):
|
||||
self.assertEqual(v_helper.get_xml_configs(),
|
||||
v_helper.PLUGIN_XML_CONFIGS)
|
||||
|
||||
def test_get_env_configs(self):
|
||||
self.assertEqual(v_helper.get_env_configs(),
|
||||
v_helper.ENV_CONFS)
|
|
@ -0,0 +1,96 @@
|
|||
# Copyright (c) 2015 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from sahara.plugins.vanilla.v2_7_5 import edp_engine
|
||||
from sahara.tests.unit import base as sahara_base
|
||||
from sahara.utils import edp
|
||||
|
||||
|
||||
class Vanilla2ConfigHintsTest(sahara_base.SaharaTestCase):
|
||||
@mock.patch(
|
||||
'sahara.plugins.vanilla.confighints_helper.'
|
||||
'get_possible_hive_config_from',
|
||||
return_value={})
|
||||
def test_get_possible_job_config_hive(
|
||||
self, get_possible_hive_config_from):
|
||||
expected_config = {'job_config': {}}
|
||||
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
|
||||
edp.JOB_TYPE_HIVE)
|
||||
get_possible_hive_config_from.assert_called_once_with(
|
||||
'plugins/vanilla/v2_7_5/resources/hive-default.xml')
|
||||
self.assertEqual(expected_config, actual_config)
|
||||
|
||||
@mock.patch('sahara.plugins.vanilla.hadoop2.edp_engine.EdpOozieEngine')
|
||||
def test_get_possible_job_config_java(self, BaseVanillaEdpOozieEngine):
|
||||
expected_config = {'job_config': {}}
|
||||
BaseVanillaEdpOozieEngine.get_possible_job_config.return_value = (
|
||||
expected_config)
|
||||
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
|
||||
edp.JOB_TYPE_JAVA)
|
||||
(BaseVanillaEdpOozieEngine.get_possible_job_config.
|
||||
assert_called_once_with(edp.JOB_TYPE_JAVA))
|
||||
self.assertEqual(expected_config, actual_config)
|
||||
|
||||
@mock.patch(
|
||||
'sahara.plugins.vanilla.confighints_helper.'
|
||||
'get_possible_mapreduce_config_from',
|
||||
return_value={})
|
||||
def test_get_possible_job_config_mapreduce(
|
||||
self, get_possible_mapreduce_config_from):
|
||||
expected_config = {'job_config': {}}
|
||||
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
|
||||
edp.JOB_TYPE_MAPREDUCE)
|
||||
get_possible_mapreduce_config_from.assert_called_once_with(
|
||||
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')
|
||||
self.assertEqual(expected_config, actual_config)
|
||||
|
||||
@mock.patch(
|
||||
'sahara.plugins.vanilla.confighints_helper.'
|
||||
'get_possible_mapreduce_config_from',
|
||||
return_value={})
|
||||
def test_get_possible_job_config_mapreduce_streaming(
|
||||
self, get_possible_mapreduce_config_from):
|
||||
expected_config = {'job_config': {}}
|
||||
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
|
||||
edp.JOB_TYPE_MAPREDUCE_STREAMING)
|
||||
get_possible_mapreduce_config_from.assert_called_once_with(
|
||||
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')
|
||||
self.assertEqual(expected_config, actual_config)
|
||||
|
||||
@mock.patch(
|
||||
'sahara.plugins.vanilla.confighints_helper.'
|
||||
'get_possible_pig_config_from',
|
||||
return_value={})
|
||||
def test_get_possible_job_config_pig(
|
||||
self, get_possible_pig_config_from):
|
||||
expected_config = {'job_config': {}}
|
||||
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
|
||||
edp.JOB_TYPE_PIG)
|
||||
get_possible_pig_config_from.assert_called_once_with(
|
||||
'plugins/vanilla/v2_7_5/resources/mapred-default.xml')
|
||||
self.assertEqual(expected_config, actual_config)
|
||||
|
||||
@mock.patch('sahara.plugins.vanilla.hadoop2.edp_engine.EdpOozieEngine')
|
||||
def test_get_possible_job_config_shell(self, BaseVanillaEdpOozieEngine):
|
||||
expected_config = {'job_config': {}}
|
||||
BaseVanillaEdpOozieEngine.get_possible_job_config.return_value = (
|
||||
expected_config)
|
||||
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
|
||||
edp.JOB_TYPE_SHELL)
|
||||
(BaseVanillaEdpOozieEngine.get_possible_job_config.
|
||||
assert_called_once_with(edp.JOB_TYPE_SHELL))
|
||||
self.assertEqual(expected_config, actual_config)
|
|
@ -0,0 +1,253 @@
|
|||
# Copyright (c) 2017 EasyStack Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import six
|
||||
import testtools
|
||||
|
||||
from sahara.conductor import resource as r
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins.vanilla.v2_7_5.edp_engine import EdpOozieEngine
|
||||
from sahara.plugins.vanilla.v2_7_5.edp_engine import EdpSparkEngine
|
||||
from sahara.plugins.vanilla.v2_7_5 import versionhandler as v_h
|
||||
from sahara.tests.unit import base
|
||||
from sahara.tests.unit import testutils
|
||||
|
||||
|
||||
class TestConfig(object):
|
||||
def __init__(self, applicable_target, name, default_value):
|
||||
self.applicable_target = applicable_target
|
||||
self.name = name
|
||||
self.default_value = default_value
|
||||
|
||||
|
||||
class VersionHandlerTest(base.SaharaTestCase):
|
||||
plugin_path = 'sahara.plugins.vanilla.'
|
||||
plugin_hadoop2_path = 'sahara.plugins.vanilla.hadoop2.'
|
||||
|
||||
def setUp(self):
|
||||
super(VersionHandlerTest, self).setUp()
|
||||
self.cluster = mock.Mock()
|
||||
self.vh = v_h.VersionHandler()
|
||||
|
||||
def test_get_plugin_configs(self):
|
||||
self.vh.pctx['all_confs'] = 'haha'
|
||||
conf = self.vh.get_plugin_configs()
|
||||
self.assertEqual(conf, 'haha')
|
||||
|
||||
def test_get_node_processes(self):
|
||||
processes = self.vh.get_node_processes()
|
||||
for k, v in six.iteritems(processes):
|
||||
for p in v:
|
||||
self.assertIsInstance(p, str)
|
||||
|
||||
@mock.patch(plugin_hadoop2_path +
|
||||
'validation.validate_cluster_creating')
|
||||
def test_validate(self, validate_create):
|
||||
self.vh.pctx = mock.Mock()
|
||||
self.vh.validate(self.cluster)
|
||||
validate_create.assert_called_once_with(self.vh.pctx,
|
||||
self.cluster)
|
||||
|
||||
@mock.patch(plugin_path +
|
||||
'v2_7_5.versionhandler.VersionHandler.update_infra')
|
||||
def test_update_infra(self, update_infra):
|
||||
self.vh.update_infra(self.cluster)
|
||||
update_infra.assert_called_once_with(self.cluster)
|
||||
|
||||
@mock.patch(plugin_hadoop2_path + 'config.configure_cluster')
|
||||
def test_configure_cluster(self, configure_cluster):
|
||||
self.vh.pctx = mock.Mock()
|
||||
self.vh.configure_cluster(self.cluster)
|
||||
configure_cluster.assert_called_once_with(self.vh.pctx, self.cluster)
|
||||
|
||||
@mock.patch(plugin_path + 'v2_7_5.versionhandler.run')
|
||||
@mock.patch(plugin_path + 'v2_7_5.versionhandler.s_scripts')
|
||||
@mock.patch('sahara.swift.swift_helper.install_ssl_certs')
|
||||
@mock.patch(plugin_hadoop2_path + 'keypairs.provision_keypairs')
|
||||
@mock.patch('sahara.plugins.utils.get_instances')
|
||||
@mock.patch('sahara.utils.cluster.get_instances')
|
||||
def test_start_cluster(self, c_get_instances, u_get_instances,
|
||||
provision_keypairs, install_ssl_certs,
|
||||
s_scripts, run):
|
||||
self.vh.pctx = mock.Mock()
|
||||
instances = mock.Mock()
|
||||
c_get_instances.return_value = instances
|
||||
u_get_instances.return_value = instances
|
||||
self.vh._set_cluster_info = mock.Mock()
|
||||
self.vh.start_cluster(self.cluster)
|
||||
provision_keypairs.assert_called_once_with(self.cluster)
|
||||
s_scripts.start_namenode.assert_called_once_with(self.cluster)
|
||||
s_scripts.start_secondarynamenode.assert_called_once_with(self.cluster)
|
||||
s_scripts.start_resourcemanager.assert_called_once_with(self.cluster)
|
||||
s_scripts.start_historyserver.assert_called_once_with(self.cluster)
|
||||
s_scripts.start_oozie.assert_called_once_with(self.vh.pctx,
|
||||
self.cluster)
|
||||
s_scripts.start_hiveserver.assert_called_once_with(self.vh.pctx,
|
||||
self.cluster)
|
||||
s_scripts.start_spark.assert_called_once_with(self.cluster)
|
||||
run.start_dn_nm_processes.assert_called_once_with(instances)
|
||||
run.await_datanodes.assert_called_once_with(self.cluster)
|
||||
install_ssl_certs.assert_called_once_with(instances)
|
||||
self.vh._set_cluster_info.assert_called_once_with(self.cluster)
|
||||
|
||||
@mock.patch(plugin_hadoop2_path + 'scaling.decommission_nodes')
|
||||
def test_decommission_nodes(self, decommission_nodes):
|
||||
self.vh.pctx = mock.Mock()
|
||||
cluster = mock.Mock()
|
||||
instances = mock.Mock()
|
||||
self.vh.decommission_nodes(cluster, instances)
|
||||
decommission_nodes.assert_called_once_with(self.vh.pctx,
|
||||
cluster,
|
||||
instances)
|
||||
|
||||
@mock.patch('sahara.utils.general.get_by_id')
|
||||
@mock.patch(plugin_hadoop2_path +
|
||||
'validation.validate_additional_ng_scaling')
|
||||
@mock.patch(plugin_hadoop2_path +
|
||||
'validation.validate_existing_ng_scaling')
|
||||
def test_validate_scaling(self, vls, vla, get_by_id):
|
||||
self.vh.pctx['all_confs'] = [TestConfig('HDFS', 'dfs.replication', -1)]
|
||||
ng1 = testutils.make_ng_dict('ng1', '40', ['namenode'], 1)
|
||||
ng2 = testutils.make_ng_dict('ng2', '41', ['datanode'], 2)
|
||||
ng3 = testutils.make_ng_dict('ng3', '42', ['datanode'], 3)
|
||||
additional = [ng2['id'], ng3['id']]
|
||||
existing = {ng2['id']: 1}
|
||||
cluster = testutils.create_cluster('test-cluster', 'tenant1', 'fake',
|
||||
'0.1', [ng1, ng2, ng3])
|
||||
self.vh.validate_scaling(cluster, existing, additional)
|
||||
vla.assert_called_once_with(cluster, additional)
|
||||
vls.assert_called_once_with(self.vh.pctx, cluster, existing)
|
||||
|
||||
ng4 = testutils.make_ng_dict('ng4', '43', ['datanode', 'zookeeper'], 3)
|
||||
ng5 = testutils.make_ng_dict('ng5', '44', ['datanode', 'zookeeper'], 1)
|
||||
existing = {ng4['id']: 2}
|
||||
additional = {ng5['id']}
|
||||
cluster = testutils.create_cluster('test-cluster', 'tenant1', 'fake',
|
||||
'0.1', [ng1, ng4])
|
||||
|
||||
with testtools.ExpectedException(ex.ClusterCannotBeScaled):
|
||||
self.vh.validate_scaling(cluster, existing, {})
|
||||
|
||||
get_by_id.return_value = r.NodeGroupResource(ng5)
|
||||
|
||||
with testtools.ExpectedException(ex.ClusterCannotBeScaled):
|
||||
self.vh.validate_scaling(cluster, {}, additional)
|
||||
|
||||
@mock.patch(plugin_hadoop2_path + 'scaling.scale_cluster')
|
||||
@mock.patch(plugin_hadoop2_path + 'keypairs.provision_keypairs')
|
||||
def test_scale_cluster(self, provision_keypairs, scale_cluster):
|
||||
self.vh.pctx = mock.Mock()
|
||||
instances = mock.Mock()
|
||||
self.vh.scale_cluster(self.cluster, instances)
|
||||
provision_keypairs.assert_called_once_with(self.cluster,
|
||||
instances)
|
||||
scale_cluster.assert_called_once_with(self.vh.pctx,
|
||||
self.cluster,
|
||||
instances)
|
||||
|
||||
@mock.patch("sahara.conductor.API.cluster_update")
|
||||
@mock.patch("sahara.context.ctx")
|
||||
@mock.patch(plugin_path + 'utils.get_namenode')
|
||||
@mock.patch(plugin_path + 'utils.get_resourcemanager')
|
||||
@mock.patch(plugin_path + 'utils.get_historyserver')
|
||||
@mock.patch(plugin_path + 'utils.get_oozie')
|
||||
@mock.patch(plugin_path + 'utils.get_spark_history_server')
|
||||
def test_set_cluster_info(self, get_spark_history_server, get_oozie,
|
||||
get_historyserver, get_resourcemanager,
|
||||
get_namenode, ctx, cluster_update):
|
||||
get_spark_history_server.return_value.management_ip = '1.2.3.0'
|
||||
get_oozie.return_value.get_ip_or_dns_name = mock.Mock(
|
||||
return_value='1.2.3.1')
|
||||
get_historyserver.return_value.get_ip_or_dns_name = mock.Mock(
|
||||
return_value='1.2.3.2')
|
||||
get_resourcemanager.return_value.get_ip_or_dns_name = mock.Mock(
|
||||
return_value='1.2.3.3')
|
||||
get_namenode.return_value.get_ip_or_dns_name = mock.Mock(
|
||||
return_value='1.2.3.4')
|
||||
get_namenode.return_value.hostname = mock.Mock(
|
||||
return_value='testnode')
|
||||
self.vh._set_cluster_info(self.cluster)
|
||||
info = {'YARN': {
|
||||
'Web UI': 'http://1.2.3.3:8088',
|
||||
'ResourceManager': 'http://1.2.3.3:8032'
|
||||
},
|
||||
'HDFS': {
|
||||
'Web UI': 'http://1.2.3.4:50070',
|
||||
'NameNode': 'hdfs://testnode:9000'
|
||||
},
|
||||
'JobFlow': {
|
||||
'Oozie': 'http://1.2.3.1:11000'
|
||||
},
|
||||
'MapReduce JobHistory Server': {
|
||||
'Web UI': 'http://1.2.3.2:19888'
|
||||
},
|
||||
'Apache Spark': {
|
||||
'Spark UI': 'http://1.2.3.0:4040',
|
||||
'Spark History Server UI': 'http://1.2.3.0:18080'
|
||||
}
|
||||
}
|
||||
cluster_update.assert_called_once_with(ctx(), self.cluster,
|
||||
{'info': info})
|
||||
|
||||
@mock.patch("sahara.service.edp.job_utils.get_plugin")
|
||||
@mock.patch('sahara.plugins.utils.get_instance')
|
||||
@mock.patch('os.path.join')
|
||||
def test_get_edp_engine(self, join, get_instance, get_plugin):
|
||||
job_type = ''
|
||||
ret = self.vh.get_edp_engine(self.cluster, job_type)
|
||||
self.assertIsNone(ret)
|
||||
|
||||
job_type = 'Java'
|
||||
ret = self.vh.get_edp_engine(self.cluster, job_type)
|
||||
self.assertIsInstance(ret, EdpOozieEngine)
|
||||
|
||||
job_type = 'Spark'
|
||||
ret = self.vh.get_edp_engine(self.cluster, job_type)
|
||||
self.assertIsInstance(ret, EdpSparkEngine)
|
||||
|
||||
def test_get_edp_job_types(self):
|
||||
job_types = ['Hive', 'Java', 'MapReduce',
|
||||
'MapReduce.Streaming', 'Pig', 'Shell', 'Spark']
|
||||
self.assertEqual(self.vh.get_edp_job_types(), job_types)
|
||||
|
||||
def test_get_edp_config_hints(self):
|
||||
job_type = 'Java'
|
||||
ret = {'job_config': {'args': [], 'configs': []}}
|
||||
self.assertEqual(self.vh.get_edp_config_hints(job_type), ret)
|
||||
|
||||
@mock.patch(plugin_hadoop2_path + 'utils.delete_oozie_password')
|
||||
@mock.patch(plugin_hadoop2_path + 'keypairs.drop_key')
|
||||
def test_on_terminate_cluster(self, delete_oozie_password, drop_key):
|
||||
self.vh.on_terminate_cluster(self.cluster)
|
||||
delete_oozie_password.assert_called_once_with(self.cluster)
|
||||
drop_key.assert_called_once_with(self.cluster)
|
||||
|
||||
@mock.patch(plugin_hadoop2_path + 'config.get_open_ports')
|
||||
def test_get_open_ports(self, get_open_ports):
|
||||
node_group = mock.Mock()
|
||||
self.vh.get_open_ports(node_group)
|
||||
get_open_ports.assert_called_once_with(node_group)
|
||||
|
||||
@mock.patch(plugin_hadoop2_path +
|
||||
'recommendations_utils.recommend_configs')
|
||||
def test_recommend_configs(self, recommend_configs):
|
||||
scaling = mock.Mock()
|
||||
configs = mock.Mock()
|
||||
self.vh.pctx['all_confs'] = configs
|
||||
self.vh.recommend_configs(self.cluster, scaling)
|
||||
recommend_configs.assert_called_once_with(self.cluster,
|
||||
configs,
|
||||
scaling)
|
Loading…
Reference in New Issue