Reuse existing spark sql context

Prevent creating a new spark sql context object with every batch.
Profiling of java heap for the driver indicated that there is a
steady increase (~12MB over 5 days) of
org.apache.spark.sql.execution.metric.LongSQLMetricValue
and org.apache.spark.sql.execution.ui.SQLTaskMetrics with
each batch execution. These are used by the spark streaming
ui and were not being garbage collected.
See https://issues.apache.org/jira/browse/SPARK-17381
with a similar issue.
This change along with setting
spark.sql.ui.retainedExecutions to a low number in
sparks-defaults.conf will reduce gradual increase in heap
size.
Also made a change to catch unhandled MemberNotJoined exception
because of whichthe transform service thread went into
a unresponsive state.

Change-Id: Ibf244cbfc00a90ada66f492b473719c25fa17fd2
This commit is contained in:
agatea 2017-02-27 12:24:00 -08:00
parent 372c1a2327
commit 1579d8b9e5
4 changed files with 27 additions and 7 deletions

View File

@ -3,9 +3,25 @@ spark.executor.extraClassPath /opt/spark/current/lib/drizzle-jdbc-1.3.jar
spark.blockManager.port 7100
spark.broadcast.port 7105
spark.cores.max 1
spark.driver.memory 512m
spark.driver.port 7110
spark.eventLog.dir /var/log/spark/events
spark.executor.cores 1
spark.executor.memory 512m
spark.executor.port 7115
spark.fileserver.port 7120
spark.python.worker.memory 16m
spark.speculation true
spark.speculation.interval 200
spark.eventLog.dir /var/log/spark/events
spark.sql.shuffle.partitions 32
spark.worker.cleanup.enabled True
spark.cleaner.ttl 900
spark.sql.ui.retainedExecutions 10
spark.streaming.ui.retainedBatches 10
spark.worker.ui.retainedExecutors 10
spark.worker.ui.retainedDrivers 10
spark.ui.retainedJobs 10
spark.ui.retainedStages 10
spark.driver.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/tmp/gc_driver.log
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/tmp/gc_executor.log

View File

@ -296,7 +296,8 @@ class MonMetricsKafkaProcessor(object):
"rdd_to_recordstore: nothing to process...")
else:
sql_context = SQLContext(rdd_transform_context_rdd.context)
sql_context = SQLContext.getOrCreate(
rdd_transform_context_rdd.context)
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
get_data_driven_specs_repo()
pre_transform_specs_df = data_driven_specs_repo.\

View File

@ -165,10 +165,13 @@ class TransformService(threading.Thread):
coordinator.heartbeat()
coordinator.run_watchers()
if self.previously_running is True:
# Leave/exit the coordination/election group
request = coordinator.leave_group(group)
request.get()
try:
# Leave/exit the coordination/election group
request = coordinator.leave_group(group)
request.get()
except coordination.MemberNotJoined:
LOG.info('Host has not yet joined group %s as %s' %
(group, self.my_host_name))
time.sleep(float(CONF.service.election_polling_frequency))
coordinator.stop()

View File

@ -32,7 +32,7 @@ class TransformUtils(object):
def _rdd_to_df(rdd, schema):
"""convert rdd to dataframe using schema."""
spark_context = rdd.context
sql_context = SQLContext(spark_context)
sql_context = SQLContext.getOrCreate(spark_context)
if schema is None:
df = sql_context.createDataFrame(rdd)
else: