Switch to using Spark version 2.2.0

Following changes were required:

1.)
By default the pre-built distribution
for Spark 2.2.0 is compiled with Scala 2.11.
monasca-transform requires Spark compiled with
Scala 2.10 since we use spark streaming to
pull data from Kafka and the version of Kafka
is compatible with Scala 2.10.
The recommended way is to compile Spark
with Scala 2.10, but for purposes of devstack
plugin made changes to pull the required jars
from mvn directly.
(see SPARK_JARS and SPARK_JAVA_LIB variables in
settings)
All jars get moved to
<SPARK_HOME>/assembly/target/assembly/
target/scala_2.10/jars/
Note: <SPARK_HOME>/jars gets renamed
to <SPARK_HOME>/jars_original.
spark-submit defaults to assembly location
if <SPARK_HOME>/jars directory is missing.

2.) Updated start up scripts for spark
worker and spark master with a new env variable
SPARK_SCALA_VERSIOn=2.10. Also updated
PYTHONPATH variable to add new
py4j-0.10.4-src.zip file

3.) Some changes to adhere to deprecated pyspark
function calls which were removed in Spark 2.0

Change-Id: I8f8393bb91307d55f156b2ebf45225a16ae9d8f4
This commit is contained in:
Ashwin Agate 2017-07-06 00:05:34 -07:00
parent 255dc84a27
commit 022bd11a4d
16 changed files with 67 additions and 30 deletions

View File

@ -63,7 +63,7 @@ service_log_filename=monasca-transform.log
spark_event_logging_enabled = true
# A list of jars which Spark should use
spark_jars_list = /opt/spark/current/lib/spark-streaming-kafka_2.10-1.6.3.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/opt/spark/current/lib/drizzle-jdbc-1.3.jar
spark_jars_list = /opt/spark/current/assembly/target/scala-2.10/jars/spark-streaming-kafka-0-8_2.10-2.2.0.jar,/opt/spark/current/assembly/target/scala-2.10/jars/scala-library-2.10.6.jar,/opt/spark/current/assembly/target/scala-2.10/jars/kafka_2.10-0.8.1.1.jar,/opt/spark/current/assembly/target/scala-2.10/jars/metrics-core-2.2.0.jar,/opt/spark/current/assembly/target/scala-2.10/jars/drizzle-jdbc-1.3.jar
# A list of where the Spark master(s) should run
spark_master_list = spark://localhost:7077

View File

@ -1,5 +1,5 @@
spark.driver.extraClassPath /opt/spark/current/lib/drizzle-jdbc-1.3.jar
spark.executor.extraClassPath /opt/spark/current/lib/drizzle-jdbc-1.3.jar
spark.driver.extraClassPath /opt/spark/current/assembly/target/scala-2.10/jars/drizzle-jdbc-1.3.jar
spark.executor.extraClassPath /opt/spark/current/assembly/target/scala-2.10/jars/drizzle-jdbc-1.3.jar
spark.blockManager.port 7100
spark.broadcast.port 7105

View File

@ -11,7 +11,7 @@ export SPARK_WORKER_WEBUI_PORT=18081
export SPARK_WORKER_DIR=/var/run/spark/work
export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_WORKER_CORES=1
export SPARK_WORKER_OPTS="$SPARK_WORKER_OPTS -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=900 -Dspark.worker.cleanup.appDataTtl=1*24*3600"
export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=file://var/log/spark/events -Dspark.history.ui.port=18082"
export SPARK_LOG_DIR=/var/log/spark

View File

