Merge "Refactor rest of CDH plugin code"

This commit is contained in:
Jenkins 2017-04-21 10:17:38 +00:00 committed by Gerrit Code Review
commit 8c0cf3058e
28 changed files with 1082 additions and 3388 deletions

View File

@ -21,6 +21,7 @@ from sahara import conductor
from sahara import context
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh import health
from sahara.plugins import kerberos
@six.add_metaclass(abc.ABCMeta)
@ -90,10 +91,55 @@ class BaseVersionHandler(AbstractVersionHandler):
self.validation = None # to validate
def get_plugin_configs(self):
return self.config_helper.get_plugin_configs()
result = self.config_helper.get_plugin_configs()
result.extend(kerberos.get_config_list())
return result
def get_node_processes(self):
raise NotImplementedError()
return {
"CLOUDERA": ['CLOUDERA_MANAGER'],
"HDFS": ['HDFS_NAMENODE', 'HDFS_DATANODE',
'HDFS_SECONDARYNAMENODE', 'HDFS_JOURNALNODE'],
"YARN": ['YARN_RESOURCEMANAGER', 'YARN_NODEMANAGER',
'YARN_JOBHISTORY', 'YARN_STANDBYRM'],
"OOZIE": ['OOZIE_SERVER'],
"HIVE": ['HIVE_SERVER2', 'HIVE_METASTORE', 'HIVE_WEBHCAT'],
"HUE": ['HUE_SERVER'],
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
"ZOOKEEPER": ['ZOOKEEPER_SERVER'],
"HBASE": ['HBASE_MASTER', 'HBASE_REGIONSERVER'],
"FLUME": ['FLUME_AGENT'],
"IMPALA": ['IMPALA_CATALOGSERVER', 'IMPALA_STATESTORE', 'IMPALAD'],
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"SENTRY": ['SENTRY_SERVER'],
"KMS": ['KMS'],
"KAFKA": ['KAFKA_BROKER'],
"YARN_GATEWAY": [],
"RESOURCEMANAGER": [],
"NODEMANAGER": [],
"JOBHISTORY": [],
"HDFS_GATEWAY": [],
'DATANODE': [],
'NAMENODE': [],
'SECONDARYNAMENODE': [],
'JOURNALNODE': [],
'REGIONSERVER': [],
'MASTER': [],
'HIVEMETASTORE': [],
'HIVESERVER': [],
'WEBCAT': [],
'CATALOGSERVER': [],
'STATESTORE': [],
'IMPALAD': [],
'Kerberos': [],
}
def validate(self, cluster):
self.validation.validate_cluster_creating(cluster)
@ -133,12 +179,16 @@ class BaseVersionHandler(AbstractVersionHandler):
def get_edp_engine(self, cluster, job_type):
oozie_type = self.edp_engine.EdpOozieEngine.get_supported_job_types()
spark_type = self.edp_engine.EdpSparkEngine.get_supported_job_types()
if job_type in oozie_type:
return self.edp_engine.EdpOozieEngine(cluster)
if job_type in spark_type:
return self.edp_engine.EdpSparkEngine(cluster)
return None
def get_edp_job_types(self):
return self.edp_engine.EdpOozieEngine.get_supported_job_types()
return (self.edp_engine.EdpOozieEngine.get_supported_job_types() +
self.edp_engine.EdpSparkEngine.get_supported_job_types())
def get_edp_config_hints(self, job_type):
return self.edp_engine.EdpOozieEngine.get_possible_job_config(job_type)

View File

