Merge "Refactor rest of CDH plugin code"
This commit is contained in:
commit
8c0cf3058e
|
@ -21,6 +21,7 @@ from sahara import conductor
|
|||
from sahara import context
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh import health
|
||||
from sahara.plugins import kerberos
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
|
@ -90,10 +91,55 @@ class BaseVersionHandler(AbstractVersionHandler):
|
|||
self.validation = None # to validate
|
||||
|
||||
def get_plugin_configs(self):
|
||||
return self.config_helper.get_plugin_configs()
|
||||
result = self.config_helper.get_plugin_configs()
|
||||
result.extend(kerberos.get_config_list())
|
||||
return result
|
||||
|
||||
def get_node_processes(self):
|
||||
raise NotImplementedError()
|
||||
return {
|
||||
"CLOUDERA": ['CLOUDERA_MANAGER'],
|
||||
"HDFS": ['HDFS_NAMENODE', 'HDFS_DATANODE',
|
||||
'HDFS_SECONDARYNAMENODE', 'HDFS_JOURNALNODE'],
|
||||
"YARN": ['YARN_RESOURCEMANAGER', 'YARN_NODEMANAGER',
|
||||
'YARN_JOBHISTORY', 'YARN_STANDBYRM'],
|
||||
"OOZIE": ['OOZIE_SERVER'],
|
||||
"HIVE": ['HIVE_SERVER2', 'HIVE_METASTORE', 'HIVE_WEBHCAT'],
|
||||
"HUE": ['HUE_SERVER'],
|
||||
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
|
||||
"ZOOKEEPER": ['ZOOKEEPER_SERVER'],
|
||||
"HBASE": ['HBASE_MASTER', 'HBASE_REGIONSERVER'],
|
||||
"FLUME": ['FLUME_AGENT'],
|
||||
"IMPALA": ['IMPALA_CATALOGSERVER', 'IMPALA_STATESTORE', 'IMPALAD'],
|
||||
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
|
||||
"SOLR": ['SOLR_SERVER'],
|
||||
"SQOOP": ['SQOOP_SERVER'],
|
||||
"SENTRY": ['SENTRY_SERVER'],
|
||||
"KMS": ['KMS'],
|
||||
"KAFKA": ['KAFKA_BROKER'],
|
||||
|
||||
"YARN_GATEWAY": [],
|
||||
"RESOURCEMANAGER": [],
|
||||
"NODEMANAGER": [],
|
||||
"JOBHISTORY": [],
|
||||
|
||||
"HDFS_GATEWAY": [],
|
||||
'DATANODE': [],
|
||||
'NAMENODE': [],
|
||||
'SECONDARYNAMENODE': [],
|
||||
'JOURNALNODE': [],
|
||||
|
||||
'REGIONSERVER': [],
|
||||
'MASTER': [],
|
||||
|
||||
'HIVEMETASTORE': [],
|
||||
'HIVESERVER': [],
|
||||
'WEBCAT': [],
|
||||
|
||||
'CATALOGSERVER': [],
|
||||
'STATESTORE': [],
|
||||
'IMPALAD': [],
|
||||
'Kerberos': [],
|
||||
}
|
||||
|
||||
def validate(self, cluster):
|
||||
self.validation.validate_cluster_creating(cluster)
|
||||
|
@ -133,12 +179,16 @@ class BaseVersionHandler(AbstractVersionHandler):
|
|||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
oozie_type = self.edp_engine.EdpOozieEngine.get_supported_job_types()
|
||||
spark_type = self.edp_engine.EdpSparkEngine.get_supported_job_types()
|
||||
if job_type in oozie_type:
|
||||
return self.edp_engine.EdpOozieEngine(cluster)
|
||||
if job_type in spark_type:
|
||||
return self.edp_engine.EdpSparkEngine(cluster)
|
||||
return None
|
||||
|
||||
def get_edp_job_types(self):
|
||||
return self.edp_engine.EdpOozieEngine.get_supported_job_types()
|
||||
return (self.edp_engine.EdpOozieEngine.get_supported_job_types() +
|
||||
self.edp_engine.EdpSparkEngine.get_supported_job_types())
|
||||
|
||||
def get_edp_config_hints(self, job_type):
|
||||
return self.edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
|
||||
|
|
|
@ -15,22 +15,41 @@
|
|||
|
||||
import functools
|
||||
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from sahara import context
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh.client import api_client
|
||||
from sahara.plugins.cdh.client import services
|
||||
from sahara.plugins.cdh import db_helper
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh import plugin_utils
|
||||
from sahara.plugins.cdh import validation
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins import kerberos
|
||||
from sahara.swift import swift_helper
|
||||
from sahara.topology import topology_helper as t_helper
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import configs as s_cfg
|
||||
from sahara.utils import poll_utils
|
||||
from sahara.utils import xmlutils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
HDFS_SERVICE_TYPE = 'HDFS'
|
||||
YARN_SERVICE_TYPE = 'YARN'
|
||||
OOZIE_SERVICE_TYPE = 'OOZIE'
|
||||
HIVE_SERVICE_TYPE = 'HIVE'
|
||||
HUE_SERVICE_TYPE = 'HUE'
|
||||
SPARK_SERVICE_TYPE = 'SPARK_ON_YARN'
|
||||
ZOOKEEPER_SERVICE_TYPE = 'ZOOKEEPER'
|
||||
HBASE_SERVICE_TYPE = 'HBASE'
|
||||
FLUME_SERVICE_TYPE = 'FLUME'
|
||||
SENTRY_SERVICE_TYPE = 'SENTRY'
|
||||
SOLR_SERVICE_TYPE = 'SOLR'
|
||||
SQOOP_SERVICE_TYPE = 'SQOOP'
|
||||
KS_INDEXER_SERVICE_TYPE = 'KS_INDEXER'
|
||||
IMPALA_SERVICE_TYPE = 'IMPALA'
|
||||
KMS_SERVICE_TYPE = 'KMS'
|
||||
KAFKA_SERVICE_TYPE = 'KAFKA'
|
||||
|
||||
|
||||
def cloudera_cmd(f):
|
||||
|
@ -51,7 +70,7 @@ def cloudera_cmd(f):
|
|||
class ClouderaUtils(object):
|
||||
CM_DEFAULT_USERNAME = 'admin'
|
||||
CM_DEFAULT_PASSWD = 'admin'
|
||||
CM_API_VERSION = 6
|
||||
CM_API_VERSION = 8
|
||||
|
||||
HDFS_SERVICE_NAME = 'hdfs01'
|
||||
YARN_SERVICE_NAME = 'yarn01'
|
||||
|
@ -62,8 +81,20 @@ class ClouderaUtils(object):
|
|||
ZOOKEEPER_SERVICE_NAME = 'zookeeper01'
|
||||
HBASE_SERVICE_NAME = 'hbase01'
|
||||
|
||||
FLUME_SERVICE_NAME = 'flume01'
|
||||
SOLR_SERVICE_NAME = 'solr01'
|
||||
SQOOP_SERVICE_NAME = 'sqoop01'
|
||||
KS_INDEXER_SERVICE_NAME = 'ks_indexer01'
|
||||
IMPALA_SERVICE_NAME = 'impala01'
|
||||
SENTRY_SERVICE_NAME = 'sentry01'
|
||||
KMS_SERVICE_NAME = 'kms01'
|
||||
KAFKA_SERVICE_NAME = 'kafka01'
|
||||
NAME_SERVICE = 'nameservice01'
|
||||
|
||||
def __init__(self):
|
||||
self.pu = plugin_utils.AbstractPluginUtils()
|
||||
self.validator = validation.Validator
|
||||
self.c_helper = None
|
||||
|
||||
def get_api_client_by_default_password(self, cluster):
|
||||
manager_ip = self.pu.get_manager(cluster).management_ip
|
||||
|
@ -74,7 +105,7 @@ class ClouderaUtils(object):
|
|||
|
||||
def get_api_client(self, cluster, api_version=None):
|
||||
manager_ip = self.pu.get_manager(cluster).management_ip
|
||||
cm_password = db_helper.get_cm_password(cluster)
|
||||
cm_password = dh.get_cm_password(cluster)
|
||||
version = self.CM_API_VERSION if not api_version else api_version
|
||||
return api_client.ApiResource(manager_ip,
|
||||
username=self.CM_DEFAULT_USERNAME,
|
||||
|
@ -84,7 +115,7 @@ class ClouderaUtils(object):
|
|||
def update_cloudera_password(self, cluster):
|
||||
api = self.get_api_client_by_default_password(cluster)
|
||||
user = api.get_user(self.CM_DEFAULT_USERNAME)
|
||||
user.password = db_helper.get_cm_password(cluster)
|
||||
user.password = dh.get_cm_password(cluster)
|
||||
api.update_user(user)
|
||||
|
||||
def get_cloudera_cluster(self, cluster):
|
||||
|
@ -223,7 +254,6 @@ class ClouderaUtils(object):
|
|||
cm.hosts_start_roles([hostname])
|
||||
|
||||
def get_service_by_role(self, role, cluster=None, instance=None):
|
||||
cm_cluster = None
|
||||
if cluster:
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
elif instance:
|
||||
|
@ -249,11 +279,86 @@ class ClouderaUtils(object):
|
|||
return cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
|
||||
elif role in ['MASTER', 'REGIONSERVER']:
|
||||
return cm_cluster.get_service(self.HBASE_SERVICE_NAME)
|
||||
elif role in ['AGENT']:
|
||||
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
|
||||
elif role in ['SENTRY_SERVER']:
|
||||
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
|
||||
elif role in ['SQOOP_SERVER']:
|
||||
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
|
||||
elif role in ['SOLR_SERVER']:
|
||||
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
|
||||
elif role in ['HBASE_INDEXER']:
|
||||
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
|
||||
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
|
||||
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||
elif role in ['KMS']:
|
||||
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||
elif role in ['JOURNALNODE']:
|
||||
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
elif role in ['YARN_STANDBYRM']:
|
||||
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
elif role in ['KAFKA_BROKER']:
|
||||
return cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
|
||||
else:
|
||||
raise ValueError(
|
||||
_("Process %(process)s is not supported by CDH plugin") %
|
||||
{'process': role})
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("First run cluster"), param=('cluster', 1))
|
||||
@cloudera_cmd
|
||||
def first_run(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yield cm_cluster.first_run()
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Create services"), param=('cluster', 1))
|
||||
def create_services(self, cluster):
|
||||
api = self.get_api_client(cluster)
|
||||
cm_cluster = api.create_cluster(cluster.name,
|
||||
fullVersion=cluster.hadoop_version)
|
||||
|
||||
if len(self.pu.get_zookeepers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.ZOOKEEPER_SERVICE_NAME,
|
||||
ZOOKEEPER_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.YARN_SERVICE_NAME, YARN_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE)
|
||||
if self.pu.get_hive_metastore(cluster):
|
||||
cm_cluster.create_service(self.HIVE_SERVICE_NAME,
|
||||
HIVE_SERVICE_TYPE)
|
||||
if self.pu.get_hue(cluster):
|
||||
cm_cluster.create_service(self.HUE_SERVICE_NAME, HUE_SERVICE_TYPE)
|
||||
if self.pu.get_spark_historyserver(cluster):
|
||||
cm_cluster.create_service(self.SPARK_SERVICE_NAME,
|
||||
SPARK_SERVICE_TYPE)
|
||||
if self.pu.get_hbase_master(cluster):
|
||||
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
|
||||
HBASE_SERVICE_TYPE)
|
||||
if len(self.pu.get_flumes(cluster)) > 0:
|
||||
cm_cluster.create_service(self.FLUME_SERVICE_NAME,
|
||||
FLUME_SERVICE_TYPE)
|
||||
if self.pu.get_sentry(cluster):
|
||||
cm_cluster.create_service(self.SENTRY_SERVICE_NAME,
|
||||
SENTRY_SERVICE_TYPE)
|
||||
if len(self.pu.get_solrs(cluster)) > 0:
|
||||
cm_cluster.create_service(self.SOLR_SERVICE_NAME,
|
||||
SOLR_SERVICE_TYPE)
|
||||
if self.pu.get_sqoop(cluster):
|
||||
cm_cluster.create_service(self.SQOOP_SERVICE_NAME,
|
||||
SQOOP_SERVICE_TYPE)
|
||||
if len(self.pu.get_hbase_indexers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.KS_INDEXER_SERVICE_NAME,
|
||||
KS_INDEXER_SERVICE_TYPE)
|
||||
if self.pu.get_catalogserver(cluster):
|
||||
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
|
||||
IMPALA_SERVICE_TYPE)
|
||||
if self.pu.get_kms(cluster):
|
||||
cm_cluster.create_service(self.KMS_SERVICE_NAME,
|
||||
KMS_SERVICE_TYPE)
|
||||
if len(self.pu.get_kafka_brokers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.KAFKA_SERVICE_NAME,
|
||||
KAFKA_SERVICE_TYPE)
|
||||
|
||||
def _agents_connected(self, instances, api):
|
||||
hostnames = [i.fqdn() for i in instances]
|
||||
hostnames_to_manager = [h.hostname for h in
|
||||
|
@ -271,6 +376,91 @@ class ClouderaUtils(object):
|
|||
_("Await Cloudera agents"), 5, {
|
||||
'instances': instances, 'api': api})
|
||||
|
||||
def await_agents(self, cluster, instances):
|
||||
self._await_agents(cluster, instances,
|
||||
self.c_helper.AWAIT_AGENTS_TIMEOUT)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Configure services"), param=('cluster', 1))
|
||||
def configure_services(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
|
||||
if len(self.pu.get_zookeepers(cluster)) > 0:
|
||||
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
|
||||
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
yarn.update_config(self._get_configs(YARN_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
oozie = cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
|
||||
oozie.update_config(self._get_configs(OOZIE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hive_metastore(cluster):
|
||||
hive = cm_cluster.get_service(self.HIVE_SERVICE_NAME)
|
||||
hive.update_config(self._get_configs(HIVE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hue(cluster):
|
||||
hue = cm_cluster.get_service(self.HUE_SERVICE_NAME)
|
||||
hue.update_config(self._get_configs(HUE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_spark_historyserver(cluster):
|
||||
spark = cm_cluster.get_service(self.SPARK_SERVICE_NAME)
|
||||
spark.update_config(self._get_configs(SPARK_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hbase_master(cluster):
|
||||
hbase = cm_cluster.get_service(self.HBASE_SERVICE_NAME)
|
||||
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_flumes(cluster)) > 0:
|
||||
flume = cm_cluster.get_service(self.FLUME_SERVICE_NAME)
|
||||
flume.update_config(self._get_configs(FLUME_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_sentry(cluster):
|
||||
sentry = cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
|
||||
sentry.update_config(self._get_configs(SENTRY_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_solrs(cluster)) > 0:
|
||||
solr = cm_cluster.get_service(self.SOLR_SERVICE_NAME)
|
||||
solr.update_config(self._get_configs(SOLR_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_sqoop(cluster):
|
||||
sqoop = cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
|
||||
sqoop.update_config(self._get_configs(SQOOP_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_hbase_indexers(cluster)) > 0:
|
||||
ks_indexer = cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
|
||||
ks_indexer.update_config(
|
||||
self._get_configs(KS_INDEXER_SERVICE_TYPE, cluster=cluster))
|
||||
|
||||
if self.pu.get_catalogserver(cluster):
|
||||
impala = cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_kms(cluster):
|
||||
kms = cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||
kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
if len(self.pu.get_kafka_brokers(cluster)) > 0:
|
||||
kafka = cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
|
||||
kafka.update_config(self._get_configs(KAFKA_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
def configure_instances(self, instances, cluster=None):
|
||||
# instances non-empty
|
||||
cpo.add_provisioning_step(
|
||||
|
@ -399,11 +589,243 @@ class ClouderaUtils(object):
|
|||
'Cloudera Manager': {
|
||||
'Web UI': 'http://%s:7180' % mng.get_ip_or_dns_name(),
|
||||
'Username': 'admin',
|
||||
'Password': db_helper.get_cm_password(cluster)
|
||||
'Password': dh.get_cm_password(cluster)
|
||||
}
|
||||
}
|
||||
return info
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Enable NameNode HA"), param=('cluster', 1))
|
||||
@cloudera_cmd
|
||||
def enable_namenode_ha(self, cluster):
|
||||
standby_nn = self.pu.get_secondarynamenode(cluster)
|
||||
standby_nn_host_name = standby_nn.fqdn()
|
||||
jns = self.pu.get_jns(cluster)
|
||||
jn_list = []
|
||||
for index, jn in enumerate(jns):
|
||||
jn_host_name = jn.fqdn()
|
||||
jn_list.append({'jnHostId': jn_host_name,
|
||||
'jnName': 'JN%i' % index,
|
||||
'jnEditsDir': '/dfs/jn'
|
||||
})
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
nn = hdfs.get_roles_by_type('NAMENODE')[0]
|
||||
|
||||
yield hdfs.enable_nn_ha(active_name=nn.name,
|
||||
standby_host_id=standby_nn_host_name,
|
||||
nameservice=self.NAME_SERVICE, jns=jn_list
|
||||
)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
|
||||
@cloudera_cmd
|
||||
def enable_resourcemanager_ha(self, cluster):
|
||||
new_rm = self.pu.get_stdb_rm(cluster)
|
||||
new_rm_host_name = new_rm.fqdn()
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)
|
||||
|
||||
def _get_configs(self, service, cluster=None, instance=None):
|
||||
# Defined in derived class.
|
||||
return
|
||||
def get_hadoop_dirs(mount_points, suffix):
|
||||
return ','.join([x + suffix for x in mount_points])
|
||||
|
||||
all_confs = {}
|
||||
if cluster:
|
||||
zk_count = self.validator.get_inst_count(cluster,
|
||||
'ZOOKEEPER_SERVER')
|
||||
hbm_count = self.validator.get_inst_count(cluster, 'HBASE_MASTER')
|
||||
snt_count = self.validator.get_inst_count(cluster,
|
||||
'SENTRY_SERVER')
|
||||
ks_count =\
|
||||
self.validator.get_inst_count(cluster,
|
||||
'KEY_VALUE_STORE_INDEXER')
|
||||
kms_count = self.validator.get_inst_count(cluster, 'KMS')
|
||||
imp_count =\
|
||||
self.validator.get_inst_count(cluster,
|
||||
'IMPALA_CATALOGSERVER')
|
||||
hive_count = self.validator.get_inst_count(cluster,
|
||||
'HIVE_METASTORE')
|
||||
slr_count = self.validator.get_inst_count(cluster, 'SOLR_SERVER')
|
||||
sqp_count = self.validator.get_inst_count(cluster, 'SQOOP_SERVER')
|
||||
core_site_safety_valve = ''
|
||||
if self.pu.c_helper.is_swift_enabled(cluster):
|
||||
configs = swift_helper.get_swift_configs()
|
||||
confs = {c['name']: c['value'] for c in configs}
|
||||
core_site_safety_valve = xmlutils.create_elements_xml(confs)
|
||||
all_confs = {
|
||||
'HDFS': {
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
|
||||
'dfs_block_local_path_access_user':
|
||||
'impala' if imp_count else '',
|
||||
'kms_service': self.KMS_SERVICE_NAME if kms_count else '',
|
||||
'core_site_safety_valve': core_site_safety_valve
|
||||
},
|
||||
'HIVE': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'OOZIE': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
|
||||
'hive_service':
|
||||
self.HIVE_SERVICE_NAME if hive_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'YARN': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'HUE': {
|
||||
'hive_service': self.HIVE_SERVICE_NAME,
|
||||
'oozie_service': self.OOZIE_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'solr_service':
|
||||
self.SOLR_SERVICE_NAME if slr_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else '',
|
||||
'impala_service':
|
||||
self.IMPALA_SERVICE_NAME if imp_count else '',
|
||||
'sqoop_service':
|
||||
self.SQOOP_SERVICE_NAME if sqp_count else ''
|
||||
},
|
||||
'SPARK_ON_YARN': {
|
||||
'yarn_service': self.YARN_SERVICE_NAME
|
||||
},
|
||||
'HBASE': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME,
|
||||
'hbase_enable_indexing': 'true' if ks_count else 'false',
|
||||
'hbase_enable_replication':
|
||||
'true' if ks_count else 'false'
|
||||
},
|
||||
'FLUME': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'solr_service':
|
||||
self.SOLR_SERVICE_NAME if slr_count else '',
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else ''
|
||||
},
|
||||
'SENTRY': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'sentry_server_config_safety_valve': (
|
||||
self.c_helper.SENTRY_IMPALA_CLIENT_SAFETY_VALVE
|
||||
if imp_count else '')
|
||||
},
|
||||
'SOLR': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
|
||||
},
|
||||
'SQOOP': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME
|
||||
},
|
||||
'KS_INDEXER': {
|
||||
'hbase_service': self.HBASE_SERVICE_NAME,
|
||||
'solr_service': self.SOLR_SERVICE_NAME
|
||||
},
|
||||
'IMPALA': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else '',
|
||||
'hive_service': self.HIVE_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
}
|
||||
}
|
||||
hive_confs = {
|
||||
'HIVE': {
|
||||
'hive_metastore_database_type': 'postgresql',
|
||||
'hive_metastore_database_host':
|
||||
self.pu.get_manager(cluster).internal_ip,
|
||||
'hive_metastore_database_port': '7432',
|
||||
'hive_metastore_database_password':
|
||||
dh.get_hive_db_password(cluster)
|
||||
}
|
||||
}
|
||||
hue_confs = {
|
||||
'HUE': {
|
||||
'hue_webhdfs': self.pu.get_role_name(
|
||||
self.pu.get_namenode(cluster), 'NAMENODE')
|
||||
}
|
||||
}
|
||||
sentry_confs = {
|
||||
'SENTRY': {
|
||||
'sentry_server_database_type': 'postgresql',
|
||||
'sentry_server_database_host':
|
||||
self.pu.get_manager(cluster).internal_ip,
|
||||
'sentry_server_database_port': '7432',
|
||||
'sentry_server_database_password':
|
||||
dh.get_sentry_db_password(cluster)
|
||||
}
|
||||
}
|
||||
kafka_confs = {
|
||||
'KAFKA': {
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
|
||||
}
|
||||
}
|
||||
all_confs = s_cfg.merge_configs(all_confs, hue_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, hive_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, kafka_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
|
||||
|
||||
if instance:
|
||||
snt_count = self.validator.get_inst_count(instance.cluster,
|
||||
'SENTRY_SERVER')
|
||||
paths = instance.storage_paths()
|
||||
|
||||
instance_default_confs = {
|
||||
'NAMENODE': {
|
||||
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
|
||||
},
|
||||
'SECONDARYNAMENODE': {
|
||||
'fs_checkpoint_dir_list':
|
||||
get_hadoop_dirs(paths, '/fs/snn')
|
||||
},
|
||||
'DATANODE': {
|
||||
'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn'),
|
||||
'dfs_datanode_data_dir_perm': 755,
|
||||
'dfs_datanode_handler_count': 30
|
||||
},
|
||||
'NODEMANAGER': {
|
||||
'yarn_nodemanager_local_dirs':
|
||||
get_hadoop_dirs(paths, '/yarn/local'),
|
||||
'container_executor_allowed_system_users':
|
||||
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
|
||||
"spark,oozie",
|
||||
"container_executor_banned_users": "bin"
|
||||
},
|
||||
'SERVER': {
|
||||
'maxSessionTimeout': 60000
|
||||
},
|
||||
'HIVESERVER2': {
|
||||
'hiveserver2_enable_impersonation':
|
||||
'false' if snt_count else 'true',
|
||||
'hive_hs2_config_safety_valve': (
|
||||
self.c_helper.HIVE_SERVER2_SENTRY_SAFETY_VALVE
|
||||
if snt_count else '')
|
||||
},
|
||||
'HIVEMETASTORE': {
|
||||
'hive_metastore_config_safety_valve': (
|
||||
self.c_helper.HIVE_METASTORE_SENTRY_SAFETY_VALVE
|
||||
if snt_count else '')
|
||||
}
|
||||
}
|
||||
|
||||
ng_user_confs = self.pu.convert_process_configs(
|
||||
instance.node_group.node_configs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
|
||||
|
||||
return all_confs.get(service, {})
|
||||
|
|
|
@ -38,22 +38,18 @@ class ConfigHelper(object):
|
|||
'CM5 repo key URL (for debian-based only)', 'general', 'cluster',
|
||||
priority=1, default_value="")
|
||||
|
||||
ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
|
||||
config_type='bool', priority=1,
|
||||
default_value=True)
|
||||
|
||||
ENABLE_HBASE_COMMON_LIB = p.Config(
|
||||
'Enable HBase Common Lib', 'general', 'cluster', config_type='bool',
|
||||
priority=1, default_value=True)
|
||||
|
||||
ENABLE_SWIFT = p.Config(
|
||||
'Enable Swift', 'general', 'cluster',
|
||||
config_type='bool', priority=1, default_value=True)
|
||||
|
||||
DEFAULT_SWIFT_LIB_URL = (
|
||||
'https://repository.cloudera.com/artifactory/repo/org'
|
||||
'/apache/hadoop/hadoop-openstack/2.3.0-cdh5.0.0'
|
||||
'/hadoop-openstack-2.3.0-cdh5.0.0.jar')
|
||||
|
||||
DEFAULT_EXTJS_LIB_URL = (
|
||||
'http://tarballs.openstack.org/sahara/dist/common-artifacts/'
|
||||
'ext-2.2.zip')
|
||||
'/apache/hadoop/hadoop-openstack/2.6.0-cdh5.5.0'
|
||||
'/hadoop-openstack-2.6.0-cdh5.5.0.jar')
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
|
@ -61,12 +57,40 @@ class ConfigHelper(object):
|
|||
description=("Library that adds Swift support to CDH. The file"
|
||||
" will be downloaded by VMs."))
|
||||
|
||||
DEFAULT_EXTJS_LIB_URL = (
|
||||
'http://tarballs.openstack.org/sahara/dist/common-artifacts/'
|
||||
'ext-2.2.zip')
|
||||
|
||||
EXTJS_LIB_URL = p.Config(
|
||||
"ExtJS library URL", 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_EXTJS_LIB_URL,
|
||||
description=("Ext 2.2 library is required for Oozie Web Console. "
|
||||
"The file will be downloaded by VMs with oozie."))
|
||||
|
||||
_default_executor_classpath = ":".join(
|
||||
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
|
||||
'/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
|
||||
|
||||
EXECUTOR_EXTRA_CLASSPATH = p.Config(
|
||||
'Executor extra classpath', 'Spark', 'cluster', priority=2,
|
||||
default_value=_default_executor_classpath,
|
||||
description='Value for spark.executor.extraClassPath in '
|
||||
'spark-defaults.conf (default: %s)'
|
||||
% _default_executor_classpath)
|
||||
|
||||
KMS_REPO_URL = p.Config(
|
||||
'KMS repo list URL', 'general', 'cluster', priority=1,
|
||||
default_value="")
|
||||
|
||||
KMS_REPO_KEY_URL = p.Config(
|
||||
'KMS repo key URL (for debian-based only)', 'general',
|
||||
'cluster',
|
||||
priority=1, default_value="")
|
||||
|
||||
REQUIRE_ANTI_AFFINITY = p.Config(
|
||||
'Require Anti Affinity', 'general', 'cluster',
|
||||
config_type='bool', priority=2, default_value=True)
|
||||
|
||||
AWAIT_AGENTS_TIMEOUT = p.Config(
|
||||
'Await Cloudera agents timeout', 'general', 'cluster',
|
||||
config_type='int', priority=1, default_value=300, is_optional=True,
|
||||
|
@ -103,7 +127,6 @@ class ConfigHelper(object):
|
|||
return json.loads(data)
|
||||
|
||||
def _init_ng_configs(self, confs, app_target, scope):
|
||||
|
||||
prepare_value = lambda x: x.replace('\n', ' ') if x else ""
|
||||
cfgs = []
|
||||
for cfg in confs:
|
||||
|
@ -115,6 +138,113 @@ class ConfigHelper(object):
|
|||
|
||||
return cfgs
|
||||
|
||||
def _init_all_ng_plugin_configs(self):
|
||||
self.hdfs_confs = self._load_and_init_configs(
|
||||
'hdfs-service.json', 'HDFS', 'cluster')
|
||||
self.namenode_confs = self._load_and_init_configs(
|
||||
'hdfs-namenode.json', 'NAMENODE', 'node')
|
||||
self.datanode_confs = self._load_and_init_configs(
|
||||
'hdfs-datanode.json', 'DATANODE', 'node')
|
||||
self.secnamenode_confs = self._load_and_init_configs(
|
||||
'hdfs-secondarynamenode.json', 'SECONDARYNAMENODE', 'node')
|
||||
self.hdfs_gateway_confs = self._load_and_init_configs(
|
||||
'hdfs-gateway.json', 'HDFS_GATEWAY', 'node')
|
||||
self.journalnode_confs = self._load_and_init_configs(
|
||||
'hdfs-journalnode.json', 'JOURNALNODE', 'node')
|
||||
|
||||
self.yarn_confs = self._load_and_init_configs(
|
||||
'yarn-service.json', 'YARN', 'cluster')
|
||||
self.resourcemanager_confs = self._load_and_init_configs(
|
||||
'yarn-resourcemanager.json', 'RESOURCEMANAGER', 'node')
|
||||
self.nodemanager_confs = self._load_and_init_configs(
|
||||
'yarn-nodemanager.json', 'NODEMANAGER', 'node')
|
||||
self.jobhistory_confs = self._load_and_init_configs(
|
||||
'yarn-jobhistory.json', 'JOBHISTORY', 'node')
|
||||
self.yarn_gateway_conf = self._load_and_init_configs(
|
||||
'yarn-gateway.json', 'YARN_GATEWAY', 'node')
|
||||
|
||||
self.oozie_service_confs = self._load_and_init_configs(
|
||||
'oozie-service.json', 'OOZIE', 'cluster')
|
||||
self.oozie_role_confs = self._load_and_init_configs(
|
||||
'oozie-oozie_server.json', 'OOZIE', 'node')
|
||||
|
||||
self.hive_service_confs = self._load_and_init_configs(
|
||||
'hive-service.json', 'HIVE', 'cluster')
|
||||
self.hive_metastore_confs = self._load_and_init_configs(
|
||||
'hive-hivemetastore.json', 'HIVEMETASTORE', 'node')
|
||||
self.hive_hiveserver_confs = self._load_and_init_configs(
|
||||
'hive-hiveserver2.json', 'HIVESERVER', 'node')
|
||||
self.hive_webhcat_confs = self._load_and_init_configs(
|
||||
'hive-webhcat.json', 'WEBHCAT', 'node')
|
||||
|
||||
self.hue_service_confs = self._load_and_init_configs(
|
||||
'hue-service.json', 'HUE', 'cluster')
|
||||
self.hue_role_confs = self._load_and_init_configs(
|
||||
'hue-hue_server.json', 'HUE', 'node')
|
||||
|
||||
self.spark_service_confs = self._load_and_init_configs(
|
||||
'spark-service.json', 'SPARK_ON_YARN', 'cluster')
|
||||
self.spark_role_confs = self._load_and_init_configs(
|
||||
'spark-spark_yarn_history_server.json', 'SPARK_ON_YARN', 'node')
|
||||
|
||||
self.zookeeper_server_confs = self._load_and_init_configs(
|
||||
'zookeeper-service.json', 'ZOOKEEPER', 'cluster')
|
||||
self.zookeeper_service_confs = self._load_and_init_configs(
|
||||
'zookeeper-server.json', 'ZOOKEEPER', 'node')
|
||||
|
||||
self.hbase_confs = self._load_and_init_configs(
|
||||
'hbase-service.json', 'HBASE', 'cluster')
|
||||
self.master_confs = self._load_and_init_configs(
|
||||
'hbase-master.json', 'MASTER', 'node')
|
||||
self.regionserver_confs = self._load_and_init_configs(
|
||||
'hbase-regionserver.json', 'REGIONSERVER', 'node')
|
||||
|
||||
self.flume_service_confs = self._load_and_init_configs(
|
||||
'flume-service.json', 'FLUME', 'cluster')
|
||||
self.flume_agent_confs = self._load_and_init_configs(
|
||||
'flume-agent.json', 'FLUME', 'node')
|
||||
|
||||
self.sentry_service_confs = self._load_and_init_configs(
|
||||
'sentry-service.json', 'SENTRY', 'cluster')
|
||||
self.sentry_server_confs = self._load_and_init_configs(
|
||||
'sentry-sentry_server.json', 'SENTRY', 'node')
|
||||
|
||||
self.solr_service_confs = self._load_and_init_configs(
|
||||
'solr-service.json', 'SOLR', 'cluster')
|
||||
self.solr_server_confs = self._load_and_init_configs(
|
||||
'solr-solr_server.json', 'SOLR', 'node')
|
||||
|
||||
self.sqoop_service_confs = self._load_and_init_configs(
|
||||
'sqoop-service.json', 'SQOOP', 'cluster')
|
||||
self.sqoop_server_confs = self._load_and_init_configs(
|
||||
'sqoop-sqoop_server.json', 'SQOOP', 'node')
|
||||
|
||||
self.ks_indexer_service_confs = self._load_and_init_configs(
|
||||
'ks_indexer-service.json', 'KS_INDEXER', 'cluster')
|
||||
self.ks_indexer_role_confs = self._load_and_init_configs(
|
||||
'ks_indexer-hbase_indexer.json', 'KS_INDEXER', 'node')
|
||||
|
||||
self.impala_service_confs = self._load_and_init_configs(
|
||||
'impala-service.json', 'IMPALA', 'cluster')
|
||||
self.impala_catalogserver_confs = self._load_and_init_configs(
|
||||
'impala-catalogserver.json', 'CATALOGSERVER', 'node')
|
||||
self.impala_impalad_confs = self._load_and_init_configs(
|
||||
'impala-impalad.json', 'IMPALAD', 'node')
|
||||
self.impala_statestore_confs = self._load_and_init_configs(
|
||||
'impala-statestore.json', 'STATESTORE', 'node')
|
||||
|
||||
self.kms_service_confs = self._load_and_init_configs(
|
||||
'kms-service.json', 'KMS', 'cluster')
|
||||
self.kms_kms_confs = self._load_and_init_configs(
|
||||
'kms-kms.json', 'KMS', 'node')
|
||||
|
||||
self.kafka_service = self._load_and_init_configs(
|
||||
'kafka-service.json', 'KAFKA', 'cluster')
|
||||
self.kafka_kafka_broker = self._load_and_init_configs(
|
||||
'kafka-kafka_broker.json', 'KAFKA', 'node')
|
||||
self.kafka_kafka_mirror_maker = self._load_and_init_configs(
|
||||
'kafka-kafka_mirror_maker.json', 'KAFKA', 'node')
|
||||
|
||||
def _load_and_init_configs(self, filename, app_target, scope):
|
||||
confs = self._load_json(self.path_to_config + filename)
|
||||
cfgs = self._init_ng_configs(confs, app_target, scope)
|
||||
|
@ -127,10 +257,11 @@ class ConfigHelper(object):
|
|||
|
||||
def _get_cluster_plugin_configs(self):
|
||||
return [self.CDH5_REPO_URL, self.CDH5_REPO_KEY_URL, self.CM5_REPO_URL,
|
||||
self.CM5_REPO_KEY_URL, self.ENABLE_SWIFT,
|
||||
self.ENABLE_HBASE_COMMON_LIB, self.SWIFT_LIB_URL,
|
||||
self.EXTJS_LIB_URL, self.AWAIT_MANAGER_STARTING_TIMEOUT,
|
||||
self.AWAIT_AGENTS_TIMEOUT]
|
||||
self.CM5_REPO_KEY_URL, self.ENABLE_SWIFT, self.SWIFT_LIB_URL,
|
||||
self.ENABLE_HBASE_COMMON_LIB, self.EXTJS_LIB_URL,
|
||||
self.AWAIT_MANAGER_STARTING_TIMEOUT, self.AWAIT_AGENTS_TIMEOUT,
|
||||
self.EXECUTOR_EXTRA_CLASSPATH, self.KMS_REPO_URL,
|
||||
self.KMS_REPO_KEY_URL, self.REQUIRE_ANTI_AFFINITY]
|
||||
|
||||
def get_plugin_configs(self):
|
||||
cluster_wide = self._get_cluster_plugin_configs()
|
||||
|
@ -165,3 +296,9 @@ class ConfigHelper(object):
|
|||
|
||||
def get_extjs_lib_url(self, cluster):
|
||||
return self._get_config_value(cluster, self.EXTJS_LIB_URL)
|
||||
|
||||
def get_kms_key_url(self, cluster):
|
||||
return self._get_config_value(cluster, self.KMS_REPO_KEY_URL)
|
||||
|
||||
def get_required_anti_affinity(self, cluster):
|
||||
return self._get_config_value(cluster, self.REQUIRE_ANTI_AFFINITY)
|
||||
|
|
|
@ -14,6 +14,48 @@
|
|||
from sahara.plugins import kerberos
|
||||
|
||||
|
||||
PACKAGES = [
|
||||
'cloudera-manager-agent',
|
||||
'cloudera-manager-daemons',
|
||||
'cloudera-manager-server',
|
||||
'cloudera-manager-server-db-2',
|
||||
'flume-ng',
|
||||
'hadoop-hdfs-datanode',
|
||||
'hadoop-hdfs-namenode',
|
||||
'hadoop-hdfs-secondarynamenode',
|
||||
'hadoop-kms'
|
||||
'hadoop-mapreduce',
|
||||
'hadoop-mapreduce-historyserver',
|
||||
'hadoop-yarn-nodemanager',
|
||||
'hadoop-yarn-resourcemanager',
|
||||
'hbase',
|
||||
'hbase-solr',
|
||||
'hive-hcatalog',
|
||||
'hive-metastore',
|
||||
'hive-server2',
|
||||
'hive-webhcat-server',
|
||||
'hue',
|
||||
'impala',
|
||||
'impala-server',
|
||||
'impala-state-store',
|
||||
'impala-catalog',
|
||||
'impala-shell',
|
||||
'kafka',
|
||||
'kafka-server'
|
||||
'keytrustee-keyprovider',
|
||||
'oozie',
|
||||
'oracle-j2sdk1.7',
|
||||
'sentry',
|
||||
'solr-server',
|
||||
'solr-doc',
|
||||
'search',
|
||||
'spark-history-server',
|
||||
'sqoop2',
|
||||
'unzip',
|
||||
'zookeeper'
|
||||
]
|
||||
|
||||
|
||||
def setup_kerberos_for_cluster(cluster, cloudera_utils):
|
||||
if kerberos.is_kerberos_security_enabled(cluster):
|
||||
manager = cloudera_utils.pu.get_manager(cluster)
|
||||
|
@ -40,3 +82,43 @@ def prepare_scaling_kerberized_cluster(cluster, cloudera_utils, instances):
|
|||
kerberos.create_keytabs_for_map(
|
||||
cluster,
|
||||
{'hdfs': cloudera_utils.pu.get_hdfs_nodes(cluster, instances)})
|
||||
|
||||
|
||||
def get_open_ports(node_group):
|
||||
ports = [9000] # for CM agent
|
||||
|
||||
ports_map = {
|
||||
'CLOUDERA_MANAGER': [7180, 7182, 7183, 7432, 7184, 8084, 8086, 10101,
|
||||
9997, 9996, 8087, 9998, 9999, 8085, 9995, 9994],
|
||||
'HDFS_NAMENODE': [8020, 8022, 50070, 50470],
|
||||
'HDFS_SECONDARYNAMENODE': [50090, 50495],
|
||||
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
|
||||
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
|
||||
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
|
||||
'YARN_NODEMANAGER': [8040, 8041, 8042],
|
||||
'YARN_JOBHISTORY': [10020, 19888],
|
||||
'HIVE_METASTORE': [9083],
|
||||
'HIVE_SERVER2': [10000],
|
||||
'HUE_SERVER': [8888],
|
||||
'OOZIE_SERVER': [11000, 11001],
|
||||
'SPARK_YARN_HISTORY_SERVER': [18088],
|
||||
'ZOOKEEPER_SERVER': [2181, 3181, 4181, 9010],
|
||||
'HBASE_MASTER': [60000],
|
||||
'HBASE_REGIONSERVER': [60020],
|
||||
'FLUME_AGENT': [41414],
|
||||
'SENTRY_SERVER': [8038],
|
||||
'SOLR_SERVER': [8983, 8984],
|
||||
'SQOOP_SERVER': [8005, 12000],
|
||||
'KEY_VALUE_STORE_INDEXER': [],
|
||||
'IMPALA_CATALOGSERVER': [25020, 26000],
|
||||
'IMPALA_STATESTORE': [25010, 24000],
|
||||
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
|
||||
'KMS': [16000, 16001],
|
||||
'JOURNALNODE': [8480, 8481, 8485]
|
||||
}
|
||||
|
||||
for process in node_group.node_processes:
|
||||
if process in ports_map:
|
||||
ports.extend(ports_map[process])
|
||||
|
||||
return ports
|
||||
|
|
|
@ -46,8 +46,11 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
|
|||
return 'http://%s:11000/oozie' % oozie_ip
|
||||
|
||||
def get_name_node_uri(self, cluster):
|
||||
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
|
||||
return 'hdfs://%s:8020' % namenode_ip
|
||||
if len(self.cloudera_utils.pu.get_jns(cluster)) > 0:
|
||||
return 'hdfs://%s' % self.cloudera_utils.NAME_SERVICE
|
||||
else:
|
||||
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
|
||||
return 'hdfs://%s:8020' % namenode_ip
|
||||
|
||||
def get_resource_manager_uri(self, cluster):
|
||||
resourcemanager = self.cloudera_utils.pu.get_resourcemanager(cluster)
|
||||
|
|
|
@ -79,19 +79,26 @@ class CDHPluginAutoConfigsProvider(ru.HadoopAutoConfigsProvider):
|
|||
class AbstractPluginUtils(object):
|
||||
|
||||
def __init__(self):
|
||||
# c_helper and db_helper will be defined in derived classes.
|
||||
# c_helper will be defined in derived classes.
|
||||
self.c_helper = None
|
||||
|
||||
def get_role_name(self, instance, service):
|
||||
# NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}"
|
||||
shortcuts = {
|
||||
'AGENT': 'A',
|
||||
'ALERTPUBLISHER': 'AP',
|
||||
'CATALOGSERVER': 'ICS',
|
||||
'DATANODE': 'DN',
|
||||
'EVENTSERVER': 'ES',
|
||||
'HBASE_INDEXER': 'LHBI',
|
||||
'HIVEMETASTORE': 'HVM',
|
||||
'HIVESERVER2': 'HVS',
|
||||
'HOSTMONITOR': 'HM',
|
||||
'IMPALAD': 'ID',
|
||||
'JOBHISTORY': 'JS',
|
||||
'JOURNALNODE': 'JN',
|
||||
'KAFKA_BROKER': 'KB',
|
||||
'KMS': 'KMS',
|
||||
'MASTER': 'M',
|
||||
'NAMENODE': 'NN',
|
||||
'NODEMANAGER': 'NM',
|
||||
|
@ -99,10 +106,16 @@ class AbstractPluginUtils(object):
|
|||
'REGIONSERVER': 'RS',
|
||||
'RESOURCEMANAGER': 'RM',
|
||||
'SECONDARYNAMENODE': 'SNN',
|
||||
'SENTRY_SERVER': 'SNT',
|
||||
'SERVER': 'S',
|
||||
'SERVICEMONITOR': 'SM',
|
||||
'SOLR_SERVER': 'SLR',
|
||||
'SPARK_YARN_HISTORY_SERVER': 'SHS',
|
||||
'WEBHCAT': 'WHC'
|
||||
'SQOOP_SERVER': 'S2S',
|
||||
'STATESTORE': 'ISS',
|
||||
'WEBHCAT': 'WHC',
|
||||
'HDFS_GATEWAY': 'HG',
|
||||
'YARN_GATEWAY': 'YG'
|
||||
}
|
||||
return '%s_%s' % (shortcuts.get(service, service),
|
||||
instance.hostname().replace('-', '_'))
|
||||
|
@ -155,6 +168,42 @@ class AbstractPluginUtils(object):
|
|||
def get_hbase_master(self, cluster):
|
||||
return u.get_instance(cluster, 'HBASE_MASTER')
|
||||
|
||||
def get_sentry(self, cluster):
|
||||
return u.get_instance(cluster, 'SENTRY_SERVER')
|
||||
|
||||
def get_flumes(self, cluster):
|
||||
return u.get_instances(cluster, 'FLUME_AGENT')
|
||||
|
||||
def get_solrs(self, cluster):
|
||||
return u.get_instances(cluster, 'SOLR_SERVER')
|
||||
|
||||
def get_sqoop(self, cluster):
|
||||
return u.get_instance(cluster, 'SQOOP_SERVER')
|
||||
|
||||
def get_hbase_indexers(self, cluster):
|
||||
return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER')
|
||||
|
||||
def get_catalogserver(self, cluster):
|
||||
return u.get_instance(cluster, 'IMPALA_CATALOGSERVER')
|
||||
|
||||
def get_statestore(self, cluster):
|
||||
return u.get_instance(cluster, 'IMPALA_STATESTORE')
|
||||
|
||||
def get_impalads(self, cluster):
|
||||
return u.get_instances(cluster, 'IMPALAD')
|
||||
|
||||
def get_kms(self, cluster):
|
||||
return u.get_instances(cluster, 'KMS')
|
||||
|
||||
def get_jns(self, cluster):
|
||||
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
|
||||
|
||||
def get_stdb_rm(self, cluster):
|
||||
return u.get_instance(cluster, 'YARN_STANDBYRM')
|
||||
|
||||
def get_kafka_brokers(self, cluster):
|
||||
return u.get_instances(cluster, 'KAFKA_BROKER')
|
||||
|
||||
def convert_process_configs(self, configs):
|
||||
p_dict = {
|
||||
"CLOUDERA": ['MANAGER'],
|
||||
|
@ -173,9 +222,21 @@ class AbstractPluginUtils(object):
|
|||
"ZOOKEEPER": ['SERVER'],
|
||||
"MASTER": ['MASTER'],
|
||||
"REGIONSERVER": ['REGIONSERVER'],
|
||||
'YARN_GATEWAY': ['YARN_GATEWAY'],
|
||||
'HDFS_GATEWAY': ['HDFS_GATEWAY']
|
||||
"FLUME": ['AGENT'],
|
||||
"CATALOGSERVER": ['CATALOGSERVER'],
|
||||
"STATESTORE": ['STATESTORE'],
|
||||
"IMPALAD": ['IMPALAD'],
|
||||
"KS_INDEXER": ['HBASE_INDEXER'],
|
||||
"SENTRY": ['SENTRY_SERVER'],
|
||||
"SOLR": ['SOLR_SERVER'],
|
||||
"SQOOP": ['SQOOP_SERVER'],
|
||||
"KMS": ['KMS'],
|
||||
"YARN_GATEWAY": ['YARN_GATEWAY'],
|
||||
"HDFS_GATEWAY": ['HDFS_GATEWAY'],
|
||||
"JOURNALNODE": ['JOURNALNODE'],
|
||||
"KAFKA": ['KAFKA_BROKER']
|
||||
}
|
||||
|
||||
if isinstance(configs, res.Resource):
|
||||
configs = configs.to_dict()
|
||||
for k in configs.keys():
|
||||
|
@ -275,6 +336,11 @@ class AbstractPluginUtils(object):
|
|||
r.execute_command('sudo curl %s -o %s/hadoop-openstack.jar' % (
|
||||
swift_lib_remote_url, HADOOP_LIB_DIR))
|
||||
|
||||
def configure_sentry(self, cluster):
|
||||
manager = self.get_manager(cluster)
|
||||
with manager.remote() as r:
|
||||
dh.create_sentry_database(cluster, r)
|
||||
|
||||
def put_hive_hdfs_xml(self, cluster):
|
||||
servers = self.get_hive_servers(cluster)
|
||||
with servers[0].remote() as r:
|
||||
|
@ -339,31 +405,38 @@ class AbstractPluginUtils(object):
|
|||
instance=instance.instance_name))
|
||||
cluster = instance.cluster
|
||||
|
||||
cdh5_key = self.c_helper.get_cdh5_key_url(cluster)
|
||||
cm5_key = self.c_helper.get_cm5_key_url(cluster)
|
||||
|
||||
with instance.remote() as r:
|
||||
if cmd.is_ubuntu_os(r):
|
||||
cdh5_key = (cdh5_key or
|
||||
self.c_helper.DEFAULT_CDH5_UBUNTU_REPO_KEY_URL)
|
||||
cm5_key = (cm5_key or
|
||||
self.c_helper.DEFAULT_CM5_UBUNTU_REPO_KEY_URL)
|
||||
cdh5_key = (
|
||||
self.c_helper.get_cdh5_key_url(cluster) or
|
||||
self.c_helper.DEFAULT_CDH5_UBUNTU_REPO_KEY_URL)
|
||||
cm5_key = (
|
||||
self.c_helper.get_cm5_key_url(cluster) or
|
||||
self.c_helper.DEFAULT_CM5_UBUNTU_REPO_KEY_URL)
|
||||
kms_key = (
|
||||
self.c_helper.get_kms_key_url(cluster) or
|
||||
self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL)
|
||||
|
||||
cdh5_repo_content = self.c_helper.CDH5_UBUNTU_REPO
|
||||
cm5_repo_content = self.c_helper.CM5_UBUNTU_REPO
|
||||
kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL
|
||||
|
||||
cmd.write_ubuntu_repository(r, cdh5_repo_content, 'cdh')
|
||||
cmd.add_apt_key(r, cdh5_key)
|
||||
cmd.write_ubuntu_repository(r, cm5_repo_content, 'cm')
|
||||
cmd.add_apt_key(r, cm5_key)
|
||||
cmd.add_ubuntu_repository(r, kms_repo_url, 'kms')
|
||||
cmd.add_apt_key(r, kms_key)
|
||||
cmd.update_repository(r)
|
||||
|
||||
if cmd.is_centos_os(r):
|
||||
cdh5_repo_content = self.c_helper.CDH5_CENTOS_REPO
|
||||
cm5_repo_content = self.c_helper.CM5_CENTOS_REPO
|
||||
kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL
|
||||
|
||||
cmd.write_centos_repository(r, cdh5_repo_content, 'cdh')
|
||||
cmd.write_centos_repository(r, cm5_repo_content, 'cm')
|
||||
cmd.add_centos_repository(r, kms_repo_url, 'kms')
|
||||
cmd.update_repository(r)
|
||||
|
||||
def _get_config_value(self, service, name, configs, cluster=None):
|
||||
|
|
|
@ -13,460 +13,15 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh import cloudera_utils as cu
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh.v5_5_0 import config_helper
|
||||
from sahara.plugins.cdh.v5_5_0 import plugin_utils as pu
|
||||
from sahara.plugins.cdh.v5_5_0 import validation
|
||||
from sahara.swift import swift_helper
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import configs as s_cfg
|
||||
from sahara.utils import xmlutils
|
||||
|
||||
|
||||
HDFS_SERVICE_TYPE = 'HDFS'
|
||||
YARN_SERVICE_TYPE = 'YARN'
|
||||
OOZIE_SERVICE_TYPE = 'OOZIE'
|
||||
HIVE_SERVICE_TYPE = 'HIVE'
|
||||
HUE_SERVICE_TYPE = 'HUE'
|
||||
SPARK_SERVICE_TYPE = 'SPARK_ON_YARN'
|
||||
ZOOKEEPER_SERVICE_TYPE = 'ZOOKEEPER'
|
||||
HBASE_SERVICE_TYPE = 'HBASE'
|
||||
FLUME_SERVICE_TYPE = 'FLUME'
|
||||
SENTRY_SERVICE_TYPE = 'SENTRY'
|
||||
SOLR_SERVICE_TYPE = 'SOLR'
|
||||
SQOOP_SERVICE_TYPE = 'SQOOP'
|
||||
KS_INDEXER_SERVICE_TYPE = 'KS_INDEXER'
|
||||
IMPALA_SERVICE_TYPE = 'IMPALA'
|
||||
KMS_SERVICE_TYPE = 'KMS'
|
||||
KAFKA_SERVICE_TYPE = 'KAFKA'
|
||||
|
||||
c_helper = config_helper.ConfigHelperV550()
|
||||
|
||||
|
||||
class ClouderaUtilsV550(cu.ClouderaUtils):
|
||||
FLUME_SERVICE_NAME = 'flume01'
|
||||
SOLR_SERVICE_NAME = 'solr01'
|
||||
SQOOP_SERVICE_NAME = 'sqoop01'
|
||||
KS_INDEXER_SERVICE_NAME = 'ks_indexer01'
|
||||
IMPALA_SERVICE_NAME = 'impala01'
|
||||
SENTRY_SERVICE_NAME = 'sentry01'
|
||||
KMS_SERVICE_NAME = 'kms01'
|
||||
CM_API_VERSION = 8
|
||||
NAME_SERVICE = 'nameservice01'
|
||||
KAFKA_SERVICE_NAME = 'kafka01'
|
||||
|
||||
def __init__(self):
|
||||
cu.ClouderaUtils.__init__(self)
|
||||
self.pu = pu.PluginUtilsV550()
|
||||
self.validator = validation.ValidatorV550
|
||||
|
||||
def get_service_by_role(self, role, cluster=None, instance=None):
|
||||
cm_cluster = None
|
||||
if cluster:
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
elif instance:
|
||||
cm_cluster = self.get_cloudera_cluster(instance.cluster)
|
||||
else:
|
||||
raise ValueError(_("'cluster' or 'instance' argument missed"))
|
||||
|
||||
if role in ['AGENT']:
|
||||
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
|
||||
elif role in ['SENTRY_SERVER']:
|
||||
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
|
||||
elif role in ['SQOOP_SERVER']:
|
||||
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
|
||||
elif role in ['SOLR_SERVER']:
|
||||
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
|
||||
elif role in ['HBASE_INDEXER']:
|
||||
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
|
||||
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
|
||||
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||
elif role in ['KMS']:
|
||||
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||
elif role in ['JOURNALNODE']:
|
||||
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
elif role in ['YARN_STANDBYRM']:
|
||||
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
elif role in ['KAFKA_BROKER']:
|
||||
return cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
|
||||
else:
|
||||
return super(ClouderaUtilsV550, self).get_service_by_role(
|
||||
role, cluster, instance)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("First run cluster"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def first_run(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yield cm_cluster.first_run()
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Create services"), param=('cluster', 1))
|
||||
def create_services(self, cluster):
|
||||
api = self.get_api_client(cluster)
|
||||
cm_cluster = api.create_cluster(cluster.name,
|
||||
fullVersion=cluster.hadoop_version)
|
||||
|
||||
if len(self.pu.get_zookeepers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.ZOOKEEPER_SERVICE_NAME,
|
||||
ZOOKEEPER_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.YARN_SERVICE_NAME, YARN_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE)
|
||||
if self.pu.get_hive_metastore(cluster):
|
||||
cm_cluster.create_service(self.HIVE_SERVICE_NAME,
|
||||
HIVE_SERVICE_TYPE)
|
||||
if self.pu.get_hue(cluster):
|
||||
cm_cluster.create_service(self.HUE_SERVICE_NAME, HUE_SERVICE_TYPE)
|
||||
if self.pu.get_spark_historyserver(cluster):
|
||||
cm_cluster.create_service(self.SPARK_SERVICE_NAME,
|
||||
SPARK_SERVICE_TYPE)
|
||||
if self.pu.get_hbase_master(cluster):
|
||||
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
|
||||
HBASE_SERVICE_TYPE)
|
||||
if len(self.pu.get_flumes(cluster)) > 0:
|
||||
cm_cluster.create_service(self.FLUME_SERVICE_NAME,
|
||||
FLUME_SERVICE_TYPE)
|
||||
if self.pu.get_sentry(cluster):
|
||||
cm_cluster.create_service(self.SENTRY_SERVICE_NAME,
|
||||
SENTRY_SERVICE_TYPE)
|
||||
if len(self.pu.get_solrs(cluster)) > 0:
|
||||
cm_cluster.create_service(self.SOLR_SERVICE_NAME,
|
||||
SOLR_SERVICE_TYPE)
|
||||
if self.pu.get_sqoop(cluster):
|
||||
cm_cluster.create_service(self.SQOOP_SERVICE_NAME,
|
||||
SQOOP_SERVICE_TYPE)
|
||||
if len(self.pu.get_hbase_indexers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.KS_INDEXER_SERVICE_NAME,
|
||||
KS_INDEXER_SERVICE_TYPE)
|
||||
if self.pu.get_catalogserver(cluster):
|
||||
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
|
||||
IMPALA_SERVICE_TYPE)
|
||||
if self.pu.get_kms(cluster):
|
||||
cm_cluster.create_service(self.KMS_SERVICE_NAME,
|
||||
KMS_SERVICE_TYPE)
|
||||
if len(self.pu.get_kafka_brokers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.KAFKA_SERVICE_NAME,
|
||||
KAFKA_SERVICE_TYPE)
|
||||
|
||||
def await_agents(self, cluster, instances):
|
||||
self._await_agents(cluster, instances, c_helper.AWAIT_AGENTS_TIMEOUT)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Configure services"), param=('cluster', 1))
|
||||
def configure_services(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
|
||||
if len(self.pu.get_zookeepers(cluster)) > 0:
|
||||
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
|
||||
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
yarn.update_config(self._get_configs(YARN_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
oozie = cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
|
||||
oozie.update_config(self._get_configs(OOZIE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hive_metastore(cluster):
|
||||
hive = cm_cluster.get_service(self.HIVE_SERVICE_NAME)
|
||||
hive.update_config(self._get_configs(HIVE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hue(cluster):
|
||||
hue = cm_cluster.get_service(self.HUE_SERVICE_NAME)
|
||||
hue.update_config(self._get_configs(HUE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_spark_historyserver(cluster):
|
||||
spark = cm_cluster.get_service(self.SPARK_SERVICE_NAME)
|
||||
spark.update_config(self._get_configs(SPARK_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hbase_master(cluster):
|
||||
hbase = cm_cluster.get_service(self.HBASE_SERVICE_NAME)
|
||||
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_flumes(cluster)) > 0:
|
||||
flume = cm_cluster.get_service(self.FLUME_SERVICE_NAME)
|
||||
flume.update_config(self._get_configs(FLUME_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_sentry(cluster):
|
||||
sentry = cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
|
||||
sentry.update_config(self._get_configs(SENTRY_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_solrs(cluster)) > 0:
|
||||
solr = cm_cluster.get_service(self.SOLR_SERVICE_NAME)
|
||||
solr.update_config(self._get_configs(SOLR_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_sqoop(cluster):
|
||||
sqoop = cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
|
||||
sqoop.update_config(self._get_configs(SQOOP_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_hbase_indexers(cluster)) > 0:
|
||||
ks_indexer = cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
|
||||
ks_indexer.update_config(
|
||||
self._get_configs(KS_INDEXER_SERVICE_TYPE, cluster=cluster))
|
||||
|
||||
if self.pu.get_catalogserver(cluster):
|
||||
impala = cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_kms(cluster):
|
||||
kms = cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||
kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_kafka_brokers(cluster)) > 0:
|
||||
kafka = cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
|
||||
kafka.update_config(self._get_configs(KAFKA_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
def _get_configs(self, service, cluster=None, instance=None):
|
||||
def get_hadoop_dirs(mount_points, suffix):
|
||||
return ','.join([x + suffix for x in mount_points])
|
||||
|
||||
all_confs = {}
|
||||
if cluster:
|
||||
zk_count = self.validator._get_inst_count(cluster,
|
||||
'ZOOKEEPER_SERVER')
|
||||
hbm_count = self.validator._get_inst_count(cluster, 'HBASE_MASTER')
|
||||
snt_count = self.validator._get_inst_count(cluster,
|
||||
'SENTRY_SERVER')
|
||||
ks_count =\
|
||||
self.validator._get_inst_count(cluster,
|
||||
'KEY_VALUE_STORE_INDEXER')
|
||||
kms_count = self.validator._get_inst_count(cluster, 'KMS')
|
||||
imp_count =\
|
||||
self.validator._get_inst_count(cluster,
|
||||
'IMPALA_CATALOGSERVER')
|
||||
hive_count = self.validator._get_inst_count(cluster,
|
||||
'HIVE_METASTORE')
|
||||
slr_count = self.validator._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
sqp_count = self.validator._get_inst_count(cluster, 'SQOOP_SERVER')
|
||||
core_site_safety_valve = ''
|
||||
if self.pu.c_helper.is_swift_enabled(cluster):
|
||||
configs = swift_helper.get_swift_configs()
|
||||
confs = {c['name']: c['value'] for c in configs}
|
||||
core_site_safety_valve = xmlutils.create_elements_xml(confs)
|
||||
all_confs = {
|
||||
'HDFS': {
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
|
||||
'dfs_block_local_path_access_user':
|
||||
'impala' if imp_count else '',
|
||||
'kms_service': self.KMS_SERVICE_NAME if kms_count else '',
|
||||
'core_site_safety_valve': core_site_safety_valve
|
||||
},
|
||||
'HIVE': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'OOZIE': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
|
||||
'hive_service':
|
||||
self.HIVE_SERVICE_NAME if hive_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'YARN': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'HUE': {
|
||||
'hive_service': self.HIVE_SERVICE_NAME,
|
||||
'oozie_service': self.OOZIE_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'solr_service':
|
||||
self.SOLR_SERVICE_NAME if slr_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else '',
|
||||
'impala_service':
|
||||
self.IMPALA_SERVICE_NAME if imp_count else '',
|
||||
'sqoop_service':
|
||||
self.SQOOP_SERVICE_NAME if sqp_count else ''
|
||||
},
|
||||
'SPARK_ON_YARN': {
|
||||
'yarn_service': self.YARN_SERVICE_NAME
|
||||
},
|
||||
'HBASE': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME,
|
||||
'hbase_enable_indexing': 'true' if ks_count else 'false',
|
||||
'hbase_enable_replication':
|
||||
'true' if ks_count else 'false'
|
||||
},
|
||||
'FLUME': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'solr_service':
|
||||
self.SOLR_SERVICE_NAME if slr_count else '',
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else ''
|
||||
},
|
||||
'SENTRY': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'sentry_server_config_safety_valve': (
|
||||
c_helper.SENTRY_IMPALA_CLIENT_SAFETY_VALVE
|
||||
if imp_count else '')
|
||||
},
|
||||
'SOLR': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
|
||||
},
|
||||
'SQOOP': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME
|
||||
},
|
||||
'KS_INDEXER': {
|
||||
'hbase_service': self.HBASE_SERVICE_NAME,
|
||||
'solr_service': self.SOLR_SERVICE_NAME
|
||||
},
|
||||
'IMPALA': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else '',
|
||||
'hive_service': self.HIVE_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
}
|
||||
}
|
||||
hive_confs = {
|
||||
'HIVE': {
|
||||
'hive_metastore_database_type': 'postgresql',
|
||||
'hive_metastore_database_host':
|
||||
self.pu.get_manager(cluster).internal_ip,
|
||||
'hive_metastore_database_port': '7432',
|
||||
'hive_metastore_database_password':
|
||||
dh.get_hive_db_password(cluster)
|
||||
}
|
||||
}
|
||||
hue_confs = {
|
||||
'HUE': {
|
||||
'hue_webhdfs': self.pu.get_role_name(
|
||||
self.pu.get_namenode(cluster), 'NAMENODE')
|
||||
}
|
||||
}
|
||||
sentry_confs = {
|
||||
'SENTRY': {
|
||||
'sentry_server_database_type': 'postgresql',
|
||||
'sentry_server_database_host':
|
||||
self.pu.get_manager(cluster).internal_ip,
|
||||
'sentry_server_database_port': '7432',
|
||||
'sentry_server_database_password':
|
||||
dh.get_sentry_db_password(cluster)
|
||||
}
|
||||
}
|
||||
kafka_confs = {
|
||||
'KAFKA': {
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
|
||||
}
|
||||
}
|
||||
all_confs = s_cfg.merge_configs(all_confs, hue_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, hive_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, kafka_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
|
||||
|
||||
if instance:
|
||||
snt_count = self.validator._get_inst_count(instance.cluster,
|
||||
'SENTRY_SERVER')
|
||||
paths = instance.storage_paths()
|
||||
|
||||
instance_default_confs = {
|
||||
'NAMENODE': {
|
||||
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
|
||||
},
|
||||
'SECONDARYNAMENODE': {
|
||||
'fs_checkpoint_dir_list':
|
||||
get_hadoop_dirs(paths, '/fs/snn')
|
||||
},
|
||||
'DATANODE': {
|
||||
'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn'),
|
||||
'dfs_datanode_data_dir_perm': 755,
|
||||
'dfs_datanode_handler_count': 30
|
||||
},
|
||||
'NODEMANAGER': {
|
||||
'yarn_nodemanager_local_dirs':
|
||||
get_hadoop_dirs(paths, '/yarn/local'),
|
||||
'container_executor_allowed_system_users':
|
||||
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
|
||||
"spark,oozie",
|
||||
"container_executor_banned_users": "bin"
|
||||
},
|
||||
'SERVER': {
|
||||
'maxSessionTimeout': 60000
|
||||
},
|
||||
'HIVESERVER2': {
|
||||
'hiveserver2_enable_impersonation':
|
||||
'false' if snt_count else 'true',
|
||||
'hive_hs2_config_safety_valve': (
|
||||
c_helper.HIVE_SERVER2_SENTRY_SAFETY_VALVE
|
||||
if snt_count else '')
|
||||
},
|
||||
'HIVEMETASTORE': {
|
||||
'hive_metastore_config_safety_valve': (
|
||||
c_helper.HIVE_METASTORE_SENTRY_SAFETY_VALVE
|
||||
if snt_count else '')
|
||||
}
|
||||
}
|
||||
|
||||
ng_user_confs = self.pu.convert_process_configs(
|
||||
instance.node_group.node_configs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
|
||||
|
||||
return all_confs.get(service, {})
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Enable NameNode HA"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def enable_namenode_ha(self, cluster):
|
||||
standby_nn = self.pu.get_secondarynamenode(cluster)
|
||||
standby_nn_host_name = standby_nn.fqdn()
|
||||
jns = self.pu.get_jns(cluster)
|
||||
jn_list = []
|
||||
for index, jn in enumerate(jns):
|
||||
jn_host_name = jn.fqdn()
|
||||
jn_list.append({'jnHostId': jn_host_name,
|
||||
'jnName': 'JN%i' % index,
|
||||
'jnEditsDir': '/dfs/jn'
|
||||
})
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
nn = hdfs.get_roles_by_type('NAMENODE')[0]
|
||||
|
||||
yield hdfs.enable_nn_ha(active_name=nn.name,
|
||||
standby_host_id=standby_nn_host_name,
|
||||
nameservice=self.NAME_SERVICE, jns=jn_list
|
||||
)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def enable_resourcemanager_ha(self, cluster):
|
||||
new_rm = self.pu.get_stdb_rm(cluster)
|
||||
new_rm_host_name = new_rm.fqdn()
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)
|
||||
self.c_helper = config_helper.ConfigHelperV550()
|
||||
|
|
|
@ -65,10 +65,9 @@ class ConfigHelperV550(c_h.ConfigHelper):
|
|||
'keytrustee/cloudera.list')
|
||||
|
||||
DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL = (
|
||||
'http://archive.cloudera.com/'
|
||||
'navigator-keytrustee5/ubuntu/'
|
||||
'trusty/amd64/navigator-keytrustee'
|
||||
'/archive.key')
|
||||
'http://archive.cloudera.com/navigator-'
|
||||
'keytrustee5/ubuntu/trusty/amd64/navigator-'
|
||||
'keytrustee/archive.key')
|
||||
|
||||
KEY_TRUSTEE_CENTOS_REPO_URL = (
|
||||
'http://archive.cloudera.com/navigator-'
|
||||
|
@ -80,6 +79,12 @@ class ConfigHelperV550(c_h.ConfigHelper):
|
|||
'/apache/hadoop/hadoop-openstack/2.6.0-cdh5.5.0'
|
||||
'/hadoop-openstack-2.6.0-cdh5.5.0.jar')
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_SWIFT_LIB_URL,
|
||||
description=("Library that adds Swift support to CDH. The file"
|
||||
" will be downloaded by VMs."))
|
||||
|
||||
HIVE_SERVER2_SENTRY_SAFETY_VALVE = f.get_file_text(
|
||||
path_to_config + 'hive-server2-sentry-safety.xml')
|
||||
|
||||
|
@ -89,162 +94,8 @@ class ConfigHelperV550(c_h.ConfigHelper):
|
|||
SENTRY_IMPALA_CLIENT_SAFETY_VALVE = f.get_file_text(
|
||||
path_to_config + 'sentry-impala-client-safety.xml')
|
||||
|
||||
_default_executor_classpath = ":".join(
|
||||
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
|
||||
'/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
|
||||
|
||||
EXECUTOR_EXTRA_CLASSPATH = p.Config(
|
||||
'Executor extra classpath', 'Spark', 'cluster', priority=2,
|
||||
default_value=_default_executor_classpath,
|
||||
description='Value for spark.executor.extraClassPath in '
|
||||
'spark-defaults.conf (default: %s)'
|
||||
% _default_executor_classpath)
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_SWIFT_LIB_URL,
|
||||
description=("Library that adds Swift support to CDH. The file"
|
||||
" will be downloaded by VMs."))
|
||||
|
||||
KMS_REPO_URL = p.Config(
|
||||
'KMS repo list URL', 'general', 'cluster', priority=1,
|
||||
default_value="")
|
||||
|
||||
KMS_REPO_KEY_URL = p.Config(
|
||||
'KMS repo key URL (for debian-based only)', 'general',
|
||||
'cluster',
|
||||
priority=1, default_value="")
|
||||
|
||||
REQUIRE_ANTI_AFFINITY = p.Config('Require Anti Affinity',
|
||||
'general', 'cluster',
|
||||
config_type='bool',
|
||||
priority=2,
|
||||
default_value=True)
|
||||
|
||||
def __init__(self):
|
||||
super(ConfigHelperV550, self).__init__()
|
||||
self.priority_one_confs = self._load_json(
|
||||
self.path_to_config + 'priority-one-confs.json')
|
||||
self._init_all_ng_plugin_configs()
|
||||
|
||||
def _get_cluster_plugin_configs(self):
|
||||
confs = super(ConfigHelperV550, self)._get_ng_plugin_configs()
|
||||
confs += [self.EXECUTOR_EXTRA_CLASSPATH,
|
||||
self.KMS_REPO_URL,
|
||||
self.KMS_REPO_KEY_URL,
|
||||
self.REQUIRE_ANTI_AFFINITY]
|
||||
|
||||
return confs
|
||||
|
||||
def _init_all_ng_plugin_configs(self):
|
||||
self.hdfs_confs = self._load_and_init_configs(
|
||||
'hdfs-service.json', 'HDFS', 'cluster')
|
||||
self.namenode_confs = self._load_and_init_configs(
|
||||
'hdfs-namenode.json', 'NAMENODE', 'node')
|
||||
self.datanode_confs = self._load_and_init_configs(
|
||||
'hdfs-datanode.json', 'DATANODE', 'node')
|
||||
self.secnamenode_confs = self._load_and_init_configs(
|
||||
'hdfs-secondarynamenode.json', 'SECONDARYNAMENODE', 'node')
|
||||
self.hdfs_gateway_confs = self._load_and_init_configs(
|
||||
'hdfs-gateway.json', 'HDFS_GATEWAY', 'node')
|
||||
self.journalnode_confs = self._load_and_init_configs(
|
||||
'hdfs-journalnode.json', 'JOURNALNODE', 'node')
|
||||
|
||||
self.yarn_confs = self._load_and_init_configs(
|
||||
'yarn-service.json', 'YARN', 'cluster')
|
||||
self.resourcemanager_confs = self._load_and_init_configs(
|
||||
'yarn-resourcemanager.json', 'RESOURCEMANAGER', 'node')
|
||||
self.nodemanager_confs = self._load_and_init_configs(
|
||||
'yarn-nodemanager.json', 'NODEMANAGER', 'node')
|
||||
self.jobhistory_confs = self._load_and_init_configs(
|
||||
'yarn-jobhistory.json', 'JOBHISTORY', 'node')
|
||||
self.yarn_gateway_conf = self._load_and_init_configs(
|
||||
'yarn-gateway.json', 'YARN_GATEWAY', 'node')
|
||||
|
||||
self.oozie_service_confs = self._load_and_init_configs(
|
||||
'oozie-service.json', 'OOZIE', 'cluster')
|
||||
self.oozie_role_confs = self._load_and_init_configs(
|
||||
'oozie-oozie_server.json', 'OOZIE', 'node')
|
||||
|
||||
self.hive_service_confs = self._load_and_init_configs(
|
||||
'hive-service.json', 'HIVE', 'cluster')
|
||||
self.hive_metastore_confs = self._load_and_init_configs(
|
||||
'hive-hivemetastore.json', 'HIVEMETASTORE', 'node')
|
||||
self.hive_hiveserver_confs = self._load_and_init_configs(
|
||||
'hive-hiveserver2.json', 'HIVESERVER', 'node')
|
||||
self.hive_webhcat_confs = self._load_and_init_configs(
|
||||
'hive-webhcat.json', 'WEBHCAT', 'node')
|
||||
|
||||
self.hue_service_confs = self._load_and_init_configs(
|
||||
'hue-service.json', 'HUE', 'cluster')
|
||||
self.hue_role_confs = self._load_and_init_configs(
|
||||
'hue-hue_server.json', 'HUE', 'node')
|
||||
|
||||
self.spark_service_confs = self._load_and_init_configs(
|
||||
'spark-service.json', 'SPARK_ON_YARN', 'cluster')
|
||||
self.spark_role_confs = self._load_and_init_configs(
|
||||
'spark-spark_yarn_history_server.json', 'SPARK_ON_YARN', 'node')
|
||||
|
||||
self.zookeeper_server_confs = self._load_and_init_configs(
|
||||
'zookeeper-service.json', 'ZOOKEEPER', 'cluster')
|
||||
self.zookeeper_service_confs = self._load_and_init_configs(
|
||||
'zookeeper-server.json', 'ZOOKEEPER', 'node')
|
||||
|
||||
self.hbase_confs = self._load_and_init_configs(
|
||||
'hbase-service.json', 'HBASE', 'cluster')
|
||||
self.master_confs = self._load_and_init_configs(
|
||||
'hbase-master.json', 'MASTER', 'node')
|
||||
self.regionserver_confs = self._load_and_init_configs(
|
||||
'hbase-regionserver.json', 'REGIONSERVER', 'node')
|
||||
|
||||
self.flume_service_confs = self._load_and_init_configs(
|
||||
'flume-service.json', 'FLUME', 'cluster')
|
||||
self.flume_agent_confs = self._load_and_init_configs(
|
||||
'flume-agent.json', 'FLUME', 'node')
|
||||
|
||||
self.sentry_service_confs = self._load_and_init_configs(
|
||||
'sentry-service.json', 'SENTRY', 'cluster')
|
||||
self.sentry_server_confs = self._load_and_init_configs(
|
||||
'sentry-sentry_server.json', 'SENTRY', 'node')
|
||||
|
||||
self.solr_service_confs = self._load_and_init_configs(
|
||||
'solr-service.json', 'SOLR', 'cluster')
|
||||
self.solr_server_confs = self._load_and_init_configs(
|
||||
'solr-solr_server.json', 'SOLR', 'node')
|
||||
|
||||
self.sqoop_service_confs = self._load_and_init_configs(
|
||||
'sqoop-service.json', 'SQOOP', 'cluster')
|
||||
self.sqoop_server_confs = self._load_and_init_configs(
|
||||
'sqoop-sqoop_server.json', 'SQOOP', 'node')
|
||||
|
||||
self.ks_indexer_service_confs = self._load_and_init_configs(
|
||||
'ks_indexer-service.json', 'KS_INDEXER', 'cluster')
|
||||
self.ks_indexer_role_confs = self._load_and_init_configs(
|
||||
'ks_indexer-hbase_indexer.json', 'KS_INDEXER', 'node')
|
||||
|
||||
self.impala_service_confs = self._load_and_init_configs(
|
||||
'impala-service.json', 'IMPALA', 'cluster')
|
||||
self.impala_catalogserver_confs = self._load_and_init_configs(
|
||||
'impala-catalogserver.json', 'CATALOGSERVER', 'node')
|
||||
self.impala_impalad_confs = self._load_and_init_configs(
|
||||
'impala-impalad.json', 'IMPALAD', 'node')
|
||||
self.impala_statestore_confs = self._load_and_init_configs(
|
||||
'impala-statestore.json', 'STATESTORE', 'node')
|
||||
|
||||
self.kms_service_confs = self._load_and_init_configs(
|
||||
'kms-service.json', 'KMS', 'cluster')
|
||||
self.kms_kms_confs = self._load_and_init_configs(
|
||||
'kms-kms.json', 'KMS', 'node')
|
||||
|
||||
self.kafka_service = self._load_and_init_configs(
|
||||
'kafka-service.json', 'KAFKA', 'cluster')
|
||||
self.kafka_kafka_broker = self._load_and_init_configs(
|
||||
'kafka-kafka_broker.json', 'KAFKA', 'node')
|
||||
self.kafka_kafka_mirror_maker = self._load_and_init_configs(
|
||||
'kafka-kafka_mirror_maker.json', 'KAFKA', 'node')
|
||||
|
||||
def get_required_anti_affinity(self, cluster):
|
||||
return self._get_config_value(cluster, self.REQUIRE_ANTI_AFFINITY)
|
||||
|
||||
def get_kms_key_url(self, cluster):
|
||||
return self._get_config_value(cluster, self.KMS_REPO_KEY_URL)
|
||||
|
|
|
@ -21,49 +21,10 @@ from sahara.plugins import utils as gu
|
|||
from sahara.service.edp import hdfs_helper as h
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
|
||||
PACKAGES = [
|
||||
'cloudera-manager-agent',
|
||||
'cloudera-manager-daemons',
|
||||
'cloudera-manager-server',
|
||||
'cloudera-manager-server-db-2',
|
||||
'flume-ng',
|
||||
'hadoop-hdfs-datanode',
|
||||
'hadoop-hdfs-namenode',
|
||||
'hadoop-hdfs-secondarynamenode',
|
||||
'hadoop-kms'
|
||||
'hadoop-mapreduce',
|
||||
'hadoop-mapreduce-historyserver',
|
||||
'hadoop-yarn-nodemanager',
|
||||
'hadoop-yarn-resourcemanager',
|
||||
'hbase',
|
||||
'hbase-solr',
|
||||
'hive-hcatalog',
|
||||
'hive-metastore',
|
||||
'hive-server2',
|
||||
'hive-webhcat-server',
|
||||
'hue',
|
||||
'impala',
|
||||
'impala-server',
|
||||
'impala-state-store',
|
||||
'impala-catalog',
|
||||
'impala-shell',
|
||||
'kafka',
|
||||
'kafka-server'
|
||||
'keytrustee-keyprovider',
|
||||
'oozie',
|
||||
'oracle-j2sdk1.7',
|
||||
'sentry',
|
||||
'solr-server',
|
||||
'solr-doc',
|
||||
'search',
|
||||
'spark-history-server',
|
||||
'sqoop2',
|
||||
'unzip',
|
||||
'zookeeper'
|
||||
]
|
||||
|
||||
CU = cu.ClouderaUtilsV550()
|
||||
|
||||
PACKAGES = common_deploy.PACKAGES
|
||||
|
||||
|
||||
def configure_cluster(cluster):
|
||||
instances = gu.get_instances(cluster)
|
||||
|
@ -115,7 +76,6 @@ def scale_cluster(cluster, instances):
|
|||
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
_start_roles(cluster, instances)
|
||||
|
||||
CU.refresh_datanodes(cluster)
|
||||
CU.refresh_yarn_nodes(cluster)
|
||||
CU.restart_stale_services(cluster)
|
||||
|
@ -204,40 +164,5 @@ def start_cluster(cluster):
|
|||
|
||||
|
||||
def get_open_ports(node_group):
|
||||
ports = [9000] # for CM agent
|
||||
|
||||
ports_map = {
|
||||
'CLOUDERA_MANAGER': [7180, 7182, 7183, 7432, 7184, 8084, 8086, 10101,
|
||||
9997, 9996, 8087, 9998, 9999, 8085, 9995, 9994],
|
||||
'HDFS_NAMENODE': [8020, 8022, 50070, 50470],
|
||||
'HDFS_SECONDARYNAMENODE': [50090, 50495],
|
||||
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
|
||||
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
|
||||
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
|
||||
'YARN_NODEMANAGER': [8040, 8041, 8042],
|
||||
'YARN_JOBHISTORY': [10020, 19888],
|
||||
'HIVE_METASTORE': [9083],
|
||||
'HIVE_SERVER2': [10000],
|
||||
'HUE_SERVER': [8888],
|
||||
'OOZIE_SERVER': [11000, 11001],
|
||||
'SPARK_YARN_HISTORY_SERVER': [18088],
|
||||
'ZOOKEEPER_SERVER': [2181, 3181, 4181, 9010],
|
||||
'HBASE_MASTER': [60000],
|
||||
'HBASE_REGIONSERVER': [60020],
|
||||
'FLUME_AGENT': [41414],
|
||||
'SENTRY_SERVER': [8038],
|
||||
'SOLR_SERVER': [8983, 8984],
|
||||
'SQOOP_SERVER': [8005, 12000],
|
||||
'KEY_VALUE_STORE_INDEXER': [],
|
||||
'IMPALA_CATALOGSERVER': [25020, 26000],
|
||||
'IMPALA_STATESTORE': [25010, 24000],
|
||||
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
|
||||
'KMS': [16000, 16001],
|
||||
'JOURNALNODE': [8480, 8481, 8485]
|
||||
}
|
||||
|
||||
for process in node_group.node_processes:
|
||||
if process in ports_map:
|
||||
ports.extend(ports_map[process])
|
||||
|
||||
ports = common_deploy.get_open_ports(node_group)
|
||||
return ports
|
||||
|
|
|
@ -26,13 +26,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
|
|||
super(EdpOozieEngine, self).__init__(cluster)
|
||||
self.cloudera_utils = cu.ClouderaUtilsV550()
|
||||
|
||||
def get_name_node_uri(self, cluster):
|
||||
if len(self.cloudera_utils.pu.get_jns(cluster)) > 0:
|
||||
return 'hdfs://%s' % self.cloudera_utils.NAME_SERVICE
|
||||
else:
|
||||
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
|
||||
return 'hdfs://%s:8020' % namenode_ip
|
||||
|
||||
@staticmethod
|
||||
def get_possible_job_config(job_type):
|
||||
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):
|
||||
|
|
|
@ -13,154 +13,11 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.conductor import resource as res
|
||||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh import plugin_utils as pu
|
||||
from sahara.plugins.cdh.v5_5_0 import config_helper
|
||||
from sahara.plugins import utils as u
|
||||
|
||||
|
||||
class PluginUtilsV550(pu.AbstractPluginUtils):
|
||||
|
||||
def __init__(self):
|
||||
self.c_helper = config_helper.ConfigHelperV550()
|
||||
|
||||
def get_role_name(self, instance, service):
|
||||
# NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}"
|
||||
shortcuts = {
|
||||
'AGENT': 'A',
|
||||
'ALERTPUBLISHER': 'AP',
|
||||
'CATALOGSERVER': 'ICS',
|
||||
'DATANODE': 'DN',
|
||||
'EVENTSERVER': 'ES',
|
||||
'HBASE_INDEXER': 'LHBI',
|
||||
'HIVEMETASTORE': 'HVM',
|
||||
'HIVESERVER2': 'HVS',
|
||||
'HOSTMONITOR': 'HM',
|
||||
'IMPALAD': 'ID',
|
||||
'JOBHISTORY': 'JS',
|
||||
'JOURNALNODE': 'JN',
|
||||
'KAFKA_BROKER': 'KB',
|
||||
'KMS': 'KMS',
|
||||
'MASTER': 'M',
|
||||
'NAMENODE': 'NN',
|
||||
'NODEMANAGER': 'NM',
|
||||
'OOZIE_SERVER': 'OS',
|
||||
'REGIONSERVER': 'RS',
|
||||
'RESOURCEMANAGER': 'RM',
|
||||
'SECONDARYNAMENODE': 'SNN',
|
||||
'SENTRY_SERVER': 'SNT',
|
||||
'SERVER': 'S',
|
||||
'SERVICEMONITOR': 'SM',
|
||||
'SOLR_SERVER': 'SLR',
|
||||
'SPARK_YARN_HISTORY_SERVER': 'SHS',
|
||||
'SQOOP_SERVER': 'S2S',
|
||||
'STATESTORE': 'ISS',
|
||||
'WEBHCAT': 'WHC',
|
||||
'HDFS_GATEWAY': 'HG',
|
||||
'YARN_GATEWAY': 'YG'
|
||||
}
|
||||
return '%s_%s' % (shortcuts.get(service, service),
|
||||
instance.hostname().replace('-', '_'))
|
||||
|
||||
def get_sentry(self, cluster):
|
||||
return u.get_instance(cluster, 'SENTRY_SERVER')
|
||||
|
||||
def get_flumes(self, cluster):
|
||||
return u.get_instances(cluster, 'FLUME_AGENT')
|
||||
|
||||
def get_solrs(self, cluster):
|
||||
return u.get_instances(cluster, 'SOLR_SERVER')
|
||||
|
||||
def get_sqoop(self, cluster):
|
||||
return u.get_instance(cluster, 'SQOOP_SERVER')
|
||||
|
||||
def get_hbase_indexers(self, cluster):
|
||||
return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER')
|
||||
|
||||
def get_catalogserver(self, cluster):
|
||||
return u.get_instance(cluster, 'IMPALA_CATALOGSERVER')
|
||||
|
||||
def get_statestore(self, cluster):
|
||||
return u.get_instance(cluster, 'IMPALA_STATESTORE')
|
||||
|
||||
def get_impalads(self, cluster):
|
||||
return u.get_instances(cluster, 'IMPALAD')
|
||||
|
||||
def get_kms(self, cluster):
|
||||
return u.get_instances(cluster, 'KMS')
|
||||
|
||||
def get_jns(self, cluster):
|
||||
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
|
||||
|
||||
def get_stdb_rm(self, cluster):
|
||||
return u.get_instance(cluster, 'YARN_STANDBYRM')
|
||||
|
||||
def get_kafka_brokers(self, cluster):
|
||||
return u.get_instances(cluster, 'KAFKA_BROKER')
|
||||
|
||||
def convert_process_configs(self, configs):
|
||||
p_dict = {
|
||||
"CLOUDERA": ['MANAGER'],
|
||||
"NAMENODE": ['NAMENODE'],
|
||||
"DATANODE": ['DATANODE'],
|
||||
"SECONDARYNAMENODE": ['SECONDARYNAMENODE'],
|
||||
"RESOURCEMANAGER": ['RESOURCEMANAGER'],
|
||||
"NODEMANAGER": ['NODEMANAGER'],
|
||||
"JOBHISTORY": ['JOBHISTORY'],
|
||||
"OOZIE": ['OOZIE_SERVER'],
|
||||
"HIVESERVER": ['HIVESERVER2'],
|
||||
"HIVEMETASTORE": ['HIVEMETASTORE'],
|
||||
"WEBHCAT": ['WEBHCAT'],
|
||||
"HUE": ['HUE_SERVER'],
|
||||
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
|
||||
"ZOOKEEPER": ['SERVER'],
|
||||
"MASTER": ['MASTER'],
|
||||
"REGIONSERVER": ['REGIONSERVER'],
|
||||
"FLUME": ['AGENT'],
|
||||
"CATALOGSERVER": ['CATALOGSERVER'],
|
||||
"STATESTORE": ['STATESTORE'],
|
||||
"IMPALAD": ['IMPALAD'],
|
||||
"KS_INDEXER": ['HBASE_INDEXER'],
|
||||
"SENTRY": ['SENTRY_SERVER'],
|
||||
"SOLR": ['SOLR_SERVER'],
|
||||
"SQOOP": ['SQOOP_SERVER'],
|
||||
"KMS": ['KMS'],
|
||||
"YARN_GATEWAY": ['YARN_GATEWAY'],
|
||||
"HDFS_GATEWAY": ['HDFS_GATEWAY'],
|
||||
"JOURNALNODE": ['JOURNALNODE'],
|
||||
"KAFKA": ['KAFKA_BROKER']
|
||||
}
|
||||
if isinstance(configs, res.Resource):
|
||||
configs = configs.to_dict()
|
||||
for k in configs.keys():
|
||||
if k in p_dict.keys():
|
||||
item = configs[k]
|
||||
del configs[k]
|
||||
newkey = p_dict[k][0]
|
||||
configs[newkey] = item
|
||||
return res.Resource(configs)
|
||||
|
||||
def configure_sentry(self, cluster):
|
||||
manager = self.get_manager(cluster)
|
||||
with manager.remote() as r:
|
||||
dh.create_sentry_database(cluster, r)
|
||||
|
||||
def _configure_repo_from_inst(self, instance):
|
||||
super(PluginUtilsV550, self)._configure_repo_from_inst(instance)
|
||||
|
||||
cluster = instance.cluster
|
||||
with instance.remote() as r:
|
||||
if cmd.is_ubuntu_os(r):
|
||||
kms_key = (
|
||||
self.c_helper.get_kms_key_url(cluster) or
|
||||
self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL)
|
||||
kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL
|
||||
cmd.add_ubuntu_repository(r, kms_repo_url, 'kms')
|
||||
cmd.add_apt_key(r, kms_key)
|
||||
cmd.update_repository(r)
|
||||
if cmd.is_centos_os(r):
|
||||
kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL
|
||||
cmd.add_centos_repository(r, kms_repo_url, 'kms')
|
||||
cmd.update_repository(r)
|
||||
|
|
|
@ -13,220 +13,9 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh.v5_5_0 import plugin_utils as pu
|
||||
from sahara.plugins.cdh import validation
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins import utils as u
|
||||
|
||||
|
||||
class ValidatorV550(validation.Validator):
|
||||
PU = pu.PluginUtilsV550()
|
||||
|
||||
@classmethod
|
||||
def validate_cluster_creating(cls, cluster):
|
||||
super(ValidatorV550, cls).validate_cluster_creating(cluster)
|
||||
cls._hdfs_ha_validation(cluster)
|
||||
cls._yarn_ha_validation(cluster)
|
||||
cls._flume_validation(cluster)
|
||||
cls._sentry_validation(cluster)
|
||||
cls._solr_validation(cluster)
|
||||
cls._sqoop_validation(cluster)
|
||||
cls._hbase_indexer_validation(cluster)
|
||||
cls._impala_validation(cluster)
|
||||
cls._kms_validation(cluster)
|
||||
|
||||
@classmethod
|
||||
def _hdfs_ha_validation(cls, cluster):
|
||||
jn_count = cls._get_inst_count(cluster, 'HDFS_JOURNALNODE')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
|
||||
cluster)
|
||||
|
||||
if jn_count > 0:
|
||||
if jn_count < 3:
|
||||
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||
_('not less than 3'),
|
||||
jn_count)
|
||||
if not jn_count % 2:
|
||||
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||
_('be odd'), jn_count)
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException('ZOOKEEPER',
|
||||
required_by='HDFS HA')
|
||||
if require_anti_affinity:
|
||||
if 'HDFS_SECONDARYNAMENODE' not in\
|
||||
cls._get_anti_affinity(cluster):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
_('HDFS_SECONDARYNAMENODE should be enabled '
|
||||
'in anti_affinity.'))
|
||||
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
|
||||
|
||||
@classmethod
|
||||
def _yarn_ha_validation(cls, cluster):
|
||||
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
stdb_rm_count = cls._get_inst_count(cluster, 'YARN_STANDBYRM')
|
||||
|
||||
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
|
||||
cluster)
|
||||
|
||||
if stdb_rm_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
|
||||
if stdb_rm_count > 0:
|
||||
if rm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_RESOURCEMANAGER', required_by='RM HA')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='RM HA')
|
||||
if require_anti_affinity:
|
||||
if 'YARN_RESOURCEMANAGER' not in\
|
||||
cls._get_anti_affinity(cluster):
|
||||
raise ex.ResourceManagerHAConfigurationError(
|
||||
_('YARN_RESOURCEMANAGER should be enabled in '
|
||||
'anti_affinity.'))
|
||||
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
|
||||
raise ex.ResourceManagerHAConfigurationError(
|
||||
_('YARN_STANDBYRM should be'
|
||||
' enabled in anti_affinity.'))
|
||||
|
||||
@classmethod
|
||||
def _flume_validation(cls, cluster):
|
||||
a_count = cls._get_inst_count(cluster, 'FLUME_AGENT')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
|
||||
if a_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='FLUME_AGENT')
|
||||
|
||||
@classmethod
|
||||
def _sentry_validation(cls, cluster):
|
||||
|
||||
snt_count = cls._get_inst_count(cluster, 'SENTRY_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
if snt_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'SENTRY_SERVER', _('0 or 1'), snt_count)
|
||||
if snt_count == 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SENTRY_SERVER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='SENTRY_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _solr_validation(cls, cluster):
|
||||
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
|
||||
if slr_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SOLR_SERVER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='SOLR_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _sqoop_validation(cls, cluster):
|
||||
|
||||
s2s_count = cls._get_inst_count(cluster, 'SQOOP_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
|
||||
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
|
||||
|
||||
if s2s_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'SQOOP_SERVER', _('0 or 1'), s2s_count)
|
||||
if s2s_count == 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SQOOP_SERVER')
|
||||
if nm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
|
||||
if hs_count != 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _hbase_indexer_validation(cls, cluster):
|
||||
|
||||
lhbi_count = cls._get_inst_count(cluster, 'HBASE_INDEXER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
|
||||
|
||||
if lhbi_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='HBASE_INDEXER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='HBASE_INDEXER')
|
||||
if slr_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'SOLR_SERVER', required_by='HBASE_INDEXER')
|
||||
if hbm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HBASE_MASTER', required_by='HBASE_INDEXER')
|
||||
|
||||
@classmethod
|
||||
def _impala_validation(cls, cluster):
|
||||
ics_count = cls._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
|
||||
iss_count = cls._get_inst_count(cluster, 'IMPALA_STATESTORE')
|
||||
id_count = cls._get_inst_count(cluster, 'IMPALAD')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
|
||||
|
||||
if ics_count > 1:
|
||||
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
|
||||
_('0 or 1'), ics_count)
|
||||
if iss_count > 1:
|
||||
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
|
||||
_('0 or 1'), iss_count)
|
||||
if ics_count == 1:
|
||||
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
|
||||
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
|
||||
datanodes = set(ng.id for ng in datanode_ng)
|
||||
impalads = set(ng.id for ng in impalad_ng)
|
||||
|
||||
if datanodes != impalads:
|
||||
raise ex.InvalidClusterTopology(
|
||||
_("IMPALAD must be installed on every HDFS_DATANODE"))
|
||||
|
||||
if iss_count != 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'IMPALA_STATESTORE', required_by='IMPALA')
|
||||
if id_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'IMPALAD', required_by='IMPALA')
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='IMPALA')
|
||||
if hms_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HIVE_METASTORE', required_by='IMPALA')
|
||||
|
||||
@classmethod
|
||||
def _kms_validation(cls, cluster):
|
||||
|
||||
kms_count = cls._get_inst_count(cluster, 'KMS')
|
||||
if kms_count > 1:
|
||||
raise ex.InvalidComponentCountException('KMS',
|
||||
_('0 or 1'), kms_count)
|
||||
|
||||
@classmethod
|
||||
def _get_anti_affinity(cls, cluster):
|
||||
return cluster.anti_affinity
|
||||
|
|
|
@ -21,7 +21,6 @@ from sahara.plugins.cdh.v5_5_0 import deploy
|
|||
from sahara.plugins.cdh.v5_5_0 import edp_engine
|
||||
from sahara.plugins.cdh.v5_5_0 import plugin_utils
|
||||
from sahara.plugins.cdh.v5_5_0 import validation
|
||||
from sahara.plugins import kerberos
|
||||
|
||||
|
||||
class VersionHandler(avm.BaseVersionHandler):
|
||||
|
@ -34,67 +33,3 @@ class VersionHandler(avm.BaseVersionHandler):
|
|||
self.deploy = deploy
|
||||
self.edp_engine = edp_engine
|
||||
self.validation = validation.ValidatorV550()
|
||||
|
||||
def get_plugin_configs(self):
|
||||
result = super(VersionHandler, self).get_plugin_configs()
|
||||
result.extend(kerberos.get_config_list())
|
||||
return result
|
||||
|
||||
def get_node_processes(self):
|
||||
return {
|
||||
"CLOUDERA": ['CLOUDERA_MANAGER'],
|
||||
"HDFS": ['HDFS_NAMENODE', 'HDFS_DATANODE',
|
||||
'HDFS_SECONDARYNAMENODE', 'HDFS_JOURNALNODE'],
|
||||
"YARN": ['YARN_RESOURCEMANAGER', 'YARN_NODEMANAGER',
|
||||
'YARN_JOBHISTORY', 'YARN_STANDBYRM'],
|
||||
"OOZIE": ['OOZIE_SERVER'],
|
||||
"HIVE": ['HIVE_SERVER2', 'HIVE_METASTORE', 'HIVE_WEBHCAT'],
|
||||
"HUE": ['HUE_SERVER'],
|
||||
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
|
||||
"ZOOKEEPER": ['ZOOKEEPER_SERVER'],
|
||||
"HBASE": ['HBASE_MASTER', 'HBASE_REGIONSERVER'],
|
||||
"FLUME": ['FLUME_AGENT'],
|
||||
"IMPALA": ['IMPALA_CATALOGSERVER', 'IMPALA_STATESTORE', 'IMPALAD'],
|
||||
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
|
||||
"SOLR": ['SOLR_SERVER'],
|
||||
"SQOOP": ['SQOOP_SERVER'],
|
||||
"SENTRY": ['SENTRY_SERVER'],
|
||||
"KMS": ['KMS'],
|
||||
"KAFKA": ['KAFKA_BROKER'],
|
||||
|
||||
"YARN_GATEWAY": [],
|
||||
"RESOURCEMANAGER": [],
|
||||
"NODEMANAGER": [],
|
||||
"JOBHISTORY": [],
|
||||
|
||||
"HDFS_GATEWAY": [],
|
||||
'DATANODE': [],
|
||||
'NAMENODE': [],
|
||||
'SECONDARYNAMENODE': [],
|
||||
'JOURNALNODE': [],
|
||||
|
||||
'REGIONSERVER': [],
|
||||
'MASTER': [],
|
||||
|
||||
'HIVEMETASTORE': [],
|
||||
'HIVESERVER': [],
|
||||
'WEBCAT': [],
|
||||
|
||||
'CATALOGSERVER': [],
|
||||
'STATESTORE': [],
|
||||
'IMPALAD': [],
|
||||
'Kerberos': [],
|
||||
}
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
oozie_type = self.edp_engine.EdpOozieEngine.get_supported_job_types()
|
||||
spark_type = self.edp_engine.EdpSparkEngine.get_supported_job_types()
|
||||
if job_type in oozie_type:
|
||||
return self.edp_engine.EdpOozieEngine(cluster)
|
||||
if job_type in spark_type:
|
||||
return self.edp_engine.EdpSparkEngine(cluster)
|
||||
return None
|
||||
|
||||
def get_edp_job_types(self):
|
||||
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
|
||||
edp_engine.EdpSparkEngine.get_supported_job_types())
|
||||
|
|
|
@ -13,459 +13,16 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh import cloudera_utils as cu
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh.v5_7_0 import config_helper
|
||||
from sahara.plugins.cdh.v5_7_0 import plugin_utils as pu
|
||||
from sahara.plugins.cdh.v5_7_0 import validation
|
||||
from sahara.swift import swift_helper
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import configs as s_cfg
|
||||
from sahara.utils import xmlutils
|
||||
|
||||
|
||||
HDFS_SERVICE_TYPE = 'HDFS'
|
||||
YARN_SERVICE_TYPE = 'YARN'
|
||||
OOZIE_SERVICE_TYPE = 'OOZIE'
|
||||
HIVE_SERVICE_TYPE = 'HIVE'
|
||||
HUE_SERVICE_TYPE = 'HUE'
|
||||
SPARK_SERVICE_TYPE = 'SPARK_ON_YARN'
|
||||
ZOOKEEPER_SERVICE_TYPE = 'ZOOKEEPER'
|
||||
HBASE_SERVICE_TYPE = 'HBASE'
|
||||
FLUME_SERVICE_TYPE = 'FLUME'
|
||||
SENTRY_SERVICE_TYPE = 'SENTRY'
|
||||
SOLR_SERVICE_TYPE = 'SOLR'
|
||||
SQOOP_SERVICE_TYPE = 'SQOOP'
|
||||
KS_INDEXER_SERVICE_TYPE = 'KS_INDEXER'
|
||||
IMPALA_SERVICE_TYPE = 'IMPALA'
|
||||
KMS_SERVICE_TYPE = 'KMS'
|
||||
KAFKA_SERVICE_TYPE = 'KAFKA'
|
||||
|
||||
c_helper = config_helper.ConfigHelperV570()
|
||||
|
||||
|
||||
class ClouderaUtilsV570(cu.ClouderaUtils):
|
||||
FLUME_SERVICE_NAME = 'flume01'
|
||||
SOLR_SERVICE_NAME = 'solr01'
|
||||
SQOOP_SERVICE_NAME = 'sqoop01'
|
||||
KS_INDEXER_SERVICE_NAME = 'ks_indexer01'
|
||||
IMPALA_SERVICE_NAME = 'impala01'
|
||||
SENTRY_SERVICE_NAME = 'sentry01'
|
||||
KMS_SERVICE_NAME = 'kms01'
|
||||
KAFKA_SERVICE_NAME = 'kafka01'
|
||||
CM_API_VERSION = 8
|
||||
NAME_SERVICE = 'nameservice01'
|
||||
|
||||
def __init__(self):
|
||||
cu.ClouderaUtils.__init__(self)
|
||||
self.pu = pu.PluginUtilsV570()
|
||||
self.validator = validation.ValidatorV570
|
||||
|
||||
def get_service_by_role(self, role, cluster=None, instance=None):
|
||||
cm_cluster = None
|
||||
if cluster:
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
elif instance:
|
||||
cm_cluster = self.get_cloudera_cluster(instance.cluster)
|
||||
else:
|
||||
raise ValueError(_("'cluster' or 'instance' argument missed"))
|
||||
|
||||
if role in ['AGENT']:
|
||||
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
|
||||
elif role in ['SENTRY_SERVER']:
|
||||
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
|
||||
elif role in ['SQOOP_SERVER']:
|
||||
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
|
||||
elif role in ['SOLR_SERVER']:
|
||||
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
|
||||
elif role in ['HBASE_INDEXER']:
|
||||
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
|
||||
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
|
||||
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||
elif role in ['KMS']:
|
||||
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||
elif role in ['JOURNALNODE']:
|
||||
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
elif role in ['YARN_STANDBYRM']:
|
||||
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
elif role in ['KAFKA_BROKER']:
|
||||
return cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
|
||||
else:
|
||||
return super(ClouderaUtilsV570, self).get_service_by_role(
|
||||
role, cluster, instance)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("First run cluster"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def first_run(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yield cm_cluster.first_run()
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Create services"), param=('cluster', 1))
|
||||
def create_services(self, cluster):
|
||||
api = self.get_api_client(cluster)
|
||||
cm_cluster = api.create_cluster(cluster.name,
|
||||
fullVersion=cluster.hadoop_version)
|
||||
|
||||
if len(self.pu.get_zookeepers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.ZOOKEEPER_SERVICE_NAME,
|
||||
ZOOKEEPER_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.YARN_SERVICE_NAME, YARN_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE)
|
||||
if self.pu.get_hive_metastore(cluster):
|
||||
cm_cluster.create_service(self.HIVE_SERVICE_NAME,
|
||||
HIVE_SERVICE_TYPE)
|
||||
if self.pu.get_hue(cluster):
|
||||
cm_cluster.create_service(self.HUE_SERVICE_NAME, HUE_SERVICE_TYPE)
|
||||
if self.pu.get_spark_historyserver(cluster):
|
||||
cm_cluster.create_service(self.SPARK_SERVICE_NAME,
|
||||
SPARK_SERVICE_TYPE)
|
||||
if self.pu.get_hbase_master(cluster):
|
||||
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
|
||||
HBASE_SERVICE_TYPE)
|
||||
if len(self.pu.get_flumes(cluster)) > 0:
|
||||
cm_cluster.create_service(self.FLUME_SERVICE_NAME,
|
||||
FLUME_SERVICE_TYPE)
|
||||
if self.pu.get_sentry(cluster):
|
||||
cm_cluster.create_service(self.SENTRY_SERVICE_NAME,
|
||||
SENTRY_SERVICE_TYPE)
|
||||
if len(self.pu.get_solrs(cluster)) > 0:
|
||||
cm_cluster.create_service(self.SOLR_SERVICE_NAME,
|
||||
SOLR_SERVICE_TYPE)
|
||||
if self.pu.get_sqoop(cluster):
|
||||
cm_cluster.create_service(self.SQOOP_SERVICE_NAME,
|
||||
SQOOP_SERVICE_TYPE)
|
||||
if len(self.pu.get_hbase_indexers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.KS_INDEXER_SERVICE_NAME,
|
||||
KS_INDEXER_SERVICE_TYPE)
|
||||
if self.pu.get_catalogserver(cluster):
|
||||
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
|
||||
IMPALA_SERVICE_TYPE)
|
||||
if self.pu.get_kms(cluster):
|
||||
cm_cluster.create_service(self.KMS_SERVICE_NAME,
|
||||
KMS_SERVICE_TYPE)
|
||||
if len(self.pu.get_kafka_brokers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.KAFKA_SERVICE_NAME,
|
||||
KAFKA_SERVICE_TYPE)
|
||||
|
||||
def await_agents(self, cluster, instances):
|
||||
self._await_agents(cluster, instances, c_helper.AWAIT_AGENTS_TIMEOUT)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Configure services"), param=('cluster', 1))
|
||||
def configure_services(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
|
||||
if len(self.pu.get_zookeepers(cluster)) > 0:
|
||||
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
|
||||
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
yarn.update_config(self._get_configs(YARN_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
oozie = cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
|
||||
oozie.update_config(self._get_configs(OOZIE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hive_metastore(cluster):
|
||||
hive = cm_cluster.get_service(self.HIVE_SERVICE_NAME)
|
||||
hive.update_config(self._get_configs(HIVE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hue(cluster):
|
||||
hue = cm_cluster.get_service(self.HUE_SERVICE_NAME)
|
||||
hue.update_config(self._get_configs(HUE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_spark_historyserver(cluster):
|
||||
spark = cm_cluster.get_service(self.SPARK_SERVICE_NAME)
|
||||
spark.update_config(self._get_configs(SPARK_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hbase_master(cluster):
|
||||
hbase = cm_cluster.get_service(self.HBASE_SERVICE_NAME)
|
||||
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_flumes(cluster)) > 0:
|
||||
flume = cm_cluster.get_service(self.FLUME_SERVICE_NAME)
|
||||
flume.update_config(self._get_configs(FLUME_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_sentry(cluster):
|
||||
sentry = cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
|
||||
sentry.update_config(self._get_configs(SENTRY_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_solrs(cluster)) > 0:
|
||||
solr = cm_cluster.get_service(self.SOLR_SERVICE_NAME)
|
||||
solr.update_config(self._get_configs(SOLR_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_sqoop(cluster):
|
||||
sqoop = cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
|
||||
sqoop.update_config(self._get_configs(SQOOP_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_hbase_indexers(cluster)) > 0:
|
||||
ks_indexer = cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
|
||||
ks_indexer.update_config(
|
||||
self._get_configs(KS_INDEXER_SERVICE_TYPE, cluster=cluster))
|
||||
|
||||
if self.pu.get_catalogserver(cluster):
|
||||
impala = cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_kms(cluster):
|
||||
kms = cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||
kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
if len(self.pu.get_kafka_brokers(cluster)) > 0:
|
||||
kafka = cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
|
||||
kafka.update_config(self._get_configs(KAFKA_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
def _get_configs(self, service, cluster=None, instance=None):
|
||||
def get_hadoop_dirs(mount_points, suffix):
|
||||
return ','.join([x + suffix for x in mount_points])
|
||||
|
||||
all_confs = {}
|
||||
if cluster:
|
||||
zk_count = self.validator._get_inst_count(cluster,
|
||||
'ZOOKEEPER_SERVER')
|
||||
hbm_count = self.validator._get_inst_count(cluster, 'HBASE_MASTER')
|
||||
snt_count = self.validator._get_inst_count(cluster,
|
||||
'SENTRY_SERVER')
|
||||
ks_count =\
|
||||
self.validator._get_inst_count(cluster,
|
||||
'KEY_VALUE_STORE_INDEXER')
|
||||
kms_count = self.validator._get_inst_count(cluster, 'KMS')
|
||||
imp_count =\
|
||||
self.validator._get_inst_count(cluster,
|
||||
'IMPALA_CATALOGSERVER')
|
||||
hive_count = self.validator._get_inst_count(cluster,
|
||||
'HIVE_METASTORE')
|
||||
slr_count = self.validator._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
sqp_count = self.validator._get_inst_count(cluster, 'SQOOP_SERVER')
|
||||
core_site_safety_valve = ''
|
||||
if self.pu.c_helper.is_swift_enabled(cluster):
|
||||
configs = swift_helper.get_swift_configs()
|
||||
confs = {c['name']: c['value'] for c in configs}
|
||||
core_site_safety_valve = xmlutils.create_elements_xml(confs)
|
||||
all_confs = {
|
||||
'HDFS': {
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
|
||||
'dfs_block_local_path_access_user':
|
||||
'impala' if imp_count else '',
|
||||
'kms_service': self.KMS_SERVICE_NAME if kms_count else '',
|
||||
'core_site_safety_valve': core_site_safety_valve
|
||||
},
|
||||
'HIVE': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'OOZIE': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
|
||||
'hive_service':
|
||||
self.HIVE_SERVICE_NAME if hive_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'YARN': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'HUE': {
|
||||
'hive_service': self.HIVE_SERVICE_NAME,
|
||||
'oozie_service': self.OOZIE_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'solr_service':
|
||||
self.SOLR_SERVICE_NAME if slr_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else '',
|
||||
'impala_service':
|
||||
self.IMPALA_SERVICE_NAME if imp_count else '',
|
||||
'sqoop_service':
|
||||
self.SQOOP_SERVICE_NAME if sqp_count else ''
|
||||
},
|
||||
'SPARK_ON_YARN': {
|
||||
'yarn_service': self.YARN_SERVICE_NAME
|
||||
},
|
||||
'HBASE': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME,
|
||||
'hbase_enable_indexing': 'true' if ks_count else 'false',
|
||||
'hbase_enable_replication':
|
||||
'true' if ks_count else 'false'
|
||||
},
|
||||
'FLUME': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'solr_service':
|
||||
self.SOLR_SERVICE_NAME if slr_count else '',
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else ''
|
||||
},
|
||||
'SENTRY': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'sentry_server_config_safety_valve': (
|
||||
c_helper.SENTRY_IMPALA_CLIENT_SAFETY_VALVE
|
||||
if imp_count else '')
|
||||
},
|
||||
'SOLR': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
|
||||
},
|
||||
'SQOOP': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME
|
||||
},
|
||||
'KS_INDEXER': {
|
||||
'hbase_service': self.HBASE_SERVICE_NAME,
|
||||
'solr_service': self.SOLR_SERVICE_NAME
|
||||
},
|
||||
'IMPALA': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else '',
|
||||
'hive_service': self.HIVE_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
}
|
||||
}
|
||||
hive_confs = {
|
||||
'HIVE': {
|
||||
'hive_metastore_database_type': 'postgresql',
|
||||
'hive_metastore_database_host':
|
||||
self.pu.get_manager(cluster).internal_ip,
|
||||
'hive_metastore_database_port': '7432',
|
||||
'hive_metastore_database_password':
|
||||
dh.get_hive_db_password(cluster)
|
||||
}
|
||||
}
|
||||
hue_confs = {
|
||||
'HUE': {
|
||||
'hue_webhdfs': self.pu.get_role_name(
|
||||
self.pu.get_namenode(cluster), 'NAMENODE')
|
||||
}
|
||||
}
|
||||
sentry_confs = {
|
||||
'SENTRY': {
|
||||
'sentry_server_database_type': 'postgresql',
|
||||
'sentry_server_database_host':
|
||||
self.pu.get_manager(cluster).internal_ip,
|
||||
'sentry_server_database_port': '7432',
|
||||
'sentry_server_database_password':
|
||||
dh.get_sentry_db_password(cluster)
|
||||
}
|
||||
}
|
||||
kafka_confs = {
|
||||
'KAFKA': {
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
|
||||
}
|
||||
}
|
||||
all_confs = s_cfg.merge_configs(all_confs, hue_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, hive_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, kafka_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
|
||||
|
||||
if instance:
|
||||
snt_count = self.validator._get_inst_count(instance.cluster,
|
||||
'SENTRY_SERVER')
|
||||
paths = instance.storage_paths()
|
||||
|
||||
instance_default_confs = {
|
||||
'NAMENODE': {
|
||||
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
|
||||
},
|
||||
'SECONDARYNAMENODE': {
|
||||
'fs_checkpoint_dir_list':
|
||||
get_hadoop_dirs(paths, '/fs/snn')
|
||||
},
|
||||
'DATANODE': {
|
||||
'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn'),
|
||||
'dfs_datanode_data_dir_perm': 755,
|
||||
'dfs_datanode_handler_count': 30
|
||||
},
|
||||
'NODEMANAGER': {
|
||||
'yarn_nodemanager_local_dirs':
|
||||
get_hadoop_dirs(paths, '/yarn/local'),
|
||||
'container_executor_allowed_system_users':
|
||||
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
|
||||
"spark,oozie",
|
||||
"container_executor_banned_users": "bin"
|
||||
},
|
||||
'SERVER': {
|
||||
'maxSessionTimeout': 60000
|
||||
},
|
||||
'HIVESERVER2': {
|
||||
'hiveserver2_enable_impersonation':
|
||||
'false' if snt_count else 'true',
|
||||
'hive_hs2_config_safety_valve': (
|
||||
c_helper.HIVE_SERVER2_SENTRY_SAFETY_VALVE
|
||||
if snt_count else '')
|
||||
},
|
||||
'HIVEMETASTORE': {
|
||||
'hive_metastore_config_safety_valve': (
|
||||
c_helper.HIVE_METASTORE_SENTRY_SAFETY_VALVE
|
||||
if snt_count else '')
|
||||
}
|
||||
}
|
||||
|
||||
ng_user_confs = self.pu.convert_process_configs(
|
||||
instance.node_group.node_configs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
|
||||
|
||||
return all_confs.get(service, {})
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Enable NameNode HA"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def enable_namenode_ha(self, cluster):
|
||||
standby_nn = self.pu.get_secondarynamenode(cluster)
|
||||
standby_nn_host_name = standby_nn.fqdn()
|
||||
jns = self.pu.get_jns(cluster)
|
||||
jn_list = []
|
||||
for index, jn in enumerate(jns):
|
||||
jn_host_name = jn.fqdn()
|
||||
jn_list.append({'jnHostId': jn_host_name,
|
||||
'jnName': 'JN%i' % index,
|
||||
'jnEditsDir': '/dfs/jn'
|
||||
})
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
nn = hdfs.get_roles_by_type('NAMENODE')[0]
|
||||
|
||||
yield hdfs.enable_nn_ha(active_name=nn.name,
|
||||
standby_host_id=standby_nn_host_name,
|
||||
nameservice=self.NAME_SERVICE, jns=jn_list
|
||||
)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def enable_resourcemanager_ha(self, cluster):
|
||||
new_rm = self.pu.get_stdb_rm(cluster)
|
||||
new_rm_host_name = new_rm.fqdn()
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)
|
||||
self.c_helper = config_helper.ConfigHelperV570()
|
||||
|
|
|
@ -65,10 +65,9 @@ class ConfigHelperV570(c_h.ConfigHelper):
|
|||
'keytrustee/cloudera.list')
|
||||
|
||||
DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL = (
|
||||
'http://archive.cloudera.com/'
|
||||
'navigator-keytrustee5/ubuntu/'
|
||||
'trusty/amd64/navigator-keytrustee'
|
||||
'/archive.key')
|
||||
'http://archive.cloudera.com/navigator-'
|
||||
'keytrustee5/ubuntu/trusty/amd64/navigator-'
|
||||
'keytrustee/archive.key')
|
||||
|
||||
KEY_TRUSTEE_CENTOS_REPO_URL = (
|
||||
'http://archive.cloudera.com/navigator-'
|
||||
|
@ -80,6 +79,12 @@ class ConfigHelperV570(c_h.ConfigHelper):
|
|||
'/apache/hadoop/hadoop-openstack/2.6.0-cdh5.7.0'
|
||||
'/hadoop-openstack-2.6.0-cdh5.7.0.jar')
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_SWIFT_LIB_URL,
|
||||
description=("Library that adds Swift support to CDH. The file"
|
||||
" will be downloaded by VMs."))
|
||||
|
||||
HIVE_SERVER2_SENTRY_SAFETY_VALVE = f.get_file_text(
|
||||
path_to_config + 'hive-server2-sentry-safety.xml')
|
||||
|
||||
|
@ -89,162 +94,8 @@ class ConfigHelperV570(c_h.ConfigHelper):
|
|||
SENTRY_IMPALA_CLIENT_SAFETY_VALVE = f.get_file_text(
|
||||
path_to_config + 'sentry-impala-client-safety.xml')
|
||||
|
||||
_default_executor_classpath = ":".join(
|
||||
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
|
||||
'/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
|
||||
|
||||
EXECUTOR_EXTRA_CLASSPATH = p.Config(
|
||||
'Executor extra classpath', 'Spark', 'cluster', priority=2,
|
||||
default_value=_default_executor_classpath,
|
||||
description='Value for spark.executor.extraClassPath in '
|
||||
'spark-defaults.conf (default: %s)'
|
||||
% _default_executor_classpath)
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_SWIFT_LIB_URL,
|
||||
description=("Library that adds Swift support to CDH. The file"
|
||||
" will be downloaded by VMs."))
|
||||
|
||||
KMS_REPO_URL = p.Config(
|
||||
'KMS repo list URL', 'general', 'cluster', priority=1,
|
||||
default_value="")
|
||||
|
||||
KMS_REPO_KEY_URL = p.Config(
|
||||
'KMS repo key URL (for debian-based only)', 'general',
|
||||
'cluster',
|
||||
priority=1, default_value="")
|
||||
|
||||
REQUIRE_ANTI_AFFINITY = p.Config('Require Anti Affinity',
|
||||
'general', 'cluster',
|
||||
config_type='bool',
|
||||
priority=2,
|
||||
default_value=True)
|
||||
|
||||
def __init__(self):
|
||||
super(ConfigHelperV570, self).__init__()
|
||||
self.priority_one_confs = self._load_json(
|
||||
self.path_to_config + 'priority-one-confs.json')
|
||||
self._init_all_ng_plugin_configs()
|
||||
|
||||
def _get_cluster_plugin_configs(self):
|
||||
confs = super(ConfigHelperV570, self)._get_ng_plugin_configs()
|
||||
confs += [self.EXECUTOR_EXTRA_CLASSPATH,
|
||||
self.KMS_REPO_URL,
|
||||
self.KMS_REPO_KEY_URL,
|
||||
self.REQUIRE_ANTI_AFFINITY]
|
||||
|
||||
return confs
|
||||
|
||||
def _init_all_ng_plugin_configs(self):
|
||||
self.hdfs_confs = self._load_and_init_configs(
|
||||
'hdfs-service.json', 'HDFS', 'cluster')
|
||||
self.namenode_confs = self._load_and_init_configs(
|
||||
'hdfs-namenode.json', 'NAMENODE', 'node')
|
||||
self.datanode_confs = self._load_and_init_configs(
|
||||
'hdfs-datanode.json', 'DATANODE', 'node')
|
||||
self.secnamenode_confs = self._load_and_init_configs(
|
||||
'hdfs-secondarynamenode.json', 'SECONDARYNAMENODE', 'node')
|
||||
self.hdfs_gateway_confs = self._load_and_init_configs(
|
||||
'hdfs-gateway.json', 'HDFS_GATEWAY', 'node')
|
||||
self.journalnode_confs = self._load_and_init_configs(
|
||||
'hdfs-journalnode.json', 'JOURNALNODE', 'node')
|
||||
|
||||
self.yarn_confs = self._load_and_init_configs(
|
||||
'yarn-service.json', 'YARN', 'cluster')
|
||||
self.resourcemanager_confs = self._load_and_init_configs(
|
||||
'yarn-resourcemanager.json', 'RESOURCEMANAGER', 'node')
|
||||
self.nodemanager_confs = self._load_and_init_configs(
|
||||
'yarn-nodemanager.json', 'NODEMANAGER', 'node')
|
||||
self.jobhistory_confs = self._load_and_init_configs(
|
||||
'yarn-jobhistory.json', 'JOBHISTORY', 'node')
|
||||
self.yarn_gateway_conf = self._load_and_init_configs(
|
||||
'yarn-gateway.json', 'YARN_GATEWAY', 'node')
|
||||
|
||||
self.oozie_service_confs = self._load_and_init_configs(
|
||||
'oozie-service.json', 'OOZIE', 'cluster')
|
||||
self.oozie_role_confs = self._load_and_init_configs(
|
||||
'oozie-oozie_server.json', 'OOZIE', 'node')
|
||||
|
||||
self.hive_service_confs = self._load_and_init_configs(
|
||||
'hive-service.json', 'HIVE', 'cluster')
|
||||
self.hive_metastore_confs = self._load_and_init_configs(
|
||||
'hive-hivemetastore.json', 'HIVEMETASTORE', 'node')
|
||||
self.hive_hiveserver_confs = self._load_and_init_configs(
|
||||
'hive-hiveserver2.json', 'HIVESERVER', 'node')
|
||||
self.hive_webhcat_confs = self._load_and_init_configs(
|
||||
'hive-webhcat.json', 'WEBHCAT', 'node')
|
||||
|
||||
self.hue_service_confs = self._load_and_init_configs(
|
||||
'hue-service.json', 'HUE', 'cluster')
|
||||
self.hue_role_confs = self._load_and_init_configs(
|
||||
'hue-hue_server.json', 'HUE', 'node')
|
||||
|
||||
self.spark_service_confs = self._load_and_init_configs(
|
||||
'spark-service.json', 'SPARK_ON_YARN', 'cluster')
|
||||
self.spark_role_confs = self._load_and_init_configs(
|
||||
'spark-spark_yarn_history_server.json', 'SPARK_ON_YARN', 'node')
|
||||
|
||||
self.zookeeper_server_confs = self._load_and_init_configs(
|
||||
'zookeeper-service.json', 'ZOOKEEPER', 'cluster')
|
||||
self.zookeeper_service_confs = self._load_and_init_configs(
|
||||
'zookeeper-server.json', 'ZOOKEEPER', 'node')
|
||||
|
||||
self.hbase_confs = self._load_and_init_configs(
|
||||
'hbase-service.json', 'HBASE', 'cluster')
|
||||
self.master_confs = self._load_and_init_configs(
|
||||
'hbase-master.json', 'MASTER', 'node')
|
||||
self.regionserver_confs = self._load_and_init_configs(
|
||||
'hbase-regionserver.json', 'REGIONSERVER', 'node')
|
||||
|
||||
self.flume_service_confs = self._load_and_init_configs(
|
||||
'flume-service.json', 'FLUME', 'cluster')
|
||||
self.flume_agent_confs = self._load_and_init_configs(
|
||||
'flume-agent.json', 'FLUME', 'node')
|
||||
|
||||
self.sentry_service_confs = self._load_and_init_configs(
|
||||
'sentry-service.json', 'SENTRY', 'cluster')
|
||||
self.sentry_server_confs = self._load_and_init_configs(
|
||||
'sentry-sentry_server.json', 'SENTRY', 'node')
|
||||
|
||||
self.solr_service_confs = self._load_and_init_configs(
|
||||
'solr-service.json', 'SOLR', 'cluster')
|
||||
self.solr_server_confs = self._load_and_init_configs(
|
||||
'solr-solr_server.json', 'SOLR', 'node')
|
||||
|
||||
self.sqoop_service_confs = self._load_and_init_configs(
|
||||
'sqoop-service.json', 'SQOOP', 'cluster')
|
||||
self.sqoop_server_confs = self._load_and_init_configs(
|
||||
'sqoop-sqoop_server.json', 'SQOOP', 'node')
|
||||
|
||||
self.ks_indexer_service_confs = self._load_and_init_configs(
|
||||
'ks_indexer-service.json', 'KS_INDEXER', 'cluster')
|
||||
self.ks_indexer_role_confs = self._load_and_init_configs(
|
||||
'ks_indexer-hbase_indexer.json', 'KS_INDEXER', 'node')
|
||||
|
||||
self.impala_service_confs = self._load_and_init_configs(
|
||||
'impala-service.json', 'IMPALA', 'cluster')
|
||||
self.impala_catalogserver_confs = self._load_and_init_configs(
|
||||
'impala-catalogserver.json', 'CATALOGSERVER', 'node')
|
||||
self.impala_impalad_confs = self._load_and_init_configs(
|
||||
'impala-impalad.json', 'IMPALAD', 'node')
|
||||
self.impala_statestore_confs = self._load_and_init_configs(
|
||||
'impala-statestore.json', 'STATESTORE', 'node')
|
||||
|
||||
self.kms_service_confs = self._load_and_init_configs(
|
||||
'kms-service.json', 'KMS', 'cluster')
|
||||
self.kms_kms_confs = self._load_and_init_configs(
|
||||
'kms-kms.json', 'KMS', 'node')
|
||||
|
||||
self.kafka_service = self._load_and_init_configs(
|
||||
'kafka-service.json', 'KAFKA', 'cluster')
|
||||
self.kafka_kafka_broker = self._load_and_init_configs(
|
||||
'kafka-kafka_broker.json', 'KAFKA', 'node')
|
||||
self.kafka_kafka_mirror_maker = self._load_and_init_configs(
|
||||
'kafka-kafka_mirror_maker.json', 'KAFKA', 'node')
|
||||
|
||||
def get_required_anti_affinity(self, cluster):
|
||||
return self._get_config_value(cluster, self.REQUIRE_ANTI_AFFINITY)
|
||||
|
||||
def get_kms_key_url(self, cluster):
|
||||
return self._get_config_value(cluster, self.KMS_REPO_KEY_URL)
|
||||
|
|
|
@ -21,49 +21,10 @@ from sahara.plugins import utils as gu
|
|||
from sahara.service.edp import hdfs_helper as h
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
|
||||
PACKAGES = [
|
||||
'cloudera-manager-agent',
|
||||
'cloudera-manager-daemons',
|
||||
'cloudera-manager-server',
|
||||
'cloudera-manager-server-db-2',
|
||||
'flume-ng',
|
||||
'hadoop-hdfs-datanode',
|
||||
'hadoop-hdfs-namenode',
|
||||
'hadoop-hdfs-secondarynamenode',
|
||||
'hadoop-kms'
|
||||
'hadoop-mapreduce',
|
||||
'hadoop-mapreduce-historyserver',
|
||||
'hadoop-yarn-nodemanager',
|
||||
'hadoop-yarn-resourcemanager',
|
||||
'hbase',
|
||||
'hbase-solr',
|
||||
'hive-hcatalog',
|
||||
'hive-metastore',
|
||||
'hive-server2',
|
||||
'hive-webhcat-server',
|
||||
'hue',
|
||||
'impala',
|
||||
'impala-server',
|
||||
'impala-state-store',
|
||||
'impala-catalog',
|
||||
'impala-shell',
|
||||
'kafka',
|
||||
'kafka-server'
|
||||
'keytrustee-keyprovider',
|
||||
'oozie',
|
||||
'oracle-j2sdk1.7',
|
||||
'sentry',
|
||||
'solr-server',
|
||||
'solr-doc',
|
||||
'search',
|
||||
'spark-history-server',
|
||||
'sqoop2',
|
||||
'unzip',
|
||||
'zookeeper'
|
||||
]
|
||||
|
||||
CU = cu.ClouderaUtilsV570()
|
||||
|
||||
PACKAGES = common_deploy.PACKAGES
|
||||
|
||||
|
||||
def configure_cluster(cluster):
|
||||
instances = gu.get_instances(cluster)
|
||||
|
@ -110,8 +71,8 @@ def scale_cluster(cluster, instances):
|
|||
CU.configure_rack_awareness(cluster)
|
||||
CU.configure_instances(instances, cluster)
|
||||
CU.update_configs(instances)
|
||||
common_deploy.prepare_scaling_kerberized_cluster(cluster, CU,
|
||||
instances)
|
||||
common_deploy.prepare_scaling_kerberized_cluster(
|
||||
cluster, CU, instances)
|
||||
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
_start_roles(cluster, instances)
|
||||
|
@ -203,40 +164,5 @@ def start_cluster(cluster):
|
|||
|
||||
|
||||
def get_open_ports(node_group):
|
||||
ports = [9000] # for CM agent
|
||||
|
||||
ports_map = {
|
||||
'CLOUDERA_MANAGER': [7180, 7182, 7183, 7432, 7184, 8084, 8086, 10101,
|
||||
9997, 9996, 8087, 9998, 9999, 8085, 9995, 9994],
|
||||
'HDFS_NAMENODE': [8020, 8022, 50070, 50470],
|
||||
'HDFS_SECONDARYNAMENODE': [50090, 50495],
|
||||
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
|
||||
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
|
||||
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
|
||||
'YARN_NODEMANAGER': [8040, 8041, 8042],
|
||||
'YARN_JOBHISTORY': [10020, 19888],
|
||||
'HIVE_METASTORE': [9083],
|
||||
'HIVE_SERVER2': [10000],
|
||||
'HUE_SERVER': [8888],
|
||||
'OOZIE_SERVER': [11000, 11001],
|
||||
'SPARK_YARN_HISTORY_SERVER': [18088],
|
||||
'ZOOKEEPER_SERVER': [2181, 3181, 4181, 9010],
|
||||
'HBASE_MASTER': [60000],
|
||||
'HBASE_REGIONSERVER': [60020],
|
||||
'FLUME_AGENT': [41414],
|
||||
'SENTRY_SERVER': [8038],
|
||||
'SOLR_SERVER': [8983, 8984],
|
||||
'SQOOP_SERVER': [8005, 12000],
|
||||
'KEY_VALUE_STORE_INDEXER': [],
|
||||
'IMPALA_CATALOGSERVER': [25020, 26000],
|
||||
'IMPALA_STATESTORE': [25010, 24000],
|
||||
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
|
||||
'KMS': [16000, 16001],
|
||||
'JOURNALNODE': [8480, 8481, 8485]
|
||||
}
|
||||
|
||||
for process in node_group.node_processes:
|
||||
if process in ports_map:
|
||||
ports.extend(ports_map[process])
|
||||
|
||||
ports = common_deploy.get_open_ports(node_group)
|
||||
return ports
|
||||
|
|
|
@ -26,13 +26,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
|
|||
super(EdpOozieEngine, self).__init__(cluster)
|
||||
self.cloudera_utils = cu.ClouderaUtilsV570()
|
||||
|
||||
def get_name_node_uri(self, cluster):
|
||||
if len(self.cloudera_utils.pu.get_jns(cluster)) > 0:
|
||||
return 'hdfs://%s' % self.cloudera_utils.NAME_SERVICE
|
||||
else:
|
||||
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
|
||||
return 'hdfs://%s:8020' % namenode_ip
|
||||
|
||||
@staticmethod
|
||||
def get_possible_job_config(job_type):
|
||||
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):
|
||||
|
|
|
@ -13,154 +13,11 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.conductor import resource as res
|
||||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh import plugin_utils as pu
|
||||
from sahara.plugins.cdh.v5_7_0 import config_helper
|
||||
from sahara.plugins import utils as u
|
||||
|
||||
|
||||
class PluginUtilsV570(pu.AbstractPluginUtils):
|
||||
|
||||
def __init__(self):
|
||||
self.c_helper = config_helper.ConfigHelperV570()
|
||||
|
||||
def get_role_name(self, instance, service):
|
||||
# NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}"
|
||||
shortcuts = {
|
||||
'AGENT': 'A',
|
||||
'ALERTPUBLISHER': 'AP',
|
||||
'CATALOGSERVER': 'ICS',
|
||||
'DATANODE': 'DN',
|
||||
'EVENTSERVER': 'ES',
|
||||
'HBASE_INDEXER': 'LHBI',
|
||||
'HIVEMETASTORE': 'HVM',
|
||||
'HIVESERVER2': 'HVS',
|
||||
'HOSTMONITOR': 'HM',
|
||||
'IMPALAD': 'ID',
|
||||
'JOBHISTORY': 'JS',
|
||||
'JOURNALNODE': 'JN',
|
||||
'KAFKA_BROKER': 'KB',
|
||||
'KMS': 'KMS',
|
||||
'MASTER': 'M',
|
||||
'NAMENODE': 'NN',
|
||||
'NODEMANAGER': 'NM',
|
||||
'OOZIE_SERVER': 'OS',
|
||||
'REGIONSERVER': 'RS',
|
||||
'RESOURCEMANAGER': 'RM',
|
||||
'SECONDARYNAMENODE': 'SNN',
|
||||
'SENTRY_SERVER': 'SNT',
|
||||
'SERVER': 'S',
|
||||
'SERVICEMONITOR': 'SM',
|
||||
'SOLR_SERVER': 'SLR',
|
||||
'SPARK_YARN_HISTORY_SERVER': 'SHS',
|
||||
'SQOOP_SERVER': 'S2S',
|
||||
'STATESTORE': 'ISS',
|
||||
'WEBHCAT': 'WHC',
|
||||
'HDFS_GATEWAY': 'HG',
|
||||
'YARN_GATEWAY': 'YG'
|
||||
}
|
||||
return '%s_%s' % (shortcuts.get(service, service),
|
||||
instance.hostname().replace('-', '_'))
|
||||
|
||||
def get_sentry(self, cluster):
|
||||
return u.get_instance(cluster, 'SENTRY_SERVER')
|
||||
|
||||
def get_flumes(self, cluster):
|
||||
return u.get_instances(cluster, 'FLUME_AGENT')
|
||||
|
||||
def get_solrs(self, cluster):
|
||||
return u.get_instances(cluster, 'SOLR_SERVER')
|
||||
|
||||
def get_sqoop(self, cluster):
|
||||
return u.get_instance(cluster, 'SQOOP_SERVER')
|
||||
|
||||
def get_hbase_indexers(self, cluster):
|
||||
return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER')
|
||||
|
||||
def get_catalogserver(self, cluster):
|
||||
return u.get_instance(cluster, 'IMPALA_CATALOGSERVER')
|
||||
|
||||
def get_statestore(self, cluster):
|
||||
return u.get_instance(cluster, 'IMPALA_STATESTORE')
|
||||
|
||||
def get_impalads(self, cluster):
|
||||
return u.get_instances(cluster, 'IMPALAD')
|
||||
|
||||
def get_kms(self, cluster):
|
||||
return u.get_instances(cluster, 'KMS')
|
||||
|
||||
def get_jns(self, cluster):
|
||||
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
|
||||
|
||||
def get_stdb_rm(self, cluster):
|
||||
return u.get_instance(cluster, 'YARN_STANDBYRM')
|
||||
|
||||
def get_kafka_brokers(self, cluster):
|
||||
return u.get_instances(cluster, 'KAFKA_BROKER')
|
||||
|
||||
def convert_process_configs(self, configs):
|
||||
p_dict = {
|
||||
"CLOUDERA": ['MANAGER'],
|
||||
"NAMENODE": ['NAMENODE'],
|
||||
"DATANODE": ['DATANODE'],
|
||||
"SECONDARYNAMENODE": ['SECONDARYNAMENODE'],
|
||||
"RESOURCEMANAGER": ['RESOURCEMANAGER'],
|
||||
"NODEMANAGER": ['NODEMANAGER'],
|
||||
"JOBHISTORY": ['JOBHISTORY'],
|
||||
"OOZIE": ['OOZIE_SERVER'],
|
||||
"HIVESERVER": ['HIVESERVER2'],
|
||||
"HIVEMETASTORE": ['HIVEMETASTORE'],
|
||||
"WEBHCAT": ['WEBHCAT'],
|
||||
"HUE": ['HUE_SERVER'],
|
||||
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
|
||||
"ZOOKEEPER": ['SERVER'],
|
||||
"MASTER": ['MASTER'],
|
||||
"REGIONSERVER": ['REGIONSERVER'],
|
||||
"FLUME": ['AGENT'],
|
||||
"CATALOGSERVER": ['CATALOGSERVER'],
|
||||
"STATESTORE": ['STATESTORE'],
|
||||
"IMPALAD": ['IMPALAD'],
|
||||
"KS_INDEXER": ['HBASE_INDEXER'],
|
||||
"SENTRY": ['SENTRY_SERVER'],
|
||||
"SOLR": ['SOLR_SERVER'],
|
||||
"SQOOP": ['SQOOP_SERVER'],
|
||||
"KMS": ['KMS'],
|
||||
"YARN_GATEWAY": ['YARN_GATEWAY'],
|
||||
"HDFS_GATEWAY": ['HDFS_GATEWAY'],
|
||||
"JOURNALNODE": ['JOURNALNODE'],
|
||||
"KAFKA": ['KAFKA_BROKER']
|
||||
}
|
||||
if isinstance(configs, res.Resource):
|
||||
configs = configs.to_dict()
|
||||
for k in configs.keys():
|
||||
if k in p_dict.keys():
|
||||
item = configs[k]
|
||||
del configs[k]
|
||||
newkey = p_dict[k][0]
|
||||
configs[newkey] = item
|
||||
return res.Resource(configs)
|
||||
|
||||
def configure_sentry(self, cluster):
|
||||
manager = self.get_manager(cluster)
|
||||
with manager.remote() as r:
|
||||
dh.create_sentry_database(cluster, r)
|
||||
|
||||
def _configure_repo_from_inst(self, instance):
|
||||
super(PluginUtilsV570, self)._configure_repo_from_inst(instance)
|
||||
|
||||
cluster = instance.cluster
|
||||
with instance.remote() as r:
|
||||
if cmd.is_ubuntu_os(r):
|
||||
kms_key = (
|
||||
self.c_helper.get_kms_key_url(cluster) or
|
||||
self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL)
|
||||
kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL
|
||||
cmd.add_ubuntu_repository(r, kms_repo_url, 'kms')
|
||||
cmd.add_apt_key(r, kms_key)
|
||||
cmd.update_repository(r)
|
||||
if cmd.is_centos_os(r):
|
||||
kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL
|
||||
cmd.add_centos_repository(r, kms_repo_url, 'kms')
|
||||
cmd.update_repository(r)
|
||||
|
|
|
@ -13,220 +13,9 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh.v5_7_0 import plugin_utils as pu
|
||||
from sahara.plugins.cdh import validation
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins import utils as u
|
||||
|
||||
|
||||
class ValidatorV570(validation.Validator):
|
||||
PU = pu.PluginUtilsV570()
|
||||
|
||||
@classmethod
|
||||
def validate_cluster_creating(cls, cluster):
|
||||
super(ValidatorV570, cls).validate_cluster_creating(cluster)
|
||||
cls._hdfs_ha_validation(cluster)
|
||||
cls._yarn_ha_validation(cluster)
|
||||
cls._flume_validation(cluster)
|
||||
cls._sentry_validation(cluster)
|
||||
cls._solr_validation(cluster)
|
||||
cls._sqoop_validation(cluster)
|
||||
cls._hbase_indexer_validation(cluster)
|
||||
cls._impala_validation(cluster)
|
||||
cls._kms_validation(cluster)
|
||||
|
||||
@classmethod
|
||||
def _hdfs_ha_validation(cls, cluster):
|
||||
jn_count = cls._get_inst_count(cluster, 'HDFS_JOURNALNODE')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
|
||||
cluster)
|
||||
|
||||
if jn_count > 0:
|
||||
if jn_count < 3:
|
||||
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||
_('not less than 3'),
|
||||
jn_count)
|
||||
if not jn_count % 2:
|
||||
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||
_('be odd'), jn_count)
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException('ZOOKEEPER',
|
||||
required_by='HDFS HA')
|
||||
if require_anti_affinity:
|
||||
if 'HDFS_SECONDARYNAMENODE' not in\
|
||||
cls._get_anti_affinity(cluster):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
_('HDFS_SECONDARYNAMENODE should be enabled '
|
||||
'in anti_affinity.'))
|
||||
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
|
||||
|
||||
@classmethod
|
||||
def _yarn_ha_validation(cls, cluster):
|
||||
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
stdb_rm_count = cls._get_inst_count(cluster, 'YARN_STANDBYRM')
|
||||
|
||||
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
|
||||
cluster)
|
||||
|
||||
if stdb_rm_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
|
||||
if stdb_rm_count > 0:
|
||||
if rm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_RESOURCEMANAGER', required_by='RM HA')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='RM HA')
|
||||
if require_anti_affinity:
|
||||
if 'YARN_RESOURCEMANAGER' not in\
|
||||
cls._get_anti_affinity(cluster):
|
||||
raise ex.ResourceManagerHAConfigurationError(
|
||||
_('YARN_RESOURCEMANAGER should be enabled in '
|
||||
'anti_affinity.'))
|
||||
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
|
||||
raise ex.ResourceManagerHAConfigurationError(
|
||||
_('YARN_STANDBYRM should be'
|
||||
' enabled in anti_affinity.'))
|
||||
|
||||
@classmethod
|
||||
def _flume_validation(cls, cluster):
|
||||
a_count = cls._get_inst_count(cluster, 'FLUME_AGENT')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
|
||||
if a_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='FLUME_AGENT')
|
||||
|
||||
@classmethod
|
||||
def _sentry_validation(cls, cluster):
|
||||
|
||||
snt_count = cls._get_inst_count(cluster, 'SENTRY_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
if snt_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'SENTRY_SERVER', _('0 or 1'), snt_count)
|
||||
if snt_count == 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SENTRY_SERVER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='SENTRY_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _solr_validation(cls, cluster):
|
||||
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
|
||||
if slr_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SOLR_SERVER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='SOLR_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _sqoop_validation(cls, cluster):
|
||||
|
||||
s2s_count = cls._get_inst_count(cluster, 'SQOOP_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
|
||||
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
|
||||
|
||||
if s2s_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'SQOOP_SERVER', _('0 or 1'), s2s_count)
|
||||
if s2s_count == 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SQOOP_SERVER')
|
||||
if nm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
|
||||
if hs_count != 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _hbase_indexer_validation(cls, cluster):
|
||||
|
||||
lhbi_count = cls._get_inst_count(cluster, 'HBASE_INDEXER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
|
||||
|
||||
if lhbi_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='HBASE_INDEXER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='HBASE_INDEXER')
|
||||
if slr_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'SOLR_SERVER', required_by='HBASE_INDEXER')
|
||||
if hbm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HBASE_MASTER', required_by='HBASE_INDEXER')
|
||||
|
||||
@classmethod
|
||||
def _impala_validation(cls, cluster):
|
||||
ics_count = cls._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
|
||||
iss_count = cls._get_inst_count(cluster, 'IMPALA_STATESTORE')
|
||||
id_count = cls._get_inst_count(cluster, 'IMPALAD')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
|
||||
|
||||
if ics_count > 1:
|
||||
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
|
||||
_('0 or 1'), ics_count)
|
||||
if iss_count > 1:
|
||||
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
|
||||
_('0 or 1'), iss_count)
|
||||
if ics_count == 1:
|
||||
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
|
||||
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
|
||||
datanodes = set(ng.id for ng in datanode_ng)
|
||||
impalads = set(ng.id for ng in impalad_ng)
|
||||
|
||||
if datanodes != impalads:
|
||||
raise ex.InvalidClusterTopology(
|
||||
_("IMPALAD must be installed on every HDFS_DATANODE"))
|
||||
|
||||
if iss_count != 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'IMPALA_STATESTORE', required_by='IMPALA')
|
||||
if id_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'IMPALAD', required_by='IMPALA')
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='IMPALA')
|
||||
if hms_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HIVE_METASTORE', required_by='IMPALA')
|
||||
|
||||
@classmethod
|
||||
def _kms_validation(cls, cluster):
|
||||
|
||||
kms_count = cls._get_inst_count(cluster, 'KMS')
|
||||
if kms_count > 1:
|
||||
raise ex.InvalidComponentCountException('KMS',
|
||||
_('0 or 1'), kms_count)
|
||||
|
||||
@classmethod
|
||||
def _get_anti_affinity(cls, cluster):
|
||||
return cluster.anti_affinity
|
||||
|
|
|
@ -21,7 +21,6 @@ from sahara.plugins.cdh.v5_7_0 import deploy
|
|||
from sahara.plugins.cdh.v5_7_0 import edp_engine
|
||||
from sahara.plugins.cdh.v5_7_0 import plugin_utils
|
||||
from sahara.plugins.cdh.v5_7_0 import validation
|
||||
from sahara.plugins import kerberos
|
||||
|
||||
|
||||
class VersionHandler(avm.BaseVersionHandler):
|
||||
|
@ -34,67 +33,3 @@ class VersionHandler(avm.BaseVersionHandler):
|
|||
self.deploy = deploy
|
||||
self.edp_engine = edp_engine
|
||||
self.validation = validation.ValidatorV570()
|
||||
|
||||
def get_plugin_configs(self):
|
||||
result = super(VersionHandler, self).get_plugin_configs()
|
||||
result.extend(kerberos.get_config_list())
|
||||
return result
|
||||
|
||||
def get_node_processes(self):
|
||||
return {
|
||||
"CLOUDERA": ['CLOUDERA_MANAGER'],
|
||||
"HDFS": ['HDFS_NAMENODE', 'HDFS_DATANODE',
|
||||
'HDFS_SECONDARYNAMENODE', 'HDFS_JOURNALNODE'],
|
||||
"YARN": ['YARN_RESOURCEMANAGER', 'YARN_NODEMANAGER',
|
||||
'YARN_JOBHISTORY', 'YARN_STANDBYRM'],
|
||||
"OOZIE": ['OOZIE_SERVER'],
|
||||
"HIVE": ['HIVE_SERVER2', 'HIVE_METASTORE', 'HIVE_WEBHCAT'],
|
||||
"HUE": ['HUE_SERVER'],
|
||||
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
|
||||
"ZOOKEEPER": ['ZOOKEEPER_SERVER'],
|
||||
"HBASE": ['HBASE_MASTER', 'HBASE_REGIONSERVER'],
|
||||
"FLUME": ['FLUME_AGENT'],
|
||||
"IMPALA": ['IMPALA_CATALOGSERVER', 'IMPALA_STATESTORE', 'IMPALAD'],
|
||||
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
|
||||
"SOLR": ['SOLR_SERVER'],
|
||||
"SQOOP": ['SQOOP_SERVER'],
|
||||
"SENTRY": ['SENTRY_SERVER'],
|
||||
"KMS": ['KMS'],
|
||||
"KAFKA": ['KAFKA_BROKER'],
|
||||
|
||||
"YARN_GATEWAY": [],
|
||||
"RESOURCEMANAGER": [],
|
||||
"NODEMANAGER": [],
|
||||
"JOBHISTORY": [],
|
||||
|
||||
"HDFS_GATEWAY": [],
|
||||
'DATANODE': [],
|
||||
'NAMENODE': [],
|
||||
'SECONDARYNAMENODE': [],
|
||||
'JOURNALNODE': [],
|
||||
|
||||
'REGIONSERVER': [],
|
||||
'MASTER': [],
|
||||
|
||||
'HIVEMETASTORE': [],
|
||||
'HIVESERVER': [],
|
||||
'WEBCAT': [],
|
||||
|
||||
'CATALOGSERVER': [],
|
||||
'STATESTORE': [],
|
||||
'IMPALAD': [],
|
||||
'Kerberos': [],
|
||||
}
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
oozie_type = self.edp_engine.EdpOozieEngine.get_supported_job_types()
|
||||
spark_type = self.edp_engine.EdpSparkEngine.get_supported_job_types()
|
||||
if job_type in oozie_type:
|
||||
return self.edp_engine.EdpOozieEngine(cluster)
|
||||
if job_type in spark_type:
|
||||
return self.edp_engine.EdpSparkEngine(cluster)
|
||||
return None
|
||||
|
||||
def get_edp_job_types(self):
|
||||
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
|
||||
edp_engine.EdpSparkEngine.get_supported_job_types())
|
||||
|
|
|
@ -13,459 +13,16 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh import cloudera_utils as cu
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh.v5_9_0 import config_helper
|
||||
from sahara.plugins.cdh.v5_9_0 import plugin_utils as pu
|
||||
from sahara.plugins.cdh.v5_9_0 import validation
|
||||
from sahara.swift import swift_helper
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
from sahara.utils import configs as s_cfg
|
||||
from sahara.utils import xmlutils
|
||||
|
||||
|
||||
HDFS_SERVICE_TYPE = 'HDFS'
|
||||
YARN_SERVICE_TYPE = 'YARN'
|
||||
OOZIE_SERVICE_TYPE = 'OOZIE'
|
||||
HIVE_SERVICE_TYPE = 'HIVE'
|
||||
HUE_SERVICE_TYPE = 'HUE'
|
||||
SPARK_SERVICE_TYPE = 'SPARK_ON_YARN'
|
||||
ZOOKEEPER_SERVICE_TYPE = 'ZOOKEEPER'
|
||||
HBASE_SERVICE_TYPE = 'HBASE'
|
||||
FLUME_SERVICE_TYPE = 'FLUME'
|
||||
SENTRY_SERVICE_TYPE = 'SENTRY'
|
||||
SOLR_SERVICE_TYPE = 'SOLR'
|
||||
SQOOP_SERVICE_TYPE = 'SQOOP'
|
||||
KS_INDEXER_SERVICE_TYPE = 'KS_INDEXER'
|
||||
IMPALA_SERVICE_TYPE = 'IMPALA'
|
||||
KMS_SERVICE_TYPE = 'KMS'
|
||||
KAFKA_SERVICE_TYPE = 'KAFKA'
|
||||
|
||||
c_helper = config_helper.ConfigHelperV590()
|
||||
|
||||
|
||||
class ClouderaUtilsV590(cu.ClouderaUtils):
|
||||
FLUME_SERVICE_NAME = 'flume01'
|
||||
SOLR_SERVICE_NAME = 'solr01'
|
||||
SQOOP_SERVICE_NAME = 'sqoop01'
|
||||
KS_INDEXER_SERVICE_NAME = 'ks_indexer01'
|
||||
IMPALA_SERVICE_NAME = 'impala01'
|
||||
SENTRY_SERVICE_NAME = 'sentry01'
|
||||
KMS_SERVICE_NAME = 'kms01'
|
||||
KAFKA_SERVICE_NAME = 'kafka01'
|
||||
CM_API_VERSION = 8
|
||||
NAME_SERVICE = 'nameservice01'
|
||||
|
||||
def __init__(self):
|
||||
cu.ClouderaUtils.__init__(self)
|
||||
self.pu = pu.PluginUtilsV590()
|
||||
self.validator = validation.ValidatorV590
|
||||
|
||||
def get_service_by_role(self, role, cluster=None, instance=None):
|
||||
cm_cluster = None
|
||||
if cluster:
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
elif instance:
|
||||
cm_cluster = self.get_cloudera_cluster(instance.cluster)
|
||||
else:
|
||||
raise ValueError(_("'cluster' or 'instance' argument missed"))
|
||||
|
||||
if role in ['AGENT']:
|
||||
return cm_cluster.get_service(self.FLUME_SERVICE_NAME)
|
||||
elif role in ['SENTRY_SERVER']:
|
||||
return cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
|
||||
elif role in ['SQOOP_SERVER']:
|
||||
return cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
|
||||
elif role in ['SOLR_SERVER']:
|
||||
return cm_cluster.get_service(self.SOLR_SERVICE_NAME)
|
||||
elif role in ['HBASE_INDEXER']:
|
||||
return cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
|
||||
elif role in ['CATALOGSERVER', 'STATESTORE', 'IMPALAD', 'LLAMA']:
|
||||
return cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||
elif role in ['KMS']:
|
||||
return cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||
elif role in ['JOURNALNODE']:
|
||||
return cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
elif role in ['YARN_STANDBYRM']:
|
||||
return cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
elif role in ['KAFKA_BROKER']:
|
||||
return cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
|
||||
else:
|
||||
return super(ClouderaUtilsV590, self).get_service_by_role(
|
||||
role, cluster, instance)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("First run cluster"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def first_run(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yield cm_cluster.first_run()
|
||||
|
||||
@cpo.event_wrapper(True, step=_("Create services"), param=('cluster', 1))
|
||||
def create_services(self, cluster):
|
||||
api = self.get_api_client(cluster)
|
||||
cm_cluster = api.create_cluster(cluster.name,
|
||||
fullVersion=cluster.hadoop_version)
|
||||
|
||||
if len(self.pu.get_zookeepers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.ZOOKEEPER_SERVICE_NAME,
|
||||
ZOOKEEPER_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.HDFS_SERVICE_NAME, HDFS_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.YARN_SERVICE_NAME, YARN_SERVICE_TYPE)
|
||||
cm_cluster.create_service(self.OOZIE_SERVICE_NAME, OOZIE_SERVICE_TYPE)
|
||||
if self.pu.get_hive_metastore(cluster):
|
||||
cm_cluster.create_service(self.HIVE_SERVICE_NAME,
|
||||
HIVE_SERVICE_TYPE)
|
||||
if self.pu.get_hue(cluster):
|
||||
cm_cluster.create_service(self.HUE_SERVICE_NAME, HUE_SERVICE_TYPE)
|
||||
if self.pu.get_spark_historyserver(cluster):
|
||||
cm_cluster.create_service(self.SPARK_SERVICE_NAME,
|
||||
SPARK_SERVICE_TYPE)
|
||||
if self.pu.get_hbase_master(cluster):
|
||||
cm_cluster.create_service(self.HBASE_SERVICE_NAME,
|
||||
HBASE_SERVICE_TYPE)
|
||||
if len(self.pu.get_flumes(cluster)) > 0:
|
||||
cm_cluster.create_service(self.FLUME_SERVICE_NAME,
|
||||
FLUME_SERVICE_TYPE)
|
||||
if self.pu.get_sentry(cluster):
|
||||
cm_cluster.create_service(self.SENTRY_SERVICE_NAME,
|
||||
SENTRY_SERVICE_TYPE)
|
||||
if len(self.pu.get_solrs(cluster)) > 0:
|
||||
cm_cluster.create_service(self.SOLR_SERVICE_NAME,
|
||||
SOLR_SERVICE_TYPE)
|
||||
if self.pu.get_sqoop(cluster):
|
||||
cm_cluster.create_service(self.SQOOP_SERVICE_NAME,
|
||||
SQOOP_SERVICE_TYPE)
|
||||
if len(self.pu.get_hbase_indexers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.KS_INDEXER_SERVICE_NAME,
|
||||
KS_INDEXER_SERVICE_TYPE)
|
||||
if self.pu.get_catalogserver(cluster):
|
||||
cm_cluster.create_service(self.IMPALA_SERVICE_NAME,
|
||||
IMPALA_SERVICE_TYPE)
|
||||
if self.pu.get_kms(cluster):
|
||||
cm_cluster.create_service(self.KMS_SERVICE_NAME,
|
||||
KMS_SERVICE_TYPE)
|
||||
if len(self.pu.get_kafka_brokers(cluster)) > 0:
|
||||
cm_cluster.create_service(self.KAFKA_SERVICE_NAME,
|
||||
KAFKA_SERVICE_TYPE)
|
||||
|
||||
def await_agents(self, cluster, instances):
|
||||
self._await_agents(cluster, instances, c_helper.AWAIT_AGENTS_TIMEOUT)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Configure services"), param=('cluster', 1))
|
||||
def configure_services(self, cluster):
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
|
||||
if len(self.pu.get_zookeepers(cluster)) > 0:
|
||||
zookeeper = cm_cluster.get_service(self.ZOOKEEPER_SERVICE_NAME)
|
||||
zookeeper.update_config(self._get_configs(ZOOKEEPER_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
hdfs.update_config(self._get_configs(HDFS_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
yarn.update_config(self._get_configs(YARN_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
oozie = cm_cluster.get_service(self.OOZIE_SERVICE_NAME)
|
||||
oozie.update_config(self._get_configs(OOZIE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hive_metastore(cluster):
|
||||
hive = cm_cluster.get_service(self.HIVE_SERVICE_NAME)
|
||||
hive.update_config(self._get_configs(HIVE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hue(cluster):
|
||||
hue = cm_cluster.get_service(self.HUE_SERVICE_NAME)
|
||||
hue.update_config(self._get_configs(HUE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_spark_historyserver(cluster):
|
||||
spark = cm_cluster.get_service(self.SPARK_SERVICE_NAME)
|
||||
spark.update_config(self._get_configs(SPARK_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_hbase_master(cluster):
|
||||
hbase = cm_cluster.get_service(self.HBASE_SERVICE_NAME)
|
||||
hbase.update_config(self._get_configs(HBASE_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_flumes(cluster)) > 0:
|
||||
flume = cm_cluster.get_service(self.FLUME_SERVICE_NAME)
|
||||
flume.update_config(self._get_configs(FLUME_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_sentry(cluster):
|
||||
sentry = cm_cluster.get_service(self.SENTRY_SERVICE_NAME)
|
||||
sentry.update_config(self._get_configs(SENTRY_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_solrs(cluster)) > 0:
|
||||
solr = cm_cluster.get_service(self.SOLR_SERVICE_NAME)
|
||||
solr.update_config(self._get_configs(SOLR_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_sqoop(cluster):
|
||||
sqoop = cm_cluster.get_service(self.SQOOP_SERVICE_NAME)
|
||||
sqoop.update_config(self._get_configs(SQOOP_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if len(self.pu.get_hbase_indexers(cluster)) > 0:
|
||||
ks_indexer = cm_cluster.get_service(self.KS_INDEXER_SERVICE_NAME)
|
||||
ks_indexer.update_config(
|
||||
self._get_configs(KS_INDEXER_SERVICE_TYPE, cluster=cluster))
|
||||
|
||||
if self.pu.get_catalogserver(cluster):
|
||||
impala = cm_cluster.get_service(self.IMPALA_SERVICE_NAME)
|
||||
impala.update_config(self._get_configs(IMPALA_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
if self.pu.get_kms(cluster):
|
||||
kms = cm_cluster.get_service(self.KMS_SERVICE_NAME)
|
||||
kms.update_config(self._get_configs(KMS_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
if len(self.pu.get_kafka_brokers(cluster)) > 0:
|
||||
kafka = cm_cluster.get_service(self.KAFKA_SERVICE_NAME)
|
||||
kafka.update_config(self._get_configs(KAFKA_SERVICE_TYPE,
|
||||
cluster=cluster))
|
||||
|
||||
def _get_configs(self, service, cluster=None, instance=None):
|
||||
def get_hadoop_dirs(mount_points, suffix):
|
||||
return ','.join([x + suffix for x in mount_points])
|
||||
|
||||
all_confs = {}
|
||||
if cluster:
|
||||
zk_count = self.validator._get_inst_count(cluster,
|
||||
'ZOOKEEPER_SERVER')
|
||||
hbm_count = self.validator._get_inst_count(cluster, 'HBASE_MASTER')
|
||||
snt_count = self.validator._get_inst_count(cluster,
|
||||
'SENTRY_SERVER')
|
||||
ks_count =\
|
||||
self.validator._get_inst_count(cluster,
|
||||
'KEY_VALUE_STORE_INDEXER')
|
||||
kms_count = self.validator._get_inst_count(cluster, 'KMS')
|
||||
imp_count =\
|
||||
self.validator._get_inst_count(cluster,
|
||||
'IMPALA_CATALOGSERVER')
|
||||
hive_count = self.validator._get_inst_count(cluster,
|
||||
'HIVE_METASTORE')
|
||||
slr_count = self.validator._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
sqp_count = self.validator._get_inst_count(cluster, 'SQOOP_SERVER')
|
||||
core_site_safety_valve = ''
|
||||
if self.pu.c_helper.is_swift_enabled(cluster):
|
||||
configs = swift_helper.get_swift_configs()
|
||||
confs = {c['name']: c['value'] for c in configs}
|
||||
core_site_safety_valve = xmlutils.create_elements_xml(confs)
|
||||
all_confs = {
|
||||
'HDFS': {
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
|
||||
'dfs_block_local_path_access_user':
|
||||
'impala' if imp_count else '',
|
||||
'kms_service': self.KMS_SERVICE_NAME if kms_count else '',
|
||||
'core_site_safety_valve': core_site_safety_valve
|
||||
},
|
||||
'HIVE': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'OOZIE': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME,
|
||||
'hive_service':
|
||||
self.HIVE_SERVICE_NAME if hive_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'YARN': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
},
|
||||
'HUE': {
|
||||
'hive_service': self.HIVE_SERVICE_NAME,
|
||||
'oozie_service': self.OOZIE_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'solr_service':
|
||||
self.SOLR_SERVICE_NAME if slr_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else '',
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else '',
|
||||
'impala_service':
|
||||
self.IMPALA_SERVICE_NAME if imp_count else '',
|
||||
'sqoop_service':
|
||||
self.SQOOP_SERVICE_NAME if sqp_count else ''
|
||||
},
|
||||
'SPARK_ON_YARN': {
|
||||
'yarn_service': self.YARN_SERVICE_NAME
|
||||
},
|
||||
'HBASE': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME,
|
||||
'hbase_enable_indexing': 'true' if ks_count else 'false',
|
||||
'hbase_enable_replication':
|
||||
'true' if ks_count else 'false'
|
||||
},
|
||||
'FLUME': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'solr_service':
|
||||
self.SOLR_SERVICE_NAME if slr_count else '',
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else ''
|
||||
},
|
||||
'SENTRY': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'sentry_server_config_safety_valve': (
|
||||
c_helper.SENTRY_IMPALA_CLIENT_SAFETY_VALVE
|
||||
if imp_count else '')
|
||||
},
|
||||
'SOLR': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
|
||||
},
|
||||
'SQOOP': {
|
||||
'mapreduce_yarn_service': self.YARN_SERVICE_NAME
|
||||
},
|
||||
'KS_INDEXER': {
|
||||
'hbase_service': self.HBASE_SERVICE_NAME,
|
||||
'solr_service': self.SOLR_SERVICE_NAME
|
||||
},
|
||||
'IMPALA': {
|
||||
'hdfs_service': self.HDFS_SERVICE_NAME,
|
||||
'hbase_service':
|
||||
self.HBASE_SERVICE_NAME if hbm_count else '',
|
||||
'hive_service': self.HIVE_SERVICE_NAME,
|
||||
'sentry_service':
|
||||
self.SENTRY_SERVICE_NAME if snt_count else '',
|
||||
'zookeeper_service':
|
||||
self.ZOOKEEPER_SERVICE_NAME if zk_count else ''
|
||||
}
|
||||
}
|
||||
hive_confs = {
|
||||
'HIVE': {
|
||||
'hive_metastore_database_type': 'postgresql',
|
||||
'hive_metastore_database_host':
|
||||
self.pu.get_manager(cluster).internal_ip,
|
||||
'hive_metastore_database_port': '7432',
|
||||
'hive_metastore_database_password':
|
||||
dh.get_hive_db_password(cluster)
|
||||
}
|
||||
}
|
||||
hue_confs = {
|
||||
'HUE': {
|
||||
'hue_webhdfs': self.pu.get_role_name(
|
||||
self.pu.get_namenode(cluster), 'NAMENODE')
|
||||
}
|
||||
}
|
||||
sentry_confs = {
|
||||
'SENTRY': {
|
||||
'sentry_server_database_type': 'postgresql',
|
||||
'sentry_server_database_host':
|
||||
self.pu.get_manager(cluster).internal_ip,
|
||||
'sentry_server_database_port': '7432',
|
||||
'sentry_server_database_password':
|
||||
dh.get_sentry_db_password(cluster)
|
||||
}
|
||||
}
|
||||
kafka_confs = {
|
||||
'KAFKA': {
|
||||
'zookeeper_service': self.ZOOKEEPER_SERVICE_NAME
|
||||
}
|
||||
}
|
||||
all_confs = s_cfg.merge_configs(all_confs, hue_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, hive_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, sentry_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, kafka_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, cluster.cluster_configs)
|
||||
|
||||
if instance:
|
||||
snt_count = self.validator._get_inst_count(instance.cluster,
|
||||
'SENTRY_SERVER')
|
||||
paths = instance.storage_paths()
|
||||
|
||||
instance_default_confs = {
|
||||
'NAMENODE': {
|
||||
'dfs_name_dir_list': get_hadoop_dirs(paths, '/fs/nn')
|
||||
},
|
||||
'SECONDARYNAMENODE': {
|
||||
'fs_checkpoint_dir_list':
|
||||
get_hadoop_dirs(paths, '/fs/snn')
|
||||
},
|
||||
'DATANODE': {
|
||||
'dfs_data_dir_list': get_hadoop_dirs(paths, '/fs/dn'),
|
||||
'dfs_datanode_data_dir_perm': 755,
|
||||
'dfs_datanode_handler_count': 30
|
||||
},
|
||||
'NODEMANAGER': {
|
||||
'yarn_nodemanager_local_dirs':
|
||||
get_hadoop_dirs(paths, '/yarn/local'),
|
||||
'container_executor_allowed_system_users':
|
||||
"nobody,impala,hive,llama,hdfs,yarn,mapred,"
|
||||
"spark,oozie",
|
||||
"container_executor_banned_users": "bin"
|
||||
},
|
||||
'SERVER': {
|
||||
'maxSessionTimeout': 60000
|
||||
},
|
||||
'HIVESERVER2': {
|
||||
'hiveserver2_enable_impersonation':
|
||||
'false' if snt_count else 'true',
|
||||
'hive_hs2_config_safety_valve': (
|
||||
c_helper.HIVE_SERVER2_SENTRY_SAFETY_VALVE
|
||||
if snt_count else '')
|
||||
},
|
||||
'HIVEMETASTORE': {
|
||||
'hive_metastore_config_safety_valve': (
|
||||
c_helper.HIVE_METASTORE_SENTRY_SAFETY_VALVE
|
||||
if snt_count else '')
|
||||
}
|
||||
}
|
||||
|
||||
ng_user_confs = self.pu.convert_process_configs(
|
||||
instance.node_group.node_configs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, ng_user_confs)
|
||||
all_confs = s_cfg.merge_configs(all_confs, instance_default_confs)
|
||||
|
||||
return all_confs.get(service, {})
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Enable NameNode HA"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def enable_namenode_ha(self, cluster):
|
||||
standby_nn = self.pu.get_secondarynamenode(cluster)
|
||||
standby_nn_host_name = standby_nn.fqdn()
|
||||
jns = self.pu.get_jns(cluster)
|
||||
jn_list = []
|
||||
for index, jn in enumerate(jns):
|
||||
jn_host_name = jn.fqdn()
|
||||
jn_list.append({'jnHostId': jn_host_name,
|
||||
'jnName': 'JN%i' % index,
|
||||
'jnEditsDir': '/dfs/jn'
|
||||
})
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
hdfs = cm_cluster.get_service(self.HDFS_SERVICE_NAME)
|
||||
nn = hdfs.get_roles_by_type('NAMENODE')[0]
|
||||
|
||||
yield hdfs.enable_nn_ha(active_name=nn.name,
|
||||
standby_host_id=standby_nn_host_name,
|
||||
nameservice=self.NAME_SERVICE, jns=jn_list
|
||||
)
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Enable ResourceManager HA"), param=('cluster', 1))
|
||||
@cu.cloudera_cmd
|
||||
def enable_resourcemanager_ha(self, cluster):
|
||||
new_rm = self.pu.get_stdb_rm(cluster)
|
||||
new_rm_host_name = new_rm.fqdn()
|
||||
cm_cluster = self.get_cloudera_cluster(cluster)
|
||||
yarn = cm_cluster.get_service(self.YARN_SERVICE_NAME)
|
||||
yield yarn.enable_rm_ha(new_rm_host_id=new_rm_host_name)
|
||||
self.c_helper = config_helper.ConfigHelperV590()
|
||||
|
|
|
@ -65,10 +65,9 @@ class ConfigHelperV590(c_h.ConfigHelper):
|
|||
'keytrustee/cloudera.list')
|
||||
|
||||
DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL = (
|
||||
'http://archive.cloudera.com/'
|
||||
'navigator-keytrustee5/ubuntu/'
|
||||
'trusty/amd64/navigator-keytrustee'
|
||||
'/archive.key')
|
||||
'http://archive.cloudera.com/navigator-'
|
||||
'keytrustee5/ubuntu/trusty/amd64/navigator-'
|
||||
'keytrustee/archive.key')
|
||||
|
||||
KEY_TRUSTEE_CENTOS_REPO_URL = (
|
||||
'http://archive.cloudera.com/navigator-'
|
||||
|
@ -80,6 +79,12 @@ class ConfigHelperV590(c_h.ConfigHelper):
|
|||
'/apache/hadoop/hadoop-openstack/2.6.0-cdh5.9.0'
|
||||
'/hadoop-openstack-2.6.0-cdh5.9.0.jar')
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_SWIFT_LIB_URL,
|
||||
description=("Library that adds Swift support to CDH. The file"
|
||||
" will be downloaded by VMs."))
|
||||
|
||||
HIVE_SERVER2_SENTRY_SAFETY_VALVE = f.get_file_text(
|
||||
path_to_config + 'hive-server2-sentry-safety.xml')
|
||||
|
||||
|
@ -89,162 +94,8 @@ class ConfigHelperV590(c_h.ConfigHelper):
|
|||
SENTRY_IMPALA_CLIENT_SAFETY_VALVE = f.get_file_text(
|
||||
path_to_config + 'sentry-impala-client-safety.xml')
|
||||
|
||||
_default_executor_classpath = ":".join(
|
||||
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
|
||||
'/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
|
||||
|
||||
EXECUTOR_EXTRA_CLASSPATH = p.Config(
|
||||
'Executor extra classpath', 'Spark', 'cluster', priority=2,
|
||||
default_value=_default_executor_classpath,
|
||||
description='Value for spark.executor.extraClassPath in '
|
||||
'spark-defaults.conf (default: %s)'
|
||||
% _default_executor_classpath)
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_SWIFT_LIB_URL,
|
||||
description=("Library that adds Swift support to CDH. The file"
|
||||
" will be downloaded by VMs."))
|
||||
|
||||
KMS_REPO_URL = p.Config(
|
||||
'KMS repo list URL', 'general', 'cluster', priority=1,
|
||||
default_value="")
|
||||
|
||||
KMS_REPO_KEY_URL = p.Config(
|
||||
'KMS repo key URL (for debian-based only)', 'general',
|
||||
'cluster',
|
||||
priority=1, default_value="")
|
||||
|
||||
REQUIRE_ANTI_AFFINITY = p.Config('Require Anti Affinity',
|
||||
'general', 'cluster',
|
||||
config_type='bool',
|
||||
priority=2,
|
||||
default_value=True)
|
||||
|
||||
def __init__(self):
|
||||
super(ConfigHelperV590, self).__init__()
|
||||
self.priority_one_confs = self._load_json(
|
||||
self.path_to_config + 'priority-one-confs.json')
|
||||
self._init_all_ng_plugin_configs()
|
||||
|
||||
def _get_cluster_plugin_configs(self):
|
||||
confs = super(ConfigHelperV590, self)._get_ng_plugin_configs()
|
||||
confs += [self.EXECUTOR_EXTRA_CLASSPATH,
|
||||
self.KMS_REPO_URL,
|
||||
self.KMS_REPO_KEY_URL,
|
||||
self.REQUIRE_ANTI_AFFINITY]
|
||||
|
||||
return confs
|
||||
|
||||
def _init_all_ng_plugin_configs(self):
|
||||
self.hdfs_confs = self._load_and_init_configs(
|
||||
'hdfs-service.json', 'HDFS', 'cluster')
|
||||
self.namenode_confs = self._load_and_init_configs(
|
||||
'hdfs-namenode.json', 'NAMENODE', 'node')
|
||||
self.datanode_confs = self._load_and_init_configs(
|
||||
'hdfs-datanode.json', 'DATANODE', 'node')
|
||||
self.secnamenode_confs = self._load_and_init_configs(
|
||||
'hdfs-secondarynamenode.json', 'SECONDARYNAMENODE', 'node')
|
||||
self.hdfs_gateway_confs = self._load_and_init_configs(
|
||||
'hdfs-gateway.json', 'HDFS_GATEWAY', 'node')
|
||||
self.journalnode_confs = self._load_and_init_configs(
|
||||
'hdfs-journalnode.json', 'JOURNALNODE', 'node')
|
||||
|
||||
self.yarn_confs = self._load_and_init_configs(
|
||||
'yarn-service.json', 'YARN', 'cluster')
|
||||
self.resourcemanager_confs = self._load_and_init_configs(
|
||||
'yarn-resourcemanager.json', 'RESOURCEMANAGER', 'node')
|
||||
self.nodemanager_confs = self._load_and_init_configs(
|
||||
'yarn-nodemanager.json', 'NODEMANAGER', 'node')
|
||||
self.jobhistory_confs = self._load_and_init_configs(
|
||||
'yarn-jobhistory.json', 'JOBHISTORY', 'node')
|
||||
self.yarn_gateway_conf = self._load_and_init_configs(
|
||||
'yarn-gateway.json', 'YARN_GATEWAY', 'node')
|
||||
|
||||
self.oozie_service_confs = self._load_and_init_configs(
|
||||
'oozie-service.json', 'OOZIE', 'cluster')
|
||||
self.oozie_role_confs = self._load_and_init_configs(
|
||||
'oozie-oozie_server.json', 'OOZIE', 'node')
|
||||
|
||||
self.hive_service_confs = self._load_and_init_configs(
|
||||
'hive-service.json', 'HIVE', 'cluster')
|
||||
self.hive_metastore_confs = self._load_and_init_configs(
|
||||
'hive-hivemetastore.json', 'HIVEMETASTORE', 'node')
|
||||
self.hive_hiveserver_confs = self._load_and_init_configs(
|
||||
'hive-hiveserver2.json', 'HIVESERVER', 'node')
|
||||
self.hive_webhcat_confs = self._load_and_init_configs(
|
||||
'hive-webhcat.json', 'WEBHCAT', 'node')
|
||||
|
||||
self.hue_service_confs = self._load_and_init_configs(
|
||||
'hue-service.json', 'HUE', 'cluster')
|
||||
self.hue_role_confs = self._load_and_init_configs(
|
||||
'hue-hue_server.json', 'HUE', 'node')
|
||||
|
||||
self.spark_service_confs = self._load_and_init_configs(
|
||||
'spark-service.json', 'SPARK_ON_YARN', 'cluster')
|
||||
self.spark_role_confs = self._load_and_init_configs(
|
||||
'spark-spark_yarn_history_server.json', 'SPARK_ON_YARN', 'node')
|
||||
|
||||
self.zookeeper_server_confs = self._load_and_init_configs(
|
||||
'zookeeper-service.json', 'ZOOKEEPER', 'cluster')
|
||||
self.zookeeper_service_confs = self._load_and_init_configs(
|
||||
'zookeeper-server.json', 'ZOOKEEPER', 'node')
|
||||
|
||||
self.hbase_confs = self._load_and_init_configs(
|
||||
'hbase-service.json', 'HBASE', 'cluster')
|
||||
self.master_confs = self._load_and_init_configs(
|
||||
'hbase-master.json', 'MASTER', 'node')
|
||||
self.regionserver_confs = self._load_and_init_configs(
|
||||
'hbase-regionserver.json', 'REGIONSERVER', 'node')
|
||||
|
||||
self.flume_service_confs = self._load_and_init_configs(
|
||||
'flume-service.json', 'FLUME', 'cluster')
|
||||
self.flume_agent_confs = self._load_and_init_configs(
|
||||
'flume-agent.json', 'FLUME', 'node')
|
||||
|
||||
self.sentry_service_confs = self._load_and_init_configs(
|
||||
'sentry-service.json', 'SENTRY', 'cluster')
|
||||
self.sentry_server_confs = self._load_and_init_configs(
|
||||
'sentry-sentry_server.json', 'SENTRY', 'node')
|
||||
|
||||
self.solr_service_confs = self._load_and_init_configs(
|
||||
'solr-service.json', 'SOLR', 'cluster')
|
||||
self.solr_server_confs = self._load_and_init_configs(
|
||||
'solr-solr_server.json', 'SOLR', 'node')
|
||||
|
||||
self.sqoop_service_confs = self._load_and_init_configs(
|
||||
'sqoop-service.json', 'SQOOP', 'cluster')
|
||||
self.sqoop_server_confs = self._load_and_init_configs(
|
||||
'sqoop-sqoop_server.json', 'SQOOP', 'node')
|
||||
|
||||
self.ks_indexer_service_confs = self._load_and_init_configs(
|
||||
'ks_indexer-service.json', 'KS_INDEXER', 'cluster')
|
||||
self.ks_indexer_role_confs = self._load_and_init_configs(
|
||||
'ks_indexer-hbase_indexer.json', 'KS_INDEXER', 'node')
|
||||
|
||||
self.impala_service_confs = self._load_and_init_configs(
|
||||
'impala-service.json', 'IMPALA', 'cluster')
|
||||
self.impala_catalogserver_confs = self._load_and_init_configs(
|
||||
'impala-catalogserver.json', 'CATALOGSERVER', 'node')
|
||||
self.impala_impalad_confs = self._load_and_init_configs(
|
||||
'impala-impalad.json', 'IMPALAD', 'node')
|
||||
self.impala_statestore_confs = self._load_and_init_configs(
|
||||
'impala-statestore.json', 'STATESTORE', 'node')
|
||||
|
||||
self.kms_service_confs = self._load_and_init_configs(
|
||||
'kms-service.json', 'KMS', 'cluster')
|
||||
self.kms_kms_confs = self._load_and_init_configs(
|
||||
'kms-kms.json', 'KMS', 'node')
|
||||
|
||||
self.kafka_service = self._load_and_init_configs(
|
||||
'kafka-service.json', 'KAFKA', 'cluster')
|
||||
self.kafka_kafka_broker = self._load_and_init_configs(
|
||||
'kafka-kafka_broker.json', 'KAFKA', 'node')
|
||||
self.kafka_kafka_mirror_maker = self._load_and_init_configs(
|
||||
'kafka-kafka_mirror_maker.json', 'KAFKA', 'node')
|
||||
|
||||
def get_required_anti_affinity(self, cluster):
|
||||
return self._get_config_value(cluster, self.REQUIRE_ANTI_AFFINITY)
|
||||
|
||||
def get_kms_key_url(self, cluster):
|
||||
return self._get_config_value(cluster, self.KMS_REPO_KEY_URL)
|
||||
|
|
|
@ -21,49 +21,10 @@ from sahara.plugins import utils as gu
|
|||
from sahara.service.edp import hdfs_helper as h
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
|
||||
PACKAGES = [
|
||||
'cloudera-manager-agent',
|
||||
'cloudera-manager-daemons',
|
||||
'cloudera-manager-server',
|
||||
'cloudera-manager-server-db-2',
|
||||
'flume-ng',
|
||||
'hadoop-hdfs-datanode',
|
||||
'hadoop-hdfs-namenode',
|
||||
'hadoop-hdfs-secondarynamenode',
|
||||
'hadoop-kms'
|
||||
'hadoop-mapreduce',
|
||||
'hadoop-mapreduce-historyserver',
|
||||
'hadoop-yarn-nodemanager',
|
||||
'hadoop-yarn-resourcemanager',
|
||||
'hbase',
|
||||
'hbase-solr',
|
||||
'hive-hcatalog',
|
||||
'hive-metastore',
|
||||
'hive-server2',
|
||||
'hive-webhcat-server',
|
||||
'hue',
|
||||
'impala',
|
||||
'impala-server',
|
||||
'impala-state-store',
|
||||
'impala-catalog',
|
||||
'impala-shell',
|
||||
'kafka',
|
||||
'kafka-server'
|
||||
'keytrustee-keyprovider',
|
||||
'oozie',
|
||||
'oracle-j2sdk1.7',
|
||||
'sentry',
|
||||
'solr-server',
|
||||
'solr-doc',
|
||||
'search',
|
||||
'spark-history-server',
|
||||
'sqoop2',
|
||||
'unzip',
|
||||
'zookeeper'
|
||||
]
|
||||
|
||||
CU = cu.ClouderaUtilsV590()
|
||||
|
||||
PACKAGES = common_deploy.PACKAGES
|
||||
|
||||
|
||||
def configure_cluster(cluster):
|
||||
instances = gu.get_instances(cluster)
|
||||
|
@ -110,8 +71,8 @@ def scale_cluster(cluster, instances):
|
|||
CU.configure_rack_awareness(cluster)
|
||||
CU.configure_instances(instances, cluster)
|
||||
CU.update_configs(instances)
|
||||
common_deploy.prepare_scaling_kerberized_cluster(cluster, CU,
|
||||
instances)
|
||||
common_deploy.prepare_scaling_kerberized_cluster(
|
||||
cluster, CU, instances)
|
||||
|
||||
CU.pu.configure_swift(cluster, instances)
|
||||
_start_roles(cluster, instances)
|
||||
|
@ -203,40 +164,5 @@ def start_cluster(cluster):
|
|||
|
||||
|
||||
def get_open_ports(node_group):
|
||||
ports = [9000] # for CM agent
|
||||
|
||||
ports_map = {
|
||||
'CLOUDERA_MANAGER': [7180, 7182, 7183, 7432, 7184, 8084, 8086, 10101,
|
||||
9997, 9996, 8087, 9998, 9999, 8085, 9995, 9994],
|
||||
'HDFS_NAMENODE': [8020, 8022, 50070, 50470],
|
||||
'HDFS_SECONDARYNAMENODE': [50090, 50495],
|
||||
'HDFS_DATANODE': [50010, 1004, 50075, 1006, 50020],
|
||||
'YARN_RESOURCEMANAGER': [8030, 8031, 8032, 8033, 8088],
|
||||
'YARN_STANDBYRM': [8030, 8031, 8032, 8033, 8088],
|
||||
'YARN_NODEMANAGER': [8040, 8041, 8042],
|
||||
'YARN_JOBHISTORY': [10020, 19888],
|
||||
'HIVE_METASTORE': [9083],
|
||||
'HIVE_SERVER2': [10000],
|
||||
'HUE_SERVER': [8888],
|
||||
'OOZIE_SERVER': [11000, 11001],
|
||||
'SPARK_YARN_HISTORY_SERVER': [18088],
|
||||
'ZOOKEEPER_SERVER': [2181, 3181, 4181, 9010],
|
||||
'HBASE_MASTER': [60000],
|
||||
'HBASE_REGIONSERVER': [60020],
|
||||
'FLUME_AGENT': [41414],
|
||||
'SENTRY_SERVER': [8038],
|
||||
'SOLR_SERVER': [8983, 8984],
|
||||
'SQOOP_SERVER': [8005, 12000],
|
||||
'KEY_VALUE_STORE_INDEXER': [],
|
||||
'IMPALA_CATALOGSERVER': [25020, 26000],
|
||||
'IMPALA_STATESTORE': [25010, 24000],
|
||||
'IMPALAD': [21050, 21000, 23000, 25000, 28000, 22000],
|
||||
'KMS': [16000, 16001],
|
||||
'JOURNALNODE': [8480, 8481, 8485]
|
||||
}
|
||||
|
||||
for process in node_group.node_processes:
|
||||
if process in ports_map:
|
||||
ports.extend(ports_map[process])
|
||||
|
||||
ports = common_deploy.get_open_ports(node_group)
|
||||
return ports
|
||||
|
|
|
@ -26,13 +26,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
|
|||
super(EdpOozieEngine, self).__init__(cluster)
|
||||
self.cloudera_utils = cu.ClouderaUtilsV590()
|
||||
|
||||
def get_name_node_uri(self, cluster):
|
||||
if len(self.cloudera_utils.pu.get_jns(cluster)) > 0:
|
||||
return 'hdfs://%s' % self.cloudera_utils.NAME_SERVICE
|
||||
else:
|
||||
namenode_ip = self.cloudera_utils.pu.get_namenode(cluster).fqdn()
|
||||
return 'hdfs://%s:8020' % namenode_ip
|
||||
|
||||
@staticmethod
|
||||
def get_possible_job_config(job_type):
|
||||
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):
|
||||
|
|
|
@ -13,154 +13,11 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.conductor import resource as res
|
||||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh import plugin_utils as pu
|
||||
from sahara.plugins.cdh.v5_9_0 import config_helper
|
||||
from sahara.plugins import utils as u
|
||||
|
||||
|
||||
class PluginUtilsV590(pu.AbstractPluginUtils):
|
||||
|
||||
def __init__(self):
|
||||
self.c_helper = config_helper.ConfigHelperV590()
|
||||
|
||||
def get_role_name(self, instance, service):
|
||||
# NOTE: role name must match regexp "[_A-Za-z][-_A-Za-z0-9]{0,63}"
|
||||
shortcuts = {
|
||||
'AGENT': 'A',
|
||||
'ALERTPUBLISHER': 'AP',
|
||||
'CATALOGSERVER': 'ICS',
|
||||
'DATANODE': 'DN',
|
||||
'EVENTSERVER': 'ES',
|
||||
'HBASE_INDEXER': 'LHBI',
|
||||
'HIVEMETASTORE': 'HVM',
|
||||
'HIVESERVER2': 'HVS',
|
||||
'HOSTMONITOR': 'HM',
|
||||
'IMPALAD': 'ID',
|
||||
'JOBHISTORY': 'JS',
|
||||
'JOURNALNODE': 'JN',
|
||||
'KAFKA_BROKER': 'KB',
|
||||
'KMS': 'KMS',
|
||||
'MASTER': 'M',
|
||||
'NAMENODE': 'NN',
|
||||
'NODEMANAGER': 'NM',
|
||||
'OOZIE_SERVER': 'OS',
|
||||
'REGIONSERVER': 'RS',
|
||||
'RESOURCEMANAGER': 'RM',
|
||||
'SECONDARYNAMENODE': 'SNN',
|
||||
'SENTRY_SERVER': 'SNT',
|
||||
'SERVER': 'S',
|
||||
'SERVICEMONITOR': 'SM',
|
||||
'SOLR_SERVER': 'SLR',
|
||||
'SPARK_YARN_HISTORY_SERVER': 'SHS',
|
||||
'SQOOP_SERVER': 'S2S',
|
||||
'STATESTORE': 'ISS',
|
||||
'WEBHCAT': 'WHC',
|
||||
'HDFS_GATEWAY': 'HG',
|
||||
'YARN_GATEWAY': 'YG'
|
||||
}
|
||||
return '%s_%s' % (shortcuts.get(service, service),
|
||||
instance.hostname().replace('-', '_'))
|
||||
|
||||
def get_sentry(self, cluster):
|
||||
return u.get_instance(cluster, 'SENTRY_SERVER')
|
||||
|
||||
def get_flumes(self, cluster):
|
||||
return u.get_instances(cluster, 'FLUME_AGENT')
|
||||
|
||||
def get_solrs(self, cluster):
|
||||
return u.get_instances(cluster, 'SOLR_SERVER')
|
||||
|
||||
def get_sqoop(self, cluster):
|
||||
return u.get_instance(cluster, 'SQOOP_SERVER')
|
||||
|
||||
def get_hbase_indexers(self, cluster):
|
||||
return u.get_instances(cluster, 'KEY_VALUE_STORE_INDEXER')
|
||||
|
||||
def get_catalogserver(self, cluster):
|
||||
return u.get_instance(cluster, 'IMPALA_CATALOGSERVER')
|
||||
|
||||
def get_statestore(self, cluster):
|
||||
return u.get_instance(cluster, 'IMPALA_STATESTORE')
|
||||
|
||||
def get_impalads(self, cluster):
|
||||
return u.get_instances(cluster, 'IMPALAD')
|
||||
|
||||
def get_kms(self, cluster):
|
||||
return u.get_instances(cluster, 'KMS')
|
||||
|
||||
def get_jns(self, cluster):
|
||||
return u.get_instances(cluster, 'HDFS_JOURNALNODE')
|
||||
|
||||
def get_stdb_rm(self, cluster):
|
||||
return u.get_instance(cluster, 'YARN_STANDBYRM')
|
||||
|
||||
def get_kafka_brokers(self, cluster):
|
||||
return u.get_instances(cluster, 'KAFKA_BROKER')
|
||||
|
||||
def convert_process_configs(self, configs):
|
||||
p_dict = {
|
||||
"CLOUDERA": ['MANAGER'],
|
||||
"NAMENODE": ['NAMENODE'],
|
||||
"DATANODE": ['DATANODE'],
|
||||
"SECONDARYNAMENODE": ['SECONDARYNAMENODE'],
|
||||
"RESOURCEMANAGER": ['RESOURCEMANAGER'],
|
||||
"NODEMANAGER": ['NODEMANAGER'],
|
||||
"JOBHISTORY": ['JOBHISTORY'],
|
||||
"OOZIE": ['OOZIE_SERVER'],
|
||||
"HIVESERVER": ['HIVESERVER2'],
|
||||
"HIVEMETASTORE": ['HIVEMETASTORE'],
|
||||
"WEBHCAT": ['WEBHCAT'],
|
||||
"HUE": ['HUE_SERVER'],
|
||||
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
|
||||
"ZOOKEEPER": ['SERVER'],
|
||||
"MASTER": ['MASTER'],
|
||||
"REGIONSERVER": ['REGIONSERVER'],
|
||||
"FLUME": ['AGENT'],
|
||||
"CATALOGSERVER": ['CATALOGSERVER'],
|
||||
"STATESTORE": ['STATESTORE'],
|
||||
"IMPALAD": ['IMPALAD'],
|
||||
"KS_INDEXER": ['HBASE_INDEXER'],
|
||||
"SENTRY": ['SENTRY_SERVER'],
|
||||
"SOLR": ['SOLR_SERVER'],
|
||||
"SQOOP": ['SQOOP_SERVER'],
|
||||
"KMS": ['KMS'],
|
||||
"YARN_GATEWAY": ['YARN_GATEWAY'],
|
||||
"HDFS_GATEWAY": ['HDFS_GATEWAY'],
|
||||
"JOURNALNODE": ['JOURNALNODE'],
|
||||
"KAFKA": ['KAFKA_BROKER']
|
||||
}
|
||||
if isinstance(configs, res.Resource):
|
||||
configs = configs.to_dict()
|
||||
for k in configs.keys():
|
||||
if k in p_dict.keys():
|
||||
item = configs[k]
|
||||
del configs[k]
|
||||
newkey = p_dict[k][0]
|
||||
configs[newkey] = item
|
||||
return res.Resource(configs)
|
||||
|
||||
def configure_sentry(self, cluster):
|
||||
manager = self.get_manager(cluster)
|
||||
with manager.remote() as r:
|
||||
dh.create_sentry_database(cluster, r)
|
||||
|
||||
def _configure_repo_from_inst(self, instance):
|
||||
super(PluginUtilsV590, self)._configure_repo_from_inst(instance)
|
||||
|
||||
cluster = instance.cluster
|
||||
with instance.remote() as r:
|
||||
if cmd.is_ubuntu_os(r):
|
||||
kms_key = (
|
||||
self.c_helper.get_kms_key_url(cluster) or
|
||||
self.c_helper.DEFAULT_KEY_TRUSTEE_UBUNTU_REPO_KEY_URL)
|
||||
kms_repo_url = self.c_helper.KEY_TRUSTEE_UBUNTU_REPO_URL
|
||||
cmd.add_ubuntu_repository(r, kms_repo_url, 'kms')
|
||||
cmd.add_apt_key(r, kms_key)
|
||||
cmd.update_repository(r)
|
||||
if cmd.is_centos_os(r):
|
||||
kms_repo_url = self.c_helper.KEY_TRUSTEE_CENTOS_REPO_URL
|
||||
cmd.add_centos_repository(r, kms_repo_url, 'kms')
|
||||
cmd.update_repository(r)
|
||||
|
|
|
@ -13,220 +13,9 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins.cdh.v5_9_0 import plugin_utils as pu
|
||||
from sahara.plugins.cdh import validation
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins import utils as u
|
||||
|
||||
|
||||
class ValidatorV590(validation.Validator):
|
||||
PU = pu.PluginUtilsV590()
|
||||
|
||||
@classmethod
|
||||
def validate_cluster_creating(cls, cluster):
|
||||
super(ValidatorV590, cls).validate_cluster_creating(cluster)
|
||||
cls._hdfs_ha_validation(cluster)
|
||||
cls._yarn_ha_validation(cluster)
|
||||
cls._flume_validation(cluster)
|
||||
cls._sentry_validation(cluster)
|
||||
cls._solr_validation(cluster)
|
||||
cls._sqoop_validation(cluster)
|
||||
cls._hbase_indexer_validation(cluster)
|
||||
cls._impala_validation(cluster)
|
||||
cls._kms_validation(cluster)
|
||||
|
||||
@classmethod
|
||||
def _hdfs_ha_validation(cls, cluster):
|
||||
jn_count = cls._get_inst_count(cluster, 'HDFS_JOURNALNODE')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
|
||||
cluster)
|
||||
|
||||
if jn_count > 0:
|
||||
if jn_count < 3:
|
||||
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||
_('not less than 3'),
|
||||
jn_count)
|
||||
if not jn_count % 2:
|
||||
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||
_('be odd'), jn_count)
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException('ZOOKEEPER',
|
||||
required_by='HDFS HA')
|
||||
if require_anti_affinity:
|
||||
if 'HDFS_SECONDARYNAMENODE' not in\
|
||||
cls._get_anti_affinity(cluster):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
_('HDFS_SECONDARYNAMENODE should be enabled '
|
||||
'in anti_affinity.'))
|
||||
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
|
||||
|
||||
@classmethod
|
||||
def _yarn_ha_validation(cls, cluster):
|
||||
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
stdb_rm_count = cls._get_inst_count(cluster, 'YARN_STANDBYRM')
|
||||
|
||||
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
|
||||
cluster)
|
||||
|
||||
if stdb_rm_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
|
||||
if stdb_rm_count > 0:
|
||||
if rm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_RESOURCEMANAGER', required_by='RM HA')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='RM HA')
|
||||
if require_anti_affinity:
|
||||
if 'YARN_RESOURCEMANAGER' not in\
|
||||
cls._get_anti_affinity(cluster):
|
||||
raise ex.ResourceManagerHAConfigurationError(
|
||||
_('YARN_RESOURCEMANAGER should be enabled in '
|
||||
'anti_affinity.'))
|
||||
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
|
||||
raise ex.ResourceManagerHAConfigurationError(
|
||||
_('YARN_STANDBYRM should be'
|
||||
' enabled in anti_affinity.'))
|
||||
|
||||
@classmethod
|
||||
def _flume_validation(cls, cluster):
|
||||
a_count = cls._get_inst_count(cluster, 'FLUME_AGENT')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
|
||||
if a_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='FLUME_AGENT')
|
||||
|
||||
@classmethod
|
||||
def _sentry_validation(cls, cluster):
|
||||
|
||||
snt_count = cls._get_inst_count(cluster, 'SENTRY_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
if snt_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'SENTRY_SERVER', _('0 or 1'), snt_count)
|
||||
if snt_count == 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SENTRY_SERVER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='SENTRY_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _solr_validation(cls, cluster):
|
||||
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
|
||||
if slr_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SOLR_SERVER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='SOLR_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _sqoop_validation(cls, cluster):
|
||||
|
||||
s2s_count = cls._get_inst_count(cluster, 'SQOOP_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
|
||||
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
|
||||
|
||||
if s2s_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'SQOOP_SERVER', _('0 or 1'), s2s_count)
|
||||
if s2s_count == 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SQOOP_SERVER')
|
||||
if nm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
|
||||
if hs_count != 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _hbase_indexer_validation(cls, cluster):
|
||||
|
||||
lhbi_count = cls._get_inst_count(cluster, 'HBASE_INDEXER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
slr_count = cls._get_inst_count(cluster, 'SOLR_SERVER')
|
||||
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
|
||||
|
||||
if lhbi_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='HBASE_INDEXER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='HBASE_INDEXER')
|
||||
if slr_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'SOLR_SERVER', required_by='HBASE_INDEXER')
|
||||
if hbm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HBASE_MASTER', required_by='HBASE_INDEXER')
|
||||
|
||||
@classmethod
|
||||
def _impala_validation(cls, cluster):
|
||||
ics_count = cls._get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
|
||||
iss_count = cls._get_inst_count(cluster, 'IMPALA_STATESTORE')
|
||||
id_count = cls._get_inst_count(cluster, 'IMPALAD')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
|
||||
|
||||
if ics_count > 1:
|
||||
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
|
||||
_('0 or 1'), ics_count)
|
||||
if iss_count > 1:
|
||||
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
|
||||
_('0 or 1'), iss_count)
|
||||
if ics_count == 1:
|
||||
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
|
||||
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
|
||||
datanodes = set(ng.id for ng in datanode_ng)
|
||||
impalads = set(ng.id for ng in impalad_ng)
|
||||
|
||||
if datanodes != impalads:
|
||||
raise ex.InvalidClusterTopology(
|
||||
_("IMPALAD must be installed on every HDFS_DATANODE"))
|
||||
|
||||
if iss_count != 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'IMPALA_STATESTORE', required_by='IMPALA')
|
||||
if id_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'IMPALAD', required_by='IMPALA')
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='IMPALA')
|
||||
if hms_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HIVE_METASTORE', required_by='IMPALA')
|
||||
|
||||
@classmethod
|
||||
def _kms_validation(cls, cluster):
|
||||
|
||||
kms_count = cls._get_inst_count(cluster, 'KMS')
|
||||
if kms_count > 1:
|
||||
raise ex.InvalidComponentCountException('KMS',
|
||||
_('0 or 1'), kms_count)
|
||||
|
||||
@classmethod
|
||||
def _get_anti_affinity(cls, cluster):
|
||||
return cluster.anti_affinity
|
||||
|
|
|
@ -21,7 +21,6 @@ from sahara.plugins.cdh.v5_9_0 import deploy
|
|||
from sahara.plugins.cdh.v5_9_0 import edp_engine
|
||||
from sahara.plugins.cdh.v5_9_0 import plugin_utils
|
||||
from sahara.plugins.cdh.v5_9_0 import validation
|
||||
from sahara.plugins import kerberos
|
||||
|
||||
|
||||
class VersionHandler(avm.BaseVersionHandler):
|
||||
|
@ -34,67 +33,3 @@ class VersionHandler(avm.BaseVersionHandler):
|
|||
self.deploy = deploy
|
||||
self.edp_engine = edp_engine
|
||||
self.validation = validation.ValidatorV590()
|
||||
|
||||
def get_plugin_configs(self):
|
||||
result = super(VersionHandler, self).get_plugin_configs()
|
||||
result.extend(kerberos.get_config_list())
|
||||
return result
|
||||
|
||||
def get_node_processes(self):
|
||||
return {
|
||||
"CLOUDERA": ['CLOUDERA_MANAGER'],
|
||||
"HDFS": ['HDFS_NAMENODE', 'HDFS_DATANODE',
|
||||
'HDFS_SECONDARYNAMENODE', 'HDFS_JOURNALNODE'],
|
||||
"YARN": ['YARN_RESOURCEMANAGER', 'YARN_NODEMANAGER',
|
||||
'YARN_JOBHISTORY', 'YARN_STANDBYRM'],
|
||||
"OOZIE": ['OOZIE_SERVER'],
|
||||
"HIVE": ['HIVE_SERVER2', 'HIVE_METASTORE', 'HIVE_WEBHCAT'],
|
||||
"HUE": ['HUE_SERVER'],
|
||||
"SPARK_ON_YARN": ['SPARK_YARN_HISTORY_SERVER'],
|
||||
"ZOOKEEPER": ['ZOOKEEPER_SERVER'],
|
||||
"HBASE": ['HBASE_MASTER', 'HBASE_REGIONSERVER'],
|
||||
"FLUME": ['FLUME_AGENT'],
|
||||
"IMPALA": ['IMPALA_CATALOGSERVER', 'IMPALA_STATESTORE', 'IMPALAD'],
|
||||
"KS_INDEXER": ['KEY_VALUE_STORE_INDEXER'],
|
||||
"SOLR": ['SOLR_SERVER'],
|
||||
"SQOOP": ['SQOOP_SERVER'],
|
||||
"SENTRY": ['SENTRY_SERVER'],
|
||||
"KMS": ['KMS'],
|
||||
"KAFKA": ['KAFKA_BROKER'],
|
||||
|
||||
"YARN_GATEWAY": [],
|
||||
"RESOURCEMANAGER": [],
|
||||
"NODEMANAGER": [],
|
||||
"JOBHISTORY": [],
|
||||
|
||||
"HDFS_GATEWAY": [],
|
||||
'DATANODE': [],
|
||||
'NAMENODE': [],
|
||||
'SECONDARYNAMENODE': [],
|
||||
'JOURNALNODE': [],
|
||||
|
||||
'REGIONSERVER': [],
|
||||
'MASTER': [],
|
||||
|
||||
'HIVEMETASTORE': [],
|
||||
'HIVESERVER': [],
|
||||
'WEBCAT': [],
|
||||
|
||||
'CATALOGSERVER': [],
|
||||
'STATESTORE': [],
|
||||
'IMPALAD': [],
|
||||
'Kerberos': [],
|
||||
}
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
oozie_type = self.edp_engine.EdpOozieEngine.get_supported_job_types()
|
||||
spark_type = self.edp_engine.EdpSparkEngine.get_supported_job_types()
|
||||
if job_type in oozie_type:
|
||||
return self.edp_engine.EdpOozieEngine(cluster)
|
||||
if job_type in spark_type:
|
||||
return self.edp_engine.EdpSparkEngine(cluster)
|
||||
return None
|
||||
|
||||
def get_edp_job_types(self):
|
||||
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
|
||||
edp_engine.EdpSparkEngine.get_supported_job_types())
|
||||
|
|
|
@ -30,24 +30,35 @@ class Validator(object):
|
|||
cls._hue_validation(cluster)
|
||||
cls._hbase_validation(cluster)
|
||||
|
||||
cls._flume_validation(cluster)
|
||||
cls._sentry_validation(cluster)
|
||||
cls._solr_validation(cluster)
|
||||
cls._sqoop_validation(cluster)
|
||||
cls._hbase_indexer_validation(cluster)
|
||||
cls._impala_validation(cluster)
|
||||
cls._kms_validation(cluster)
|
||||
|
||||
cls._hdfs_ha_validation(cluster)
|
||||
cls._yarn_ha_validation(cluster)
|
||||
|
||||
@classmethod
|
||||
def _basic_validation(cls, cluster):
|
||||
|
||||
mng_count = cls._get_inst_count(cluster, 'CLOUDERA_MANAGER')
|
||||
mng_count = cls.get_inst_count(cluster, 'CLOUDERA_MANAGER')
|
||||
if mng_count != 1:
|
||||
raise ex.InvalidComponentCountException('CLOUDERA_MANAGER',
|
||||
1, mng_count)
|
||||
|
||||
nn_count = cls._get_inst_count(cluster, 'HDFS_NAMENODE')
|
||||
nn_count = cls.get_inst_count(cluster, 'HDFS_NAMENODE')
|
||||
if nn_count != 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'HDFS_NAMENODE', 1, nn_count)
|
||||
|
||||
snn_count = cls._get_inst_count(cluster, 'HDFS_SECONDARYNAMENODE')
|
||||
snn_count = cls.get_inst_count(cluster, 'HDFS_SECONDARYNAMENODE')
|
||||
if snn_count != 1:
|
||||
raise ex.InvalidComponentCountException('HDFS_SECONDARYNAMENODE',
|
||||
1, snn_count)
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
replicas = cls.PU.get_config_value('HDFS', 'dfs_replication', cluster)
|
||||
if dn_count < replicas:
|
||||
raise ex.InvalidComponentCountException(
|
||||
|
@ -55,12 +66,12 @@ class Validator(object):
|
|||
_('Number of datanodes must be not'
|
||||
' less than dfs_replication.'))
|
||||
|
||||
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
rm_count = cls.get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
if rm_count > 1:
|
||||
raise ex.InvalidComponentCountException('YARN_RESOURCEMANAGER',
|
||||
_('0 or 1'), rm_count)
|
||||
|
||||
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
|
||||
hs_count = cls.get_inst_count(cluster, 'YARN_JOBHISTORY')
|
||||
if hs_count > 1:
|
||||
raise ex.InvalidComponentCountException('YARN_JOBHISTORY',
|
||||
_('0 or 1'),
|
||||
|
@ -70,7 +81,7 @@ class Validator(object):
|
|||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_JOBHISTORY', required_by='YARN_RESOURCEMANAGER')
|
||||
|
||||
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
|
||||
nm_count = cls.get_inst_count(cluster, 'YARN_NODEMANAGER')
|
||||
if rm_count == 0:
|
||||
if nm_count > 0:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
|
@ -79,10 +90,10 @@ class Validator(object):
|
|||
@classmethod
|
||||
def _oozie_validation(cls, cluster):
|
||||
|
||||
oo_count = cls._get_inst_count(cluster, 'OOZIE_SERVER')
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
nm_count = cls._get_inst_count(cluster, 'YARN_NODEMANAGER')
|
||||
hs_count = cls._get_inst_count(cluster, 'YARN_JOBHISTORY')
|
||||
oo_count = cls.get_inst_count(cluster, 'OOZIE_SERVER')
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
nm_count = cls.get_inst_count(cluster, 'YARN_NODEMANAGER')
|
||||
hs_count = cls.get_inst_count(cluster, 'YARN_JOBHISTORY')
|
||||
|
||||
if oo_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
|
@ -103,10 +114,10 @@ class Validator(object):
|
|||
|
||||
@classmethod
|
||||
def _hive_validation(cls, cluster):
|
||||
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
|
||||
hvs_count = cls._get_inst_count(cluster, 'HIVE_SERVER2')
|
||||
whc_count = cls._get_inst_count(cluster, 'HIVE_WEBHCAT')
|
||||
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
hms_count = cls.get_inst_count(cluster, 'HIVE_METASTORE')
|
||||
hvs_count = cls.get_inst_count(cluster, 'HIVE_SERVER2')
|
||||
whc_count = cls.get_inst_count(cluster, 'HIVE_WEBHCAT')
|
||||
rm_count = cls.get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
|
||||
if hms_count and rm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
|
@ -126,15 +137,15 @@ class Validator(object):
|
|||
|
||||
@classmethod
|
||||
def _hue_validation(cls, cluster):
|
||||
hue_count = cls._get_inst_count(cluster, 'HUE_SERVER')
|
||||
hue_count = cls.get_inst_count(cluster, 'HUE_SERVER')
|
||||
if hue_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'HUE_SERVER', _('0 or 1'), hue_count)
|
||||
|
||||
shs_count = cls._get_inst_count(cluster, 'SPARK_YARN_HISTORY_SERVER')
|
||||
hms_count = cls._get_inst_count(cluster, 'HIVE_METASTORE')
|
||||
oo_count = cls._get_inst_count(cluster, 'OOZIE_SERVER')
|
||||
rm_count = cls._get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
shs_count = cls.get_inst_count(cluster, 'SPARK_YARN_HISTORY_SERVER')
|
||||
hms_count = cls.get_inst_count(cluster, 'HIVE_METASTORE')
|
||||
oo_count = cls.get_inst_count(cluster, 'OOZIE_SERVER')
|
||||
rm_count = cls.get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
|
||||
if shs_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
|
@ -155,9 +166,9 @@ class Validator(object):
|
|||
|
||||
@classmethod
|
||||
def _hbase_validation(cls, cluster):
|
||||
hbm_count = cls._get_inst_count(cluster, 'HBASE_MASTER')
|
||||
hbr_count = cls._get_inst_count(cluster, 'HBASE_REGIONSERVER')
|
||||
zk_count = cls._get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
hbm_count = cls.get_inst_count(cluster, 'HBASE_MASTER')
|
||||
hbr_count = cls.get_inst_count(cluster, 'HBASE_REGIONSERVER')
|
||||
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
if hbm_count == 1:
|
||||
if zk_count < 1:
|
||||
|
@ -208,7 +219,7 @@ class Validator(object):
|
|||
ng.name,
|
||||
msg % {'processes': ' '.join(ng.node_processes)})
|
||||
|
||||
dn_count = cls._get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE') - dn_to_delete
|
||||
replicas = cls.PU.get_config_value('HDFS', 'dfs_replication', cluster)
|
||||
if dn_count < replicas:
|
||||
raise ex.ClusterCannotBeScaled(
|
||||
|
@ -220,5 +231,200 @@ class Validator(object):
|
|||
return ['HDFS_DATANODE', 'YARN_NODEMANAGER']
|
||||
|
||||
@classmethod
|
||||
def _get_inst_count(cls, cluster, process):
|
||||
def _flume_validation(cls, cluster):
|
||||
a_count = cls.get_inst_count(cluster, 'FLUME_AGENT')
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
|
||||
if a_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='FLUME_AGENT')
|
||||
|
||||
@classmethod
|
||||
def _sentry_validation(cls, cluster):
|
||||
|
||||
snt_count = cls.get_inst_count(cluster, 'SENTRY_SERVER')
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
if snt_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'SENTRY_SERVER', _('0 or 1'), snt_count)
|
||||
if snt_count == 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SENTRY_SERVER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='SENTRY_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _solr_validation(cls, cluster):
|
||||
slr_count = cls.get_inst_count(cluster, 'SOLR_SERVER')
|
||||
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
|
||||
if slr_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SOLR_SERVER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='SOLR_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _sqoop_validation(cls, cluster):
|
||||
|
||||
s2s_count = cls.get_inst_count(cluster, 'SQOOP_SERVER')
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
hs_count = cls.get_inst_count(cluster, 'YARN_JOBHISTORY')
|
||||
nm_count = cls.get_inst_count(cluster, 'YARN_NODEMANAGER')
|
||||
|
||||
if s2s_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'SQOOP_SERVER', _('0 or 1'), s2s_count)
|
||||
if s2s_count == 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='SQOOP_SERVER')
|
||||
if nm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_NODEMANAGER', required_by='SQOOP_SERVER')
|
||||
if hs_count != 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_JOBHISTORY', required_by='SQOOP_SERVER')
|
||||
|
||||
@classmethod
|
||||
def _hbase_indexer_validation(cls, cluster):
|
||||
|
||||
lhbi_count = cls.get_inst_count(cluster, 'HBASE_INDEXER')
|
||||
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
slr_count = cls.get_inst_count(cluster, 'SOLR_SERVER')
|
||||
hbm_count = cls.get_inst_count(cluster, 'HBASE_MASTER')
|
||||
|
||||
if lhbi_count >= 1:
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='HBASE_INDEXER')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='HBASE_INDEXER')
|
||||
if slr_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'SOLR_SERVER', required_by='HBASE_INDEXER')
|
||||
if hbm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HBASE_MASTER', required_by='HBASE_INDEXER')
|
||||
|
||||
@classmethod
|
||||
def _impala_validation(cls, cluster):
|
||||
ics_count = cls.get_inst_count(cluster, 'IMPALA_CATALOGSERVER')
|
||||
iss_count = cls.get_inst_count(cluster, 'IMPALA_STATESTORE')
|
||||
id_count = cls.get_inst_count(cluster, 'IMPALAD')
|
||||
dn_count = cls.get_inst_count(cluster, 'HDFS_DATANODE')
|
||||
hms_count = cls.get_inst_count(cluster, 'HIVE_METASTORE')
|
||||
|
||||
if ics_count > 1:
|
||||
raise ex.InvalidComponentCountException('IMPALA_CATALOGSERVER',
|
||||
_('0 or 1'), ics_count)
|
||||
if iss_count > 1:
|
||||
raise ex.InvalidComponentCountException('IMPALA_STATESTORE',
|
||||
_('0 or 1'), iss_count)
|
||||
if ics_count == 1:
|
||||
datanode_ng = u.get_node_groups(cluster, "HDFS_DATANODE")
|
||||
impalad_ng = u.get_node_groups(cluster, "IMPALAD")
|
||||
datanodes = set(ng.id for ng in datanode_ng)
|
||||
impalads = set(ng.id for ng in impalad_ng)
|
||||
|
||||
if datanodes != impalads:
|
||||
raise ex.InvalidClusterTopology(
|
||||
_("IMPALAD must be installed on every HDFS_DATANODE"))
|
||||
|
||||
if iss_count != 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'IMPALA_STATESTORE', required_by='IMPALA')
|
||||
if id_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'IMPALAD', required_by='IMPALA')
|
||||
if dn_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HDFS_DATANODE', required_by='IMPALA')
|
||||
if hms_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'HIVE_METASTORE', required_by='IMPALA')
|
||||
|
||||
@classmethod
|
||||
def _kms_validation(cls, cluster):
|
||||
|
||||
kms_count = cls.get_inst_count(cluster, 'KMS')
|
||||
if kms_count > 1:
|
||||
raise ex.InvalidComponentCountException('KMS',
|
||||
_('0 or 1'), kms_count)
|
||||
|
||||
@classmethod
|
||||
def _hdfs_ha_validation(cls, cluster):
|
||||
jn_count = cls.get_inst_count(cluster, 'HDFS_JOURNALNODE')
|
||||
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
|
||||
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
|
||||
cluster)
|
||||
|
||||
if jn_count > 0:
|
||||
if jn_count < 3:
|
||||
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||
_('not less than 3'),
|
||||
jn_count)
|
||||
if not jn_count % 2:
|
||||
raise ex.InvalidComponentCountException('HDFS_JOURNALNODE',
|
||||
_('be odd'), jn_count)
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException('ZOOKEEPER',
|
||||
required_by='HDFS HA')
|
||||
if require_anti_affinity:
|
||||
if 'HDFS_SECONDARYNAMENODE' not in \
|
||||
cls._get_anti_affinity(cluster):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
_('HDFS_SECONDARYNAMENODE should be enabled '
|
||||
'in anti_affinity.'))
|
||||
if 'HDFS_NAMENODE' not in cls._get_anti_affinity(cluster):
|
||||
raise ex.NameNodeHAConfigurationError(
|
||||
_('HDFS_NAMENODE should be enabled in anti_affinity.'))
|
||||
|
||||
@classmethod
|
||||
def _yarn_ha_validation(cls, cluster):
|
||||
rm_count = cls.get_inst_count(cluster, 'YARN_RESOURCEMANAGER')
|
||||
zk_count = cls.get_inst_count(cluster, 'ZOOKEEPER_SERVER')
|
||||
stdb_rm_count = cls.get_inst_count(cluster, 'YARN_STANDBYRM')
|
||||
|
||||
require_anti_affinity = cls.PU.c_helper.get_required_anti_affinity(
|
||||
cluster)
|
||||
|
||||
if stdb_rm_count > 1:
|
||||
raise ex.InvalidComponentCountException(
|
||||
'YARN_STANDBYRM', _('0 or 1'), stdb_rm_count)
|
||||
if stdb_rm_count > 0:
|
||||
if rm_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'YARN_RESOURCEMANAGER', required_by='RM HA')
|
||||
if zk_count < 1:
|
||||
raise ex.RequiredServiceMissingException(
|
||||
'ZOOKEEPER', required_by='RM HA')
|
||||
if require_anti_affinity:
|
||||
if 'YARN_RESOURCEMANAGER' not in \
|
||||
cls._get_anti_affinity(cluster):
|
||||
raise ex.ResourceManagerHAConfigurationError(
|
||||
_('YARN_RESOURCEMANAGER should be enabled in '
|
||||
'anti_affinity.'))
|
||||
if 'YARN_STANDBYRM' not in cls._get_anti_affinity(cluster):
|
||||
raise ex.ResourceManagerHAConfigurationError(
|
||||
_('YARN_STANDBYRM should be'
|
||||
' enabled in anti_affinity.'))
|
||||
|
||||
@classmethod
|
||||
def _get_anti_affinity(cls, cluster):
|
||||
return cluster.anti_affinity
|
||||
|
||||
@classmethod
|
||||
def get_inst_count(cls, cluster, process):
|
||||
return sum([ng.count for ng in u.get_node_groups(cluster, process)])
|
||||
|
|
Loading…
Reference in New Issue