Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-21541]: Spark Logs show incorrect job status for a job that do…
…es not create SparkContext

Added a flag to check whether user has initialized Spark Context. If it is true, then we let Application Master unregister with Resource Manager else we do not.
  • Loading branch information
pgandhi committed Jul 26, 2017
commit f454c8933e07967548095e068063bd313ae4845c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ private[spark] class ApplicationMaster(

@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _

// A flag to check whether user has initialized spark context
@volatile private var registered = false

private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
Expand Down Expand Up @@ -319,7 +322,7 @@ private[spark] class ApplicationMaster(
*/
final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
synchronized {
if (!unregistered) {
if (registered && !unregistered) {
logInfo(s"Unregistering ApplicationMaster with $status" +
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
unregistered = true
Expand All @@ -332,10 +335,15 @@ private[spark] class ApplicationMaster(
synchronized {
if (!finished) {
val inShutdown = ShutdownHookManager.inShutdown()
logInfo(s"Final app status: $status, exitCode: $code" +
if (registered) {
exitCode = code
finalStatus = status
} else {
finalStatus = FinalApplicationStatus.FAILED
exitCode = ApplicationMaster.EXIT_SC_NOT_INITED
}
logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
exitCode = code
finalStatus = status
finalMsg = msg
finished = true
if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
Expand Down Expand Up @@ -439,12 +447,11 @@ private[spark] class ApplicationMaster(
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
registered = true
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
if (!finished) {
throw new IllegalStateException("SparkContext is null but app is still running!")
}
throw new IllegalStateException("User did not initialize spark context!")
}
userClassThread.join()
} catch {
Expand Down