@ -15,22 +15,41 @@
import functools
from oslo_log import log as logging
import six
from sahara import context
from sahara.i18n import _
from sahara.plugins.cdh.client import api_client
from sahara.plugins.cdh.client import services
from sahara.plugins.cdh import db_helper
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh import plugin_utils
from sahara.plugins.cdh import validation
from sahara.plugins import exceptions as ex
from sahara.plugins import kerberos
from sahara.swift import swift_helper
from sahara.topology import topology_helper as t_helper
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import configs as s_cfg
from sahara.utils import poll_utils
from sahara.utils import xmlutils
LOG = logging.getLogger(__name__)
HDFS_SERVICE_TYPE = 'HDFS'
YARN_SERVICE_TYPE = 'YARN'
OOZIE_SERVICE_TYPE = 'OOZIE'
HIVE_SERVICE_TYPE = 'HIVE'
HUE_SERVICE_TYPE = 'HUE'
SPARK_SERVICE_TYPE = 'SPARK_ON_YARN'
ZOOKEEPER_SERVICE_TYPE = 'ZOOKEEPER'
HBASE_SERVICE_TYPE = 'HBASE'
FLUME_SERVICE_TYPE = 'FLUME'
SENTRY_SERVICE_TYPE = 'SENTRY'
SOLR_SERVICE_TYPE = 'SOLR'
SQOOP_SERVICE_TYPE = 'SQOOP'
KS_INDEXER_SERVICE_TYPE = 'KS_INDEXER'
IMPALA_SERVICE_TYPE = 'IMPALA'
KMS_SERVICE_TYPE = 'KMS'
KAFKA_SERVICE_TYPE = 'KAFKA'
def cloudera_cmd(f):
@ -51,7 +70,7 @@ def cloudera_cmd(f):
class ClouderaUtils(object):
CM_DEFAULT_USERNAME = 'admin'
CM_DEFAULT_PASSWD = 'admin'
CM_API_VERSION = 6
CM_API_VERSION = 8
HDFS_SERVICE_NAME = 'hdfs01'
YARN_SERVICE_NAME = 'yarn01'
@ -62,8 +81,20 @@ class ClouderaUtils(object):
ZOOKEEPER_SERVICE_NAME = 'zookeeper01'
HBASE_SERVICE_NAME = 'hbase01'
FLUME_SERVICE_NAME = 'flume01'
SOLR_SERVICE_NAME = 'solr01'
SQOOP_SERVICE_NAME = 'sqoop01'
KS_INDEXER_SERVICE_NAME = 'ks_indexer01'
IMPALA_SERVICE_NAME = 'impala01'
SENTRY_SERVICE_NAME = 'sentry01'
KMS_SERVICE_NAME = 'kms01'
KAFKA_SERVICE_NAME = 'kafka01'
NAME_SERVICE = 'nameservice01'
def __init__(self):
self.pu = plugin_utils.AbstractPluginUtils()
self.validator = validation.Validator
self.c_helper = None
def get_api_client_by_default_password(self, cluster):
manager_ip = self.pu.get_manager(cluster).management_ip
@ -74,7 +105,7 @@ class ClouderaUtils(object):
def get_api_client(self, cluster, api_version=None):
manager_ip = self.pu.get_manager(cluster).management_ip
cm_password = db_helper.get_cm_password(cluster)
cm_password = dh.get_cm_password(cluster)
version = self.CM_API_VERSION if not api_version else api_version
return api_client.ApiResource(manager_ip,
username=self.CM_DEFAULT_USERNAME,
@ -84,7 +115,7 @@ class ClouderaUtils(object):
def update_cloudera_password(self, cluster):
api = self.get_api_client_by_default_password(cluster)
user = api.get_user(self.CM_DEFAULT_USERNAME)
user.password = db_helper.get_cm_password(cluster)
user.password = dh.get_cm_password(cluster)
api.update_user(user)
def get_cloudera_cluster(self, cluster):
@ -223,7 +254,6 @@ class ClouderaUtils(object):
cm.hosts_start_roles([hostname])
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
if cluster:
cm_cluster = self.get_cloudera_cluster(cluster)
elif instance:
@ -249,11 +279,86 @@ class ClouderaUtils(object):
return cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
elif role in ['MASTER', 'REGIONSERVER']:
return cm_cluster.get_service(self.HBASE_SERVICE_NAME)
elif role in ['AGENT']:
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
elif role in ['SENTRY_SERVER']:
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
elif role in ['SQOOP_SERVER']:
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
elif role in ['SOLR_SERVER']:
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
elif role in ['HBASE_INDEXER']:
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
elif role in ['KMS']:
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
elif role in ['JOURNALNODE']:
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
elif role in ['YARN_STANDBYRM']:
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
elif role in ['KAFKA_BROKER']:
return cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
else:
raise ValueError(
_("Process %(process)s is not supported by CDH plugin") %
{'process': role})
@cpo.event_wrapper(
True, step=_("First run cluster"), param=('cluster', 1))
@cloudera_cmd
def first_run(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
yield cm_cluster.first_run()
@cpo.event_wrapper(True, step=_("Create services"), param=('cluster', 1))
def create_services(self, cluster):
api = self.get_api_client(cluster)
cm_cluster = api.create_cluster(cluster.name,
fullVersion=cluster.hadoop_version)
if len(self.pu.get_zookeepers(cluster)) > 0:
cm_cluster.create_service(self.ZOOKEEPER_SERVICE_NAME,
ZOOKEEPER_SERVICE_TYPE)
cm_cluster.create_service(self.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE)
cm_cluster.create_service(self.YARN_SERVICE_NAME, YARN_SERVICE_TYPE)
cm_cluster.create_service(self.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE)
if self.pu.get_hive_metastore(cluster):
cm_cluster.create_service(self.HIVE_SERVICE_NAME,
HIVE_SERVICE_TYPE)
if self.pu.get_hue(cluster):
cm_cluster.create_service(self.HUE_SERVICE_NAME, HUE_SERVICE_TYPE)
if self.pu.get_spark_historyserver(cluster):
cm_cluster.create_service(self.SPARK_SERVICE_NAME,
SPARK_SERVICE_TYPE)
if self.pu.get_hbase_master(cluster):
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
HBASE_SERVICE_TYPE)
if len(self.pu.get_flumes(cluster)) > 0:
cm_cluster.create_service(self.FLUME_SERVICE_NAME,
FLUME_SERVICE_TYPE)
if self.pu.get_sentry(cluster):
cm_cluster.create_service(self.SENTRY_SERVICE_NAME,
SENTRY_SERVICE_TYPE)
if len(self.pu.get_solrs(cluster)) > 0:
cm_cluster.create_service(self.SOLR_SERVICE_NAME,
SOLR_SERVICE_TYPE)
if self.pu.get_sqoop(cluster):
cm_cluster.create_service(self.SQOOP_SERVICE_NAME,
SQOOP_SERVICE_TYPE)
if len(self.pu.get_hbase_indexers(cluster)) > 0:
cm_cluster.create_service(self.KS_INDEXER_SERVICE_NAME,
KS_INDEXER_SERVICE_TYPE)
if self.pu.get_catalogserver(cluster):
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
IMPALA_SERVICE_TYPE)
if self.pu.get_kms(cluster):
cm_cluster.create_service(self.KMS_SERVICE_NAME,
KMS_SERVICE_TYPE)
if len(self.pu.get_kafka_brokers(cluster)) > 0:
cm_cluster.create_service(self.KAFKA_SERVICE_NAME,
KAFKA_SERVICE_TYPE)
def _agents_connected(self, instances, api):
hostnames = [i.fqdn() for i in instances]
hostnames_to_manager = [h.hostname for h in
@ -271,6 +376,91 @@ class ClouderaUtils(object):
_("Await Cloudera agents"), 5, {
'instances': instances, 'api': api})
def await_agents(self, cluster, instances):
self._await_agents(cluster, instances,
self.c_helper.AWAIT_AGENTS_TIMEOUT)
@cpo.event_wrapper(
True, step=_("Configure services"), param=('cluster', 1))
def configure_services(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
if len(self.pu.get_zookeepers(cluster)) > 0:
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
cluster=cluster))
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
cluster=cluster))
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yarn.update_config(self._get_configs(YARN_SERVICE_TYPE,
cluster=cluster))
oozie = cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
oozie.update_config(self._get_configs(OOZIE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hive_metastore(cluster):
hive = cm_cluster.get_service(self.HIVE_SERVICE_NAME)
hive.update_config(self._get_configs(HIVE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hue(cluster):
hue = cm_cluster.get_service(self.HUE_SERVICE_NAME)
hue.update_config(self._get_configs(HUE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_spark_historyserver(cluster):
spark = cm_cluster.get_service(self.SPARK_SERVICE_NAME)
spark.update_config(self._get_configs(SPARK_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hbase_master(cluster):
hbase = cm_cluster.get_service(self.HBASE_SERVICE_NAME)
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_flumes(cluster)) > 0:
flume = cm_cluster.get_service(self.FLUME_SERVICE_NAME)
flume.update_config(self._get_configs(FLUME_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_sentry(cluster):
sentry = cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
sentry.update_config(self._get_configs(SENTRY_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_solrs(cluster)) > 0:
solr = cm_cluster.get_service(self.SOLR_SERVICE_NAME)
solr.update_config(self._get_configs(SOLR_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_sqoop(cluster):
sqoop = cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
sqoop.update_config(self._get_configs(SQOOP_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_hbase_indexers(cluster)) > 0:
ks_indexer = cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
ks_indexer.update_config(
self._get_configs(KS_INDEXER_SERVICE_TYPE, cluster=cluster))
if self.pu.get_catalogserver(cluster):
impala = cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_kms(cluster):
kms = cm_cluster.get_service(self.KMS_SERVICE_NAME)
kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_kafka_brokers(cluster)) > 0:
kafka = cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
kafka.update_config(self._get_configs(KAFKA_SERVICE_TYPE,
cluster=cluster))
def configure_instances(self, instances, cluster=None):
# instances non-empty
cpo.add_provisioning_step(
@ -399,11 +589,243 @@ class ClouderaUtils(object):
'Cloudera Manager': {
'Web UI': 'http://%s:7180' % mng.get_ip_or_dns_name(),
'Username': 'admin',
'Password': db_helper.get_cm_password(cluster)
'Password': dh.get_cm_password(cluster)
}
}
return info
@cpo.event_wrapper(
True, step=_("Enable NameNode HA"), param=('cluster', 1))
@cloudera_cmd
def enable_namenode_ha(self, cluster):
standby_nn = self.pu.get_secondarynamenode(cluster)
standby_nn_host_name = standby_nn.fqdn()
jns = self.pu.get_jns(cluster)
jn_list = []
for index, jn in enumerate(jns):
jn_host_name = jn.fqdn()
jn_list.append({'jnHostId': jn_host_name,
'jnName': 'JN%i' % index,
'jnEditsDir': '/dfs/jn'
})
cm_cluster = self.get_cloudera_cluster(cluster)
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
nn = hdfs.get_roles_by_type('NAMENODE')[0]
yield hdfs.enable_nn_ha(active_name=nn.name,
standby_host_id=standby_nn_host_name,
nameservice=self.NAME_SERVICE, jns=jn_list
)
@cpo.event_wrapper(
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
@cloudera_cmd
def enable_resourcemanager_ha(self, cluster):
new_rm = self.pu.get_stdb_rm(cluster)
new_rm_host_name = new_rm.fqdn()
cm_cluster = self.get_cloudera_cluster(cluster)
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)
def _get_configs(self, service, cluster=None, instance=None):
# Defined in derived class.
return
def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points])
all_confs = {}
if cluster:
zk_count = self.validator.get_inst_count(cluster,
'ZOOKEEPER_SERVER')
hbm_count = self.validator.get_inst_count(cluster, 'HBASE_MASTER')
snt_count = self.validator.get_inst_count(cluster,
'SENTRY_SERVER')
ks_count =\
self.validator.get_inst_count(cluster,
'KEY_VALUE_STORE_INDEXER')
kms_count = self.validator.get_inst_count(cluster, 'KMS')
imp_count =\
self.validator.get_inst_count(cluster,
'IMPALA_CATALOGSERVER')
hive_count = self.validator.get_inst_count(cluster,
'HIVE_METASTORE')
slr_count = self.validator.get_inst_count(cluster, 'SOLR_SERVER')
sqp_count = self.validator.get_inst_count(cluster, 'SQOOP_SERVER')
core_site_safety_valve = ''
if self.pu.c_helper.is_swift_enabled(cluster):
configs = swift_helper.get_swift_configs()
confs = {c['name']: c['value'] for c in configs}
core_site_safety_valve = xmlutils.create_elements_xml(confs)
all_confs = {
'HDFS': {
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
'dfs_block_local_path_access_user':
'impala' if imp_count else '',
'kms_service': self.KMS_SERVICE_NAME if kms_count else '',
'core_site_safety_valve': core_site_safety_valve
},
'HIVE': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'OOZIE': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
'hive_service':
self.HIVE_SERVICE_NAME if hive_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'YARN': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'HUE': {
'hive_service': self.HIVE_SERVICE_NAME,
'oozie_service': self.OOZIE_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'solr_service':
self.SOLR_SERVICE_NAME if slr_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else '',
'impala_service':
self.IMPALA_SERVICE_NAME if imp_count else '',
'sqoop_service':
self.SQOOP_SERVICE_NAME if sqp_count else ''
},
'SPARK_ON_YARN': {
'yarn_service': self.YARN_SERVICE_NAME
},
'HBASE': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME,
'hbase_enable_indexing': 'true' if ks_count else 'false',
'hbase_enable_replication':
'true' if ks_count else 'false'
},
'FLUME': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'solr_service':
self.SOLR_SERVICE_NAME if slr_count else '',
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else ''
},
'SENTRY': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'sentry_server_config_safety_valve': (
self.c_helper.SENTRY_IMPALA_CLIENT_SAFETY_VALVE
if imp_count else '')
},
'SOLR': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
},
'SQOOP': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME
},
'KS_INDEXER': {
'hbase_service': self.HBASE_SERVICE_NAME,
'solr_service': self.SOLR_SERVICE_NAME
},
'IMPALA': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else '',
'hive_service': self.HIVE_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
}
}
hive_confs = {
'HIVE': {
'hive_metastore_database_type': 'postgresql',
'hive_metastore_database_host':
self.pu.get_manager(cluster).internal_ip,
'hive_metastore_database_port': '7432',
'hive_metastore_database_password':
dh.get_hive_db_password(cluster)
}
}
hue_confs = {
'HUE': {
'hue_webhdfs': self.pu.get_role_name(
self.pu.get_namenode(cluster), 'NAMENODE')
}
}
sentry_confs = {
'SENTRY': {
'sentry_server_database_type': 'postgresql',
'sentry_server_database_host':
self.pu.get_manager(cluster).internal_ip,
'sentry_server_database_port': '7432',
'sentry_server_database_password':
dh.get_sentry_db_password(cluster)
}
}
kafka_confs = {
'KAFKA': {
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
}
}
all_confs = s_cfg.merge_configs(all_confs, hue_confs)
all_confs = s_cfg.merge_configs(all_confs, hive_confs)
all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
all_confs = s_cfg.merge_configs(all_confs, kafka_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if instance:
snt_count = self.validator.get_inst_count(instance.cluster,
'SENTRY_SERVER')
paths = instance.storage_paths()
instance_default_confs = {
'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
},
'SECONDARYNAMENODE': {
'fs_checkpoint_dir_list':
get_hadoop_dirs(paths, '/fs/snn')
},
'DATANODE': {
'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn'),
'dfs_datanode_data_dir_perm': 755,
'dfs_datanode_handler_count': 30
},
'NODEMANAGER': {
'yarn_nodemanager_local_dirs':
get_hadoop_dirs(paths, '/yarn/local'),
'container_executor_allowed_system_users':
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
"spark,oozie",
"container_executor_banned_users": "bin"
},
'SERVER': {
'maxSessionTimeout': 60000
},
'HIVESERVER2': {
'hiveserver2_enable_impersonation':
'false' if snt_count else 'true',
'hive_hs2_config_safety_valve': (
self.c_helper.HIVE_SERVER2_SENTRY_SAFETY_VALVE
if snt_count else '')
},
'HIVEMETASTORE': {
'hive_metastore_config_safety_valve': (
self.c_helper.HIVE_METASTORE_SENTRY_SAFETY_VALVE
if snt_count else '')
}
}
ng_user_confs = self.pu.convert_process_configs(
instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {})

View File

@ -38,22 +38,18 @@ class ConfigHelper(object):
'CM5 repo key URL (for debian-based only)', 'general', 'cluster',
priority=1, default_value="")
ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
config_type='bool', priority=1,
default_value=True)
ENABLE_HBASE_COMMON_LIB = p.Config(
'Enable HBase Common Lib', 'general', 'cluster', config_type='bool',
priority=1, default_value=True)
ENABLE_SWIFT = p.Config(
'Enable Swift', 'general', 'cluster',
config_type='bool', priority=1, default_value=True)
DEFAULT_SWIFT_LIB_URL = (
'https://repository.cloudera.com/artifactory/repo/org'
'/apache/hadoop/hadoop-openstack/2.3.0-cdh5.0.0'
'/hadoop-openstack-2.3.0-cdh5.0.0.jar')
DEFAULT_EXTJS_LIB_URL = (
'http://tarballs.openstack.org/sahara/dist/common-artifacts/'
'ext-2.2.zip')
'/apache/hadoop/hadoop-openstack/2.6.0-cdh5.5.0'
'/hadoop-openstack-2.6.0-cdh5.5.0.jar')
SWIFT_LIB_URL = p.Config(
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
@ -61,12 +57,40 @@ class ConfigHelper(object):
description=("Library that adds Swift support to CDH. The file"
" will be downloaded by VMs."))
DEFAULT_EXTJS_LIB_URL = (
'http://tarballs.openstack.org/sahara/dist/common-artifacts/'
'ext-2.2.zip')
EXTJS_LIB_URL = p.Config(
"ExtJS library URL", 'general', 'cluster', priority=1,
default_value=DEFAULT_EXTJS_LIB_URL,
description=("Ext 2.2 library is required for Oozie Web Console. "
"The file will be downloaded by VMs with oozie."))
_default_executor_classpath = ":".join(
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
'/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
EXECUTOR_EXTRA_CLASSPATH = p.Config(
'Executor extra classpath', 'Spark', 'cluster', priority=2,
default_value=_default_executor_classpath,
description='Value for spark.executor.extraClassPath in '
'spark-defaults.conf (default: %s)'
% _default_executor_classpath)
KMS_REPO_URL = p.Config(
'KMS repo list URL', 'general', 'cluster', priority=1,
default_value="")
KMS_REPO_KEY_URL = p.Config(
'KMS repo key URL (for debian-based only)', 'general',
'cluster',
priority=1, default_value="")
REQUIRE_ANTI_AFFINITY = p.Config(
'Require Anti Affinity', 'general', 'cluster',
config_type='bool', priority=2, default_value=True)
AWAIT_AGENTS_TIMEOUT = p.Config(
'Await Cloudera agents timeout', 'general', 'cluster',
config_type='int', priority=1, default_value=300, is_optional=True,
@ -103,7 +127,6 @@ class ConfigHelper(object):
return json.loads(data)
def _init_ng_configs(self, confs, app_target, scope):
prepare_value = lambda x: x.replace('\n', ' ') if x else ""
cfgs = []
for cfg in confs:
@ -115,6 +138,113 @@ class ConfigHelper(object):
return cfgs
def _init_all_ng_plugin_configs(self):
self.hdfs_confs = self._load_and_init_configs(
'hdfs-service.json', 'HDFS', 'cluster')
self.namenode_confs = self._load_and_init_configs(
'hdfs-namenode.json', 'NAMENODE', 'node')
self.datanode_confs = self._load_and_init_configs(
'hdfs-datanode.json', 'DATANODE', 'node')
self.secnamenode_confs = self._load_and_init_configs(
'hdfs-secondarynamenode.json', 'SECONDARYNAMENODE', 'node')
self.hdfs_gateway_confs = self._load_and_init_configs(
'hdfs-gateway.json', 'HDFS_GATEWAY', 'node')
self.journalnode_confs = self._load_and_init_configs(
'hdfs-journalnode.json', 'JOURNALNODE', 'node')
self.yarn_confs = self._load_and_init_configs(
'yarn-service.json', 'YARN', 'cluster')
self.resourcemanager_confs = self._load_and_init_configs(
'yarn-resourcemanager.json', 'RESOURCEMANAGER', 'node')
self.nodemanager_confs = self._load_and_init_configs(
'yarn-nodemanager.json', 'NODEMANAGER', 'node')
self.jobhistory_confs = self._load_and_init_configs(
'yarn-jobhistory.json', 'JOBHISTORY', 'node')
self.yarn_gateway_conf = self._load_and_init_configs(
'yarn-gateway.json', 'YARN_GATEWAY', 'node')
self.oozie_service_confs = self._load_and_init_configs(
'oozie-service.json', 'OOZIE', 'cluster')
self.oozie_role_confs = self._load_and_init_configs(
'oozie-oozie_server.json', 'OOZIE', 'node')
self.hive_service_confs = self._load_and_init_configs(
'hive-service.json', 'HIVE', 'cluster')
self.hive_metastore_confs = self._load_and_init_configs(
'hive-hivemetastore.json', 'HIVEMETASTORE', 'node')
self.hive_hiveserver_confs = self._load_and_init_configs(
'hive-hiveserver2.json', 'HIVESERVER', 'node')
self.hive_webhcat_confs = self._load_and_init_configs(
'hive-webhcat.json', 'WEBHCAT', 'node')
self.hue_service_confs = self._load_and_init_configs(
'hue-service.json', 'HUE', 'cluster')
self.hue_role_confs = self._load_and_init_configs(
'hue-hue_server.json', 'HUE', 'node')
self.spark_service_confs = self._load_and_init_configs(
'spark-service.json', 'SPARK_ON_YARN', 'cluster')
self.spark_role_confs = self._load_and_init_configs(
'spark-spark_yarn_history_server.json', 'SPARK_ON_YARN', 'node')
self.zookeeper_server_confs = self._load_and_init_configs(
'zookeeper-service.json', 'ZOOKEEPER', 'cluster')
self.zookeeper_service_confs = self._load_and_init_configs(
'zookeeper-server.json', 'ZOOKEEPER', 'node')
self.hbase_confs = self._load_and_init_configs(
'hbase-service.json', 'HBASE', 'cluster')
self.master_confs = self._load_and_init_configs(
'hbase-master.json', 'MASTER', 'node')
self.regionserver_confs = self._load_and_init_configs(
'hbase-regionserver.json', 'REGIONSERVER', 'node')
self.flume_service_confs = self._load_and_init_configs(
'flume-service.json', 'FLUME', 'cluster')
self.flume_agent_confs = self._load_and_init_configs(
'flume-agent.json', 'FLUME', 'node')
self.sentry_service_confs = self._load_and_init_configs(
'sentry-service.json', 'SENTRY', 'cluster')
self.sentry_server_confs = self._load_and_init_configs(
'sentry-sentry_server.json', 'SENTRY', 'node')
self.solr_service_confs = self._load_and_init_configs(
'solr-service.json', 'SOLR', 'cluster')
self.solr_server_confs = self._load_and_init_configs(
'solr-solr_server.json', 'SOLR', 'node')
self.sqoop_service_confs = self._load_and_init_configs(
'sqoop-service.json', 'SQOOP', 'cluster')
self.sqoop_server_confs = self._load_and_init_configs(
'sqoop-sqoop_server.json', 'SQOOP', 'node')
self.ks_indexer_service_confs = self._load_and_init_configs(
'ks_indexer-service.json', 'KS_INDEXER', 'cluster')
self.ks_indexer_role_confs = self._load_and_init_configs(
'ks_indexer-hbase_indexer.json', 'KS_INDEXER', 'node')
self.impala_service_confs = self._load_and_init_configs(
'impala-service.json', 'IMPALA', 'cluster')
self.impala_catalogserver_confs = self._load_and_init_configs(
'impala-catalogserver.json', 'CATALOGSERVER', 'node')
self.impala_impalad_confs = self._load_and_init_configs(
'impala-impalad.json', 'IMPALAD', 'node')
self.impala_statestore_confs = self._load_and_init_configs(
'impala-statestore.json', 'STATESTORE', 'node')
self.kms_service_confs = self._load_and_init_configs(
'kms-service.json', 'KMS', 'cluster')
self.kms_kms_confs = self._load_and_init_configs(
'kms-kms.json', 'KMS', 'node')
self.kafka_service = self._load_and_init_configs(
'kafka-service.json', 'KAFKA', 'cluster')
self.kafka_kafka_broker = self._load_and_init_configs(
'kafka-kafka_broker.json', 'KAFKA', 'node')
self.kafka_kafka_mirror_maker = self._load_and_init_configs(
'kafka-kafka_mirror_maker.json', 'KAFKA', 'node')
def _load_and_init_configs(self, filename, app_target, scope):
confs = self._load_json(self.path_to_config + filename)
cfgs = self._init_ng_configs(confs, app_target, scope)
@ -127,10 +257,11 @@ class ConfigHelper(object):
def _get_cluster_plugin_configs(self):
return [self.CDH5_REPO_URL, self.CDH5_REPO_KEY_URL, self.CM5_REPO_URL,
self.CM5_REPO_KEY_URL, self.ENABLE_SWIFT,
self.ENABLE_HBASE_COMMON_LIB, self.SWIFT_LIB_URL,
self.EXTJS_LIB_URL, self.AWAIT_MANAGER_STARTING_TIMEOUT,
self.AWAIT_AGENTS_TIMEOUT]
self.CM5_REPO_KEY_URL, self.ENABLE_SWIFT, self.SWIFT_LIB_URL,
self.ENABLE_HBASE_COMMON_LIB, self.EXTJS_LIB_URL,
self.AWAIT_MANAGER_STARTING_TIMEOUT, self.AWAIT_AGENTS_TIMEOUT,
self.EXECUTOR_EXTRA_CLASSPATH, self.KMS_REPO_URL,
self.KMS_REPO_KEY_URL, self.REQUIRE_ANTI_AFFINITY]
def get_plugin_configs(self):
cluster_wide = self._get_cluster_plugin_configs()
@ -165,3 +296,9 @@ class ConfigHelper(object):
def get_extjs_lib_url(self, cluster):
return self._get_config_value(cluster, self.EXTJS_LIB_URL)
def get_kms_key_url(self, cluster):
return self._get_config_value(cluster, self.KMS_REPO_KEY_URL)
def get_required_anti_affinity(self, cluster):
return self._get_config_value(cluster, self.REQUIRE_ANTI_AFFINITY)

View File

@ -14,6 +14,48 @@
from sahara.plugins import kerberos
PACKAGES = [
'cloudera-manager-agent',
'cloudera-manager-daemons',
'cloudera-manager-server',
'cloudera-manager-server-db-2',
'flume-ng',
'hadoop-hdfs-datanode',
'hadoop-hdfs-namenode',
'hadoop-hdfs-secondarynamenode',
'hadoop-kms'
'hadoop-mapreduce',
'hadoop-mapreduce-historyserver',
'hadoop-yarn-nodemanager',
'hadoop-yarn-resourcemanager',
'hbase',
'hbase-solr',
'hive-hcatalog',
'hive-metastore',
'hive-server2',
'hive-webhcat-server',
'hue',
'impala',
'impala-server',
'impala-state-store',
'impala-catalog',
'impala-shell',
'kafka',
'kafka-server'
'keytrustee-keyprovider',
'oozie',
'oracle-j2sdk1.7',
'sentry',
'solr-server',
'solr-doc',
'search',
'spark-history-server',
'sqoop2',
'unzip',
'zookeeper'
]
def setup_kerberos_for_cluster(cluster, cloudera_utils):
if kerberos.is_kerberos_security_enabled(cluster):
manager = cloudera_utils.pu.get_manager(cluster)
@ -40,3 +82,43 @@ def prepare_scaling_kerberized_cluster(cluster, cloudera_utils, instances):
kerberos.create_keytabs_for_map(
cluster,
{'hdfs': cloudera_utils.pu.get_hdfs_nodes(cluster, instances)})
def get_open_ports(node_group):
ports = [9000] # for CM agent
ports_map = {
'CLOUDERA_MANAGER': [7180, 7182, 7183, 7432, 7184, 8084, 8086, 10101,
9997, 9996, 8087, 9998, 9999, 8085, 9995, 9994],
'HDFS_NAMENODE': [8020, 8022, 50070, 50470],
'HDFS_SECONDARYNAMENODE': [50090, 50495],
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
'YARN_NODEMANAGER': [8040, 8041, 8042],
'YARN_JOBHISTORY': [10020, 19888],
'HIVE_METASTORE': [9083],
'HIVE_SERVER2': [10000],
'HUE_SERVER': [8888],
'OOZIE_SERVER': [11000, 11001],
'SPARK_YARN_HISTORY_SERVER': [18088],
'ZOOKEEPER_SERVER': [2181, 3181, 4181, 9010],
'HBASE_MASTER': [60000],
'HBASE_REGIONSERVER': [60020],
'FLUME_AGENT': [41414],
'SENTRY_SERVER': [8038],
'SOLR_SERVER': [8983, 8984],
'SQOOP_SERVER': [8005, 12000],
'KEY_VALUE_STORE_INDEXER': [],
'IMPALA_CATALOGSERVER': [25020, 26000],
'IMPALA_STATESTORE': [25010, 24000],
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
'KMS': [16000, 16001],
'JOURNALNODE': [8480, 8481, 8485]
}
for process in node_group.node_processes:
if process in ports_map:
ports.extend(ports_map[process])
return ports

View File

@ -46,8 +46,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
return 'http://%s:11000/oozie' % oozie_ip
def get_name_node_uri(self, cluster):
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
return 'hdfs://%s:8020' % namenode_ip
if len(self.cloudera_utils.pu.get_jns(cluster)) > 0:
return 'hdfs://%s' % self.cloudera_utils.NAME_SERVICE
else:
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
return 'hdfs://%s:8020' % namenode_ip
def get_resource_manager_uri(self, cluster):
resourcemanager = self.cloudera_utils.pu.get_resourcemanager(cluster)

View File

@ -79,19 +79,26 @@ class CDHPluginAutoConfigsProvider(ru.HadoopAutoConfigsProvider):
class AbstractPluginUtils(object):
def __init__(self):
# c_helper and db_helper will be defined in derived classes.
# c_helper will be defined in derived classes.
self.c_helper = None
def get_role_name(self, instance, service):
# NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}"
shortcuts = {
'AGENT': 'A',
'ALERTPUBLISHER': 'AP',
'CATALOGSERVER': 'ICS',
'DATANODE': 'DN',
'EVENTSERVER': 'ES',
'HBASE_INDEXER': 'LHBI',
'HIVEMETASTORE': 'HVM',
'HIVESERVER2': 'HVS',
'HOSTMONITOR': 'HM',
'IMPALAD': 'ID',
'JOBHISTORY': 'JS',
'JOURNALNODE': 'JN',
'KAFKA_BROKER': 'KB',
'KMS': 'KMS',
'MASTER': 'M',
'NAMENODE': 'NN',
'NODEMANAGER': 'NM',
@ -99,10 +106,16 @@ class AbstractPluginUtils(object):
'REGIONSERVER': 'RS',
'RESOURCEMANAGER': 'RM',
'SECONDARYNAMENODE': 'SNN',
'SENTRY_SERVER': 'SNT',
'SERVER': 'S',
'SERVICEMONITOR': 'SM',
'SOLR_SERVER': 'SLR',
'SPARK_YARN_HISTORY_SERVER': 'SHS',
'WEBHCAT': 'WHC'
'SQOOP_SERVER': 'S2S',
'STATESTORE': 'ISS',
'WEBHCAT': 'WHC',
'HDFS_GATEWAY': 'HG',
'YARN_GATEWAY': 'YG'
}
return '%s_%s' % (shortcuts.get(service, service),
instance.hostname().replace('-', '_'))
@ -155,6 +168,42 @@ class AbstractPluginUtils(object):
def get_hbase_master(self, cluster):
return u.get_instance(cluster, 'HBASE_MASTER')
def get_sentry(self, cluster):
return u.get_instance(cluster, 'SENTRY_SERVER')
def get_flumes(self, cluster):
return u.get_instances(cluster, 'FLUME_AGENT')
def get_solrs(self, cluster):
return u.get_instances(cluster, 'SOLR_SERVER')
def get_sqoop(self, cluster):
return u.get_instance(cluster, 'SQOOP_SERVER')
def get_hbase_indexers(self, cluster):
return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER')
def get_catalogserver(self, cluster):
return u.get_instance(cluster, 'IMPALA_CATALOGSERVER')
def get_statestore(self, cluster):
return u.get_instance(cluster, 'IMPALA_STATESTORE')
def get_impalads(self, cluster):
return u.get_instances(cluster, 'IMPALAD')
def get_kms(self, cluster):
return u.get_instances(cluster, 'KMS')
def get_jns(self, cluster):
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
def get_stdb_rm(self, cluster):
return u.get_instance(cluster, 'YARN_STANDBYRM')
def get_kafka_brokers(self, cluster):
return u.get_instances(cluster, 'KAFKA_BROKER')
def convert_process_configs(self, configs):
p_dict = {
"CLOUDERA": ['MANAGER'],
@ -173,9 +222,21 @@ class AbstractPluginUtils(object):
"ZOOKEEPER": ['SERVER'],
"MASTER": ['MASTER'],
"REGIONSERVER": ['REGIONSERVER'],
'YARN_GATEWAY': ['YARN_GATEWAY'],
'HDFS_GATEWAY': ['HDFS_GATEWAY']
"FLUME": ['AGENT'],
"CATALOGSERVER": ['CATALOGSERVER'],
"STATESTORE": ['STATESTORE'],
"IMPALAD": ['IMPALAD'],
"KS_INDEXER": ['HBASE_INDEXER'],
"SENTRY": ['SENTRY_SERVER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"KMS": ['KMS'],
"YARN_GATEWAY": ['YARN_GATEWAY'],
"HDFS_GATEWAY": ['HDFS_GATEWAY'],
"JOURNALNODE": ['JOURNALNODE'],
"KAFKA": ['KAFKA_BROKER']
}
if isinstance(configs, res.Resource):
configs = configs.to_dict()
for k in configs.keys():
@ -275,6 +336,11 @@ class AbstractPluginUtils(object):
r.execute_command('sudo curl %s -o %s/hadoop-openstack.jar' % (
swift_lib_remote_url, HADOOP_LIB_DIR))
def configure_sentry(self, cluster):
manager = self.get_manager(cluster)
with manager.remote() as r:
dh.create_sentry_database(cluster, r)
def put_hive_hdfs_xml(self, cluster):
servers = self.get_hive_servers(cluster)
with servers[0].remote() as r:
@ -339,31 +405,38 @@ class AbstractPluginUtils(object):
instance=instance.instance_name))
cluster = instance.cluster
cdh5_key = self.c_helper.get_cdh5_key_url(cluster)
cm5_key = self.c_helper.get_cm5_key_url(cluster)
with instance.remote() as r:
if cmd.is_ubuntu_os(r):
cdh5_key = (cdh5_key or
self.c_helper.DEFAULT_CDH5_UBUNTU_REPO_KEY_URL)
cm5_key = (cm5_key or
self.c_helper.DEFAULT_CM5_UBUNTU_REPO_KEY_URL)
cdh5_key = (
self.c_helper.get_cdh5_key_url(cluster) or
self.c_helper.DEFAULT_CDH5_UBUNTU_REPO_KEY_URL)
cm5_key = (
self.c_helper.get_cm5_key_url(cluster) or
self.c_helper.DEFAULT_CM5_UBUNTU_REPO_KEY_URL)
kms_key = (
self.c_helper.get_kms_key_url(cluster) or
self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL)
cdh5_repo_content = self.c_helper.CDH5_UBUNTU_REPO
cm5_repo_content = self.c_helper.CM5_UBUNTU_REPO
kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL
cmd.write_ubuntu_repository(r, cdh5_repo_content, 'cdh')
cmd.add_apt_key(r, cdh5_key)
cmd.write_ubuntu_repository(r, cm5_repo_content, 'cm')
cmd.add_apt_key(r, cm5_key)
cmd.add_ubuntu_repository(r, kms_repo_url, 'kms')
cmd.add_apt_key(r, kms_key)
cmd.update_repository(r)
if cmd.is_centos_os(r):
cdh5_repo_content = self.c_helper.CDH5_CENTOS_REPO
cm5_repo_content = self.c_helper.CM5_CENTOS_REPO
kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL
cmd.write_centos_repository(r, cdh5_repo_content, 'cdh')
cmd.write_centos_repository(r, cm5_repo_content, 'cm')
cmd.add_centos_repository(r, kms_repo_url, 'kms')
cmd.update_repository(r)
def _get_config_value(self, service, name, configs, cluster=None):

View File

