245 lines
9.7 KiB
Python
245 lines
9.7 KiB
Python
# Copyright (c) 2013 Mirantis Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from savanna.openstack.common import log as logging
|
|
from savanna.plugins import provisioning as p
|
|
from savanna.swift import swift_helper as swift
|
|
from savanna.utils import xmlutils as x
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
CORE_DEFAULT = x.load_hadoop_xml_defaults(
|
|
'plugins/vanilla/resources/core-default.xml')
|
|
|
|
HDFS_DEFAULT = x.load_hadoop_xml_defaults(
|
|
'plugins/vanilla/resources/hdfs-default.xml')
|
|
|
|
MAPRED_DEFAULT = x.load_hadoop_xml_defaults(
|
|
'plugins/vanilla/resources/mapred-default.xml')
|
|
|
|
XML_CONFS = {
|
|
"HDFS": [CORE_DEFAULT, HDFS_DEFAULT],
|
|
"MapReduce": [MAPRED_DEFAULT]
|
|
}
|
|
|
|
# TODO(aignatov): Environmental configs could be more complex
|
|
ENV_CONFS = {
|
|
"MapReduce": {
|
|
'Job Tracker Heap Size': 'HADOOP_JOBTRACKER_OPTS=\\"-Xmx%sm\\"',
|
|
'Task Tracker Heap Size': 'HADOOP_TASKTRACKER_OPTS=\\"-Xmx%sm\\"'
|
|
},
|
|
"HDFS": {
|
|
'Name Node Heap Size': 'HADOOP_NAMENODE_OPTS=\\"-Xmx%sm\\"',
|
|
'Data Node Heap Size': 'HADOOP_DATANODE_OPTS=\\"-Xmx%sm\\"'
|
|
}
|
|
}
|
|
|
|
ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
|
|
config_type="bool", priority=1,
|
|
default_value=True, is_optional=True)
|
|
|
|
HIDDEN_CONFS = ['fs.default.name', 'dfs.name.dir', 'dfs.data.dir',
|
|
'mapred.job.tracker', 'mapred.system.dir', 'mapred.local.dir']
|
|
|
|
CLUSTER_WIDE_CONFS = ['dfs.block.size', 'dfs.permissions', 'dfs.replication',
|
|
'dfs.replication.min', 'dfs.replication.max',
|
|
'io.file.buffer.size', 'mapreduce.job.counters.max',
|
|
'mapred.output.compress', 'io.compression.codecs',
|
|
'mapred.output.compression.codec',
|
|
'mapred.output.compression.type',
|
|
'mapred.compress.map.output',
|
|
'mapred.map.output.compression.codec']
|
|
|
|
PRIORITY_1_CONFS = ['dfs.datanode.du.reserved',
|
|
'dfs.datanode.failed.volumes.tolerated',
|
|
'dfs.datanode.max.xcievers', 'dfs.datanode.handler.count',
|
|
'dfs.namenode.handler.count', 'mapred.child.java.opts',
|
|
'mapred.jobtracker.maxtasks.per.job',
|
|
'mapred.job.tracker.handler.count',
|
|
'mapred.map.child.java.opts',
|
|
'mapred.reduce.child.java.opts',
|
|
'io.sort.mb', 'mapred.tasktracker.map.tasks.maximum',
|
|
'mapred.tasktracker.reduce.tasks.maximum']
|
|
|
|
# for now we have not so many cluster-wide configs
|
|
# lets consider all of them having high priority
|
|
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
|
|
|
|
|
|
def _initialise_configs():
|
|
configs = []
|
|
for service, config_lists in XML_CONFS.iteritems():
|
|
for config_list in config_lists:
|
|
for config in config_list:
|
|
if config['name'] not in HIDDEN_CONFS:
|
|
cfg = p.Config(config['name'], service, "node",
|
|
is_optional=True, config_type="string",
|
|
default_value=str(config['value']),
|
|
description=config['description'])
|
|
if cfg.default_value in ["true", "false"]:
|
|
cfg.config_type = "bool"
|
|
cfg.default_value = (cfg.default_value == 'true')
|
|
if str(cfg.default_value).isdigit():
|
|
cfg.config_type = "int"
|
|
cfg.default_value = int(cfg.default_value)
|
|
if config['name'] in CLUSTER_WIDE_CONFS:
|
|
cfg.scope = 'cluster'
|
|
if config['name'] in PRIORITY_1_CONFS:
|
|
cfg.priority = 1
|
|
configs.append(cfg)
|
|
|
|
for service, config_items in ENV_CONFS.iteritems():
|
|
for name, param_format_str in config_items.iteritems():
|
|
configs.append(p.Config(name, service, "node",
|
|
default_value=1024, priority=1,
|
|
config_type="int"))
|
|
|
|
configs.append(ENABLE_SWIFT)
|
|
|
|
return configs
|
|
|
|
# Initialise plugin Hadoop configurations
|
|
PLUGIN_CONFIGS = _initialise_configs()
|
|
|
|
|
|
def get_plugin_configs():
|
|
return PLUGIN_CONFIGS
|
|
|
|
|
|
def generate_xml_configs(configs, storage_path, nn_hostname, jt_hostname=None):
|
|
# inserting common configs depends on provisioned VMs and HDFS placement
|
|
# TODO(aignatov): should be moved to cluster context
|
|
cfg = {
|
|
'fs.default.name': 'hdfs://%s:8020' % nn_hostname,
|
|
'dfs.name.dir': extract_hadoop_path(storage_path,
|
|
'/lib/hadoop/hdfs/namenode'),
|
|
'dfs.data.dir': extract_hadoop_path(storage_path,
|
|
'/lib/hadoop/hdfs/datanode'),
|
|
'dfs.hosts': '/etc/hadoop/dn.incl',
|
|
'dfs.hosts.exclude': '/etc/hadoop/dn.excl',
|
|
}
|
|
|
|
if jt_hostname:
|
|
mr_cfg = {
|
|
'mapred.job.tracker': '%s:8021' % jt_hostname,
|
|
'mapred.system.dir': extract_hadoop_path(storage_path,
|
|
'/mapred/mapredsystem'),
|
|
'mapred.local.dir': extract_hadoop_path(storage_path,
|
|
'/lib/hadoop/mapred'),
|
|
'mapred.hosts': '/etc/hadoop/tt.incl',
|
|
'mapred.hosts.exclude': '/etc/hadoop/tt.excl',
|
|
}
|
|
cfg.update(mr_cfg)
|
|
|
|
# inserting user-defined configs
|
|
for key, value in extract_xml_confs(configs):
|
|
cfg[key] = value
|
|
|
|
# applying swift configs if user enabled it
|
|
swift_xml_confs = []
|
|
#TODO(aignatov): should be changed. General configs not only Swift
|
|
swift_in_config = False
|
|
if ('general' in configs and
|
|
ENABLE_SWIFT.name in configs['general']):
|
|
swift_in_config = True
|
|
if ((swift_in_config and configs['general'][ENABLE_SWIFT.name]) or
|
|
(not swift_in_config and ENABLE_SWIFT.default_value)):
|
|
swift_xml_confs = swift.get_swift_configs()
|
|
cfg.update(extract_name_values(swift_xml_confs))
|
|
LOG.info("Swift integration is enabled")
|
|
|
|
# invoking applied configs to appropriate xml files
|
|
xml_configs = {
|
|
'core-site': x.create_hadoop_xml(cfg, CORE_DEFAULT + swift_xml_confs),
|
|
'mapred-site': x.create_hadoop_xml(cfg, MAPRED_DEFAULT),
|
|
'hdfs-site': x.create_hadoop_xml(cfg, HDFS_DEFAULT)
|
|
}
|
|
|
|
return xml_configs
|
|
|
|
|
|
def extract_environment_confs(configs):
|
|
"""Returns list of Hadoop parameters which should be passed via environment
|
|
"""
|
|
lst = []
|
|
for service, srv_confs in configs.items():
|
|
if ENV_CONFS.get(service):
|
|
for param_name, param_value in srv_confs.items():
|
|
for cfg_name, cfg_format_str in ENV_CONFS[service].items():
|
|
if param_name == cfg_name and param_value is not None:
|
|
lst.append(cfg_format_str % param_value)
|
|
else:
|
|
LOG.warn("Plugin recieved wrong applicable target '%s' in "
|
|
"environmental configs" % service)
|
|
return lst
|
|
|
|
|
|
def extract_xml_confs(configs):
|
|
"""Returns list of Hadoop parameters which should be passed into general
|
|
configs like core-site.xml
|
|
"""
|
|
lst = []
|
|
for service, srv_confs in configs.items():
|
|
if XML_CONFS.get(service):
|
|
for param_name, param_value in srv_confs.items():
|
|
for cfg_list in XML_CONFS[service]:
|
|
names = [cfg['name'] for cfg in cfg_list]
|
|
if param_name in names and param_value is not None:
|
|
lst.append((param_name, param_value))
|
|
else:
|
|
LOG.warn("Plugin recieved wrong applicable target '%s' for "
|
|
"xml configs" % service)
|
|
return lst
|
|
|
|
|
|
def generate_setup_script(storage_paths, env_configs):
|
|
script_lines = ["#!/bin/bash -x"]
|
|
for line in env_configs:
|
|
script_lines.append('echo "%s" >> /tmp/hadoop-env.sh' % line)
|
|
script_lines.append("cat /etc/hadoop/hadoop-env.sh >> /tmp/hadoop-env.sh")
|
|
script_lines.append("mv /tmp/hadoop-env.sh /etc/hadoop/hadoop-env.sh")
|
|
|
|
hadoop_log = storage_paths[0] + "/log/hadoop/\$USER/"
|
|
script_lines.append('sed -i "s,export HADOOP_LOG_DIR=.*,'
|
|
'export HADOOP_LOG_DIR=%s," /etc/hadoop/hadoop-env.sh'
|
|
% hadoop_log)
|
|
|
|
hadoop_log = storage_paths[0] + "/log/hadoop/hdfs"
|
|
script_lines.append('sed -i "s,export HADOOP_SECURE_DN_LOG_DIR=.*,'
|
|
'export HADOOP_SECURE_DN_LOG_DIR=%s," '
|
|
'/etc/hadoop/hadoop-env.sh' % hadoop_log)
|
|
|
|
for path in storage_paths:
|
|
script_lines.append("chown -R hadoop:hadoop %s" % path)
|
|
script_lines.append("chmod -R 755 %s" % path)
|
|
return "\n".join(script_lines)
|
|
|
|
|
|
def extract_name_values(configs):
|
|
return dict((cfg['name'], cfg['value']) for cfg in configs)
|
|
|
|
|
|
def extract_hadoop_path(lst, hadoop_dir):
|
|
return ",".join([p + hadoop_dir for p in lst])
|
|
|
|
|
|
def determine_cluster_config(cluster, config_name):
|
|
if config_name in cluster.cluster_configs:
|
|
cluster.cluster_configs.get(config_name)
|
|
all_conf = get_plugin_configs()
|
|
for conf in all_conf:
|
|
if conf.name == config_name:
|
|
return conf.default_value
|