Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
undo effect on totalResultSize and calculatedTasks
  • Loading branch information
Hieu Huynh authored and Hieu Huynh committed Jul 19, 2018
commit 2c7d33d88a9023604ab6343988f3263ceea333ca
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.scheduler.SchedulingMode._
import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils}
import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils}
import org.apache.spark.util.collection.MedianHeap

/**
Expand Down Expand Up @@ -724,7 +724,15 @@ private[spark] class TaskSetManager(
val info = taskInfos(tid)
val index = info.index
// Check if any other attempt succeeded before this and this attempt has not been handled
if (successful(index) && killedByOtherAttempt(index)) {
if (successful(index) && killedByOtherAttempt.contains(tid)) {
calculatedTasks -= 1
Copy link
Contributor

@tgravescs tgravescs Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment here about cleaning up things from incremented earlier while handling it as successful


val resultSizeAcc = result.accumUpdates.find(a =>
a.name == Some(InternalAccumulator.RESULT_SIZE))
if (resultSizeAcc.isDefined) {
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the downside here is we already incremented and other tasks could have checked and failed before we decrement, but unless someone else has a better idea this is better then it is now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I dont see a better option.

}

handleFailedTask(tid, TaskState.KILLED,
TaskKilled("Finish but did not commit due to another attempt succeeded"))
return
Expand Down