@ -13,460 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.i18n import _
from sahara.plugins.cdh import cloudera_utils as cu
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh.v5_5_0 import config_helper
from sahara.plugins.cdh.v5_5_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_5_0 import validation
from sahara.swift import swift_helper
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import configs as s_cfg
from sahara.utils import xmlutils
HDFS_SERVICE_TYPE = 'HDFS'
YARN_SERVICE_TYPE = 'YARN'
OOZIE_SERVICE_TYPE = 'OOZIE'
HIVE_SERVICE_TYPE = 'HIVE'
HUE_SERVICE_TYPE = 'HUE'
SPARK_SERVICE_TYPE = 'SPARK_ON_YARN'
ZOOKEEPER_SERVICE_TYPE = 'ZOOKEEPER'
HBASE_SERVICE_TYPE = 'HBASE'
FLUME_SERVICE_TYPE = 'FLUME'
SENTRY_SERVICE_TYPE = 'SENTRY'
SOLR_SERVICE_TYPE = 'SOLR'
SQOOP_SERVICE_TYPE = 'SQOOP'
KS_INDEXER_SERVICE_TYPE = 'KS_INDEXER'
IMPALA_SERVICE_TYPE = 'IMPALA'
KMS_SERVICE_TYPE = 'KMS'
KAFKA_SERVICE_TYPE = 'KAFKA'
c_helper = config_helper.ConfigHelperV550()
class ClouderaUtilsV550(cu.ClouderaUtils):
FLUME_SERVICE_NAME = 'flume01'
SOLR_SERVICE_NAME = 'solr01'
SQOOP_SERVICE_NAME = 'sqoop01'
KS_INDEXER_SERVICE_NAME = 'ks_indexer01'
IMPALA_SERVICE_NAME = 'impala01'
SENTRY_SERVICE_NAME = 'sentry01'
KMS_SERVICE_NAME = 'kms01'
CM_API_VERSION = 8
NAME_SERVICE = 'nameservice01'
KAFKA_SERVICE_NAME = 'kafka01'
def __init__(self):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV550()
self.validator = validation.ValidatorV550
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
if cluster:
cm_cluster = self.get_cloudera_cluster(cluster)
elif instance:
cm_cluster = self.get_cloudera_cluster(instance.cluster)
else:
raise ValueError(_("'cluster' or 'instance' argument missed"))
if role in ['AGENT']:
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
elif role in ['SENTRY_SERVER']:
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
elif role in ['SQOOP_SERVER']:
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
elif role in ['SOLR_SERVER']:
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
elif role in ['HBASE_INDEXER']:
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
elif role in ['KMS']:
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
elif role in ['JOURNALNODE']:
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
elif role in ['YARN_STANDBYRM']:
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
elif role in ['KAFKA_BROKER']:
return cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
else:
return super(ClouderaUtilsV550, self).get_service_by_role(
role, cluster, instance)
@cpo.event_wrapper(
True, step=_("First run cluster"), param=('cluster', 1))
@cu.cloudera_cmd
def first_run(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
yield cm_cluster.first_run()
@cpo.event_wrapper(True, step=_("Create services"), param=('cluster', 1))
def create_services(self, cluster):
api = self.get_api_client(cluster)
cm_cluster = api.create_cluster(cluster.name,
fullVersion=cluster.hadoop_version)
if len(self.pu.get_zookeepers(cluster)) > 0:
cm_cluster.create_service(self.ZOOKEEPER_SERVICE_NAME,
ZOOKEEPER_SERVICE_TYPE)
cm_cluster.create_service(self.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE)
cm_cluster.create_service(self.YARN_SERVICE_NAME, YARN_SERVICE_TYPE)
cm_cluster.create_service(self.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE)
if self.pu.get_hive_metastore(cluster):
cm_cluster.create_service(self.HIVE_SERVICE_NAME,
HIVE_SERVICE_TYPE)
if self.pu.get_hue(cluster):
cm_cluster.create_service(self.HUE_SERVICE_NAME, HUE_SERVICE_TYPE)
if self.pu.get_spark_historyserver(cluster):
cm_cluster.create_service(self.SPARK_SERVICE_NAME,
SPARK_SERVICE_TYPE)
if self.pu.get_hbase_master(cluster):
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
HBASE_SERVICE_TYPE)
if len(self.pu.get_flumes(cluster)) > 0:
cm_cluster.create_service(self.FLUME_SERVICE_NAME,
FLUME_SERVICE_TYPE)
if self.pu.get_sentry(cluster):
cm_cluster.create_service(self.SENTRY_SERVICE_NAME,
SENTRY_SERVICE_TYPE)
if len(self.pu.get_solrs(cluster)) > 0:
cm_cluster.create_service(self.SOLR_SERVICE_NAME,
SOLR_SERVICE_TYPE)
if self.pu.get_sqoop(cluster):
cm_cluster.create_service(self.SQOOP_SERVICE_NAME,
SQOOP_SERVICE_TYPE)
if len(self.pu.get_hbase_indexers(cluster)) > 0:
cm_cluster.create_service(self.KS_INDEXER_SERVICE_NAME,
KS_INDEXER_SERVICE_TYPE)
if self.pu.get_catalogserver(cluster):
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
IMPALA_SERVICE_TYPE)
if self.pu.get_kms(cluster):
cm_cluster.create_service(self.KMS_SERVICE_NAME,
KMS_SERVICE_TYPE)
if len(self.pu.get_kafka_brokers(cluster)) > 0:
cm_cluster.create_service(self.KAFKA_SERVICE_NAME,
KAFKA_SERVICE_TYPE)
def await_agents(self, cluster, instances):
self._await_agents(cluster, instances, c_helper.AWAIT_AGENTS_TIMEOUT)
@cpo.event_wrapper(
True, step=_("Configure services"), param=('cluster', 1))
def configure_services(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
if len(self.pu.get_zookeepers(cluster)) > 0:
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
cluster=cluster))
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
cluster=cluster))
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yarn.update_config(self._get_configs(YARN_SERVICE_TYPE,
cluster=cluster))
oozie = cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
oozie.update_config(self._get_configs(OOZIE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hive_metastore(cluster):
hive = cm_cluster.get_service(self.HIVE_SERVICE_NAME)
hive.update_config(self._get_configs(HIVE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hue(cluster):
hue = cm_cluster.get_service(self.HUE_SERVICE_NAME)
hue.update_config(self._get_configs(HUE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_spark_historyserver(cluster):
spark = cm_cluster.get_service(self.SPARK_SERVICE_NAME)
spark.update_config(self._get_configs(SPARK_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hbase_master(cluster):
hbase = cm_cluster.get_service(self.HBASE_SERVICE_NAME)
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_flumes(cluster)) > 0:
flume = cm_cluster.get_service(self.FLUME_SERVICE_NAME)
flume.update_config(self._get_configs(FLUME_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_sentry(cluster):
sentry = cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
sentry.update_config(self._get_configs(SENTRY_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_solrs(cluster)) > 0:
solr = cm_cluster.get_service(self.SOLR_SERVICE_NAME)
solr.update_config(self._get_configs(SOLR_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_sqoop(cluster):
sqoop = cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
sqoop.update_config(self._get_configs(SQOOP_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_hbase_indexers(cluster)) > 0:
ks_indexer = cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
ks_indexer.update_config(
self._get_configs(KS_INDEXER_SERVICE_TYPE, cluster=cluster))
if self.pu.get_catalogserver(cluster):
impala = cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_kms(cluster):
kms = cm_cluster.get_service(self.KMS_SERVICE_NAME)
kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_kafka_brokers(cluster)) > 0:
kafka = cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
kafka.update_config(self._get_configs(KAFKA_SERVICE_TYPE,
cluster=cluster))
def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points])
all_confs = {}
if cluster:
zk_count = self.validator._get_inst_count(cluster,
'ZOOKEEPER_SERVER')
hbm_count = self.validator._get_inst_count(cluster, 'HBASE_MASTER')
snt_count = self.validator._get_inst_count(cluster,
'SENTRY_SERVER')
ks_count =\
self.validator._get_inst_count(cluster,
'KEY_VALUE_STORE_INDEXER')
kms_count = self.validator._get_inst_count(cluster, 'KMS')
imp_count =\
self.validator._get_inst_count(cluster,
'IMPALA_CATALOGSERVER')
hive_count = self.validator._get_inst_count(cluster,
'HIVE_METASTORE')
slr_count = self.validator._get_inst_count(cluster, 'SOLR_SERVER')
sqp_count = self.validator._get_inst_count(cluster, 'SQOOP_SERVER')
core_site_safety_valve = ''
if self.pu.c_helper.is_swift_enabled(cluster):
configs = swift_helper.get_swift_configs()
confs = {c['name']: c['value'] for c in configs}
core_site_safety_valve = xmlutils.create_elements_xml(confs)
all_confs = {
'HDFS': {
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
'dfs_block_local_path_access_user':
'impala' if imp_count else '',
'kms_service': self.KMS_SERVICE_NAME if kms_count else '',
'core_site_safety_valve': core_site_safety_valve
},
'HIVE': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'OOZIE': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
'hive_service':
self.HIVE_SERVICE_NAME if hive_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'YARN': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'HUE': {
'hive_service': self.HIVE_SERVICE_NAME,
'oozie_service': self.OOZIE_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'solr_service':
self.SOLR_SERVICE_NAME if slr_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else '',
'impala_service':
self.IMPALA_SERVICE_NAME if imp_count else '',
'sqoop_service':
self.SQOOP_SERVICE_NAME if sqp_count else ''
},
'SPARK_ON_YARN': {
'yarn_service': self.YARN_SERVICE_NAME
},
'HBASE': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME,
'hbase_enable_indexing': 'true' if ks_count else 'false',
'hbase_enable_replication':
'true' if ks_count else 'false'
},
'FLUME': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'solr_service':
self.SOLR_SERVICE_NAME if slr_count else '',
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else ''
},
'SENTRY': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'sentry_server_config_safety_valve': (
c_helper.SENTRY_IMPALA_CLIENT_SAFETY_VALVE
if imp_count else '')
},
'SOLR': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
},
'SQOOP': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME
},
'KS_INDEXER': {
'hbase_service': self.HBASE_SERVICE_NAME,
'solr_service': self.SOLR_SERVICE_NAME
},
'IMPALA': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else '',
'hive_service': self.HIVE_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
}
}
hive_confs = {
'HIVE': {
'hive_metastore_database_type': 'postgresql',
'hive_metastore_database_host':
self.pu.get_manager(cluster).internal_ip,
'hive_metastore_database_port': '7432',
'hive_metastore_database_password':
dh.get_hive_db_password(cluster)
}
}
hue_confs = {
'HUE': {
'hue_webhdfs': self.pu.get_role_name(
self.pu.get_namenode(cluster), 'NAMENODE')
}
}
sentry_confs = {
'SENTRY': {
'sentry_server_database_type': 'postgresql',
'sentry_server_database_host':
self.pu.get_manager(cluster).internal_ip,
'sentry_server_database_port': '7432',
'sentry_server_database_password':
dh.get_sentry_db_password(cluster)
}
}
kafka_confs = {
'KAFKA': {
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
}
}
all_confs = s_cfg.merge_configs(all_confs, hue_confs)
all_confs = s_cfg.merge_configs(all_confs, hive_confs)
all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
all_confs = s_cfg.merge_configs(all_confs, kafka_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if instance:
snt_count = self.validator._get_inst_count(instance.cluster,
'SENTRY_SERVER')
paths = instance.storage_paths()
instance_default_confs = {
'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
},
'SECONDARYNAMENODE': {
'fs_checkpoint_dir_list':
get_hadoop_dirs(paths, '/fs/snn')
},
'DATANODE': {
'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn'),
'dfs_datanode_data_dir_perm': 755,
'dfs_datanode_handler_count': 30
},
'NODEMANAGER': {
'yarn_nodemanager_local_dirs':
get_hadoop_dirs(paths, '/yarn/local'),
'container_executor_allowed_system_users':
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
"spark,oozie",
"container_executor_banned_users": "bin"
},
'SERVER': {
'maxSessionTimeout': 60000
},
'HIVESERVER2': {
'hiveserver2_enable_impersonation':
'false' if snt_count else 'true',
'hive_hs2_config_safety_valve': (
c_helper.HIVE_SERVER2_SENTRY_SAFETY_VALVE
if snt_count else '')
},
'HIVEMETASTORE': {
'hive_metastore_config_safety_valve': (
c_helper.HIVE_METASTORE_SENTRY_SAFETY_VALVE
if snt_count else '')
}
}
ng_user_confs = self.pu.convert_process_configs(
instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {})
@cpo.event_wrapper(
True, step=_("Enable NameNode HA"), param=('cluster', 1))
@cu.cloudera_cmd
def enable_namenode_ha(self, cluster):
standby_nn = self.pu.get_secondarynamenode(cluster)
standby_nn_host_name = standby_nn.fqdn()
jns = self.pu.get_jns(cluster)
jn_list = []
for index, jn in enumerate(jns):
jn_host_name = jn.fqdn()
jn_list.append({'jnHostId': jn_host_name,
'jnName': 'JN%i' % index,
'jnEditsDir': '/dfs/jn'
})
cm_cluster = self.get_cloudera_cluster(cluster)
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
nn = hdfs.get_roles_by_type('NAMENODE')[0]
yield hdfs.enable_nn_ha(active_name=nn.name,
standby_host_id=standby_nn_host_name,
nameservice=self.NAME_SERVICE, jns=jn_list
)
@cpo.event_wrapper(
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
@cu.cloudera_cmd
def enable_resourcemanager_ha(self, cluster):
new_rm = self.pu.get_stdb_rm(cluster)
new_rm_host_name = new_rm.fqdn()
cm_cluster = self.get_cloudera_cluster(cluster)
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)
self.c_helper = config_helper.ConfigHelperV550()

View File

@ -65,10 +65,9 @@ class ConfigHelperV550(c_h.ConfigHelper):
'keytrustee/cloudera.list')
DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL = (
'http://archive.cloudera.com/'
'navigator-keytrustee5/ubuntu/'
'trusty/amd64/navigator-keytrustee'
'/archive.key')
'http://archive.cloudera.com/navigator-'
'keytrustee5/ubuntu/trusty/amd64/navigator-'
'keytrustee/archive.key')
KEY_TRUSTEE_CENTOS_REPO_URL = (
'http://archive.cloudera.com/navigator-'
@ -80,6 +79,12 @@ class ConfigHelperV550(c_h.ConfigHelper):
'/apache/hadoop/hadoop-openstack/2.6.0-cdh5.5.0'
'/hadoop-openstack-2.6.0-cdh5.5.0.jar')
SWIFT_LIB_URL = p.Config(
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
default_value=DEFAULT_SWIFT_LIB_URL,
description=("Library that adds Swift support to CDH. The file"
" will be downloaded by VMs."))
HIVE_SERVER2_SENTRY_SAFETY_VALVE = f.get_file_text(
path_to_config + 'hive-server2-sentry-safety.xml')
@ -89,162 +94,8 @@ class ConfigHelperV550(c_h.ConfigHelper):
SENTRY_IMPALA_CLIENT_SAFETY_VALVE = f.get_file_text(
path_to_config + 'sentry-impala-client-safety.xml')
_default_executor_classpath = ":".join(
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
'/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
EXECUTOR_EXTRA_CLASSPATH = p.Config(
'Executor extra classpath', 'Spark', 'cluster', priority=2,
default_value=_default_executor_classpath,
description='Value for spark.executor.extraClassPath in '
'spark-defaults.conf (default: %s)'
% _default_executor_classpath)
SWIFT_LIB_URL = p.Config(
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
default_value=DEFAULT_SWIFT_LIB_URL,
description=("Library that adds Swift support to CDH. The file"
" will be downloaded by VMs."))
KMS_REPO_URL = p.Config(
'KMS repo list URL', 'general', 'cluster', priority=1,
default_value="")
KMS_REPO_KEY_URL = p.Config(
'KMS repo key URL (for debian-based only)', 'general',
'cluster',
priority=1, default_value="")
REQUIRE_ANTI_AFFINITY = p.Config('Require Anti Affinity',
'general', 'cluster',
config_type='bool',
priority=2,
default_value=True)
def __init__(self):
super(ConfigHelperV550, self).__init__()
self.priority_one_confs = self._load_json(
self.path_to_config + 'priority-one-confs.json')
self._init_all_ng_plugin_configs()
def _get_cluster_plugin_configs(self):
confs = super(ConfigHelperV550, self)._get_ng_plugin_configs()
confs += [self.EXECUTOR_EXTRA_CLASSPATH,
self.KMS_REPO_URL,
self.KMS_REPO_KEY_URL,
self.REQUIRE_ANTI_AFFINITY]
return confs
def _init_all_ng_plugin_configs(self):
self.hdfs_confs = self._load_and_init_configs(
'hdfs-service.json', 'HDFS', 'cluster')
self.namenode_confs = self._load_and_init_configs(
'hdfs-namenode.json', 'NAMENODE', 'node')
self.datanode_confs = self._load_and_init_configs(
'hdfs-datanode.json', 'DATANODE', 'node')
self.secnamenode_confs = self._load_and_init_configs(
'hdfs-secondarynamenode.json', 'SECONDARYNAMENODE', 'node')
self.hdfs_gateway_confs = self._load_and_init_configs(
'hdfs-gateway.json', 'HDFS_GATEWAY', 'node')
self.journalnode_confs = self._load_and_init_configs(
'hdfs-journalnode.json', 'JOURNALNODE', 'node')
self.yarn_confs = self._load_and_init_configs(
'yarn-service.json', 'YARN', 'cluster')
self.resourcemanager_confs = self._load_and_init_configs(
'yarn-resourcemanager.json', 'RESOURCEMANAGER', 'node')
self.nodemanager_confs = self._load_and_init_configs(
'yarn-nodemanager.json', 'NODEMANAGER', 'node')
self.jobhistory_confs = self._load_and_init_configs(
'yarn-jobhistory.json', 'JOBHISTORY', 'node')
self.yarn_gateway_conf = self._load_and_init_configs(
'yarn-gateway.json', 'YARN_GATEWAY', 'node')
self.oozie_service_confs = self._load_and_init_configs(
'oozie-service.json', 'OOZIE', 'cluster')
self.oozie_role_confs = self._load_and_init_configs(
'oozie-oozie_server.json', 'OOZIE', 'node')
self.hive_service_confs = self._load_and_init_configs(
'hive-service.json', 'HIVE', 'cluster')
self.hive_metastore_confs = self._load_and_init_configs(
'hive-hivemetastore.json', 'HIVEMETASTORE', 'node')
self.hive_hiveserver_confs = self._load_and_init_configs(
'hive-hiveserver2.json', 'HIVESERVER', 'node')
self.hive_webhcat_confs = self._load_and_init_configs(
'hive-webhcat.json', 'WEBHCAT', 'node')
self.hue_service_confs = self._load_and_init_configs(
'hue-service.json', 'HUE', 'cluster')
self.hue_role_confs = self._load_and_init_configs(
'hue-hue_server.json', 'HUE', 'node')
self.spark_service_confs = self._load_and_init_configs(
'spark-service.json', 'SPARK_ON_YARN', 'cluster')
self.spark_role_confs = self._load_and_init_configs(
'spark-spark_yarn_history_server.json', 'SPARK_ON_YARN', 'node')
self.zookeeper_server_confs = self._load_and_init_configs(
'zookeeper-service.json', 'ZOOKEEPER', 'cluster')
self.zookeeper_service_confs = self._load_and_init_configs(
'zookeeper-server.json', 'ZOOKEEPER', 'node')
self.hbase_confs = self._load_and_init_configs(
'hbase-service.json', 'HBASE', 'cluster')
self.master_confs = self._load_and_init_configs(
'hbase-master.json', 'MASTER', 'node')
self.regionserver_confs = self._load_and_init_configs(
'hbase-regionserver.json', 'REGIONSERVER', 'node')
self.flume_service_confs = self._load_and_init_configs(
'flume-service.json', 'FLUME', 'cluster')
self.flume_agent_confs = self._load_and_init_configs(
'flume-agent.json', 'FLUME', 'node')
self.sentry_service_confs = self._load_and_init_configs(
'sentry-service.json', 'SENTRY', 'cluster')
self.sentry_server_confs = self._load_and_init_configs(
'sentry-sentry_server.json', 'SENTRY', 'node')
self.solr_service_confs = self._load_and_init_configs(
'solr-service.json', 'SOLR', 'cluster')
self.solr_server_confs = self._load_and_init_configs(
'solr-solr_server.json', 'SOLR', 'node')
self.sqoop_service_confs = self._load_and_init_configs(
'sqoop-service.json', 'SQOOP', 'cluster')
self.sqoop_server_confs = self._load_and_init_configs(
'sqoop-sqoop_server.json', 'SQOOP', 'node')
self.ks_indexer_service_confs = self._load_and_init_configs(
'ks_indexer-service.json', 'KS_INDEXER', 'cluster')
self.ks_indexer_role_confs = self._load_and_init_configs(
'ks_indexer-hbase_indexer.json', 'KS_INDEXER', 'node')
self.impala_service_confs = self._load_and_init_configs(
'impala-service.json', 'IMPALA', 'cluster')
self.impala_catalogserver_confs = self._load_and_init_configs(
'impala-catalogserver.json', 'CATALOGSERVER', 'node')
self.impala_impalad_confs = self._load_and_init_configs(
'impala-impalad.json', 'IMPALAD', 'node')
self.impala_statestore_confs = self._load_and_init_configs(
'impala-statestore.json', 'STATESTORE', 'node')
self.kms_service_confs = self._load_and_init_configs(
'kms-service.json', 'KMS', 'cluster')
self.kms_kms_confs = self._load_and_init_configs(
'kms-kms.json', 'KMS', 'node')
self.kafka_service = self._load_and_init_configs(
'kafka-service.json', 'KAFKA', 'cluster')
self.kafka_kafka_broker = self._load_and_init_configs(
'kafka-kafka_broker.json', 'KAFKA', 'node')
self.kafka_kafka_mirror_maker = self._load_and_init_configs(
'kafka-kafka_mirror_maker.json', 'KAFKA', 'node')
def get_required_anti_affinity(self, cluster):
return self._get_config_value(cluster, self.REQUIRE_ANTI_AFFINITY)
def get_kms_key_url(self, cluster):
return self._get_config_value(cluster, self.KMS_REPO_KEY_URL)

View File

@ -21,49 +21,10 @@ from sahara.plugins import utils as gu
from sahara.service.edp import hdfs_helper as h
from sahara.utils import cluster_progress_ops as cpo
PACKAGES = [
'cloudera-manager-agent',
'cloudera-manager-daemons',
'cloudera-manager-server',
'cloudera-manager-server-db-2',
'flume-ng',
'hadoop-hdfs-datanode',
'hadoop-hdfs-namenode',
'hadoop-hdfs-secondarynamenode',
'hadoop-kms'
'hadoop-mapreduce',
'hadoop-mapreduce-historyserver',
'hadoop-yarn-nodemanager',
'hadoop-yarn-resourcemanager',
'hbase',
'hbase-solr',
'hive-hcatalog',
'hive-metastore',
'hive-server2',
'hive-webhcat-server',
'hue',
'impala',
'impala-server',
'impala-state-store',
'impala-catalog',
'impala-shell',
'kafka',
'kafka-server'
'keytrustee-keyprovider',
'oozie',
'oracle-j2sdk1.7',
'sentry',
'solr-server',
'solr-doc',
'search',
'spark-history-server',
'sqoop2',
'unzip',
'zookeeper'
]
CU = cu.ClouderaUtilsV550()
PACKAGES = common_deploy.PACKAGES
def configure_cluster(cluster):
instances = gu.get_instances(cluster)
@ -115,7 +76,6 @@ def scale_cluster(cluster, instances):
CU.pu.configure_swift(cluster, instances)
_start_roles(cluster, instances)
CU.refresh_datanodes(cluster)
CU.refresh_yarn_nodes(cluster)
CU.restart_stale_services(cluster)
@ -204,40 +164,5 @@ def start_cluster(cluster):
def get_open_ports(node_group):
ports = [9000] # for CM agent
ports_map = {
'CLOUDERA_MANAGER': [7180, 7182, 7183, 7432, 7184, 8084, 8086, 10101,
9997, 9996, 8087, 9998, 9999, 8085, 9995, 9994],
'HDFS_NAMENODE': [8020, 8022, 50070, 50470],
'HDFS_SECONDARYNAMENODE': [50090, 50495],
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
'YARN_NODEMANAGER': [8040, 8041, 8042],
'YARN_JOBHISTORY': [10020, 19888],
'HIVE_METASTORE': [9083],
'HIVE_SERVER2': [10000],
'HUE_SERVER': [8888],
'OOZIE_SERVER': [11000, 11001],
'SPARK_YARN_HISTORY_SERVER': [18088],
'ZOOKEEPER_SERVER': [2181, 3181, 4181, 9010],
'HBASE_MASTER': [60000],
'HBASE_REGIONSERVER': [60020],
'FLUME_AGENT': [41414],
'SENTRY_SERVER': [8038],
'SOLR_SERVER': [8983, 8984],
'SQOOP_SERVER': [8005, 12000],
'KEY_VALUE_STORE_INDEXER': [],
'IMPALA_CATALOGSERVER': [25020, 26000],
'IMPALA_STATESTORE': [25010, 24000],
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
'KMS': [16000, 16001],
'JOURNALNODE': [8480, 8481, 8485]
}
for process in node_group.node_processes:
if process in ports_map:
ports.extend(ports_map[process])
ports = common_deploy.get_open_ports(node_group)
return ports

View File

@ -26,13 +26,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
super(EdpOozieEngine, self).__init__(cluster)
self.cloudera_utils = cu.ClouderaUtilsV550()
def get_name_node_uri(self, cluster):
if len(self.cloudera_utils.pu.get_jns(cluster)) > 0:
return 'hdfs://%s' % self.cloudera_utils.NAME_SERVICE
else:
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
return 'hdfs://%s:8020' % namenode_ip
@staticmethod
def get_possible_job_config(job_type):
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):

View File

@ -13,154 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.conductor import resource as res
from sahara.plugins.cdh import commands as cmd
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh import plugin_utils as pu
from sahara.plugins.cdh.v5_5_0 import config_helper
from sahara.plugins import utils as u
class PluginUtilsV550(pu.AbstractPluginUtils):
def __init__(self):
self.c_helper = config_helper.ConfigHelperV550()
def get_role_name(self, instance, service):
# NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}"
shortcuts = {
'AGENT': 'A',
'ALERTPUBLISHER': 'AP',
'CATALOGSERVER': 'ICS',
'DATANODE': 'DN',
'EVENTSERVER': 'ES',
'HBASE_INDEXER': 'LHBI',
'HIVEMETASTORE': 'HVM',
'HIVESERVER2': 'HVS',
'HOSTMONITOR': 'HM',
'IMPALAD': 'ID',
'JOBHISTORY': 'JS',
'JOURNALNODE': 'JN',
'KAFKA_BROKER': 'KB',
'KMS': 'KMS',
'MASTER': 'M',
'NAMENODE': 'NN',
'NODEMANAGER': 'NM',
'OOZIE_SERVER': 'OS',
'REGIONSERVER': 'RS',
'RESOURCEMANAGER': 'RM',
'SECONDARYNAMENODE': 'SNN',
'SENTRY_SERVER': 'SNT',
'SERVER': 'S',
'SERVICEMONITOR': 'SM',
'SOLR_SERVER': 'SLR',
'SPARK_YARN_HISTORY_SERVER': 'SHS',
'SQOOP_SERVER': 'S2S',
'STATESTORE': 'ISS',
'WEBHCAT': 'WHC',
'HDFS_GATEWAY': 'HG',
'YARN_GATEWAY': 'YG'
}
return '%s_%s' % (shortcuts.get(service, service),
instance.hostname().replace('-', '_'))
def get_sentry(self, cluster):
return u.get_instance(cluster, 'SENTRY_SERVER')
def get_flumes(self, cluster):
return u.get_instances(cluster, 'FLUME_AGENT')
def get_solrs(self, cluster):
return u.get_instances(cluster, 'SOLR_SERVER')
def get_sqoop(self, cluster):
return u.get_instance(cluster, 'SQOOP_SERVER')
def get_hbase_indexers(self, cluster):
return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER')
def get_catalogserver(self, cluster):
return u.get_instance(cluster, 'IMPALA_CATALOGSERVER')
def get_statestore(self, cluster):
return u.get_instance(cluster, 'IMPALA_STATESTORE')
def get_impalads(self, cluster):
return u.get_instances(cluster, 'IMPALAD')
def get_kms(self, cluster):
return u.get_instances(cluster, 'KMS')
def get_jns(self, cluster):
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
def get_stdb_rm(self, cluster):
return u.get_instance(cluster, 'YARN_STANDBYRM')
def get_kafka_brokers(self, cluster):
return u.get_instances(cluster, 'KAFKA_BROKER')
def convert_process_configs(self, configs):
p_dict = {
"CLOUDERA": ['MANAGER'],
"NAMENODE": ['NAMENODE'],
"DATANODE": ['DATANODE'],
"SECONDARYNAMENODE": ['SECONDARYNAMENODE'],
"RESOURCEMANAGER": ['RESOURCEMANAGER'],
"NODEMANAGER": ['NODEMANAGER'],
"JOBHISTORY": ['JOBHISTORY'],
"OOZIE": ['OOZIE_SERVER'],
"HIVESERVER": ['HIVESERVER2'],
"HIVEMETASTORE": ['HIVEMETASTORE'],
"WEBHCAT": ['WEBHCAT'],
"HUE": ['HUE_SERVER'],
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
"ZOOKEEPER": ['SERVER'],
"MASTER": ['MASTER'],
"REGIONSERVER": ['REGIONSERVER'],
"FLUME": ['AGENT'],
"CATALOGSERVER": ['CATALOGSERVER'],
"STATESTORE": ['STATESTORE'],
"IMPALAD": ['IMPALAD'],
"KS_INDEXER": ['HBASE_INDEXER'],
"SENTRY": ['SENTRY_SERVER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"KMS": ['KMS'],
"YARN_GATEWAY": ['YARN_GATEWAY'],
"HDFS_GATEWAY": ['HDFS_GATEWAY'],
"JOURNALNODE": ['JOURNALNODE'],
"KAFKA": ['KAFKA_BROKER']
}
if isinstance(configs, res.Resource):
configs = configs.to_dict()
for k in configs.keys():
if k in p_dict.keys():
item = configs[k]
del configs[k]
newkey = p_dict[k][0]
configs[newkey] = item
return res.Resource(configs)
def configure_sentry(self, cluster):
manager = self.get_manager(cluster)
with manager.remote() as r:
dh.create_sentry_database(cluster, r)
def _configure_repo_from_inst(self, instance):
super(PluginUtilsV550, self)._configure_repo_from_inst(instance)
cluster = instance.cluster
with instance.remote() as r:
if cmd.is_ubuntu_os(r):
kms_key = (
self.c_helper.get_kms_key_url(cluster) or
self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL)
kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL
cmd.add_ubuntu_repository(r, kms_repo_url, 'kms')
cmd.add_apt_key(r, kms_key)
cmd.update_repository(r)
if cmd.is_centos_os(r):
kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL
cmd.add_centos_repository(r, kms_repo_url, 'kms')
cmd.update_repository(r)

View File

@ -13,220 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.i18n import _
from sahara.plugins.cdh.v5_5_0 import plugin_utils as pu
from sahara.plugins.cdh import validation
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
class ValidatorV550(validation.Validator):
PU = pu.PluginUtilsV550()
@classmethod
def validate_cluster_creating(cls, cluster):
super(ValidatorV550, cls).validate_cluster_creating(cluster)
cls._hdfs_ha_validation(cluster)
cls._yarn_ha_validation(cluster)
cls._flume_validation(cluster)
cls._sentry_validation(cluster)
cls._solr_validation(cluster)
cls._sqoop_validation(cluster)
cls._hbase_indexer_validation(cluster)
cls._impala_validation(cluster)
cls._kms_validation(cluster)
@classmethod
def _hdfs_ha_validation(cls, cluster):
jn_count = cls._get_inst_count(cluster, 'HDFS_JOURNALNODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
if jn_count > 0:
if jn_count < 3:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('not less than 3'),
jn_count)
if not jn_count % 2:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('be odd'), jn_count)
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HDFS HA')
if require_anti_affinity:
if 'HDFS_SECONDARYNAMENODE' not in\
cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_SECONDARYNAMENODE should be enabled '
'in anti_affinity.'))
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
@classmethod
def _yarn_ha_validation(cls, cluster):
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
stdb_rm_count = cls._get_inst_count(cluster, 'YARN_STANDBYRM')
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
if stdb_rm_count > 1:
raise ex.InvalidComponentCountException(
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
if stdb_rm_count > 0:
if rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='RM HA')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='RM HA')
if require_anti_affinity:
if 'YARN_RESOURCEMANAGER' not in\
cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_RESOURCEMANAGER should be enabled in '
'anti_affinity.'))
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_STANDBYRM should be'
' enabled in anti_affinity.'))
@classmethod
def _flume_validation(cls, cluster):
a_count = cls._get_inst_count(cluster, 'FLUME_AGENT')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
if a_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='FLUME_AGENT')
@classmethod
def _sentry_validation(cls, cluster):
snt_count = cls._get_inst_count(cluster, 'SENTRY_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if snt_count > 1:
raise ex.InvalidComponentCountException(
'SENTRY_SERVER', _('0 or 1'), snt_count)
if snt_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SENTRY_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SENTRY_SERVER')
@classmethod
def _solr_validation(cls, cluster):
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
if slr_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SOLR_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SOLR_SERVER')
@classmethod
def _sqoop_validation(cls, cluster):
s2s_count = cls._get_inst_count(cluster, 'SQOOP_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
if s2s_count > 1:
raise ex.InvalidComponentCountException(
'SQOOP_SERVER', _('0 or 1'), s2s_count)
if s2s_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SQOOP_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
@classmethod
def _hbase_indexer_validation(cls, cluster):
lhbi_count = cls._get_inst_count(cluster, 'HBASE_INDEXER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
if lhbi_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='HBASE_INDEXER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE_INDEXER')
if slr_count < 1:
raise ex.RequiredServiceMissingException(
'SOLR_SERVER', required_by='HBASE_INDEXER')
if hbm_count < 1:
raise ex.RequiredServiceMissingException(
'HBASE_MASTER', required_by='HBASE_INDEXER')
@classmethod
def _impala_validation(cls, cluster):
ics_count = cls._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
iss_count = cls._get_inst_count(cluster, 'IMPALA_STATESTORE')
id_count = cls._get_inst_count(cluster, 'IMPALAD')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
if ics_count > 1:
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
_('0 or 1'), ics_count)
if iss_count > 1:
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
_('0 or 1'), iss_count)
if ics_count == 1:
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
datanodes = set(ng.id for ng in datanode_ng)
impalads = set(ng.id for ng in impalad_ng)
if datanodes != impalads:
raise ex.InvalidClusterTopology(
_("IMPALAD must be installed on every HDFS_DATANODE"))
if iss_count != 1:
raise ex.RequiredServiceMissingException(
'IMPALA_STATESTORE', required_by='IMPALA')
if id_count < 1:
raise ex.RequiredServiceMissingException(
'IMPALAD', required_by='IMPALA')
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='IMPALA')
if hms_count < 1:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='IMPALA')
@classmethod
def _kms_validation(cls, cluster):
kms_count = cls._get_inst_count(cluster, 'KMS')
if kms_count > 1:
raise ex.InvalidComponentCountException('KMS',
_('0 or 1'), kms_count)
@classmethod
def _get_anti_affinity(cls, cluster):
return cluster.anti_affinity

