Skip to content
Prev Previous commit
Next Next commit
Ensure server.stop() is called when shutting down.
Also remove the cleanup code from the fs provider. It would be
better to clean up, but there's a race between that code's cleanup
and Hadoop's shutdown hook, which closes all file systems kept in
the cache. So if you try to clean up the fs provider in a shut
down hook, you may end up with ugly exceptions in the output.

But leave the stop() functionality around in case it's useful for
future provider implementations.
  • Loading branch information
Marcelo Vanzin committed May 28, 2014
commit 461fac8896f4e9ca31f55824c6ae00df27a3cf1d
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
*/
private val logCheckingThread = new Thread("LogCheckingThread") {
override def run() = Utils.logUncaughtExceptions {
while (!stopped) {
while (true) {
val now = System.currentTimeMillis
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
Thread.sleep(UPDATE_INTERVAL_MS)
Expand All @@ -68,8 +68,6 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
}
}

@volatile private var stopped = false

initialize()

private def initialize() {
Expand All @@ -87,13 +85,6 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
logCheckingThread.start()
}

override def stop() = {
stopped = true
logCheckingThread.interrupt()
logCheckingThread.join()
fs.close()
}

override def getListing(offset: Int, count: Int) = {
val list = appList.get()
val theOffset = if (offset < list.size) offset else 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,14 @@ object HistoryServer {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()

Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
override def run() = {
server.stop()
}
})

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
server.stop()
}

def initSecurity() {
Expand Down