Reuse existing spark sql context
Prevent creating a new spark sql context object with every batch. Profiling of java heap for the driver indicated that there is a steady increase (~12MB over 5 days) of org.apache.spark.sql.execution.metric.LongSQLMetricValue and org.apache.spark.sql.execution.ui.SQLTaskMetrics with each batch execution. These are used by the spark streaming ui and were not being garbage collected. See https://issues.apache.org/jira/browse/SPARK-17381 with a similar issue. This change along with setting spark.sql.ui.retainedExecutions to a low number in sparks-defaults.conf will reduce gradual increase in heap size. Also made a change to catch unhandled MemberNotJoined exception because of whichthe transform service thread went into a unresponsive state. Change-Id: Ibf244cbfc00a90ada66f492b473719c25fa17fd2
This commit is contained in:
parent
372c1a2327
commit
1579d8b9e5
|
@ -3,9 +3,25 @@ spark.executor.extraClassPath /opt/spark/current/lib/drizzle-jdbc-1.3.jar
|
|||
|
||||
spark.blockManager.port 7100
|
||||
spark.broadcast.port 7105
|
||||
spark.cores.max 1
|
||||
spark.driver.memory 512m
|
||||
spark.driver.port 7110
|
||||
spark.eventLog.dir /var/log/spark/events
|
||||
spark.executor.cores 1
|
||||
spark.executor.memory 512m
|
||||
spark.executor.port 7115
|
||||
spark.fileserver.port 7120
|
||||
spark.python.worker.memory 16m
|
||||
spark.speculation true
|
||||
spark.speculation.interval 200
|
||||
spark.eventLog.dir /var/log/spark/events
|
||||
spark.sql.shuffle.partitions 32
|
||||
spark.worker.cleanup.enabled True
|
||||
spark.cleaner.ttl 900
|
||||
spark.sql.ui.retainedExecutions 10
|
||||
spark.streaming.ui.retainedBatches 10
|
||||
spark.worker.ui.retainedExecutors 10
|
||||
spark.worker.ui.retainedDrivers 10
|
||||
spark.ui.retainedJobs 10
|
||||
spark.ui.retainedStages 10
|
||||
spark.driver.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/tmp/gc_driver.log
|
||||
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/tmp/gc_executor.log
|
||||
|
|
|
@ -296,7 +296,8 @@ class MonMetricsKafkaProcessor(object):
|
|||
"rdd_to_recordstore: nothing to process...")
|
||||
else:
|
||||
|
||||
sql_context = SQLContext(rdd_transform_context_rdd.context)
|
||||
sql_context = SQLContext.getOrCreate(
|
||||
rdd_transform_context_rdd.context)
|
||||
data_driven_specs_repo = DataDrivenSpecsRepoFactory.\
|
||||
get_data_driven_specs_repo()
|
||||
pre_transform_specs_df = data_driven_specs_repo.\
|
||||
|
|
|
@ -165,10 +165,13 @@ class TransformService(threading.Thread):
|
|||
coordinator.heartbeat()
|
||||
coordinator.run_watchers()
|
||||
if self.previously_running is True:
|
||||
# Leave/exit the coordination/election group
|
||||
request = coordinator.leave_group(group)
|
||||
request.get()
|
||||
|
||||
try:
|
||||
# Leave/exit the coordination/election group
|
||||
request = coordinator.leave_group(group)
|
||||
request.get()
|
||||
except coordination.MemberNotJoined:
|
||||
LOG.info('Host has not yet joined group %s as %s' %
|
||||
(group, self.my_host_name))
|
||||
time.sleep(float(CONF.service.election_polling_frequency))
|
||||
|
||||
coordinator.stop()
|
||||
|
|
|
@ -32,7 +32,7 @@ class TransformUtils(object):
|
|||
def _rdd_to_df(rdd, schema):
|
||||
"""convert rdd to dataframe using schema."""
|
||||
spark_context = rdd.context
|
||||
sql_context = SQLContext(spark_context)
|
||||
sql_context = SQLContext.getOrCreate(spark_context)
|
||||
if schema is None:
|
||||
df = sql_context.createDataFrame(rdd)
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue