Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
clone the spark session in StreamExecution constructor
  • Loading branch information
mukulmurthy committed Jan 10, 2019
commit b35eef0125964f29a5b029cbcf1609381e3c4f5a
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ abstract class StreamExecution(
lazy val streamMetrics = new MetricsReporter(
this, s"spark.streaming.${Option(name).getOrElse(id)}")

/** Isolated spark session to run the batches with. */
private val sparkSessionForStream = sparkSession.cloneSession()

/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
Expand Down Expand Up @@ -270,8 +273,6 @@ abstract class StreamExecution(
// force initialization of the logical plan so that the sources can be created
logicalPlan

// Isolated spark session to run the batches with.
val sparkSessionForStream = sparkSession.cloneSession()
// Adaptive execution can change num shuffle partitions, disallow
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
Expand Down