Skip to content
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,18 @@ package object config {
.doubleConf
.createWithDefault(0.75)

private[spark] val SPECULATION_TASK_DURATION_THRESHOLD =
ConfigBuilder("spark.speculation.task.duration.threshold")
.doc("Task duration after which scheduler would try to speculative run the task. If " +
"provided, tasks would be speculatively run if current stage contains less tasks " +
"than or equal to the number of slots on a single executor and the task is taking " +
"longer time than the threshold. This config helps speculate stage with very few " +
"tasks. Regular speculation configs may also apply if the executor slots are " +
"large enough. E.g. tasks might be re-launched if there are enough successful runs " +
"even though the threshold hasn't been reached.")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a simple .doc to explain what this config does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please


private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.stringConf
Expand Down
60 changes: 44 additions & 16 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ private[spark] class TaskSetManager(
val speculationQuantile = conf.get(SPECULATION_QUANTILE)
val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1)
// User provided threshold for speculation regardless of whether the quantile has been reached
val speculationTaskDurationThresOpt = conf.get(SPECULATION_TASK_DURATION_THRESHOLD)
// SPARK-29976: Only when the total number of tasks in the stage is less than or equal to the
// number of slots on a single executor, would the task manager speculative run the tasks if
// their duration is longer than the given threshold. In this way, we wouldn't speculate too
// aggressively but still handle basic cases.
val speculationTasksLessEqToSlots = numTasks <= (conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK)

// For each task, tracks whether a copy of the task has succeeded. A task will also be
// marked as "succeeded" if it failed with a fetch failure, in which case it should not
Expand Down Expand Up @@ -957,15 +964,41 @@ private[spark] class TaskSetManager(
recomputeLocality()
}

/**
* Check if the task associated with the given tid has past the time threshold and should be
* speculative run.
*/
private def checkAndSubmitSpeculatableTask(
tid: Long,
currentTimeMillis: Long,
threshold: Double): Boolean = {
val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 &&
info.timeRunning(currentTimeMillis) > threshold && !speculatableTasks.contains(index)) {
addPendingTask(index, speculatable = true)
logInfo(
("Marking task %d in stage %s (on %s) as speculatable because it ran more" +
" than %.0f ms(%d speculatable tasks in this taskset now)")
.format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1))
speculatableTasks += index
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
true
} else {
false
}
}

/**
* Check for tasks to be speculated and return true if there are any. This is called periodically
* by the TaskScheduler.
*
*/
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
// Can't speculate if we only have one task, and no need to speculate if the task set is a
// zombie or is from a barrier stage.
if (isZombie || isBarrier || numTasks == 1) {
// No need to speculate if the task set is zombie or is from a barrier stage. If there is only
// one task we don't speculate since we don't have metrics to decide whether it's taking too
// long or not, unless a task duration threshold is explicitly provided.
if (isZombie || isBarrier || (numTasks == 1 && !speculationTaskDurationThresOpt.isDefined)) {
return false
}
var foundTasks = false
Expand All @@ -983,19 +1016,14 @@ private[spark] class TaskSetManager(
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
addPendingTask(index, speculatable = true)
logInfo(
("Marking task %d in stage %s (on %s) as speculatable because it ran more" +
" than %.0f ms(%d speculatable tasks in this taskset now)")
.format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1))
speculatableTasks += index
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
foundTasks = true
}
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
}
} else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) {
val time = clock.getTimeMillis()
val threshold = speculationTaskDurationThresOpt.get
logDebug(s"Tasks taking longer time than provided speculation threshold: $threshold")
for (tid <- runningTasksSet) {
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
}
}
foundTasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1778,6 +1778,104 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
}