View File

@ -21,7 +21,6 @@ from sahara.plugins.cdh.v5_5_0 import deploy
from sahara.plugins.cdh.v5_5_0 import edp_engine
from sahara.plugins.cdh.v5_5_0 import plugin_utils
from sahara.plugins.cdh.v5_5_0 import validation
from sahara.plugins import kerberos
class VersionHandler(avm.BaseVersionHandler):
@ -34,67 +33,3 @@ class VersionHandler(avm.BaseVersionHandler):
self.deploy = deploy
self.edp_engine = edp_engine
self.validation = validation.ValidatorV550()
def get_plugin_configs(self):
result = super(VersionHandler, self).get_plugin_configs()
result.extend(kerberos.get_config_list())
return result
def get_node_processes(self):
return {
"CLOUDERA": ['CLOUDERA_MANAGER'],
"HDFS": ['HDFS_NAMENODE', 'HDFS_DATANODE',
'HDFS_SECONDARYNAMENODE', 'HDFS_JOURNALNODE'],
"YARN": ['YARN_RESOURCEMANAGER', 'YARN_NODEMANAGER',
'YARN_JOBHISTORY', 'YARN_STANDBYRM'],
"OOZIE": ['OOZIE_SERVER'],
"HIVE": ['HIVE_SERVER2', 'HIVE_METASTORE', 'HIVE_WEBHCAT'],
"HUE": ['HUE_SERVER'],
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
"ZOOKEEPER": ['ZOOKEEPER_SERVER'],
"HBASE": ['HBASE_MASTER', 'HBASE_REGIONSERVER'],
"FLUME": ['FLUME_AGENT'],
"IMPALA": ['IMPALA_CATALOGSERVER', 'IMPALA_STATESTORE', 'IMPALAD'],
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"SENTRY": ['SENTRY_SERVER'],
"KMS": ['KMS'],
"KAFKA": ['KAFKA_BROKER'],
"YARN_GATEWAY": [],
"RESOURCEMANAGER": [],
"NODEMANAGER": [],
"JOBHISTORY": [],
"HDFS_GATEWAY": [],
'DATANODE': [],
'NAMENODE': [],
'SECONDARYNAMENODE': [],
'JOURNALNODE': [],
'REGIONSERVER': [],
'MASTER': [],
'HIVEMETASTORE': [],
'HIVESERVER': [],
'WEBCAT': [],
'CATALOGSERVER': [],
'STATESTORE': [],
'IMPALAD': [],
'Kerberos': [],
}
def get_edp_engine(self, cluster, job_type):
oozie_type = self.edp_engine.EdpOozieEngine.get_supported_job_types()
spark_type = self.edp_engine.EdpSparkEngine.get_supported_job_types()
if job_type in oozie_type:
return self.edp_engine.EdpOozieEngine(cluster)
if job_type in spark_type:
return self.edp_engine.EdpSparkEngine(cluster)
return None
def get_edp_job_types(self):
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
edp_engine.EdpSparkEngine.get_supported_job_types())

View File

