Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ private[spark] class ApplicationMaster(
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))

// Disable the maximum executor failure check
private val disableMaxExecutorFailureCheck = if (maxNumExecutorFailures == -1) true else false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an extra space here. Also, the right side of this equation can just be maxNumExecutorFailures == -1.


@volatile private var exitCode = 0
@volatile private var unregistered = false
@volatile private var finished = false
Expand Down Expand Up @@ -308,7 +311,8 @@ private[spark] class ApplicationMaster(
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
if (!disableMaxExecutorFailureCheck &&
allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Max number of executor failures reached")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
import java.util.Stack
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this up with the import java.util.Collections


import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
Expand Down Expand Up @@ -83,6 +84,9 @@ private[yarn] class YarnAllocator(
private var executorIdCounter = 0
@volatile private var numExecutorsFailed = 0

@volatile private var executorFailureTimeStamps = new Stack[Long]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not want to use a queue here, because the first executor failures to leave the list should be the first ones that were added?

@volatile private var oldestRelativeExecutorFailure = -1L

@volatile private var targetNumExecutors = args.numExecutors

// Keep track of which container is running which executor to remove the executors later
Expand All @@ -94,6 +98,13 @@ private[yarn] class YarnAllocator(
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))

// Maximum number of executor failures per minute
private val relativeMaxExecutorFailurePerMinute =
sparkConf.getInt("spark.yarn.max.executor.failuresPerMinute", -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be indented two spaces. This goes for a couple other places as well.

private val relativeMaxExecutorFailureEnabled =
if (relativeMaxExecutorFailurePerMinute == -1) true else false

// Number of cores per executor.
protected val executorCores = args.executorCores
// Resource capability requested for each executors
Expand All @@ -119,7 +130,27 @@ private[yarn] class YarnAllocator(

def getNumExecutorsRunning: Int = numExecutorsRunning

def getNumExecutorsFailed: Int = numExecutorsFailed
def getNumExecutorsFailed: Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is a little bit unclear. Maybe merge this method with getRelevantNumExecutorsFailed?

if (relativeMaxExecutorFailureEnabled) {
getRelevantNumExecutorsFailed
} else {
numExecutorsFailed.intValue
}
}

/**
* Returns the the relative number of executor failures within the specified window duration.
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra newline

def getRelevantNumExecutorsFailed : Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for this logic?

val currentTime = System.currentTimeMillis / 1000
val relevantWindowStartTime = currentTime - 60
while(relevantWindowStartTime > oldestRelativeExecutorFailure &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need spaces before and after these parentheses.

executorFailureTimeStamps.size > 0){
oldestRelativeExecutorFailure = executorFailureTimeStamps.pop
}
executorFailureTimeStamps.size + 1
}

/**
* Number of container requests that have not yet been fulfilled.
Expand Down Expand Up @@ -386,6 +417,9 @@ private[yarn] class YarnAllocator(
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
numExecutorsFailed += 1
if (relativeMaxExecutorFailureEnabled) {
executorFailureTimeStamps.push(System.currentTimeMillis / 1000)
}
}
}

Expand Down