Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
SPARK-24755 Executor loss can cause task to not be resubmitted
  • Loading branch information
Hieu Huynh authored and Hieu Huynh committed Jul 8, 2018
commit 093e39cf76378821284ef7d771e819afb69930ae
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[spark] class TaskSetManager(
// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo I made before, coresponding -> corresponding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment needs to be changed since no longer array with boolean

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update it. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update comment based on hashset being ok now

// this happened while we set the `spark.speculation` to true. The task killed by others
// should not resubmit while executor lost.
private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
private val killedByOtherAttempt = new HashSet[Long]
Copy link
Contributor

@jiangxb1987 jiangxb1987 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: I prefer an Array[Long], so you know the index corresponding to the taskId, that can provide more information while debug.

Copy link
Author

@hthuynh2 hthuynh2 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jiangxb1987, thanks for the comment, but I'm not sure if I understand your suggestion correctly. Do you mean: private val killedByOtherAttempt = new Array[Long] ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the comment "Set the corresponding index of Boolean var when the task killed ..." is not correct anymore. I'm sorry I forgot to update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, please also update the comment.

Copy link
Author

@hthuynh2 hthuynh2 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use ArrayBuffer[Long] instead of Array[Long] because the number of elements can grow when there are more killed tasks.
Also, I think there is a downside of using Array-like data structure for this variable. Lookup operation for array-like data structure takes linear time and that operation is used many times when we check if a task need to be resubmitted (inside executorLost method of TSM). This will not matter much if the size of the array is small, but still I think this is something we might want to consider.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm 's approach also sounds good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 please clarify is it fine as is or are you wanting to use a hashMap and track the index? Can you give an example when this is used for debugging? For instance are you getting a heap dump and looking at the datastructures that might make sense, otherwise its not accessible without you adding in further log statements anyway and its just extra memory usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance when you have corrupted shuffle data you may want to ensure it's not caused by killing tasks, and that requires track all killed taskIds corresponding to a partition. With a hashMap as @mridulm proposed it shall be easy to add extra log to debug. But actually I just looked at the code again and found that expanding the logInfo in L735 can also resolve my case. So it seems fine to use hashSet to save some memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not proposing to expand the logInfo in L735 in this PR, I'm just concern about whether it's convenient enough for me to add extra logs to debug a potential issue. Since there is another way to achieve the same effect, I'm okay with using hashSet here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great thanks, seems like we can go with code as is then.


val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
private[scheduler] var tasksSuccessful = 0
Expand Down Expand Up @@ -735,7 +735,7 @@ private[spark] class TaskSetManager(
logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
killedByOtherAttempt(index) = true
killedByOtherAttempt += attemptInfo.taskId
sched.backend.killTask(
attemptInfo.taskId,
attemptInfo.executorId,
Expand Down Expand Up @@ -944,7 +944,7 @@ private[spark] class TaskSetManager(
&& !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (successful(index) && !killedByOtherAttempt(index)) {
if (successful(index) && !killedByOtherAttempt.contains(tid)) {
successful(index) = false
copiesRunning(index) -= 1
tasksSuccessful -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
}

test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
val conf = new SparkConf().set("spark.speculation", "true")
sc = new SparkContext("local", "test", conf)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
sc.conf.set("spark.speculation.quantile", "0.5")
sc.conf.set("spark.speculation", "true")

var killTaskCalled = false
sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
("exec2", "host2"), ("exec3", "host3"))
sched.initialize(new FakeSchedulerBackend() {
override def killTask(taskId: Long,
executorId: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

interruptThread: Boolean,
reason: String): Unit = {
// Check the only one killTask event in this case, which triggered by
// task 2.1 completed.
assert(taskId === 2)
assert(executorId === "exec3")
assert(interruptThread)
assert(reason === "another attempt succeeded")
killTaskCalled = true
}
})

// Keep track of the index of tasks that are resubmitted,
// so that the test can check that task is resubmitted correctly
var resubmittedTasks = new mutable.HashSet[Int]
val dagScheduler = new FakeDAGScheduler(sc, sched) {
override def taskEnded(task: Task[_],
reason: TaskEndReason,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo): Unit = {
super.taskEnded(task, reason, result, accumUpdates, taskInfo)
reason match {
case Resubmitted => resubmittedTasks += taskInfo.index
case _ =>
}
}
}
sched.dagScheduler.stop()
sched.setDAGScheduler(dagScheduler)

val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host3", "exec3")),
Seq(TaskLocation("host2", "exec2")))

val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
// Offer resources for 4 tasks to start
for ((exec, host) <- Seq(
"exec1" -> "host1",
"exec1" -> "host1",
"exec3" -> "host3",
"exec2" -> "host2")) {
val taskOption = manager.resourceOffer(exec, host, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === exec)
// Add an extra assert to make sure task 2.0 is running on exec3
if (task.index == 2) {
assert(task.attemptNumber === 0)
assert(task.executorId === "exec3")
}
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
clock.advance(1)
// Complete the 2 tasks and leave 2 task in running
for (id <- Set(0, 1)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
assert(sched.endedTasks(id) === Success)
}

// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(2, 3))

// Offer resource to start the speculative attempt for the running task 2.0
val taskOption = manager.resourceOffer("exec2", "host2", ANY)
assert(taskOption.isDefined)
val task4 = taskOption.get
assert(task4.index === 2)
assert(task4.taskId === 4)
assert(task4.executorId === "exec2")
assert(task4.attemptNumber === 1)
// Complete the speculative attempt for the running task
manager.handleSuccessfulTask(4, createTaskResult(2, accumUpdatesByTask(2)))
// Make sure schedBackend.killTask(2, "exec3", true, "another attempt succeeded") gets called
assert(killTaskCalled)

assert(resubmittedTasks.isEmpty)
// Host 2 Losts, meaning we lost the map output task4
manager.executorLost("exec2", "host2", SlaveLost())
// Make sure that task with index 2 is re-submitted
assert(resubmittedTasks.contains(2))
}

private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
Expand Down