@ -13,459 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.i18n import _
from sahara.plugins.cdh import cloudera_utils as cu
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh.v5_7_0 import config_helper
from sahara.plugins.cdh.v5_7_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_7_0 import validation
from sahara.swift import swift_helper
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import configs as s_cfg
from sahara.utils import xmlutils
HDFS_SERVICE_TYPE = 'HDFS'
YARN_SERVICE_TYPE = 'YARN'
OOZIE_SERVICE_TYPE = 'OOZIE'
HIVE_SERVICE_TYPE = 'HIVE'
HUE_SERVICE_TYPE = 'HUE'
SPARK_SERVICE_TYPE = 'SPARK_ON_YARN'
ZOOKEEPER_SERVICE_TYPE = 'ZOOKEEPER'
HBASE_SERVICE_TYPE = 'HBASE'
FLUME_SERVICE_TYPE = 'FLUME'
SENTRY_SERVICE_TYPE = 'SENTRY'
SOLR_SERVICE_TYPE = 'SOLR'
SQOOP_SERVICE_TYPE = 'SQOOP'
KS_INDEXER_SERVICE_TYPE = 'KS_INDEXER'
IMPALA_SERVICE_TYPE = 'IMPALA'
KMS_SERVICE_TYPE = 'KMS'
KAFKA_SERVICE_TYPE = 'KAFKA'
c_helper = config_helper.ConfigHelperV570()
class ClouderaUtilsV570(cu.ClouderaUtils):
FLUME_SERVICE_NAME = 'flume01'
SOLR_SERVICE_NAME = 'solr01'
SQOOP_SERVICE_NAME = 'sqoop01'
KS_INDEXER_SERVICE_NAME = 'ks_indexer01'
IMPALA_SERVICE_NAME = 'impala01'
SENTRY_SERVICE_NAME = 'sentry01'
KMS_SERVICE_NAME = 'kms01'
KAFKA_SERVICE_NAME = 'kafka01'
CM_API_VERSION = 8
NAME_SERVICE = 'nameservice01'
def __init__(self):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV570()
self.validator = validation.ValidatorV570
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
if cluster:
cm_cluster = self.get_cloudera_cluster(cluster)
elif instance:
cm_cluster = self.get_cloudera_cluster(instance.cluster)
else:
raise ValueError(_("'cluster' or 'instance' argument missed"))
if role in ['AGENT']:
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
elif role in ['SENTRY_SERVER']:
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
elif role in ['SQOOP_SERVER']:
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
elif role in ['SOLR_SERVER']:
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
elif role in ['HBASE_INDEXER']:
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
elif role in ['KMS']:
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
elif role in ['JOURNALNODE']:
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
elif role in ['YARN_STANDBYRM']:
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
elif role in ['KAFKA_BROKER']:
return cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
else:
return super(ClouderaUtilsV570, self).get_service_by_role(
role, cluster, instance)
@cpo.event_wrapper(
True, step=_("First run cluster"), param=('cluster', 1))
@cu.cloudera_cmd
def first_run(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
yield cm_cluster.first_run()
@cpo.event_wrapper(True, step=_("Create services"), param=('cluster', 1))
def create_services(self, cluster):
api = self.get_api_client(cluster)
cm_cluster = api.create_cluster(cluster.name,
fullVersion=cluster.hadoop_version)
if len(self.pu.get_zookeepers(cluster)) > 0:
cm_cluster.create_service(self.ZOOKEEPER_SERVICE_NAME,
ZOOKEEPER_SERVICE_TYPE)
cm_cluster.create_service(self.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE)
cm_cluster.create_service(self.YARN_SERVICE_NAME, YARN_SERVICE_TYPE)
cm_cluster.create_service(self.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE)
if self.pu.get_hive_metastore(cluster):
cm_cluster.create_service(self.HIVE_SERVICE_NAME,
HIVE_SERVICE_TYPE)
if self.pu.get_hue(cluster):
cm_cluster.create_service(self.HUE_SERVICE_NAME, HUE_SERVICE_TYPE)
if self.pu.get_spark_historyserver(cluster):
cm_cluster.create_service(self.SPARK_SERVICE_NAME,
SPARK_SERVICE_TYPE)
if self.pu.get_hbase_master(cluster):
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
HBASE_SERVICE_TYPE)
if len(self.pu.get_flumes(cluster)) > 0:
cm_cluster.create_service(self.FLUME_SERVICE_NAME,
FLUME_SERVICE_TYPE)
if self.pu.get_sentry(cluster):
cm_cluster.create_service(self.SENTRY_SERVICE_NAME,
SENTRY_SERVICE_TYPE)
if len(self.pu.get_solrs(cluster)) > 0:
cm_cluster.create_service(self.SOLR_SERVICE_NAME,
SOLR_SERVICE_TYPE)
if self.pu.get_sqoop(cluster):
cm_cluster.create_service(self.SQOOP_SERVICE_NAME,
SQOOP_SERVICE_TYPE)
if len(self.pu.get_hbase_indexers(cluster)) > 0:
cm_cluster.create_service(self.KS_INDEXER_SERVICE_NAME,
KS_INDEXER_SERVICE_TYPE)
if self.pu.get_catalogserver(cluster):
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
IMPALA_SERVICE_TYPE)
if self.pu.get_kms(cluster):
cm_cluster.create_service(self.KMS_SERVICE_NAME,
KMS_SERVICE_TYPE)
if len(self.pu.get_kafka_brokers(cluster)) > 0:
cm_cluster.create_service(self.KAFKA_SERVICE_NAME,
KAFKA_SERVICE_TYPE)
def await_agents(self, cluster, instances):
self._await_agents(cluster, instances, c_helper.AWAIT_AGENTS_TIMEOUT)
@cpo.event_wrapper(
True, step=_("Configure services"), param=('cluster', 1))
def configure_services(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
if len(self.pu.get_zookeepers(cluster)) > 0:
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
cluster=cluster))
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
cluster=cluster))
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yarn.update_config(self._get_configs(YARN_SERVICE_TYPE,
cluster=cluster))
oozie = cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
oozie.update_config(self._get_configs(OOZIE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hive_metastore(cluster):
hive = cm_cluster.get_service(self.HIVE_SERVICE_NAME)
hive.update_config(self._get_configs(HIVE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hue(cluster):
hue = cm_cluster.get_service(self.HUE_SERVICE_NAME)
hue.update_config(self._get_configs(HUE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_spark_historyserver(cluster):
spark = cm_cluster.get_service(self.SPARK_SERVICE_NAME)
spark.update_config(self._get_configs(SPARK_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hbase_master(cluster):
hbase = cm_cluster.get_service(self.HBASE_SERVICE_NAME)
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_flumes(cluster)) > 0:
flume = cm_cluster.get_service(self.FLUME_SERVICE_NAME)
flume.update_config(self._get_configs(FLUME_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_sentry(cluster):
sentry = cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
sentry.update_config(self._get_configs(SENTRY_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_solrs(cluster)) > 0:
solr = cm_cluster.get_service(self.SOLR_SERVICE_NAME)
solr.update_config(self._get_configs(SOLR_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_sqoop(cluster):
sqoop = cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
sqoop.update_config(self._get_configs(SQOOP_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_hbase_indexers(cluster)) > 0:
ks_indexer = cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
ks_indexer.update_config(
self._get_configs(KS_INDEXER_SERVICE_TYPE, cluster=cluster))
if self.pu.get_catalogserver(cluster):
impala = cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_kms(cluster):
kms = cm_cluster.get_service(self.KMS_SERVICE_NAME)
kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_kafka_brokers(cluster)) > 0:
kafka = cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
kafka.update_config(self._get_configs(KAFKA_SERVICE_TYPE,
cluster=cluster))
def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points])
all_confs = {}
if cluster:
zk_count = self.validator._get_inst_count(cluster,
'ZOOKEEPER_SERVER')
hbm_count = self.validator._get_inst_count(cluster, 'HBASE_MASTER')
snt_count = self.validator._get_inst_count(cluster,
'SENTRY_SERVER')
ks_count =\
self.validator._get_inst_count(cluster,
'KEY_VALUE_STORE_INDEXER')
kms_count = self.validator._get_inst_count(cluster, 'KMS')
imp_count =\
self.validator._get_inst_count(cluster,
'IMPALA_CATALOGSERVER')
hive_count = self.validator._get_inst_count(cluster,
'HIVE_METASTORE')
slr_count = self.validator._get_inst_count(cluster, 'SOLR_SERVER')
sqp_count = self.validator._get_inst_count(cluster, 'SQOOP_SERVER')
core_site_safety_valve = ''
if self.pu.c_helper.is_swift_enabled(cluster):
configs = swift_helper.get_swift_configs()
confs = {c['name']: c['value'] for c in configs}
core_site_safety_valve = xmlutils.create_elements_xml(confs)
all_confs = {
'HDFS': {
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
'dfs_block_local_path_access_user':
'impala' if imp_count else '',
'kms_service': self.KMS_SERVICE_NAME if kms_count else '',
'core_site_safety_valve': core_site_safety_valve
},
'HIVE': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'OOZIE': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
'hive_service':
self.HIVE_SERVICE_NAME if hive_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'YARN': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'HUE': {
'hive_service': self.HIVE_SERVICE_NAME,
'oozie_service': self.OOZIE_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'solr_service':
self.SOLR_SERVICE_NAME if slr_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else '',
'impala_service':
self.IMPALA_SERVICE_NAME if imp_count else '',
'sqoop_service':
self.SQOOP_SERVICE_NAME if sqp_count else ''
},
'SPARK_ON_YARN': {
'yarn_service': self.YARN_SERVICE_NAME
},
'HBASE': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME,
'hbase_enable_indexing': 'true' if ks_count else 'false',
'hbase_enable_replication':
'true' if ks_count else 'false'
},
'FLUME': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'solr_service':
self.SOLR_SERVICE_NAME if slr_count else '',
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else ''
},
'SENTRY': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'sentry_server_config_safety_valve': (
c_helper.SENTRY_IMPALA_CLIENT_SAFETY_VALVE
if imp_count else '')
},
'SOLR': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
},
'SQOOP': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME
},
'KS_INDEXER': {
'hbase_service': self.HBASE_SERVICE_NAME,
'solr_service': self.SOLR_SERVICE_NAME
},
'IMPALA': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else '',
'hive_service': self.HIVE_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
}
}
hive_confs = {
'HIVE': {
'hive_metastore_database_type': 'postgresql',
'hive_metastore_database_host':
self.pu.get_manager(cluster).internal_ip,
'hive_metastore_database_port': '7432',
'hive_metastore_database_password':
dh.get_hive_db_password(cluster)
}
}
hue_confs = {
'HUE': {
'hue_webhdfs': self.pu.get_role_name(
self.pu.get_namenode(cluster), 'NAMENODE')
}
}
sentry_confs = {
'SENTRY': {
'sentry_server_database_type': 'postgresql',
'sentry_server_database_host':
self.pu.get_manager(cluster).internal_ip,
'sentry_server_database_port': '7432',
'sentry_server_database_password':
dh.get_sentry_db_password(cluster)
}
}
kafka_confs = {
'KAFKA': {
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
}
}
all_confs = s_cfg.merge_configs(all_confs, hue_confs)
all_confs = s_cfg.merge_configs(all_confs, hive_confs)
all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
all_confs = s_cfg.merge_configs(all_confs, kafka_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if instance:
snt_count = self.validator._get_inst_count(instance.cluster,
'SENTRY_SERVER')
paths = instance.storage_paths()
instance_default_confs = {
'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
},
'SECONDARYNAMENODE': {
'fs_checkpoint_dir_list':
get_hadoop_dirs(paths, '/fs/snn')
},
'DATANODE': {
'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn'),
'dfs_datanode_data_dir_perm': 755,
'dfs_datanode_handler_count': 30
},
'NODEMANAGER': {
'yarn_nodemanager_local_dirs':
get_hadoop_dirs(paths, '/yarn/local'),
'container_executor_allowed_system_users':
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
"spark,oozie",
"container_executor_banned_users": "bin"
},
'SERVER': {
'maxSessionTimeout': 60000
},
'HIVESERVER2': {
'hiveserver2_enable_impersonation':
'false' if snt_count else 'true',
'hive_hs2_config_safety_valve': (
c_helper.HIVE_SERVER2_SENTRY_SAFETY_VALVE
if snt_count else '')
},
'HIVEMETASTORE': {
'hive_metastore_config_safety_valve': (
c_helper.HIVE_METASTORE_SENTRY_SAFETY_VALVE
if snt_count else '')
}
}
ng_user_confs = self.pu.convert_process_configs(
instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {})
@cpo.event_wrapper(
True, step=_("Enable NameNode HA"), param=('cluster', 1))
@cu.cloudera_cmd
def enable_namenode_ha(self, cluster):
standby_nn = self.pu.get_secondarynamenode(cluster)
standby_nn_host_name = standby_nn.fqdn()
jns = self.pu.get_jns(cluster)
jn_list = []
for index, jn in enumerate(jns):
jn_host_name = jn.fqdn()
jn_list.append({'jnHostId': jn_host_name,
'jnName': 'JN%i' % index,
'jnEditsDir': '/dfs/jn'
})
cm_cluster = self.get_cloudera_cluster(cluster)
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
nn = hdfs.get_roles_by_type('NAMENODE')[0]
yield hdfs.enable_nn_ha(active_name=nn.name,
standby_host_id=standby_nn_host_name,
nameservice=self.NAME_SERVICE, jns=jn_list
)
@cpo.event_wrapper(
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
@cu.cloudera_cmd
def enable_resourcemanager_ha(self, cluster):
new_rm = self.pu.get_stdb_rm(cluster)
new_rm_host_name = new_rm.fqdn()
cm_cluster = self.get_cloudera_cluster(cluster)
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)
self.c_helper = config_helper.ConfigHelperV570()

View File

@ -65,10 +65,9 @@ class ConfigHelperV570(c_h.ConfigHelper):
'keytrustee/cloudera.list')
DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL = (
'http://archive.cloudera.com/'
'navigator-keytrustee5/ubuntu/'
'trusty/amd64/navigator-keytrustee'
'/archive.key')
'http://archive.cloudera.com/navigator-'
'keytrustee5/ubuntu/trusty/amd64/navigator-'
'keytrustee/archive.key')
KEY_TRUSTEE_CENTOS_REPO_URL = (
'http://archive.cloudera.com/navigator-'
@ -80,6 +79,12 @@ class ConfigHelperV570(c_h.ConfigHelper):
'/apache/hadoop/hadoop-openstack/2.6.0-cdh5.7.0'
'/hadoop-openstack-2.6.0-cdh5.7.0.jar')
SWIFT_LIB_URL = p.Config(
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
default_value=DEFAULT_SWIFT_LIB_URL,
description=("Library that adds Swift support to CDH. The file"
" will be downloaded by VMs."))
HIVE_SERVER2_SENTRY_SAFETY_VALVE = f.get_file_text(
path_to_config + 'hive-server2-sentry-safety.xml')
@ -89,162 +94,8 @@ class ConfigHelperV570(c_h.ConfigHelper):
SENTRY_IMPALA_CLIENT_SAFETY_VALVE = f.get_file_text(
path_to_config + 'sentry-impala-client-safety.xml')
_default_executor_classpath = ":".join(
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
'/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
EXECUTOR_EXTRA_CLASSPATH = p.Config(
'Executor extra classpath', 'Spark', 'cluster', priority=2,
default_value=_default_executor_classpath,
description='Value for spark.executor.extraClassPath in '
'spark-defaults.conf (default: %s)'
% _default_executor_classpath)
SWIFT_LIB_URL = p.Config(
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
default_value=DEFAULT_SWIFT_LIB_URL,
description=("Library that adds Swift support to CDH. The file"
" will be downloaded by VMs."))
KMS_REPO_URL = p.Config(
'KMS repo list URL', 'general', 'cluster', priority=1,
default_value="")
KMS_REPO_KEY_URL = p.Config(
'KMS repo key URL (for debian-based only)', 'general',
'cluster',
priority=1, default_value="")
REQUIRE_ANTI_AFFINITY = p.Config('Require Anti Affinity',
'general', 'cluster',
config_type='bool',
priority=2,
default_value=True)
def __init__(self):
super(ConfigHelperV570, self).__init__()
self.priority_one_confs = self._load_json(
self.path_to_config + 'priority-one-confs.json')
self._init_all_ng_plugin_configs()
def _get_cluster_plugin_configs(self):
confs = super(ConfigHelperV570, self)._get_ng_plugin_configs()
confs += [self.EXECUTOR_EXTRA_CLASSPATH,
self.KMS_REPO_URL,
self.KMS_REPO_KEY_URL,
self.REQUIRE_ANTI_AFFINITY]
return confs
def _init_all_ng_plugin_configs(self):
self.hdfs_confs = self._load_and_init_configs(
'hdfs-service.json', 'HDFS', 'cluster')
self.namenode_confs = self._load_and_init_configs(
'hdfs-namenode.json', 'NAMENODE', 'node')
self.datanode_confs = self._load_and_init_configs(
'hdfs-datanode.json', 'DATANODE', 'node')
self.secnamenode_confs = self._load_and_init_configs(
'hdfs-secondarynamenode.json', 'SECONDARYNAMENODE', 'node')
self.hdfs_gateway_confs = self._load_and_init_configs(
'hdfs-gateway.json', 'HDFS_GATEWAY', 'node')
self.journalnode_confs = self._load_and_init_configs(
'hdfs-journalnode.json', 'JOURNALNODE', 'node')
self.yarn_confs = self._load_and_init_configs(
'yarn-service.json', 'YARN', 'cluster')
self.resourcemanager_confs = self._load_and_init_configs(
'yarn-resourcemanager.json', 'RESOURCEMANAGER', 'node')
self.nodemanager_confs = self._load_and_init_configs(
'yarn-nodemanager.json', 'NODEMANAGER', 'node')
self.jobhistory_confs = self._load_and_init_configs(
'yarn-jobhistory.json', 'JOBHISTORY', 'node')
self.yarn_gateway_conf = self._load_and_init_configs(
'yarn-gateway.json', 'YARN_GATEWAY', 'node')
self.oozie_service_confs = self._load_and_init_configs(
'oozie-service.json', 'OOZIE', 'cluster')
self.oozie_role_confs = self._load_and_init_configs(
'oozie-oozie_server.json', 'OOZIE', 'node')
self.hive_service_confs = self._load_and_init_configs(
'hive-service.json', 'HIVE', 'cluster')
self.hive_metastore_confs = self._load_and_init_configs(
'hive-hivemetastore.json', 'HIVEMETASTORE', 'node')
self.hive_hiveserver_confs = self._load_and_init_configs(
'hive-hiveserver2.json', 'HIVESERVER', 'node')
self.hive_webhcat_confs = self._load_and_init_configs(
'hive-webhcat.json', 'WEBHCAT', 'node')
self.hue_service_confs = self._load_and_init_configs(
'hue-service.json', 'HUE', 'cluster')
self.hue_role_confs = self._load_and_init_configs(
'hue-hue_server.json', 'HUE', 'node')
self.spark_service_confs = self._load_and_init_configs(
'spark-service.json', 'SPARK_ON_YARN', 'cluster')
self.spark_role_confs = self._load_and_init_configs(
'spark-spark_yarn_history_server.json', 'SPARK_ON_YARN', 'node')
self.zookeeper_server_confs = self._load_and_init_configs(
'zookeeper-service.json', 'ZOOKEEPER', 'cluster')
self.zookeeper_service_confs = self._load_and_init_configs(
'zookeeper-server.json', 'ZOOKEEPER', 'node')
self.hbase_confs = self._load_and_init_configs(
'hbase-service.json', 'HBASE', 'cluster')
self.master_confs = self._load_and_init_configs(
'hbase-master.json', 'MASTER', 'node')
self.regionserver_confs = self._load_and_init_configs(
'hbase-regionserver.json', 'REGIONSERVER', 'node')
self.flume_service_confs = self._load_and_init_configs(
'flume-service.json', 'FLUME', 'cluster')
self.flume_agent_confs = self._load_and_init_configs(
'flume-agent.json', 'FLUME', 'node')
self.sentry_service_confs = self._load_and_init_configs(
'sentry-service.json', 'SENTRY', 'cluster')
self.sentry_server_confs = self._load_and_init_configs(
'sentry-sentry_server.json', 'SENTRY', 'node')
self.solr_service_confs = self._load_and_init_configs(
'solr-service.json', 'SOLR', 'cluster')
self.solr_server_confs = self._load_and_init_configs(
'solr-solr_server.json', 'SOLR', 'node')
self.sqoop_service_confs = self._load_and_init_configs(
'sqoop-service.json', 'SQOOP', 'cluster')
self.sqoop_server_confs = self._load_and_init_configs(
'sqoop-sqoop_server.json', 'SQOOP', 'node')
self.ks_indexer_service_confs = self._load_and_init_configs(
'ks_indexer-service.json', 'KS_INDEXER', 'cluster')
self.ks_indexer_role_confs = self._load_and_init_configs(
'ks_indexer-hbase_indexer.json', 'KS_INDEXER', 'node')
self.impala_service_confs = self._load_and_init_configs(
'impala-service.json', 'IMPALA', 'cluster')
self.impala_catalogserver_confs = self._load_and_init_configs(
'impala-catalogserver.json', 'CATALOGSERVER', 'node')
self.impala_impalad_confs = self._load_and_init_configs(
'impala-impalad.json', 'IMPALAD', 'node')
self.impala_statestore_confs = self._load_and_init_configs(
'impala-statestore.json', 'STATESTORE', 'node')
self.kms_service_confs = self._load_and_init_configs(
'kms-service.json', 'KMS', 'cluster')
self.kms_kms_confs = self._load_and_init_configs(
'kms-kms.json', 'KMS', 'node')
self.kafka_service = self._load_and_init_configs(
'kafka-service.json', 'KAFKA', 'cluster')
self.kafka_kafka_broker = self._load_and_init_configs(
'kafka-kafka_broker.json', 'KAFKA', 'node')
self.kafka_kafka_mirror_maker = self._load_and_init_configs(
'kafka-kafka_mirror_maker.json', 'KAFKA', 'node')
def get_required_anti_affinity(self, cluster):
return self._get_config_value(cluster, self.REQUIRE_ANTI_AFFINITY)
def get_kms_key_url(self, cluster):
return self._get_config_value(cluster, self.KMS_REPO_KEY_URL)

View File

@ -21,49 +21,10 @@ from sahara.plugins import utils as gu
from sahara.service.edp import hdfs_helper as h
from sahara.utils import cluster_progress_ops as cpo
PACKAGES = [
'cloudera-manager-agent',
'cloudera-manager-daemons',
'cloudera-manager-server',
'cloudera-manager-server-db-2',
'flume-ng',
'hadoop-hdfs-datanode',
'hadoop-hdfs-namenode',
'hadoop-hdfs-secondarynamenode',
'hadoop-kms'
'hadoop-mapreduce',
'hadoop-mapreduce-historyserver',
'hadoop-yarn-nodemanager',
'hadoop-yarn-resourcemanager',
'hbase',
'hbase-solr',
'hive-hcatalog',
'hive-metastore',
'hive-server2',
'hive-webhcat-server',
'hue',
'impala',
'impala-server',
'impala-state-store',
'impala-catalog',
'impala-shell',
'kafka',
'kafka-server'
'keytrustee-keyprovider',
'oozie',
'oracle-j2sdk1.7',
'sentry',
'solr-server',
'solr-doc',
'search',
'spark-history-server',
'sqoop2',
'unzip',
'zookeeper'
]
CU = cu.ClouderaUtilsV570()
PACKAGES = common_deploy.PACKAGES
def configure_cluster(cluster):
instances = gu.get_instances(cluster)
@ -110,8 +71,8 @@ def scale_cluster(cluster, instances):
CU.configure_rack_awareness(cluster)
CU.configure_instances(instances, cluster)
CU.update_configs(instances)
common_deploy.prepare_scaling_kerberized_cluster(cluster, CU,
instances)
common_deploy.prepare_scaling_kerberized_cluster(
cluster, CU, instances)
CU.pu.configure_swift(cluster, instances)
_start_roles(cluster, instances)
@ -203,40 +164,5 @@ def start_cluster(cluster):
def get_open_ports(node_group):
ports = [9000] # for CM agent
ports_map = {
'CLOUDERA_MANAGER': [7180, 7182, 7183, 7432, 7184, 8084, 8086, 10101,
9997, 9996, 8087, 9998, 9999, 8085, 9995, 9994],
'HDFS_NAMENODE': [8020, 8022, 50070, 50470],
'HDFS_SECONDARYNAMENODE': [50090, 50495],
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
'YARN_NODEMANAGER': [8040, 8041, 8042],
'YARN_JOBHISTORY': [10020, 19888],
'HIVE_METASTORE': [9083],
'HIVE_SERVER2': [10000],
'HUE_SERVER': [8888],
'OOZIE_SERVER': [11000, 11001],
'SPARK_YARN_HISTORY_SERVER': [18088],
'ZOOKEEPER_SERVER': [2181, 3181, 4181, 9010],
'HBASE_MASTER': [60000],
'HBASE_REGIONSERVER': [60020],
'FLUME_AGENT': [41414],
'SENTRY_SERVER': [8038],
'SOLR_SERVER': [8983, 8984],
'SQOOP_SERVER': [8005, 12000],
'KEY_VALUE_STORE_INDEXER': [],
'IMPALA_CATALOGSERVER': [25020, 26000],
'IMPALA_STATESTORE': [25010, 24000],
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
'KMS': [16000, 16001],
'JOURNALNODE': [8480, 8481, 8485]
}
for process in node_group.node_processes:
if process in ports_map:
ports.extend(ports_map[process])
ports = common_deploy.get_open_ports(node_group)
return ports

View File

@ -26,13 +26,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
super(EdpOozieEngine, self).__init__(cluster)
self.cloudera_utils = cu.ClouderaUtilsV570()
def get_name_node_uri(self, cluster):
if len(self.cloudera_utils.pu.get_jns(cluster)) > 0:
return 'hdfs://%s' % self.cloudera_utils.NAME_SERVICE
else:
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
return 'hdfs://%s:8020' % namenode_ip
@staticmethod
def get_possible_job_config(job_type):
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):

View File

@ -13,154 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.conductor import resource as res
from sahara.plugins.cdh import commands as cmd
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh import plugin_utils as pu
from sahara.plugins.cdh.v5_7_0 import config_helper
from sahara.plugins import utils as u
class PluginUtilsV570(pu.AbstractPluginUtils):
def __init__(self):
self.c_helper = config_helper.ConfigHelperV570()
def get_role_name(self, instance, service):
# NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}"
shortcuts = {
'AGENT': 'A',
'ALERTPUBLISHER': 'AP',
'CATALOGSERVER': 'ICS',
'DATANODE': 'DN',
'EVENTSERVER': 'ES',
'HBASE_INDEXER': 'LHBI',
'HIVEMETASTORE': 'HVM',
'HIVESERVER2': 'HVS',
'HOSTMONITOR': 'HM',
'IMPALAD': 'ID',
'JOBHISTORY': 'JS',
'JOURNALNODE': 'JN',
'KAFKA_BROKER': 'KB',
'KMS': 'KMS',
'MASTER': 'M',
'NAMENODE': 'NN',
'NODEMANAGER': 'NM',
'OOZIE_SERVER': 'OS',
'REGIONSERVER': 'RS',
'RESOURCEMANAGER': 'RM',
'SECONDARYNAMENODE': 'SNN',
'SENTRY_SERVER': 'SNT',
'SERVER': 'S',
'SERVICEMONITOR': 'SM',
'SOLR_SERVER': 'SLR',
'SPARK_YARN_HISTORY_SERVER': 'SHS',
'SQOOP_SERVER': 'S2S',
'STATESTORE': 'ISS',
'WEBHCAT': 'WHC',
'HDFS_GATEWAY': 'HG',
'YARN_GATEWAY': 'YG'
}
return '%s_%s' % (shortcuts.get(service, service),
instance.hostname().replace('-', '_'))
def get_sentry(self, cluster):
return u.get_instance(cluster, 'SENTRY_SERVER')
def get_flumes(self, cluster):
return u.get_instances(cluster, 'FLUME_AGENT')
def get_solrs(self, cluster):
return u.get_instances(cluster, 'SOLR_SERVER')
def get_sqoop(self, cluster):
return u.get_instance(cluster, 'SQOOP_SERVER')
def get_hbase_indexers(self, cluster):
return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER')
def get_catalogserver(self, cluster):
return u.get_instance(cluster, 'IMPALA_CATALOGSERVER')
def get_statestore(self, cluster):
return u.get_instance(cluster, 'IMPALA_STATESTORE')
def get_impalads(self, cluster):
return u.get_instances(cluster, 'IMPALAD')
def get_kms(self, cluster):
return u.get_instances(cluster, 'KMS')
def get_jns(self, cluster):
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
def get_stdb_rm(self, cluster):
return u.get_instance(cluster, 'YARN_STANDBYRM')
def get_kafka_brokers(self, cluster):
return u.get_instances(cluster, 'KAFKA_BROKER')
def convert_process_configs(self, configs):
p_dict = {
"CLOUDERA": ['MANAGER'],
"NAMENODE": ['NAMENODE'],
"DATANODE": ['DATANODE'],
"SECONDARYNAMENODE": ['SECONDARYNAMENODE'],
"RESOURCEMANAGER": ['RESOURCEMANAGER'],
"NODEMANAGER": ['NODEMANAGER'],
"JOBHISTORY": ['JOBHISTORY'],
"OOZIE": ['OOZIE_SERVER'],
"HIVESERVER": ['HIVESERVER2'],
"HIVEMETASTORE": ['HIVEMETASTORE'],
"WEBHCAT": ['WEBHCAT'],
"HUE": ['HUE_SERVER'],
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
"ZOOKEEPER": ['SERVER'],
"MASTER": ['MASTER'],
"REGIONSERVER": ['REGIONSERVER'],
"FLUME": ['AGENT'],
"CATALOGSERVER": ['CATALOGSERVER'],
"STATESTORE": ['STATESTORE'],
"IMPALAD": ['IMPALAD'],
"KS_INDEXER": ['HBASE_INDEXER'],
"SENTRY": ['SENTRY_SERVER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"KMS": ['KMS'],
"YARN_GATEWAY": ['YARN_GATEWAY'],
"HDFS_GATEWAY": ['HDFS_GATEWAY'],
"JOURNALNODE": ['JOURNALNODE'],
"KAFKA": ['KAFKA_BROKER']
}
if isinstance(configs, res.Resource):
configs = configs.to_dict()
for k in configs.keys():
if k in p_dict.keys():
item = configs[k]
del configs[k]
newkey = p_dict[k][0]
configs[newkey] = item
return res.Resource(configs)
def configure_sentry(self, cluster):
manager = self.get_manager(cluster)
with manager.remote() as r:
dh.create_sentry_database(cluster, r)
def _configure_repo_from_inst(self, instance):
super(PluginUtilsV570, self)._configure_repo_from_inst(instance)
cluster = instance.cluster
with instance.remote() as r:
if cmd.is_ubuntu_os(r):
kms_key = (
self.c_helper.get_kms_key_url(cluster) or
self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL)
kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL
cmd.add_ubuntu_repository(r, kms_repo_url, 'kms')
cmd.add_apt_key(r, kms_key)
cmd.update_repository(r)
if cmd.is_centos_os(r):
kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL
cmd.add_centos_repository(r, kms_repo_url, 'kms')
cmd.update_repository(r)

View File

@ -13,220 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.i18n import _
from sahara.plugins.cdh.v5_7_0 import plugin_utils as pu
from sahara.plugins.cdh import validation
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
class ValidatorV570(validation.Validator):
PU = pu.PluginUtilsV570()
@classmethod
def validate_cluster_creating(cls, cluster):
super(ValidatorV570, cls).validate_cluster_creating(cluster)
cls._hdfs_ha_validation(cluster)
cls._yarn_ha_validation(cluster)
cls._flume_validation(cluster)
cls._sentry_validation(cluster)
cls._solr_validation(cluster)
cls._sqoop_validation(cluster)
cls._hbase_indexer_validation(cluster)
cls._impala_validation(cluster)
cls._kms_validation(cluster)
@classmethod
def _hdfs_ha_validation(cls, cluster):
jn_count = cls._get_inst_count(cluster, 'HDFS_JOURNALNODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
if jn_count > 0:
if jn_count < 3:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('not less than 3'),
jn_count)
if not jn_count % 2:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('be odd'), jn_count)
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HDFS HA')
if require_anti_affinity:
if 'HDFS_SECONDARYNAMENODE' not in\
cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_SECONDARYNAMENODE should be enabled '
'in anti_affinity.'))
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
@classmethod
def _yarn_ha_validation(cls, cluster):
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
stdb_rm_count = cls._get_inst_count(cluster, 'YARN_STANDBYRM')
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
if stdb_rm_count > 1:
raise ex.InvalidComponentCountException(
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
if stdb_rm_count > 0:
if rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='RM HA')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='RM HA')
if require_anti_affinity:
if 'YARN_RESOURCEMANAGER' not in\
cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_RESOURCEMANAGER should be enabled in '
'anti_affinity.'))
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_STANDBYRM should be'
' enabled in anti_affinity.'))
@classmethod
def _flume_validation(cls, cluster):
a_count = cls._get_inst_count(cluster, 'FLUME_AGENT')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
if a_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='FLUME_AGENT')
@classmethod
def _sentry_validation(cls, cluster):
snt_count = cls._get_inst_count(cluster, 'SENTRY_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if snt_count > 1:
raise ex.InvalidComponentCountException(
'SENTRY_SERVER', _('0 or 1'), snt_count)
if snt_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SENTRY_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SENTRY_SERVER')
@classmethod
def _solr_validation(cls, cluster):
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
if slr_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SOLR_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SOLR_SERVER')
@classmethod
def _sqoop_validation(cls, cluster):
s2s_count = cls._get_inst_count(cluster, 'SQOOP_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
if s2s_count > 1:
raise ex.InvalidComponentCountException(
'SQOOP_SERVER', _('0 or 1'), s2s_count)
if s2s_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SQOOP_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
@classmethod
def _hbase_indexer_validation(cls, cluster):
lhbi_count = cls._get_inst_count(cluster, 'HBASE_INDEXER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
if lhbi_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='HBASE_INDEXER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE_INDEXER')
if slr_count < 1:
raise ex.RequiredServiceMissingException(
'SOLR_SERVER', required_by='HBASE_INDEXER')
if hbm_count < 1:
raise ex.RequiredServiceMissingException(
'HBASE_MASTER', required_by='HBASE_INDEXER')
@classmethod
def _impala_validation(cls, cluster):
ics_count = cls._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
iss_count = cls._get_inst_count(cluster, 'IMPALA_STATESTORE')
id_count = cls._get_inst_count(cluster, 'IMPALAD')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
if ics_count > 1:
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
_('0 or 1'), ics_count)
if iss_count > 1:
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
_('0 or 1'), iss_count)
if ics_count == 1:
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
datanodes = set(ng.id for ng in datanode_ng)
impalads = set(ng.id for ng in impalad_ng)
if datanodes != impalads:
raise ex.InvalidClusterTopology(
_("IMPALAD must be installed on every HDFS_DATANODE"))
if iss_count != 1:
raise ex.RequiredServiceMissingException(
'IMPALA_STATESTORE', required_by='IMPALA')
if id_count < 1:
raise ex.RequiredServiceMissingException(
'IMPALAD', required_by='IMPALA')
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='IMPALA')
if hms_count < 1:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='IMPALA')
@classmethod
def _kms_validation(cls, cluster):
kms_count = cls._get_inst_count(cluster, 'KMS')
if kms_count > 1:
raise ex.InvalidComponentCountException('KMS',
_('0 or 1'), kms_count)
@classmethod
def _get_anti_affinity(cls, cluster):
return cluster.anti_affinity

