# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. from oslo_config import cfg from oslo_log import log as logging import six from sahara.plugins import provisioning as p from sahara.plugins import swift_helper as swift from sahara.plugins import topology_helper as topology from sahara.plugins import utils LOG = logging.getLogger(__name__) CONF = cfg.CONF CORE_DEFAULT = utils.load_hadoop_xml_defaults( 'plugins/spark/resources/core-default.xml', 'sahara_plugin_spark') HDFS_DEFAULT = utils.load_hadoop_xml_defaults( 'plugins/spark/resources/hdfs-default.xml', 'sahara_plugin_spark') SWIFT_DEFAULTS = swift.read_default_swift_configs() XML_CONFS = { "HDFS": [CORE_DEFAULT, HDFS_DEFAULT, SWIFT_DEFAULTS] } _default_executor_classpath = ":".join( ['/usr/lib/hadoop-mapreduce/hadoop-openstack.jar']) SPARK_CONFS = { 'Spark': { "OPTIONS": [ { 'name': 'Executor extra classpath', 'description': 'Value for spark.executor.extraClassPath' ' in spark-defaults.conf' ' (default: %s)' % _default_executor_classpath, 'default': '%s' % _default_executor_classpath, 'priority': 2, }, { 'name': 'Master port', 'description': 'Start the master on a different port' ' (default: 7077)', 'default': '7077', 'priority': 2, }, { 'name': 'Worker port', 'description': 'Start the Spark worker on a specific port' ' (default: random)', 'default': 'random', 'priority': 2, }, { 'name': 'Master webui port', 'description': 'Port for the master web UI (default: 8080)', 'default': '8080', 'priority': 1, }, { 'name': 'Worker webui port', 'description': 'Port for the worker web UI (default: 8081)', 'default': '8081', 'priority': 1, }, { 'name': 'Worker cores', 'description': 'Total number of cores to allow Spark' ' applications to use on the machine' ' (default: all available cores)', 'default': 'all', 'priority': 2, }, { 'name': 'Worker memory', 'description': 'Total amount of memory to allow Spark' ' applications to use on the machine, e.g. 1000m,' ' 2g (default: total memory minus 1 GB)', 'default': 'all', 'priority': 1, }, { 'name': 'Worker instances', 'description': 'Number of worker instances to run on each' ' machine (default: 1)', 'default': '1', 'priority': 2, }, { 'name': 'Spark home', 'description': 'The location of the spark installation' ' (default: /opt/spark)', 'default': '/opt/spark', 'priority': 2, }, { 'name': 'Minimum cleanup seconds', 'description': 'Job data will never be purged before this' ' amount of time elapses (default: 86400 = 1 day)', 'default': '86400', 'priority': 2, }, { 'name': 'Maximum cleanup seconds', 'description': 'Job data will always be purged after this' ' amount of time elapses (default: 1209600 = 14 days)', 'default': '1209600', 'priority': 2, }, { 'name': 'Minimum cleanup megabytes', 'description': 'No job data will be purged unless the total' ' job data exceeds this size (default: 4096 = 4GB)', 'default': '4096', 'priority': 2, }, ] } } HADOOP_CONF_DIR = "/etc/hadoop/conf" ENV_CONFS = { "HDFS": { 'Name Node Heap Size': 'HADOOP_NAMENODE_OPTS=\\"-Xmx%sm\\"', 'Data Node Heap Size': 'HADOOP_DATANODE_OPTS=\\"-Xmx%sm\\"' } } ENABLE_DATA_LOCALITY = p.Config('Enable Data Locality', 'general', 'cluster', config_type="bool", priority=1, default_value=True, is_optional=True) ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster', config_type="bool", priority=1, default_value=True, is_optional=False) DATANODES_STARTUP_TIMEOUT = p.Config( 'DataNodes startup timeout', 'general', 'cluster', config_type='int', priority=1, default_value=10800, is_optional=True, description='Timeout for DataNodes startup, in seconds') # Default set to 1 day, which is the default Keystone token # expiration time. After the token is expired we can't continue # scaling anyway. DECOMMISSIONING_TIMEOUT = p.Config('Decommissioning Timeout', 'general', 'cluster', config_type='int', priority=1, default_value=86400, is_optional=True, description='Timeout for datanode' ' decommissioning operation' ' during scaling, in seconds') HIDDEN_CONFS = ['fs.defaultFS', 'dfs.namenode.name.dir', 'dfs.datanode.data.dir'] CLUSTER_WIDE_CONFS = ['dfs.block.size', 'dfs.permissions', 'dfs.replication', 'dfs.replication.min', 'dfs.replication.max', 'io.file.buffer.size'] PRIORITY_1_CONFS = ['dfs.datanode.du.reserved', 'dfs.datanode.failed.volumes.tolerated', 'dfs.datanode.max.xcievers', 'dfs.datanode.handler.count', 'dfs.namenode.handler.count'] # for now we have not so many cluster-wide configs # lets consider all of them having high priority PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS def _initialise_configs(): configs = [] for service, config_lists in six.iteritems(XML_CONFS): for config_list in config_lists: for config in config_list: if config['name'] not in HIDDEN_CONFS: cfg = p.Config(config['name'], service, "node", is_optional=True, config_type="string", default_value=str(config['value']), description=config['description']) if cfg.default_value in ["true", "false"]: cfg.config_type = "bool" cfg.default_value = (cfg.default_value == 'true') elif utils.is_int(cfg.default_value): cfg.config_type = "int" cfg.default_value = int(cfg.default_value) if config['name'] in CLUSTER_WIDE_CONFS: cfg.scope = 'cluster' if config['name'] in PRIORITY_1_CONFS: cfg.priority = 1 configs.append(cfg) for service, config_items in six.iteritems(ENV_CONFS): for name, param_format_str in six.iteritems(config_items): configs.append(p.Config(name, service, "node", default_value=1024, priority=1, config_type="int")) for service, config_items in six.iteritems(SPARK_CONFS): for item in config_items['OPTIONS']: cfg = p.Config(name=item["name"], description=item["description"], default_value=item["default"], applicable_target=service, scope="cluster", is_optional=True, priority=item["priority"]) configs.append(cfg) configs.append(DECOMMISSIONING_TIMEOUT) configs.append(ENABLE_SWIFT) configs.append(DATANODES_STARTUP_TIMEOUT) if CONF.enable_data_locality: configs.append(ENABLE_DATA_LOCALITY) return configs # Initialise plugin Hadoop configurations PLUGIN_CONFIGS = _initialise_configs() def get_plugin_configs(): return PLUGIN_CONFIGS def generate_cfg_from_general(cfg, configs, general_config, rest_excluded=False): if 'general' in configs: for nm in general_config: if nm not in configs['general'] and not rest_excluded: configs['general'][nm] = general_config[nm]['default_value'] for name, value in configs['general'].items(): if value: cfg = _set_config(cfg, general_config, name) LOG.debug("Applying config: {name}".format(name=name)) else: cfg = _set_config(cfg, general_config) return cfg def _get_hostname(service): return service.hostname() if service else None def generate_xml_configs(configs, storage_path, nn_hostname, hadoop_port): if hadoop_port is None: hadoop_port = 8020 cfg = { 'fs.defaultFS': 'hdfs://%s:%s' % (nn_hostname, str(hadoop_port)), 'dfs.namenode.name.dir': extract_hadoop_path(storage_path, '/dfs/nn'), 'dfs.datanode.data.dir': extract_hadoop_path(storage_path, '/dfs/dn'), 'dfs.hosts': '/etc/hadoop/dn.incl', 'dfs.hosts.exclude': '/etc/hadoop/dn.excl' } # inserting user-defined configs for key, value in extract_hadoop_xml_confs(configs): cfg[key] = value # Add the swift defaults if they have not been set by the user swft_def = [] if is_swift_enabled(configs): swft_def = SWIFT_DEFAULTS swift_configs = extract_name_values(swift.get_swift_configs()) for key, value in six.iteritems(swift_configs): if key not in cfg: cfg[key] = value # invoking applied configs to appropriate xml files core_all = CORE_DEFAULT + swft_def if CONF.enable_data_locality: cfg.update(topology.TOPOLOGY_CONFIG) # applying vm awareness configs core_all += topology.vm_awareness_core_config() xml_configs = { 'core-site': utils.create_hadoop_xml(cfg, core_all), 'hdfs-site': utils.create_hadoop_xml(cfg, HDFS_DEFAULT) } return xml_configs def _get_spark_opt_default(opt_name): for opt in SPARK_CONFS["Spark"]["OPTIONS"]: if opt_name == opt["name"]: return opt["default"] return None def generate_spark_env_configs(cluster): configs = [] # master configuration sp_master = utils.get_instance(cluster, "master") configs.append('SPARK_MASTER_IP=' + sp_master.hostname()) # point to the hadoop conf dir so that Spark can read things # like the swift configuration without having to copy core-site # to /opt/spark/conf configs.append('HADOOP_CONF_DIR=' + HADOOP_CONF_DIR) masterport = utils.get_config_value_or_default("Spark", "Master port", cluster) if masterport and masterport != _get_spark_opt_default("Master port"): configs.append('SPARK_MASTER_PORT=' + str(masterport)) masterwebport = utils.get_config_value_or_default("Spark", "Master webui port", cluster) if (masterwebport and masterwebport != _get_spark_opt_default("Master webui port")): configs.append('SPARK_MASTER_WEBUI_PORT=' + str(masterwebport)) # configuration for workers workercores = utils.get_config_value_or_default("Spark", "Worker cores", cluster) if workercores and workercores != _get_spark_opt_default("Worker cores"): configs.append('SPARK_WORKER_CORES=' + str(workercores)) workermemory = utils.get_config_value_or_default("Spark", "Worker memory", cluster) if (workermemory and workermemory != _get_spark_opt_default("Worker memory")): configs.append('SPARK_WORKER_MEMORY=' + str(workermemory)) workerport = utils.get_config_value_or_default("Spark", "Worker port", cluster) if workerport and workerport != _get_spark_opt_default("Worker port"): configs.append('SPARK_WORKER_PORT=' + str(workerport)) workerwebport = utils.get_config_value_or_default("Spark", "Worker webui port", cluster) if (workerwebport and workerwebport != _get_spark_opt_default("Worker webui port")): configs.append('SPARK_WORKER_WEBUI_PORT=' + str(workerwebport)) workerinstances = utils.get_config_value_or_default("Spark", "Worker instances", cluster) if (workerinstances and workerinstances != _get_spark_opt_default("Worker instances")): configs.append('SPARK_WORKER_INSTANCES=' + str(workerinstances)) return '\n'.join(configs) # workernames need to be a list of worker names def generate_spark_slaves_configs(workernames): return '\n'.join(workernames) def generate_spark_executor_classpath(cluster): cp = utils.get_config_value_or_default("Spark", "Executor extra classpath", cluster) if cp: return "spark.executor.extraClassPath " + cp return "\n" def extract_hadoop_environment_confs(configs): """Returns environment specific Hadoop configurations. :returns: list of Hadoop parameters which should be passed via environment """ lst = [] for service, srv_confs in configs.items(): if ENV_CONFS.get(service): for param_name, param_value in srv_confs.items(): for cfg_name, cfg_format_str in ENV_CONFS[service].items(): if param_name == cfg_name and param_value is not None: lst.append(cfg_format_str % param_value) return lst def extract_hadoop_xml_confs(configs): """Returns xml specific Hadoop configurations. :returns: list of Hadoop parameters which should be passed into general configs like core-site.xml """ lst = [] for service, srv_confs in configs.items(): if XML_CONFS.get(service): for param_name, param_value in srv_confs.items(): for cfg_list in XML_CONFS[service]: names = [cfg['name'] for cfg in cfg_list] if param_name in names and param_value is not None: lst.append((param_name, param_value)) return lst def generate_hadoop_setup_script(storage_paths, env_configs): script_lines = ["#!/bin/bash -x"] script_lines.append("echo -n > /tmp/hadoop-env.sh") for line in env_configs: if 'HADOOP' in line: script_lines.append('echo "%s" >> /tmp/hadoop-env.sh' % line) script_lines.append("cat /etc/hadoop/hadoop-env.sh >> /tmp/hadoop-env.sh") script_lines.append("cp /tmp/hadoop-env.sh /etc/hadoop/hadoop-env.sh") hadoop_log = storage_paths[0] + "/log/hadoop/\$USER/" script_lines.append('sed -i "s,export HADOOP_LOG_DIR=.*,' 'export HADOOP_LOG_DIR=%s," /etc/hadoop/hadoop-env.sh' % hadoop_log) hadoop_log = storage_paths[0] + "/log/hadoop/hdfs" script_lines.append('sed -i "s,export HADOOP_SECURE_DN_LOG_DIR=.*,' 'export HADOOP_SECURE_DN_LOG_DIR=%s," ' '/etc/hadoop/hadoop-env.sh' % hadoop_log) for path in storage_paths: script_lines.append("chown -R hadoop:hadoop %s" % path) script_lines.append("chmod -f -R 755 %s ||" "echo 'Permissions unchanged'" % path) return "\n".join(script_lines) def generate_job_cleanup_config(cluster): spark_config = { 'minimum_cleanup_megabytes': utils.get_config_value_or_default( "Spark", "Minimum cleanup megabytes", cluster), 'minimum_cleanup_seconds': utils.get_config_value_or_default( "Spark", "Minimum cleanup seconds", cluster), 'maximum_cleanup_seconds': utils.get_config_value_or_default( "Spark", "Maximum cleanup seconds", cluster) } job_conf = { 'valid': ( _convert_config_to_int( spark_config['maximum_cleanup_seconds']) > 0 and _convert_config_to_int( spark_config['minimum_cleanup_megabytes']) > 0 and _convert_config_to_int( spark_config['minimum_cleanup_seconds']) > 0) } if job_conf['valid']: job_conf['cron'] = utils.get_file_text( 'plugins/spark/resources/spark-cleanup.cron', 'sahara_plugin_spark'), job_cleanup_script = utils.get_file_text( 'plugins/spark/resources/tmp-cleanup.sh.template', 'sahara_plugin_spark') job_conf['script'] = job_cleanup_script.format(**spark_config) return job_conf def _convert_config_to_int(config_value): try: return int(config_value) except ValueError: return -1 def extract_name_values(configs): return {cfg['name']: cfg['value'] for cfg in configs} def make_hadoop_path(base_dirs, suffix): return [base_dir + suffix for base_dir in base_dirs] def extract_hadoop_path(lst, hadoop_dir): if lst: return ",".join(make_hadoop_path(lst, hadoop_dir)) def _set_config(cfg, gen_cfg, name=None): if name in gen_cfg: cfg.update(gen_cfg[name]['conf']) if name is None: for name in gen_cfg: cfg.update(gen_cfg[name]['conf']) return cfg def _get_general_config_value(conf, option): if 'general' in conf and option.name in conf['general']: return conf['general'][option.name] return option.default_value def _get_general_cluster_config_value(cluster, option): return _get_general_config_value(cluster.cluster_configs, option) def is_data_locality_enabled(cluster): if not CONF.enable_data_locality: return False return _get_general_cluster_config_value(cluster, ENABLE_DATA_LOCALITY) def is_swift_enabled(configs): return _get_general_config_value(configs, ENABLE_SWIFT) def get_decommissioning_timeout(cluster): return _get_general_cluster_config_value(cluster, DECOMMISSIONING_TIMEOUT) def get_port_from_config(service, name, cluster=None): address = utils.get_config_value_or_default(service, name, cluster) return utils.get_port_from_address(address)