diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 32bc4e566306..6a7f7331c6b9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -59,6 +59,9 @@ private[spark] class ApplicationMaster( private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) + // Disable the maximum executor failure check + private val disableMaxExecutorFailureCheck = if (maxNumExecutorFailures == -1) true else false + @volatile private var exitCode = 0 @volatile private var unregistered = false @volatile private var finished = false @@ -308,7 +311,8 @@ private[spark] class ApplicationMaster( var failureCount = 0 while (!finished) { try { - if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + if (!disableMaxExecutorFailureCheck && + allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, "Max number of executor failures reached") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b8f42dadcb46..cad0f460043c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn import java.util.Collections import java.util.concurrent._ import java.util.regex.Pattern +import java.util.Stack import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -83,6 +84,9 @@ private[yarn] class YarnAllocator( private var executorIdCounter = 0 @volatile private var numExecutorsFailed = 0 + @volatile private var executorFailureTimeStamps = new Stack[Long]() + @volatile private var oldestRelativeExecutorFailure = -1L + @volatile private var targetNumExecutors = args.numExecutors // Keep track of which container is running which executor to remove the executors later @@ -94,6 +98,13 @@ private[yarn] class YarnAllocator( // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) + + // Maximum number of executor failures per minute + private val relativeMaxExecutorFailurePerMinute = + sparkConf.getInt("spark.yarn.max.executor.failuresPerMinute", -1) + private val relativeMaxExecutorFailureEnabled = + if (relativeMaxExecutorFailurePerMinute == -1) true else false + // Number of cores per executor. protected val executorCores = args.executorCores // Resource capability requested for each executors @@ -119,7 +130,27 @@ private[yarn] class YarnAllocator( def getNumExecutorsRunning: Int = numExecutorsRunning - def getNumExecutorsFailed: Int = numExecutorsFailed + def getNumExecutorsFailed: Int = { + if (relativeMaxExecutorFailureEnabled) { + getRelevantNumExecutorsFailed + } else { + numExecutorsFailed.intValue + } + } + + /** + * Returns the the relative number of executor failures within the specified window duration. + */ + + def getRelevantNumExecutorsFailed : Int = { + val currentTime = System.currentTimeMillis / 1000 + val relevantWindowStartTime = currentTime - 60 + while(relevantWindowStartTime > oldestRelativeExecutorFailure && + executorFailureTimeStamps.size > 0){ + oldestRelativeExecutorFailure = executorFailureTimeStamps.pop + } + executorFailureTimeStamps.size + 1 + } /** * Number of container requests that have not yet been fulfilled. @@ -386,6 +417,9 @@ private[yarn] class YarnAllocator( ". Exit status: " + completedContainer.getExitStatus + ". Diagnostics: " + completedContainer.getDiagnostics) numExecutorsFailed += 1 + if (relativeMaxExecutorFailureEnabled) { + executorFailureTimeStamps.push(System.currentTimeMillis / 1000) + } } }