View File

@ -21,7 +21,6 @@ from sahara.plugins.cdh.v5_7_0 import deploy
from sahara.plugins.cdh.v5_7_0 import edp_engine
from sahara.plugins.cdh.v5_7_0 import plugin_utils
from sahara.plugins.cdh.v5_7_0 import validation
from sahara.plugins import kerberos
class VersionHandler(avm.BaseVersionHandler):
@ -34,67 +33,3 @@ class VersionHandler(avm.BaseVersionHandler):
self.deploy = deploy
self.edp_engine = edp_engine
self.validation = validation.ValidatorV570()
def get_plugin_configs(self):
result = super(VersionHandler, self).get_plugin_configs()
result.extend(kerberos.get_config_list())
return result
def get_node_processes(self):
return {
"CLOUDERA": ['CLOUDERA_MANAGER'],
"HDFS": ['HDFS_NAMENODE', 'HDFS_DATANODE',
'HDFS_SECONDARYNAMENODE', 'HDFS_JOURNALNODE'],
"YARN": ['YARN_RESOURCEMANAGER', 'YARN_NODEMANAGER',
'YARN_JOBHISTORY', 'YARN_STANDBYRM'],
"OOZIE": ['OOZIE_SERVER'],
"HIVE": ['HIVE_SERVER2', 'HIVE_METASTORE', 'HIVE_WEBHCAT'],
"HUE": ['HUE_SERVER'],
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
"ZOOKEEPER": ['ZOOKEEPER_SERVER'],
"HBASE": ['HBASE_MASTER', 'HBASE_REGIONSERVER'],
"FLUME": ['FLUME_AGENT'],
"IMPALA": ['IMPALA_CATALOGSERVER', 'IMPALA_STATESTORE', 'IMPALAD'],
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"SENTRY": ['SENTRY_SERVER'],
"KMS": ['KMS'],
"KAFKA": ['KAFKA_BROKER'],
"YARN_GATEWAY": [],
"RESOURCEMANAGER": [],
"NODEMANAGER": [],
"JOBHISTORY": [],
"HDFS_GATEWAY": [],
'DATANODE': [],
'NAMENODE': [],
'SECONDARYNAMENODE': [],
'JOURNALNODE': [],
'REGIONSERVER': [],
'MASTER': [],
'HIVEMETASTORE': [],
'HIVESERVER': [],
'WEBCAT': [],
'CATALOGSERVER': [],
'STATESTORE': [],
'IMPALAD': [],
'Kerberos': [],
}
def get_edp_engine(self, cluster, job_type):
oozie_type = self.edp_engine.EdpOozieEngine.get_supported_job_types()
spark_type = self.edp_engine.EdpSparkEngine.get_supported_job_types()
if job_type in oozie_type:
return self.edp_engine.EdpOozieEngine(cluster)
if job_type in spark_type:
return self.edp_engine.EdpSparkEngine(cluster)
return None
def get_edp_job_types(self):
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
edp_engine.EdpSparkEngine.get_supported_job_types())

View File

