Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ private[spark] class ApplicationMaster(
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))

// Disable the maximum executor failure check
private val disableMaxExecutorFailureCheck =
sparkConf.getBoolean("spark.yarn.max.executor.failures.disable", false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a cost and weight to making a flag for everything, and I think this doesn't add value. Just set max to a high value to "disable" it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We could make special value to means disabled -> like 0 or -1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, will treat maxNumExecutorFailures = -1 as disable state for this check.


@volatile private var exitCode = 0
@volatile private var unregistered = false
@volatile private var finished = false
Expand Down Expand Up @@ -308,7 +312,8 @@ private[spark] class ApplicationMaster(
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
if (!disableMaxExecutorFailureCheck &&
allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Max number of executor failures reached")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
import java.util.Stack
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this up with the import java.util.Collections


import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
Expand Down Expand Up @@ -83,6 +84,9 @@ private[yarn] class YarnAllocator(
private var executorIdCounter = 0
@volatile private var numExecutorsFailed = 0

@volatile private var executorFailureTimeStamps = new Stack[Long]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not want to use a queue here, because the first executor failures to leave the list should be the first ones that were added?

@volatile private var oldestRelativeExecutorFailure = -1L

@volatile private var targetNumExecutors = args.numExecutors

// Keep track of which container is running which executor to remove the executors later
Expand All @@ -94,6 +98,14 @@ private[yarn] class YarnAllocator(
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))

// Make the maximum executor failure check to be relative with respect to duration
private val relativeMaxExecutorFailureCheck =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this more easily solved by considering max failures to be per batch interval or something? why define another different window of time in another property?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
I am not sure if just batch window will do, as the D stream window needs to be some multiple of it. Also, in our use case a long running spark application, there will be no concept of batch window as such.

Thanks,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen if you're suggesting tying it to the dstream window I think that could be confusing. It doesn't seem obvious to me that these should be proportional.

Another thing is that it seems confusing that, if I want to set a max failures per time interval, I need to set three different properties. It's also worth considering just adding a spark.yarn.max.executor.failuresPerMinute property.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sryza ,

spark.yarn.max.executor.failuresPerMinute will make that to be per minute kind of failures, which means hardcoding the value of window.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I had in my head this was streaming-specific for some reason. For an app that may run forever, it seems like any maximum number of failures is insufficient, and you'd want to disable this entirely. How about that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was that the size of the window might not need to vary. Are there examples that come to mind of scenarios where the ideal window size is widely different?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable.
Added the property as spark.yarn.max.executor.failuresPerMinute

sparkConf.getBoolean("spark.yarn.max.executor.failures.relative", false)
// window duration in sec ( default = 600 sec ) for checking maximum number of executor failures
private val relativeMaxExecutorFailureCheckWindow =
sparkConf.getInt("spark.yarn.max.executor.failures.relative.window", 600)

// Number of cores per executor.
protected val executorCores = args.executorCores
// Resource capability requested for each executors
Expand All @@ -119,7 +131,27 @@ private[yarn] class YarnAllocator(

def getNumExecutorsRunning: Int = numExecutorsRunning

def getNumExecutorsFailed: Int = numExecutorsFailed
def getNumExecutorsFailed: Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is a little bit unclear. Maybe merge this method with getRelevantNumExecutorsFailed?

if(relativeMaxExecutorFailureCheck){
getRelevantNumExecutorsFailed
} else {
numExecutorsFailed.intValue
}
}

/**
* Returns the the relative number of executor failures within the specifid window duration.
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra newline

def getRelevantNumExecutorsFailed : Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for this logic?

var currentTime = System.currentTimeMillis / 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many style problems around here, like the disconnected javadoc, missing spaces around conditions, var vs val

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @srowen ,

I will fix it up, I just have a curiosity that while build process, it do shares some of the styling issues, is there any other extensive list also?

Thanks,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var relevantWindowStartTime = currentTime - relativeMaxExecutorFailureCheckWindow
while(relevantWindowStartTime > oldestRelativeExecutorFailure &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need spaces before and after these parentheses.

executorFailureTimeStamps.size > 0){
oldestRelativeExecutorFailure = executorFailureTimeStamps.pop
}
executorFailureTimeStamps.size + 1
}

/**
* Number of container requests that have not yet been fulfilled.
Expand Down Expand Up @@ -386,6 +418,9 @@ private[yarn] class YarnAllocator(
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
numExecutorsFailed += 1
if(relativeMaxExecutorFailureCheck) {
executorFailureTimeStamps.push(System.currentTimeMillis / 1000)
}
}
}

Expand Down