@ -2,8 +2,13 @@
. /opt/spark/current/conf/spark-env.sh
export EXEC_CLASS=org.apache.spark.deploy.master.Master
export INSTANCE_ID=1
export SPARK_CLASSPATH=/opt/spark/current/conf/:/opt/spark/current/lib/spark-assembly-1.6.3-hadoop2.6.0.jar:/opt/spark/current/lib/datanucleus-core-3.2.10.jar:/opt/spark/current/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/current/lib/datanucleus-api-jdo-3.2.6.jar
export SPARK_CLASSPATH=/etc/spark/conf/:/opt/spark/current/assembly/target/scala-2.10/jars/*
export log="$SPARK_LOG_DIR/spark-spark-"$EXEC_CLASS"-"$INSTANCE_ID"-127.0.0.1.out"
export SPARK_HOME=/opt/spark/current
/usr/bin/java -cp $SPARK_CLASSPATH $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g -XX:MaxPermSize=256m "$EXEC_CLASS" --ip "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" --webui-port "$SPARK_MASTER_WEBUI_PORT"
# added for spark 2
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}"
export SPARK_SCALA_VERSION="2.10"
/usr/bin/java -cp "$SPARK_CLASSPATH" $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g "$EXEC_CLASS" --ip "$SPARK_MASTER_IP" --port "$SPARK_MASTER_PORT" --webui-port "$SPARK_MASTER_WEBUI_PORT" --properties-file "/etc/spark/conf/spark-defaults.conf"

View File

@ -1,9 +1,17 @@
#!/usr/bin/env bash
. /opt/spark/current/conf/spark-env.sh
. /opt/spark/current/conf/spark-worker-env.sh
export EXEC_CLASS=org.apache.spark.deploy.worker.Worker
export INSTANCE_ID=1
export SPARK_CLASSPATH=/opt/spark/current/conf/:/opt/spark/current/lib/spark-assembly-1.6.3-hadoop2.6.0.jar:/opt/spark/current/lib/datanucleus-core-3.2.10.jar:/opt/spark/current/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark/current/lib/datanucleus-api-jdo-3.2.6.jar
export SPARK_CLASSPATH=/etc/spark/conf/:/opt/spark/current/assembly/target/scala-2.10/jars/*
export log="$SPARK_LOG_DIR/spark-spark-"$EXEC_CLASS"-"$INSTANCE_ID"-127.0.0.1.out"
export SPARK_HOME=/opt/spark/current
/usr/bin/java -cp $SPARK_CLASSPATH $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g -XX:MaxPermSize=256m "$EXEC_CLASS" --webui-port "$SPARK_WORKER_WEBUI_PORT" --port $SPARK_WORKER_PORT $SPARK_MASTERS
# added for spark 2.1.1
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}"
export SPARK_SCALA_VERSION="2.10"
/usr/bin/java -cp "$SPARK_CLASSPATH" $SPARK_DAEMON_JAVA_OPTS -Xms1g -Xmx1g "$EXEC_CLASS" --host $SPARK_LOCAL_IP --cores $SPARK_WORKER_CORES --memory $SPARK_WORKER_MEMORY --port "$SPARK_WORKER_PORT" -d "$SPARK_WORKER_DIR" --webui-port "$SPARK_WORKER_WEBUI_PORT" --properties-file "/etc/spark/conf/spark-defaults.conf" spark://$SPARK_MASTERS

View File

@ -1,4 +1,3 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# Copyright 2016 FUJITSU LIMITED
#
@ -60,6 +59,13 @@ function pre_install_spark {
SPARK_LIB_NAME=`echo ${SPARK_JAVA_LIB} | sed 's/.*\///'`
download_through_cache ${MAVEN_REPO}/${SPARK_JAVA_LIB} ${SPARK_LIB_NAME}
done
for SPARK_JAR in "${SPARK_JARS[@]}"
do
SPARK_JAR_NAME=`echo ${SPARK_JAR} | sed 's/.*\///'`
download_through_cache ${MAVEN_REPO}/${SPARK_JAR} ${SPARK_JAR_NAME}
done
download_through_cache ${APACHE_MIRROR}/spark/spark-${SPARK_VERSION}/${SPARK_TARBALL_NAME} ${SPARK_TARBALL_NAME} 1000
@ -67,7 +73,7 @@ function pre_install_spark {
function install_java_libs {
pushd /opt/spark/current/lib
pushd /opt/spark/current/assembly/target/scala-2.10/jars/
for SPARK_JAVA_LIB in "${SPARK_JAVA_LIBS[@]}"
do
SPARK_LIB_NAME=`echo ${SPARK_JAVA_LIB} | sed 's/.*\///'`
@ -76,15 +82,27 @@ function install_java_libs {
popd
}
function link_spark_streaming_lib {
function install_spark_jars {
pushd /opt/spark/current/lib
ln -sf spark-streaming-kafka.jar spark-streaming-kafka_2.10-1.6.3.jar
# create a directory for jars
mkdir -p /opt/spark/current/assembly/target/scala-2.10/jars
# copy jars to new location
pushd /opt/spark/current/assembly/target/scala-2.10/jars
for SPARK_JAR in "${SPARK_JARS[@]}"
do
SPARK_JAR_NAME=`echo ${SPARK_JAR} | sed 's/.*\///'`
copy_from_cache ${SPARK_JAR_NAME}
done
# copy all jars except spark and scala to assembly/target/scala_2.10/jars
find /opt/spark/current/jars/ -type f ! \( -iname 'spark*' -o -iname 'scala*' -o -iname 'jackson-module-scala*' -o -iname 'json4s-*' -o -iname 'breeze*' -o -iname 'spire*' -o -iname 'macro-compat*' -o -iname 'shapeless*' -o -iname 'machinist*' -o -iname 'chill*' \) -exec cp {} . \;
# rename jars directory
mv /opt/spark/current/jars/ /opt/spark/current/jars_original
popd
}
function copy_from_cache {
resource_name=$1
target_directory=${2:-"./."}
@ -341,6 +359,8 @@ function install_spark {
ln -sf /opt/spark/${SPARK_HADOOP_VERSION} /opt/spark/current
install_spark_jars
install_java_libs
create_spark_directories

View File

@ -37,8 +37,8 @@ enable_service spark-worker
# spark vars
SPARK_DIRECTORIES=("/var/spark" "/var/log/spark" "/var/log/spark/events" "/var/run/spark" "/var/run/spark/work" "/etc/spark/conf" "/etc/spark/init" )
SPARK_VERSION=${SPARK_VERSION:-1.6.3}
HADOOP_VERSION=${HADOOP_VERSION:-2.6}
SPARK_VERSION=${SPARK_VERSION:-2.2.0}
HADOOP_VERSION=${HADOOP_VERSION:-2.7}
SPARK_HADOOP_VERSION=spark-$SPARK_VERSION-bin-hadoop$HADOOP_VERSION
SPARK_TARBALL_NAME=${SPARK_HADOOP_VERSION}.tgz
MAVEN_REPO=${MAVEN_REPO:-https://repo1.maven.org/maven2}
@ -48,7 +48,10 @@ APACHE_MIRROR=${APACHE_MIRROR:-http://archive.apache.org/dist/}
BASE_KAFKA_VERSION=${BASE_KAFKA_VERSION:-0.8.1.1}
SCALA_VERSION=${SCALA_VERSION:-2.10}
KAFKA_VERSION=${KAFKA_VERSION:-${SCALA_VERSION}-${BASE_KAFKA_VERSION}}
SPARK_JAVA_LIBS=("org/apache/kafka/kafka_2.10/0.8.1.1/kafka_2.10-0.8.1.1.jar" "org/scala-lang/scala-library/2.10.1/scala-library-2.10.1.jar" "com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar" "org/apache/spark/spark-streaming-kafka_2.10/${SPARK_VERSION}/spark-streaming-kafka_2.10-${SPARK_VERSION}.jar" "org/drizzle/jdbc/drizzle-jdbc/1.3/drizzle-jdbc-1.3.jar")
SPARK_JAVA_LIBS=("org/apache/kafka/kafka_2.10/0.8.1.1/kafka_2.10-0.8.1.1.jar" "com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar" "org/scala-lang/scala-library/2.10.6/scala-library-2.10.6.jar" "org/scala-lang/scala-compiler/2.10.6/scala-compiler-2.10.6.jar" "org/scala-lang/scala-reflect/2.10.6/scala-reflect-2.10.6.jar" "org/scala-lang/scalap/2.10.6/scalap-2.10.6.jar" "org/apache/spark/spark-streaming-kafka-0-8_2.10/${SPARK_VERSION}/spark-streaming-kafka-0-8_2.10-${SPARK_VERSION}.jar" "org/drizzle/jdbc/drizzle-jdbc/1.3/drizzle-jdbc-1.3.jar" "com/fasterxml/jackson/module/jackson-module-scala_2.10/2.6.5/jackson-module-scala_2.10-2.6.5.jar" "org/json4s/json4s-jackson_2.10/3.2.11/json4s-jackson_2.10-3.2.11.jar" "org/json4s/json4s-core_2.10/3.2.11/json4s-core_2.10-3.2.11.jar" "org/json4s/json4s-ast_2.10/3.2.11/json4s-ast_2.10-3.2.11.jar" "org/scalanlp/breeze-macros_2.10/0.13.1/breeze-macros_2.10-0.13.1.jar" "org/spire-math/spire_2.10/0.13.0/spire_2.10-0.13.0.jar" "org/typelevel/macro-compat_2.10/1.1.1/macro-compat_2.10-1.1.1.jar" "com/chuusai/shapeless_2.10/2.3.2/shapeless_2.10-2.3.2.jar" "org/spire-math/spire-macros_2.10/0.13.0/spire-macros_2.10-0.13.0.jar" "org/typelevel/machinist_2.10/0.6.1/machinist_2.10-0.6.1.jar" "org/scalanlp/breeze_2.10/0.13.1/breeze_2.10-0.13.1.jar" "com/twitter/chill_2.10/0.8.0/chill_2.10-0.8.0.jar" "com/twitter/chill-java/0.8.0/chill-java-0.8.0.jar")
# Get Spark 2.2 jars compiled with Scala 2.10 from mvn
SPARK_JARS=("org/apache/spark/spark-catalyst_2.10/${SPARK_VERSION}/spark-catalyst_2.10-2.2.0.jar" "org/apache/spark/spark-core_2.10/${SPARK_VERSION}/spark-core_2.10-2.2.0.jar" "org/apache/spark/spark-graphx_2.10/${SPARK_VERSION}/spark-graphx_2.10-2.2.0.jar" "org/apache/spark/spark-launcher_2.10/${SPARK_VERSION}/spark-launcher_2.10-2.2.0.jar" "org/apache/spark/spark-mllib_2.10/${SPARK_VERSION}/spark-mllib_2.10-2.2.0.jar" "org/apache/spark/spark-mllib-local_2.10/${SPARK_VERSION}/spark-mllib-local_2.10-2.2.0.jar" "org/apache/spark/spark-network-common_2.10/${SPARK_VERSION}/spark-network-common_2.10-2.2.0.jar" "org/apache/spark/spark-network-shuffle_2.10/${SPARK_VERSION}/spark-network-shuffle_2.10-2.2.0.jar" "org/apache/spark/spark-repl_2.10/${SPARK_VERSION}/spark-repl_2.10-2.2.0.jar" "org/apache/spark/spark-sketch_2.10/${SPARK_VERSION}/spark-sketch_2.10-2.2.0.jar" "org/apache/spark/spark-sql_2.10/${SPARK_VERSION}/spark-sql_2.10-2.2.0.jar" "org/apache/spark/spark-streaming_2.10/${SPARK_VERSION}/spark-streaming_2.10-2.2.0.jar" "org/apache/spark/spark-tags_2.10/${SPARK_VERSION}/spark-tags_2.10-2.2.0.jar" "org/apache/spark/spark-unsafe_2.10/${SPARK_VERSION}/spark-unsafe_2.10-2.2.0.jar" "org/apache/spark/spark-yarn_2.10/${SPARK_VERSION}/spark-yarn_2.10-2.2.0.jar")
# monasca-api stuff

View File

@ -67,10 +67,10 @@ service_log_filename=monasca-transform.log
spark_event_logging_enabled = true
# A list of jars which Spark should use
spark_jars_list = /opt/spark/current/lib/spark-streaming-kafka.jar,/opt/spark/current/lib/scala-library-2.10.1.jar,/opt/spark/current/lib/kafka_2.10-0.8.1.1.jar,/opt/spark/current/lib/metrics-core-2.2.0.jar,/usr/share/java/mysql.jar
spark_jars_list = /opt/spark/current/assembly/target/scala-2.10/jars/spark-streaming-kafka-0-8_2.10-2.1.1.jar,/opt/spark/current/assembly/target/scala-2.10/jars/scala-library-2.10.6.jar,/opt/spark/current/assembly/target/scala-2.10/jars/kafka_2.10-0.8.1.1.jar,/opt/spark/current/assembly/target/scala-2.10/jars/metrics-core-2.2.0.jar,/opt/spark/current/assembly/target/scala-2.10/jars/drizzle-jdbc-1.3.jar
# A list of where the Spark master(s) should run
spark_master_list = spark://192.168.10.4:7077,192.168.10.5:7077
spark_master_list = spark://localhost:7077
# spark_home for the environment
spark_home = /opt/spark/current

View File

@ -486,7 +486,7 @@ class FetchQuantity(UsageComponent):
grouped_data = record_store_df_int.groupBy(*group_by_columns_list)
grouped_record_store_df = grouped_data.agg(agg_operations_map)
grouped_data_rdd_with_operation = grouped_record_store_df.map(
grouped_data_rdd_with_operation = grouped_record_store_df.rdd.map(
lambda x:
GroupedDataWithOperation(x,
str(usage_fetch_operation)))

View File

@ -56,7 +56,7 @@ class MySQLDataDrivenSpecsRepo(DataDrivenSpecsRepo):
spec = json.loads(item['transform_spec'])
data.append(json.dumps(spec))
data_frame = sql_context.jsonRDD(spark_context.parallelize(data))
data_frame = sql_context.read.json(spark_context.parallelize(data))
self.transform_specs_data_frame = data_frame
def generate_pre_transform_specs_data_frame(self, spark_context=None,
@ -72,5 +72,5 @@ class MySQLDataDrivenSpecsRepo(DataDrivenSpecsRepo):
spec = json.loads(item['pre_transform_spec'])
data.append(json.dumps(spec))
data_frame = sql_context.jsonRDD(spark_context.parallelize(data))
data_frame = sql_context.read.json(spark_context.parallelize(data))
self.pre_transform_specs_data_frame = data_frame

View File

@ -324,7 +324,7 @@ class MonMetricsKafkaProcessor(object):
# filter out unwanted metrics and keep metrics we are interested in
#
cond = [
raw_mon_metrics_df.metric.name ==
raw_mon_metrics_df.metric["name"] ==
pre_transform_specs_df.event_type]
filtered_metrics_df = raw_mon_metrics_df.join(
pre_transform_specs_df, cond)

View File

@ -78,7 +78,7 @@ class InstanceUsageUtils(TransformUtils):
def create_df_from_json_rdd(sql_context, jsonrdd):
"""create instance usage df from json rdd."""
schema = InstanceUsageUtils._get_instance_usage_schema()
instance_usage_schema_df = sql_context.jsonRDD(jsonrdd, schema)
instance_usage_schema_df = sql_context.read.json(jsonrdd, schema)
return instance_usage_schema_df
@ -269,7 +269,7 @@ class MonMetricUtils(TransformUtils):
def create_mon_metrics_df_from_json_rdd(sql_context, jsonrdd):
"""create mon metrics df from json rdd."""
schema = MonMetricUtils._get_mon_metric_json_schema()
mon_metrics_df = sql_context.jsonRDD(jsonrdd, schema)
mon_metrics_df = sql_context.read.json(jsonrdd, schema)
return mon_metrics_df

View File

@ -20,7 +20,7 @@ import sys
try:
sys.path.append(os.path.join("/opt/spark/current", "python"))
sys.path.append(os.path.join("/opt/spark/current",
"python", "lib", "py4j-0.9-src.zip"))
"python", "lib", "py4j-0.10.4-src.zip"))
except KeyError:
print("Error adding Spark location to the path")
# TODO(someone) not sure what action is appropriate

View File

@ -18,7 +18,7 @@ import sys
try:
sys.path.append(os.path.join("/opt/spark/current", "python"))
sys.path.append(os.path.join("/opt/spark/current",
"python", "lib", "py4j-0.9-src.zip"))
"python", "lib", "py4j-0.10.4-src.zip"))
except KeyError:
print("Error adding Spark location to the path")
sys.exit(1)

View File

@ -18,7 +18,7 @@ import sys
try:
sys.path.append(os.path.join("/opt/spark/current", "python"))
sys.path.append(os.path.join("/opt/spark/current",
"python", "lib", "py4j-0.9-src.zip"))
"python", "lib", "py4j-0.10.4-src.zip"))
except KeyError:
print("Error adding Spark location to the path")
sys.exit(1)

View File

@ -46,6 +46,7 @@ basepython = python2.7
install_command = {[testenv]install_command}
setenv = {[testenv]setenv}
SPARK_HOME=/opt/spark/current
SPARK_SCALA_VERSION=2.10
OS_TEST_PATH=tests/functional
whitelist_externals =
{[testenv]whitelist_externals}