Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,6 @@ class SparkContext(
/** Shut down the SparkContext. */
def stop() {
ui.stop()
eventLogger.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand All @@ -884,6 +883,7 @@ class SparkContext(
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
listenerBus.stop()
eventLogger.foreach(_.stop())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listenerBus.stop() should be moved all the way to the end of sc.stop(), in case there are listeners that block for a long time and prevent other components of Spark to stop. Perhaps eventLogger.foreach(_.stop()) should go right after the listener bus.

taskScheduler = null
// TODO: Cache.stop()?
env.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
private var sparkListenerBus: Option[Thread] = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't have to be an option, you can move the declaration of the thread up here, and do thread.start() in start(). Also, a better name for this would be something like listenerThread


/**
* Start sending events to attached listeners.
Expand All @@ -49,7 +50,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
throw new IllegalStateException("Listener bus already started!")
}
started = true
new Thread("SparkListenerBus") {
sparkListenerBus = Some(new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
Expand All @@ -61,7 +62,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
postToAll(event)
}
}
}.start()
})
sparkListenerBus.foreach(_.start())
}

def post(event: SparkListenerEvent) {
Expand Down Expand Up @@ -97,5 +99,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
post(SparkListenerShutdown)
sparkListenerBus.foreach(_.join())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ object SparkHdfsLR {
}

println("Final w: " + w)
sc.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, the lack of this is a cause for bugs in other places of Spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, is the System.exit(0) that follows necessary? Can we just remove it?

System.exit(0)
}
}