@ -13,459 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.i18n import _
from sahara.plugins.cdh import cloudera_utils as cu
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh.v5_9_0 import config_helper
from sahara.plugins.cdh.v5_9_0 import plugin_utils as pu
from sahara.plugins.cdh.v5_9_0 import validation
from sahara.swift import swift_helper
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import configs as s_cfg
from sahara.utils import xmlutils
HDFS_SERVICE_TYPE = 'HDFS'
YARN_SERVICE_TYPE = 'YARN'
OOZIE_SERVICE_TYPE = 'OOZIE'
HIVE_SERVICE_TYPE = 'HIVE'
HUE_SERVICE_TYPE = 'HUE'
SPARK_SERVICE_TYPE = 'SPARK_ON_YARN'
ZOOKEEPER_SERVICE_TYPE = 'ZOOKEEPER'
HBASE_SERVICE_TYPE = 'HBASE'
FLUME_SERVICE_TYPE = 'FLUME'
SENTRY_SERVICE_TYPE = 'SENTRY'
SOLR_SERVICE_TYPE = 'SOLR'
SQOOP_SERVICE_TYPE = 'SQOOP'
KS_INDEXER_SERVICE_TYPE = 'KS_INDEXER'
IMPALA_SERVICE_TYPE = 'IMPALA'
KMS_SERVICE_TYPE = 'KMS'
KAFKA_SERVICE_TYPE = 'KAFKA'
c_helper = config_helper.ConfigHelperV590()
class ClouderaUtilsV590(cu.ClouderaUtils):
FLUME_SERVICE_NAME = 'flume01'
SOLR_SERVICE_NAME = 'solr01'
SQOOP_SERVICE_NAME = 'sqoop01'
KS_INDEXER_SERVICE_NAME = 'ks_indexer01'
IMPALA_SERVICE_NAME = 'impala01'
SENTRY_SERVICE_NAME = 'sentry01'
KMS_SERVICE_NAME = 'kms01'
KAFKA_SERVICE_NAME = 'kafka01'
CM_API_VERSION = 8
NAME_SERVICE = 'nameservice01'
def __init__(self):
cu.ClouderaUtils.__init__(self)
self.pu = pu.PluginUtilsV590()
self.validator = validation.ValidatorV590
def get_service_by_role(self, role, cluster=None, instance=None):
cm_cluster = None
if cluster:
cm_cluster = self.get_cloudera_cluster(cluster)
elif instance:
cm_cluster = self.get_cloudera_cluster(instance.cluster)
else:
raise ValueError(_("'cluster' or 'instance' argument missed"))
if role in ['AGENT']:
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
elif role in ['SENTRY_SERVER']:
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
elif role in ['SQOOP_SERVER']:
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
elif role in ['SOLR_SERVER']:
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
elif role in ['HBASE_INDEXER']:
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
elif role in ['KMS']:
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
elif role in ['JOURNALNODE']:
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
elif role in ['YARN_STANDBYRM']:
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
elif role in ['KAFKA_BROKER']:
return cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
else:
return super(ClouderaUtilsV590, self).get_service_by_role(
role, cluster, instance)
@cpo.event_wrapper(
True, step=_("First run cluster"), param=('cluster', 1))
@cu.cloudera_cmd
def first_run(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
yield cm_cluster.first_run()
@cpo.event_wrapper(True, step=_("Create services"), param=('cluster', 1))
def create_services(self, cluster):
api = self.get_api_client(cluster)
cm_cluster = api.create_cluster(cluster.name,
fullVersion=cluster.hadoop_version)
if len(self.pu.get_zookeepers(cluster)) > 0:
cm_cluster.create_service(self.ZOOKEEPER_SERVICE_NAME,
ZOOKEEPER_SERVICE_TYPE)
cm_cluster.create_service(self.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE)
cm_cluster.create_service(self.YARN_SERVICE_NAME, YARN_SERVICE_TYPE)
cm_cluster.create_service(self.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE)
if self.pu.get_hive_metastore(cluster):
cm_cluster.create_service(self.HIVE_SERVICE_NAME,
HIVE_SERVICE_TYPE)
if self.pu.get_hue(cluster):
cm_cluster.create_service(self.HUE_SERVICE_NAME, HUE_SERVICE_TYPE)
if self.pu.get_spark_historyserver(cluster):
cm_cluster.create_service(self.SPARK_SERVICE_NAME,
SPARK_SERVICE_TYPE)
if self.pu.get_hbase_master(cluster):
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
HBASE_SERVICE_TYPE)
if len(self.pu.get_flumes(cluster)) > 0:
cm_cluster.create_service(self.FLUME_SERVICE_NAME,
FLUME_SERVICE_TYPE)
if self.pu.get_sentry(cluster):
cm_cluster.create_service(self.SENTRY_SERVICE_NAME,
SENTRY_SERVICE_TYPE)
if len(self.pu.get_solrs(cluster)) > 0:
cm_cluster.create_service(self.SOLR_SERVICE_NAME,
SOLR_SERVICE_TYPE)
if self.pu.get_sqoop(cluster):
cm_cluster.create_service(self.SQOOP_SERVICE_NAME,
SQOOP_SERVICE_TYPE)
if len(self.pu.get_hbase_indexers(cluster)) > 0:
cm_cluster.create_service(self.KS_INDEXER_SERVICE_NAME,
KS_INDEXER_SERVICE_TYPE)
if self.pu.get_catalogserver(cluster):
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
IMPALA_SERVICE_TYPE)
if self.pu.get_kms(cluster):
cm_cluster.create_service(self.KMS_SERVICE_NAME,
KMS_SERVICE_TYPE)
if len(self.pu.get_kafka_brokers(cluster)) > 0:
cm_cluster.create_service(self.KAFKA_SERVICE_NAME,
KAFKA_SERVICE_TYPE)
def await_agents(self, cluster, instances):
self._await_agents(cluster, instances, c_helper.AWAIT_AGENTS_TIMEOUT)
@cpo.event_wrapper(
True, step=_("Configure services"), param=('cluster', 1))
def configure_services(self, cluster):
cm_cluster = self.get_cloudera_cluster(cluster)
if len(self.pu.get_zookeepers(cluster)) > 0:
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
cluster=cluster))
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
cluster=cluster))
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yarn.update_config(self._get_configs(YARN_SERVICE_TYPE,
cluster=cluster))
oozie = cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
oozie.update_config(self._get_configs(OOZIE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hive_metastore(cluster):
hive = cm_cluster.get_service(self.HIVE_SERVICE_NAME)
hive.update_config(self._get_configs(HIVE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hue(cluster):
hue = cm_cluster.get_service(self.HUE_SERVICE_NAME)
hue.update_config(self._get_configs(HUE_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_spark_historyserver(cluster):
spark = cm_cluster.get_service(self.SPARK_SERVICE_NAME)
spark.update_config(self._get_configs(SPARK_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_hbase_master(cluster):
hbase = cm_cluster.get_service(self.HBASE_SERVICE_NAME)
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_flumes(cluster)) > 0:
flume = cm_cluster.get_service(self.FLUME_SERVICE_NAME)
flume.update_config(self._get_configs(FLUME_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_sentry(cluster):
sentry = cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
sentry.update_config(self._get_configs(SENTRY_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_solrs(cluster)) > 0:
solr = cm_cluster.get_service(self.SOLR_SERVICE_NAME)
solr.update_config(self._get_configs(SOLR_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_sqoop(cluster):
sqoop = cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
sqoop.update_config(self._get_configs(SQOOP_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_hbase_indexers(cluster)) > 0:
ks_indexer = cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
ks_indexer.update_config(
self._get_configs(KS_INDEXER_SERVICE_TYPE, cluster=cluster))
if self.pu.get_catalogserver(cluster):
impala = cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
cluster=cluster))
if self.pu.get_kms(cluster):
kms = cm_cluster.get_service(self.KMS_SERVICE_NAME)
kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
cluster=cluster))
if len(self.pu.get_kafka_brokers(cluster)) > 0:
kafka = cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
kafka.update_config(self._get_configs(KAFKA_SERVICE_TYPE,
cluster=cluster))
def _get_configs(self, service, cluster=None, instance=None):
def get_hadoop_dirs(mount_points, suffix):
return ','.join([x + suffix for x in mount_points])
all_confs = {}
if cluster:
zk_count = self.validator._get_inst_count(cluster,
'ZOOKEEPER_SERVER')
hbm_count = self.validator._get_inst_count(cluster, 'HBASE_MASTER')
snt_count = self.validator._get_inst_count(cluster,
'SENTRY_SERVER')
ks_count =\
self.validator._get_inst_count(cluster,
'KEY_VALUE_STORE_INDEXER')
kms_count = self.validator._get_inst_count(cluster, 'KMS')
imp_count =\
self.validator._get_inst_count(cluster,
'IMPALA_CATALOGSERVER')
hive_count = self.validator._get_inst_count(cluster,
'HIVE_METASTORE')
slr_count = self.validator._get_inst_count(cluster, 'SOLR_SERVER')
sqp_count = self.validator._get_inst_count(cluster, 'SQOOP_SERVER')
core_site_safety_valve = ''
if self.pu.c_helper.is_swift_enabled(cluster):
configs = swift_helper.get_swift_configs()
confs = {c['name']: c['value'] for c in configs}
core_site_safety_valve = xmlutils.create_elements_xml(confs)
all_confs = {
'HDFS': {
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
'dfs_block_local_path_access_user':
'impala' if imp_count else '',
'kms_service': self.KMS_SERVICE_NAME if kms_count else '',
'core_site_safety_valve': core_site_safety_valve
},
'HIVE': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'OOZIE': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
'hive_service':
self.HIVE_SERVICE_NAME if hive_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'YARN': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
},
'HUE': {
'hive_service': self.HIVE_SERVICE_NAME,
'oozie_service': self.OOZIE_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'solr_service':
self.SOLR_SERVICE_NAME if slr_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else '',
'impala_service':
self.IMPALA_SERVICE_NAME if imp_count else '',
'sqoop_service':
self.SQOOP_SERVICE_NAME if sqp_count else ''
},
'SPARK_ON_YARN': {
'yarn_service': self.YARN_SERVICE_NAME
},
'HBASE': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME,
'hbase_enable_indexing': 'true' if ks_count else 'false',
'hbase_enable_replication':
'true' if ks_count else 'false'
},
'FLUME': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'solr_service':
self.SOLR_SERVICE_NAME if slr_count else '',
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else ''
},
'SENTRY': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'sentry_server_config_safety_valve': (
c_helper.SENTRY_IMPALA_CLIENT_SAFETY_VALVE
if imp_count else '')
},
'SOLR': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
},
'SQOOP': {
'mapreduce_yarn_service': self.YARN_SERVICE_NAME
},
'KS_INDEXER': {
'hbase_service': self.HBASE_SERVICE_NAME,
'solr_service': self.SOLR_SERVICE_NAME
},
'IMPALA': {
'hdfs_service': self.HDFS_SERVICE_NAME,
'hbase_service':
self.HBASE_SERVICE_NAME if hbm_count else '',
'hive_service': self.HIVE_SERVICE_NAME,
'sentry_service':
self.SENTRY_SERVICE_NAME if snt_count else '',
'zookeeper_service':
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
}
}
hive_confs = {
'HIVE': {
'hive_metastore_database_type': 'postgresql',
'hive_metastore_database_host':
self.pu.get_manager(cluster).internal_ip,
'hive_metastore_database_port': '7432',
'hive_metastore_database_password':
dh.get_hive_db_password(cluster)
}
}
hue_confs = {
'HUE': {
'hue_webhdfs': self.pu.get_role_name(
self.pu.get_namenode(cluster), 'NAMENODE')
}
}
sentry_confs = {
'SENTRY': {
'sentry_server_database_type': 'postgresql',
'sentry_server_database_host':
self.pu.get_manager(cluster).internal_ip,
'sentry_server_database_port': '7432',
'sentry_server_database_password':
dh.get_sentry_db_password(cluster)
}
}
kafka_confs = {
'KAFKA': {
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
}
}
all_confs = s_cfg.merge_configs(all_confs, hue_confs)
all_confs = s_cfg.merge_configs(all_confs, hive_confs)
all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
all_confs = s_cfg.merge_configs(all_confs, kafka_confs)
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
if instance:
snt_count = self.validator._get_inst_count(instance.cluster,
'SENTRY_SERVER')
paths = instance.storage_paths()
instance_default_confs = {
'NAMENODE': {
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
},
'SECONDARYNAMENODE': {
'fs_checkpoint_dir_list':
get_hadoop_dirs(paths, '/fs/snn')
},
'DATANODE': {
'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn'),
'dfs_datanode_data_dir_perm': 755,
'dfs_datanode_handler_count': 30
},
'NODEMANAGER': {
'yarn_nodemanager_local_dirs':
get_hadoop_dirs(paths, '/yarn/local'),
'container_executor_allowed_system_users':
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
"spark,oozie",
"container_executor_banned_users": "bin"
},
'SERVER': {
'maxSessionTimeout': 60000
},
'HIVESERVER2': {
'hiveserver2_enable_impersonation':
'false' if snt_count else 'true',
'hive_hs2_config_safety_valve': (
c_helper.HIVE_SERVER2_SENTRY_SAFETY_VALVE
if snt_count else '')
},
'HIVEMETASTORE': {
'hive_metastore_config_safety_valve': (
c_helper.HIVE_METASTORE_SENTRY_SAFETY_VALVE
if snt_count else '')
}
}
ng_user_confs = self.pu.convert_process_configs(
instance.node_group.node_configs)
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
return all_confs.get(service, {})
@cpo.event_wrapper(
True, step=_("Enable NameNode HA"), param=('cluster', 1))
@cu.cloudera_cmd
def enable_namenode_ha(self, cluster):
standby_nn = self.pu.get_secondarynamenode(cluster)
standby_nn_host_name = standby_nn.fqdn()
jns = self.pu.get_jns(cluster)
jn_list = []
for index, jn in enumerate(jns):
jn_host_name = jn.fqdn()
jn_list.append({'jnHostId': jn_host_name,
'jnName': 'JN%i' % index,
'jnEditsDir': '/dfs/jn'
})
cm_cluster = self.get_cloudera_cluster(cluster)
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
nn = hdfs.get_roles_by_type('NAMENODE')[0]
yield hdfs.enable_nn_ha(active_name=nn.name,
standby_host_id=standby_nn_host_name,
nameservice=self.NAME_SERVICE, jns=jn_list
)
@cpo.event_wrapper(
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
@cu.cloudera_cmd
def enable_resourcemanager_ha(self, cluster):
new_rm = self.pu.get_stdb_rm(cluster)
new_rm_host_name = new_rm.fqdn()
cm_cluster = self.get_cloudera_cluster(cluster)
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)
self.c_helper = config_helper.ConfigHelperV590()

View File

@ -65,10 +65,9 @@ class ConfigHelperV590(c_h.ConfigHelper):
'keytrustee/cloudera.list')
DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL = (
'http://archive.cloudera.com/'
'navigator-keytrustee5/ubuntu/'
'trusty/amd64/navigator-keytrustee'
'/archive.key')
'http://archive.cloudera.com/navigator-'
'keytrustee5/ubuntu/trusty/amd64/navigator-'
'keytrustee/archive.key')
KEY_TRUSTEE_CENTOS_REPO_URL = (
'http://archive.cloudera.com/navigator-'
@ -80,6 +79,12 @@ class ConfigHelperV590(c_h.ConfigHelper):
'/apache/hadoop/hadoop-openstack/2.6.0-cdh5.9.0'
'/hadoop-openstack-2.6.0-cdh5.9.0.jar')
SWIFT_LIB_URL = p.Config(
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
default_value=DEFAULT_SWIFT_LIB_URL,
description=("Library that adds Swift support to CDH. The file"
" will be downloaded by VMs."))
HIVE_SERVER2_SENTRY_SAFETY_VALVE = f.get_file_text(
path_to_config + 'hive-server2-sentry-safety.xml')
@ -89,162 +94,8 @@ class ConfigHelperV590(c_h.ConfigHelper):
SENTRY_IMPALA_CLIENT_SAFETY_VALVE = f.get_file_text(
path_to_config + 'sentry-impala-client-safety.xml')
_default_executor_classpath = ":".join(
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
'/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
EXECUTOR_EXTRA_CLASSPATH = p.Config(
'Executor extra classpath', 'Spark', 'cluster', priority=2,
default_value=_default_executor_classpath,
description='Value for spark.executor.extraClassPath in '
'spark-defaults.conf (default: %s)'
% _default_executor_classpath)
SWIFT_LIB_URL = p.Config(
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
default_value=DEFAULT_SWIFT_LIB_URL,
description=("Library that adds Swift support to CDH. The file"
" will be downloaded by VMs."))
KMS_REPO_URL = p.Config(
'KMS repo list URL', 'general', 'cluster', priority=1,
default_value="")
KMS_REPO_KEY_URL = p.Config(
'KMS repo key URL (for debian-based only)', 'general',
'cluster',
priority=1, default_value="")
REQUIRE_ANTI_AFFINITY = p.Config('Require Anti Affinity',
'general', 'cluster',
config_type='bool',
priority=2,
default_value=True)
def __init__(self):
super(ConfigHelperV590, self).__init__()
self.priority_one_confs = self._load_json(
self.path_to_config + 'priority-one-confs.json')
self._init_all_ng_plugin_configs()
def _get_cluster_plugin_configs(self):
confs = super(ConfigHelperV590, self)._get_ng_plugin_configs()
confs += [self.EXECUTOR_EXTRA_CLASSPATH,
self.KMS_REPO_URL,
self.KMS_REPO_KEY_URL,
self.REQUIRE_ANTI_AFFINITY]
return confs
def _init_all_ng_plugin_configs(self):
self.hdfs_confs = self._load_and_init_configs(
'hdfs-service.json', 'HDFS', 'cluster')
self.namenode_confs = self._load_and_init_configs(
'hdfs-namenode.json', 'NAMENODE', 'node')
self.datanode_confs = self._load_and_init_configs(
'hdfs-datanode.json', 'DATANODE', 'node')
self.secnamenode_confs = self._load_and_init_configs(
'hdfs-secondarynamenode.json', 'SECONDARYNAMENODE', 'node')
self.hdfs_gateway_confs = self._load_and_init_configs(
'hdfs-gateway.json', 'HDFS_GATEWAY', 'node')
self.journalnode_confs = self._load_and_init_configs(
'hdfs-journalnode.json', 'JOURNALNODE', 'node')
self.yarn_confs = self._load_and_init_configs(
'yarn-service.json', 'YARN', 'cluster')
self.resourcemanager_confs = self._load_and_init_configs(
'yarn-resourcemanager.json', 'RESOURCEMANAGER', 'node')
self.nodemanager_confs = self._load_and_init_configs(
'yarn-nodemanager.json', 'NODEMANAGER', 'node')
self.jobhistory_confs = self._load_and_init_configs(
'yarn-jobhistory.json', 'JOBHISTORY', 'node')
self.yarn_gateway_conf = self._load_and_init_configs(
'yarn-gateway.json', 'YARN_GATEWAY', 'node')
self.oozie_service_confs = self._load_and_init_configs(
'oozie-service.json', 'OOZIE', 'cluster')
self.oozie_role_confs = self._load_and_init_configs(
'oozie-oozie_server.json', 'OOZIE', 'node')
self.hive_service_confs = self._load_and_init_configs(
'hive-service.json', 'HIVE', 'cluster')
self.hive_metastore_confs = self._load_and_init_configs(
'hive-hivemetastore.json', 'HIVEMETASTORE', 'node')
self.hive_hiveserver_confs = self._load_and_init_configs(
'hive-hiveserver2.json', 'HIVESERVER', 'node')
self.hive_webhcat_confs = self._load_and_init_configs(
'hive-webhcat.json', 'WEBHCAT', 'node')
self.hue_service_confs = self._load_and_init_configs(
'hue-service.json', 'HUE', 'cluster')
self.hue_role_confs = self._load_and_init_configs(
'hue-hue_server.json', 'HUE', 'node')
self.spark_service_confs = self._load_and_init_configs(
'spark-service.json', 'SPARK_ON_YARN', 'cluster')
self.spark_role_confs = self._load_and_init_configs(
'spark-spark_yarn_history_server.json', 'SPARK_ON_YARN', 'node')
self.zookeeper_server_confs = self._load_and_init_configs(
'zookeeper-service.json', 'ZOOKEEPER', 'cluster')
self.zookeeper_service_confs = self._load_and_init_configs(
'zookeeper-server.json', 'ZOOKEEPER', 'node')
self.hbase_confs = self._load_and_init_configs(
'hbase-service.json', 'HBASE', 'cluster')
self.master_confs = self._load_and_init_configs(
'hbase-master.json', 'MASTER', 'node')
self.regionserver_confs = self._load_and_init_configs(
'hbase-regionserver.json', 'REGIONSERVER', 'node')
self.flume_service_confs = self._load_and_init_configs(
'flume-service.json', 'FLUME', 'cluster')
self.flume_agent_confs = self._load_and_init_configs(
'flume-agent.json', 'FLUME', 'node')
self.sentry_service_confs = self._load_and_init_configs(
'sentry-service.json', 'SENTRY', 'cluster')
self.sentry_server_confs = self._load_and_init_configs(
'sentry-sentry_server.json', 'SENTRY', 'node')
self.solr_service_confs = self._load_and_init_configs(
'solr-service.json', 'SOLR', 'cluster')
self.solr_server_confs = self._load_and_init_configs(
'solr-solr_server.json', 'SOLR', 'node')
self.sqoop_service_confs = self._load_and_init_configs(
'sqoop-service.json', 'SQOOP', 'cluster')
self.sqoop_server_confs = self._load_and_init_configs(
'sqoop-sqoop_server.json', 'SQOOP', 'node')
self.ks_indexer_service_confs = self._load_and_init_configs(
'ks_indexer-service.json', 'KS_INDEXER', 'cluster')
self.ks_indexer_role_confs = self._load_and_init_configs(
'ks_indexer-hbase_indexer.json', 'KS_INDEXER', 'node')
self.impala_service_confs = self._load_and_init_configs(
'impala-service.json', 'IMPALA', 'cluster')
self.impala_catalogserver_confs = self._load_and_init_configs(
'impala-catalogserver.json', 'CATALOGSERVER', 'node')
self.impala_impalad_confs = self._load_and_init_configs(
'impala-impalad.json', 'IMPALAD', 'node')
self.impala_statestore_confs = self._load_and_init_configs(
'impala-statestore.json', 'STATESTORE', 'node')
self.kms_service_confs = self._load_and_init_configs(
'kms-service.json', 'KMS', 'cluster')
self.kms_kms_confs = self._load_and_init_configs(
'kms-kms.json', 'KMS', 'node')
self.kafka_service = self._load_and_init_configs(
'kafka-service.json', 'KAFKA', 'cluster')
self.kafka_kafka_broker = self._load_and_init_configs(
'kafka-kafka_broker.json', 'KAFKA', 'node')
self.kafka_kafka_mirror_maker = self._load_and_init_configs(
'kafka-kafka_mirror_maker.json', 'KAFKA', 'node')
def get_required_anti_affinity(self, cluster):
return self._get_config_value(cluster, self.REQUIRE_ANTI_AFFINITY)
def get_kms_key_url(self, cluster):
return self._get_config_value(cluster, self.KMS_REPO_KEY_URL)

View File

@ -21,49 +21,10 @@ from sahara.plugins import utils as gu
from sahara.service.edp import hdfs_helper as h
from sahara.utils import cluster_progress_ops as cpo
PACKAGES = [
'cloudera-manager-agent',
'cloudera-manager-daemons',
'cloudera-manager-server',
'cloudera-manager-server-db-2',
'flume-ng',
'hadoop-hdfs-datanode',
'hadoop-hdfs-namenode',
'hadoop-hdfs-secondarynamenode',
'hadoop-kms'
'hadoop-mapreduce',
'hadoop-mapreduce-historyserver',
'hadoop-yarn-nodemanager',
'hadoop-yarn-resourcemanager',
'hbase',
'hbase-solr',
'hive-hcatalog',
'hive-metastore',
'hive-server2',
'hive-webhcat-server',
'hue',
'impala',
'impala-server',
'impala-state-store',
'impala-catalog',
'impala-shell',
'kafka',
'kafka-server'
'keytrustee-keyprovider',
'oozie',
'oracle-j2sdk1.7',
'sentry',
'solr-server',
'solr-doc',
'search',
'spark-history-server',
'sqoop2',
'unzip',
'zookeeper'
]
CU = cu.ClouderaUtilsV590()
PACKAGES = common_deploy.PACKAGES
def configure_cluster(cluster):
instances = gu.get_instances(cluster)
@ -110,8 +71,8 @@ def scale_cluster(cluster, instances):
CU.configure_rack_awareness(cluster)
CU.configure_instances(instances, cluster)
CU.update_configs(instances)
common_deploy.prepare_scaling_kerberized_cluster(cluster, CU,
instances)
common_deploy.prepare_scaling_kerberized_cluster(
cluster, CU, instances)
CU.pu.configure_swift(cluster, instances)
_start_roles(cluster, instances)
@ -203,40 +164,5 @@ def start_cluster(cluster):
def get_open_ports(node_group):
ports = [9000] # for CM agent
ports_map = {
'CLOUDERA_MANAGER': [7180, 7182, 7183, 7432, 7184, 8084, 8086, 10101,
9997, 9996, 8087, 9998, 9999, 8085, 9995, 9994],
'HDFS_NAMENODE': [8020, 8022, 50070, 50470],
'HDFS_SECONDARYNAMENODE': [50090, 50495],
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
'YARN_NODEMANAGER': [8040, 8041, 8042],
'YARN_JOBHISTORY': [10020, 19888],
'HIVE_METASTORE': [9083],
'HIVE_SERVER2': [10000],
'HUE_SERVER': [8888],
'OOZIE_SERVER': [11000, 11001],
'SPARK_YARN_HISTORY_SERVER': [18088],
'ZOOKEEPER_SERVER': [2181, 3181, 4181, 9010],
'HBASE_MASTER': [60000],
'HBASE_REGIONSERVER': [60020],
'FLUME_AGENT': [41414],
'SENTRY_SERVER': [8038],
'SOLR_SERVER': [8983, 8984],
'SQOOP_SERVER': [8005, 12000],
'KEY_VALUE_STORE_INDEXER': [],
'IMPALA_CATALOGSERVER': [25020, 26000],
'IMPALA_STATESTORE': [25010, 24000],
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
'KMS': [16000, 16001],
'JOURNALNODE': [8480, 8481, 8485]
}
for process in node_group.node_processes:
if process in ports_map:
ports.extend(ports_map[process])
ports = common_deploy.get_open_ports(node_group)
return ports

View File

@ -26,13 +26,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
super(EdpOozieEngine, self).__init__(cluster)
self.cloudera_utils = cu.ClouderaUtilsV590()
def get_name_node_uri(self, cluster):
if len(self.cloudera_utils.pu.get_jns(cluster)) > 0:
return 'hdfs://%s' % self.cloudera_utils.NAME_SERVICE
else:
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
return 'hdfs://%s:8020' % namenode_ip
@staticmethod
def get_possible_job_config(job_type):
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):

View File

@ -13,154 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.conductor import resource as res
from sahara.plugins.cdh import commands as cmd
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh import plugin_utils as pu
from sahara.plugins.cdh.v5_9_0 import config_helper
from sahara.plugins import utils as u
class PluginUtilsV590(pu.AbstractPluginUtils):
def __init__(self):
self.c_helper = config_helper.ConfigHelperV590()
def get_role_name(self, instance, service):
# NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}"
shortcuts = {
'AGENT': 'A',
'ALERTPUBLISHER': 'AP',
'CATALOGSERVER': 'ICS',
'DATANODE': 'DN',
'EVENTSERVER': 'ES',
'HBASE_INDEXER': 'LHBI',
'HIVEMETASTORE': 'HVM',
'HIVESERVER2': 'HVS',
'HOSTMONITOR': 'HM',
'IMPALAD': 'ID',
'JOBHISTORY': 'JS',
'JOURNALNODE': 'JN',
'KAFKA_BROKER': 'KB',
'KMS': 'KMS',
'MASTER': 'M',
'NAMENODE': 'NN',
'NODEMANAGER': 'NM',
'OOZIE_SERVER': 'OS',
'REGIONSERVER': 'RS',
'RESOURCEMANAGER': 'RM',
'SECONDARYNAMENODE': 'SNN',
'SENTRY_SERVER': 'SNT',
'SERVER': 'S',
'SERVICEMONITOR': 'SM',
'SOLR_SERVER': 'SLR',
'SPARK_YARN_HISTORY_SERVER': 'SHS',
'SQOOP_SERVER': 'S2S',
'STATESTORE': 'ISS',
'WEBHCAT': 'WHC',
'HDFS_GATEWAY': 'HG',
'YARN_GATEWAY': 'YG'
}
return '%s_%s' % (shortcuts.get(service, service),
instance.hostname().replace('-', '_'))
def get_sentry(self, cluster):
return u.get_instance(cluster, 'SENTRY_SERVER')
def get_flumes(self, cluster):
return u.get_instances(cluster, 'FLUME_AGENT')
def get_solrs(self, cluster):
return u.get_instances(cluster, 'SOLR_SERVER')
def get_sqoop(self, cluster):
return u.get_instance(cluster, 'SQOOP_SERVER')
def get_hbase_indexers(self, cluster):
return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER')
def get_catalogserver(self, cluster):
return u.get_instance(cluster, 'IMPALA_CATALOGSERVER')
def get_statestore(self, cluster):
return u.get_instance(cluster, 'IMPALA_STATESTORE')
def get_impalads(self, cluster):
return u.get_instances(cluster, 'IMPALAD')
def get_kms(self, cluster):
return u.get_instances(cluster, 'KMS')
def get_jns(self, cluster):
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
def get_stdb_rm(self, cluster):
return u.get_instance(cluster, 'YARN_STANDBYRM')
def get_kafka_brokers(self, cluster):
return u.get_instances(cluster, 'KAFKA_BROKER')
def convert_process_configs(self, configs):
p_dict = {
"CLOUDERA": ['MANAGER'],
"NAMENODE": ['NAMENODE'],
"DATANODE": ['DATANODE'],
"SECONDARYNAMENODE": ['SECONDARYNAMENODE'],
"RESOURCEMANAGER": ['RESOURCEMANAGER'],
"NODEMANAGER": ['NODEMANAGER'],
"JOBHISTORY": ['JOBHISTORY'],
"OOZIE": ['OOZIE_SERVER'],
"HIVESERVER": ['HIVESERVER2'],
"HIVEMETASTORE": ['HIVEMETASTORE'],
"WEBHCAT": ['WEBHCAT'],
"HUE": ['HUE_SERVER'],
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
"ZOOKEEPER": ['SERVER'],
"MASTER": ['MASTER'],
"REGIONSERVER": ['REGIONSERVER'],
"FLUME": ['AGENT'],
"CATALOGSERVER": ['CATALOGSERVER'],
"STATESTORE": ['STATESTORE'],
"IMPALAD": ['IMPALAD'],
"KS_INDEXER": ['HBASE_INDEXER'],
"SENTRY": ['SENTRY_SERVER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"KMS": ['KMS'],
"YARN_GATEWAY": ['YARN_GATEWAY'],
"HDFS_GATEWAY": ['HDFS_GATEWAY'],
"JOURNALNODE": ['JOURNALNODE'],
"KAFKA": ['KAFKA_BROKER']
}
if isinstance(configs, res.Resource):
configs = configs.to_dict()
for k in configs.keys():
if k in p_dict.keys():
item = configs[k]
del configs[k]
newkey = p_dict[k][0]
configs[newkey] = item
return res.Resource(configs)
def configure_sentry(self, cluster):
manager = self.get_manager(cluster)
with manager.remote() as r:
dh.create_sentry_database(cluster, r)
def _configure_repo_from_inst(self, instance):
super(PluginUtilsV590, self)._configure_repo_from_inst(instance)
cluster = instance.cluster
with instance.remote() as r:
if cmd.is_ubuntu_os(r):
kms_key = (
self.c_helper.get_kms_key_url(cluster) or
self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL)
kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL
cmd.add_ubuntu_repository(r, kms_repo_url, 'kms')
cmd.add_apt_key(r, kms_key)
cmd.update_repository(r)
if cmd.is_centos_os(r):
kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL
cmd.add_centos_repository(r, kms_repo_url, 'kms')
cmd.update_repository(r)

View File

@ -13,220 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.i18n import _
from sahara.plugins.cdh.v5_9_0 import plugin_utils as pu
from sahara.plugins.cdh import validation
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
class ValidatorV590(validation.Validator):
PU = pu.PluginUtilsV590()
@classmethod
def validate_cluster_creating(cls, cluster):
super(ValidatorV590, cls).validate_cluster_creating(cluster)
cls._hdfs_ha_validation(cluster)
cls._yarn_ha_validation(cluster)
cls._flume_validation(cluster)
cls._sentry_validation(cluster)
cls._solr_validation(cluster)
cls._sqoop_validation(cluster)
cls._hbase_indexer_validation(cluster)
cls._impala_validation(cluster)
cls._kms_validation(cluster)
@classmethod
def _hdfs_ha_validation(cls, cluster):
jn_count = cls._get_inst_count(cluster, 'HDFS_JOURNALNODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
if jn_count > 0:
if jn_count < 3:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('not less than 3'),
jn_count)
if not jn_count % 2:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('be odd'), jn_count)
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HDFS HA')
if require_anti_affinity:
if 'HDFS_SECONDARYNAMENODE' not in\
cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_SECONDARYNAMENODE should be enabled '
'in anti_affinity.'))
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
@classmethod
def _yarn_ha_validation(cls, cluster):
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
stdb_rm_count = cls._get_inst_count(cluster, 'YARN_STANDBYRM')
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
if stdb_rm_count > 1:
raise ex.InvalidComponentCountException(
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
if stdb_rm_count > 0:
if rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='RM HA')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='RM HA')
if require_anti_affinity:
if 'YARN_RESOURCEMANAGER' not in\
cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_RESOURCEMANAGER should be enabled in '
'anti_affinity.'))
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_STANDBYRM should be'
' enabled in anti_affinity.'))
@classmethod
def _flume_validation(cls, cluster):
a_count = cls._get_inst_count(cluster, 'FLUME_AGENT')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
if a_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='FLUME_AGENT')
@classmethod
def _sentry_validation(cls, cluster):
snt_count = cls._get_inst_count(cluster, 'SENTRY_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if snt_count > 1:
raise ex.InvalidComponentCountException(
'SENTRY_SERVER', _('0 or 1'), snt_count)
if snt_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SENTRY_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SENTRY_SERVER')
@classmethod
def _solr_validation(cls, cluster):
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
if slr_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SOLR_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SOLR_SERVER')
@classmethod
def _sqoop_validation(cls, cluster):
s2s_count = cls._get_inst_count(cluster, 'SQOOP_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
if s2s_count > 1:
raise ex.InvalidComponentCountException(
'SQOOP_SERVER', _('0 or 1'), s2s_count)
if s2s_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SQOOP_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
@classmethod
def _hbase_indexer_validation(cls, cluster):
lhbi_count = cls._get_inst_count(cluster, 'HBASE_INDEXER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
if lhbi_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='HBASE_INDEXER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE_INDEXER')
if slr_count < 1:
raise ex.RequiredServiceMissingException(
'SOLR_SERVER', required_by='HBASE_INDEXER')
if hbm_count < 1:
raise ex.RequiredServiceMissingException(
'HBASE_MASTER', required_by='HBASE_INDEXER')
@classmethod
def _impala_validation(cls, cluster):
ics_count = cls._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
iss_count = cls._get_inst_count(cluster, 'IMPALA_STATESTORE')
id_count = cls._get_inst_count(cluster, 'IMPALAD')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
if ics_count > 1:
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
_('0 or 1'), ics_count)
if iss_count > 1:
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
_('0 or 1'), iss_count)
if ics_count == 1:
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
datanodes = set(ng.id for ng in datanode_ng)
impalads = set(ng.id for ng in impalad_ng)
if datanodes != impalads:
raise ex.InvalidClusterTopology(
_("IMPALAD must be installed on every HDFS_DATANODE"))
if iss_count != 1:
raise ex.RequiredServiceMissingException(
'IMPALA_STATESTORE', required_by='IMPALA')
if id_count < 1:
raise ex.RequiredServiceMissingException(
'IMPALAD', required_by='IMPALA')
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='IMPALA')
if hms_count < 1:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='IMPALA')
@classmethod
def _kms_validation(cls, cluster):
kms_count = cls._get_inst_count(cluster, 'KMS')
if kms_count > 1:
raise ex.InvalidComponentCountException('KMS',
_('0 or 1'), kms_count)
@classmethod
def _get_anti_affinity(cls, cluster):
return cluster.anti_affinity