private def testSpeculationDurationSetup(
speculationThresholdOpt: Option[String],
speculationQuantile: Double,
numTasks: Int,
numSlots: Int): (TaskSetManager, ManualClock) = {
sc = new SparkContext("local", "test")
sc.conf.set(config.SPECULATION_ENABLED, true)
sc.conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString)
// Set the number of slots per executor
sc.conf.set(config.EXECUTOR_CORES.key, numSlots.toString)
sc.conf.set(config.CPUS_PER_TASK.key, "1")
if (speculationThresholdOpt.isDefined) {
sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, speculationThresholdOpt.get)
}
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
// Create a task set with the given number of tasks
val taskSet = FakeTask.createTaskSet(numTasks)
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
manager.isZombie = false

// Offer resources for the task to start
for (i <- 1 to numTasks) {
manager.resourceOffer(s"exec$i", s"host$i", NO_PREF)
}
(manager, clock)
}

private def testSpeculationDurationThreshold(
speculationThresholdProvided: Boolean,
numTasks: Int,
numSlots: Int): Unit = {
val (manager, clock) = testSpeculationDurationSetup(
// Set the threshold to be 60 minutes
if (speculationThresholdProvided) Some("60min") else None,
// Set the quantile to be 1.0 so that regular speculation would not be triggered
1.0,
numTasks,
numSlots
)

// if the time threshold has not been exceeded, no speculative run should be triggered
clock.advance(1000*60*60)
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == 0)

// Now the task should have been running for 60 minutes and 1 second
clock.advance(1)
if (speculationThresholdProvided && numSlots >= numTasks) {
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == numTasks)
// Should not submit duplicated tasks
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == numTasks)
} else {
// If the feature flag is turned off, or the stage contains too many tasks
assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == 0)
}
}

Seq(1, 2).foreach { numTasks =>
test("SPARK-29976 when a speculation time threshold is provided, should speculative " +
s"run the task even if there are not enough successful runs, total tasks: $numTasks") {
testSpeculationDurationThreshold(true, numTasks, numTasks)
}

test("SPARK-29976: when the speculation time threshold is not provided," +
s"don't speculative run if there are not enough successful runs, total tasks: $numTasks") {
testSpeculationDurationThreshold(false, numTasks, numTasks)
}
}

test("SPARK-29976 when a speculation time threshold is provided, should not speculative " +
"if there are too many tasks in the stage even though time threshold is provided") {
testSpeculationDurationThreshold(true, 2, 1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to add anther test here that that test interaction of the speculative configs. Meaning I have both the threshold set and the speculation quantile is smaller, the threshold can still apply and vice versa, the quantile can still apply.


test("SPARK-29976 Regular speculation configs should still take effect even when a " +
"threshold is provided") {
val (manager, clock) = testSpeculationDurationSetup(
Some("60min"),
speculationQuantile = 0.5,
numTasks = 2,
numSlots = 2
)

// Task duration can't be 0, advance 1 sec
clock.advance(1000)
// Mark one of the task succeeded, which should satisfy the quantile
manager.handleSuccessfulTask(0, createTaskResult(0))
// Advance 1 more second so the remaining task takes longer than medium but doesn't satisfy the
// duration threshold yet
clock.advance(1000)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == 1)
}

test("TaskOutputFileAlreadyExistException lead to task set abortion") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
Expand Down
13 changes: 13 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,19 @@ Apart from these, the following properties are also available, and may be useful
Fraction of tasks which must be complete before speculation is enabled for a particular stage.
</td>
</tr>
<tr>
<td><code>spark.speculation.task.duration.threshold</code></td>
<td>None</td>
<td>
Task duration after which scheduler would try to speculative run the task. If provided, tasks
would be speculatively run if current stage contains less tasks than or equal to the number of
slots on a single executor and the task is taking longer time than the threshold. This config
helps speculate stage with very few tasks. Regular speculation configs may also apply if the
executor slots are large enough. E.g. tasks might be re-launched if there are enough successful
runs even though the threshold hasn't been reached.
Default unit is bytes, unless otherwise specified.
</td>
</tr>
<tr>
<td><code>spark.task.cpus</code></td>
<td>1</td>
Expand Down