Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Properly handle signal kill in ApplicationMaster
  • Loading branch information
jerryshao committed Mar 23, 2016
commit 6c0ef0c75698c74da794a8bb27e92987f8d2907b
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

package org.apache.spark.deploy.yarn

import scala.util.control.NonFatal

import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
import java.net.{Socket, URL}
import java.util.concurrent.atomic.AtomicReference

import scala.util.control.NonFatal

import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import sun.misc.{Signal, SignalHandler}

import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv,
Expand Down Expand Up @@ -117,6 +119,25 @@ private[spark] class ApplicationMaster(

private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None

if (SystemUtils.IS_OS_UNIX) {
// Register signal handler for signal "TERM", "INT" and "HUP". For the cases where AM receive a
// signal and stop, from RM's aspect this application needs to be reattempted, rather than mark
// as success.
// Replace this signal handler with SignalLogger in AM side.
val signalHandler = new SignalHandler() {
override def handle(sig: Signal): Unit = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra blank line, you can add one above handle if it makes that easier to read

logInfo(s"received signal: ${sig.getName}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SIGNAL)
}
}
Seq("TERM", "INT", "HUP").foreach { sig => Signal.handle(new Signal(sig), signalHandler) }
}

def getAttemptId(): ApplicationAttemptId = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume his is merge issue? remove this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my bad, I will remove this.

client.getAttemptId()
}

final def run(): Int = {
try {
val appAttemptId = client.getAttemptId()
Expand Down Expand Up @@ -642,11 +663,11 @@ object ApplicationMaster extends Logging {
private val EXIT_SC_NOT_INITED = 13
private val EXIT_SECURITY = 14
private val EXIT_EXCEPTION_USER_CLASS = 15
private val EXIT_SIGNAL = 16

private var master: ApplicationMaster = _

def main(args: Array[String]): Unit = {
SignalLogger.register(log)
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
Expand Down