View File

@ -21,7 +21,6 @@ from sahara.plugins.cdh.v5_9_0 import deploy
from sahara.plugins.cdh.v5_9_0 import edp_engine
from sahara.plugins.cdh.v5_9_0 import plugin_utils
from sahara.plugins.cdh.v5_9_0 import validation
from sahara.plugins import kerberos
class VersionHandler(avm.BaseVersionHandler):
@ -34,67 +33,3 @@ class VersionHandler(avm.BaseVersionHandler):
self.deploy = deploy
self.edp_engine = edp_engine
self.validation = validation.ValidatorV590()
def get_plugin_configs(self):
result = super(VersionHandler, self).get_plugin_configs()
result.extend(kerberos.get_config_list())
return result
def get_node_processes(self):
return {
"CLOUDERA": ['CLOUDERA_MANAGER'],
"HDFS": ['HDFS_NAMENODE', 'HDFS_DATANODE',
'HDFS_SECONDARYNAMENODE', 'HDFS_JOURNALNODE'],
"YARN": ['YARN_RESOURCEMANAGER', 'YARN_NODEMANAGER',
'YARN_JOBHISTORY', 'YARN_STANDBYRM'],
"OOZIE": ['OOZIE_SERVER'],
"HIVE": ['HIVE_SERVER2', 'HIVE_METASTORE', 'HIVE_WEBHCAT'],
"HUE": ['HUE_SERVER'],
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
"ZOOKEEPER": ['ZOOKEEPER_SERVER'],
"HBASE": ['HBASE_MASTER', 'HBASE_REGIONSERVER'],
"FLUME": ['FLUME_AGENT'],
"IMPALA": ['IMPALA_CATALOGSERVER', 'IMPALA_STATESTORE', 'IMPALAD'],
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
"SOLR": ['SOLR_SERVER'],
"SQOOP": ['SQOOP_SERVER'],
"SENTRY": ['SENTRY_SERVER'],
"KMS": ['KMS'],
"KAFKA": ['KAFKA_BROKER'],
"YARN_GATEWAY": [],
"RESOURCEMANAGER": [],
"NODEMANAGER": [],
"JOBHISTORY": [],
"HDFS_GATEWAY": [],
'DATANODE': [],
'NAMENODE': [],
'SECONDARYNAMENODE': [],
'JOURNALNODE': [],
'REGIONSERVER': [],
'MASTER': [],
'HIVEMETASTORE': [],
'HIVESERVER': [],
'WEBCAT': [],
'CATALOGSERVER': [],
'STATESTORE': [],
'IMPALAD': [],
'Kerberos': [],
}
def get_edp_engine(self, cluster, job_type):
oozie_type = self.edp_engine.EdpOozieEngine.get_supported_job_types()
spark_type = self.edp_engine.EdpSparkEngine.get_supported_job_types()
if job_type in oozie_type:
return self.edp_engine.EdpOozieEngine(cluster)
if job_type in spark_type:
return self.edp_engine.EdpSparkEngine(cluster)
return None
def get_edp_job_types(self):
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
edp_engine.EdpSparkEngine.get_supported_job_types())

View File

@ -30,24 +30,35 @@ class Validator(object):
cls._hue_validation(cluster)
cls._hbase_validation(cluster)
cls._flume_validation(cluster)
cls._sentry_validation(cluster)
cls._solr_validation(cluster)
cls._sqoop_validation(cluster)
cls._hbase_indexer_validation(cluster)
cls._impala_validation(cluster)
cls._kms_validation(cluster)
cls._hdfs_ha_validation(cluster)
cls._yarn_ha_validation(cluster)
@classmethod
def _basic_validation(cls, cluster):
mng_count = cls._get_inst_count(cluster, 'CLOUDERA_MANAGER')
mng_count = cls.get_inst_count(cluster, 'CLOUDERA_MANAGER')
if mng_count != 1:
raise ex.InvalidComponentCountException('CLOUDERA_MANAGER',
1, mng_count)
nn_count = cls._get_inst_count(cluster, 'HDFS_NAMENODE')
nn_count = cls.get_inst_count(cluster, 'HDFS_NAMENODE')
if nn_count != 1:
raise ex.InvalidComponentCountException(
'HDFS_NAMENODE', 1, nn_count)
snn_count = cls._get_inst_count(cluster, 'HDFS_SECONDARYNAMENODE')
snn_count = cls.get_inst_count(cluster, 'HDFS_SECONDARYNAMENODE')
if snn_count != 1:
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE',
1, snn_count)
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
replicas = cls.PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.InvalidComponentCountException(
@ -55,12 +66,12 @@ class Validator(object):
_('Number of datanodes must be not'
' less than dfs_replication.'))
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
rm_count = cls.get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if rm_count > 1:
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
_('0 or 1'), rm_count)
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
hs_count = cls.get_inst_count(cluster, 'YARN_JOBHISTORY')
if hs_count > 1:
raise ex.InvalidComponentCountException('YARN_JOBHISTORY',
_('0 or 1'),
@ -70,7 +81,7 @@ class Validator(object):
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='YARN_RESOURCEMANAGER')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
nm_count = cls.get_inst_count(cluster, 'YARN_NODEMANAGER')
if rm_count == 0:
if nm_count > 0:
raise ex.RequiredServiceMissingException(
@ -79,10 +90,10 @@ class Validator(object):
@classmethod
def _oozie_validation(cls, cluster):
oo_count = cls._get_inst_count(cluster, 'OOZIE_SERVER')
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
oo_count = cls.get_inst_count(cluster, 'OOZIE_SERVER')
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
nm_count = cls.get_inst_count(cluster, 'YARN_NODEMANAGER')
hs_count = cls.get_inst_count(cluster, 'YARN_JOBHISTORY')
if oo_count > 1:
raise ex.InvalidComponentCountException(
@ -103,10 +114,10 @@ class Validator(object):
@classmethod
def _hive_validation(cls, cluster):
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
hvs_count = cls._get_inst_count(cluster, 'HIVE_SERVER2')
whc_count = cls._get_inst_count(cluster, 'HIVE_WEBHCAT')
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
hms_count = cls.get_inst_count(cluster, 'HIVE_METASTORE')
hvs_count = cls.get_inst_count(cluster, 'HIVE_SERVER2')
whc_count = cls.get_inst_count(cluster, 'HIVE_WEBHCAT')
rm_count = cls.get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if hms_count and rm_count < 1:
raise ex.RequiredServiceMissingException(
@ -126,15 +137,15 @@ class Validator(object):
@classmethod
def _hue_validation(cls, cluster):
hue_count = cls._get_inst_count(cluster, 'HUE_SERVER')
hue_count = cls.get_inst_count(cluster, 'HUE_SERVER')
if hue_count > 1:
raise ex.InvalidComponentCountException(
'HUE_SERVER', _('0 or 1'), hue_count)
shs_count = cls._get_inst_count(cluster, 'SPARK_YARN_HISTORY_SERVER')
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
oo_count = cls._get_inst_count(cluster, 'OOZIE_SERVER')
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
shs_count = cls.get_inst_count(cluster, 'SPARK_YARN_HISTORY_SERVER')
hms_count = cls.get_inst_count(cluster, 'HIVE_METASTORE')
oo_count = cls.get_inst_count(cluster, 'OOZIE_SERVER')
rm_count = cls.get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
if shs_count > 1:
raise ex.InvalidComponentCountException(
@ -155,9 +166,9 @@ class Validator(object):
@classmethod
def _hbase_validation(cls, cluster):
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
hbr_count = cls._get_inst_count(cluster, 'HBASE_REGIONSERVER')
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
hbm_count = cls.get_inst_count(cluster, 'HBASE_MASTER')
hbr_count = cls.get_inst_count(cluster, 'HBASE_REGIONSERVER')
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if hbm_count == 1:
if zk_count < 1:
@ -208,7 +219,7 @@ class Validator(object):
ng.name,
msg % {'processes': ' '.join(ng.node_processes)})
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
replicas = cls.PU.get_config_value('HDFS', 'dfs_replication', cluster)
if dn_count < replicas:
raise ex.ClusterCannotBeScaled(
@ -220,5 +231,200 @@ class Validator(object):
return ['HDFS_DATANODE', 'YARN_NODEMANAGER']
@classmethod
def _get_inst_count(cls, cluster, process):
def _flume_validation(cls, cluster):
a_count = cls.get_inst_count(cluster, 'FLUME_AGENT')
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
if a_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='FLUME_AGENT')
@classmethod
def _sentry_validation(cls, cluster):
snt_count = cls.get_inst_count(cluster, 'SENTRY_SERVER')
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
if snt_count > 1:
raise ex.InvalidComponentCountException(
'SENTRY_SERVER', _('0 or 1'), snt_count)
if snt_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SENTRY_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SENTRY_SERVER')
@classmethod
def _solr_validation(cls, cluster):
slr_count = cls.get_inst_count(cluster, 'SOLR_SERVER')
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
if slr_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SOLR_SERVER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='SOLR_SERVER')
@classmethod
def _sqoop_validation(cls, cluster):
s2s_count = cls.get_inst_count(cluster, 'SQOOP_SERVER')
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
hs_count = cls.get_inst_count(cluster, 'YARN_JOBHISTORY')
nm_count = cls.get_inst_count(cluster, 'YARN_NODEMANAGER')
if s2s_count > 1:
raise ex.InvalidComponentCountException(
'SQOOP_SERVER', _('0 or 1'), s2s_count)
if s2s_count == 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='SQOOP_SERVER')
if nm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
if hs_count != 1:
raise ex.RequiredServiceMissingException(
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
@classmethod
def _hbase_indexer_validation(cls, cluster):
lhbi_count = cls.get_inst_count(cluster, 'HBASE_INDEXER')
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
slr_count = cls.get_inst_count(cluster, 'SOLR_SERVER')
hbm_count = cls.get_inst_count(cluster, 'HBASE_MASTER')
if lhbi_count >= 1:
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='HBASE_INDEXER')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='HBASE_INDEXER')
if slr_count < 1:
raise ex.RequiredServiceMissingException(
'SOLR_SERVER', required_by='HBASE_INDEXER')
if hbm_count < 1:
raise ex.RequiredServiceMissingException(
'HBASE_MASTER', required_by='HBASE_INDEXER')
@classmethod
def _impala_validation(cls, cluster):
ics_count = cls.get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
iss_count = cls.get_inst_count(cluster, 'IMPALA_STATESTORE')
id_count = cls.get_inst_count(cluster, 'IMPALAD')
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
hms_count = cls.get_inst_count(cluster, 'HIVE_METASTORE')
if ics_count > 1:
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
_('0 or 1'), ics_count)
if iss_count > 1:
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
_('0 or 1'), iss_count)
if ics_count == 1:
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
datanodes = set(ng.id for ng in datanode_ng)
impalads = set(ng.id for ng in impalad_ng)
if datanodes != impalads:
raise ex.InvalidClusterTopology(
_("IMPALAD must be installed on every HDFS_DATANODE"))
if iss_count != 1:
raise ex.RequiredServiceMissingException(
'IMPALA_STATESTORE', required_by='IMPALA')
if id_count < 1:
raise ex.RequiredServiceMissingException(
'IMPALAD', required_by='IMPALA')
if dn_count < 1:
raise ex.RequiredServiceMissingException(
'HDFS_DATANODE', required_by='IMPALA')
if hms_count < 1:
raise ex.RequiredServiceMissingException(
'HIVE_METASTORE', required_by='IMPALA')
@classmethod
def _kms_validation(cls, cluster):
kms_count = cls.get_inst_count(cluster, 'KMS')
if kms_count > 1:
raise ex.InvalidComponentCountException('KMS',
_('0 or 1'), kms_count)
@classmethod
def _hdfs_ha_validation(cls, cluster):
jn_count = cls.get_inst_count(cluster, 'HDFS_JOURNALNODE')
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
if jn_count > 0:
if jn_count < 3:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('not less than 3'),
jn_count)
if not jn_count % 2:
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
_('be odd'), jn_count)
if zk_count < 1:
raise ex.RequiredServiceMissingException('ZOOKEEPER',
required_by='HDFS HA')
if require_anti_affinity:
if 'HDFS_SECONDARYNAMENODE' not in \
cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_SECONDARYNAMENODE should be enabled '
'in anti_affinity.'))
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
raise ex.NameNodeHAConfigurationError(
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
@classmethod
def _yarn_ha_validation(cls, cluster):
rm_count = cls.get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
stdb_rm_count = cls.get_inst_count(cluster, 'YARN_STANDBYRM')
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
cluster)
if stdb_rm_count > 1:
raise ex.InvalidComponentCountException(
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
if stdb_rm_count > 0:
if rm_count < 1:
raise ex.RequiredServiceMissingException(
'YARN_RESOURCEMANAGER', required_by='RM HA')
if zk_count < 1:
raise ex.RequiredServiceMissingException(
'ZOOKEEPER', required_by='RM HA')
if require_anti_affinity:
if 'YARN_RESOURCEMANAGER' not in \
cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_RESOURCEMANAGER should be enabled in '
'anti_affinity.'))
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
raise ex.ResourceManagerHAConfigurationError(
_('YARN_STANDBYRM should be'
' enabled in anti_affinity.'))
@classmethod
def _get_anti_affinity(cls, cluster):
return cluster.anti_affinity
@classmethod
def get_inst_count(cls, cluster, process):
return sum([ng.count for ng in u.get_node_groups(cluster, process)])