Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix indentation and comments
  • Loading branch information
Hieu Huynh authored and Hieu Huynh committed Jul 16, 2018
commit 9f0e0aeae4af79a59a6d2bf2b6f84c06fd57e0d5
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ private[spark] class TaskSetManager(
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)

// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
// this happened while we set the `spark.speculation` to true. The task killed by others

// Add the tid of task into this HashSet when the task is killed by other attempt tasks.
// This happened while we set the `spark.speculation` to true. The task killed by others
// should not resubmit while executor lost.
private val killedByOtherAttempt = new HashSet[Long]
Copy link
Contributor

@jiangxb1987 jiangxb1987 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: I prefer an Array[Long], so you know the index corresponding to the taskId, that can provide more information while debug.

Copy link
Author

@hthuynh2 hthuynh2 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jiangxb1987, thanks for the comment, but I'm not sure if I understand your suggestion correctly. Do you mean: private val killedByOtherAttempt = new Array[Long] ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the comment "Set the corresponding index of Boolean var when the task killed ..." is not correct anymore. I'm sorry I forgot to update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, please also update the comment.

Copy link
Author

@hthuynh2 hthuynh2 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use ArrayBuffer[Long] instead of Array[Long] because the number of elements can grow when there are more killed tasks.
Also, I think there is a downside of using Array-like data structure for this variable. Lookup operation for array-like data structure takes linear time and that operation is used many times when we check if a task need to be resubmitted (inside executorLost method of TSM). This will not matter much if the size of the array is small, but still I think this is something we might want to consider.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm 's approach also sounds good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 please clarify is it fine as is or are you wanting to use a hashMap and track the index? Can you give an example when this is used for debugging? For instance are you getting a heap dump and looking at the datastructures that might make sense, otherwise its not accessible without you adding in further log statements anyway and its just extra memory usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance when you have corrupted shuffle data you may want to ensure it's not caused by killing tasks, and that requires track all killed taskIds corresponding to a partition. With a hashMap as @mridulm proposed it shall be easy to add extra log to debug. But actually I just looked at the code again and found that expanding the logInfo in L735 can also resolve my case. So it seems fine to use hashSet to save some memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not proposing to expand the logInfo in L735 in this PR, I'm just concern about whether it's convenient enough for me to add extra logs to debug a potential issue. Since there is another way to achieve the same effect, I'm okay with using hashSet here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great thanks, seems like we can go with code as is then.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1378,9 +1378,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
("exec2", "host2"), ("exec3", "host3"))
sched.initialize(new FakeSchedulerBackend() {
override def killTask(taskId: Long,
executorId: String,
interruptThread: Boolean,
reason: String): Unit = {
executorId: String,
interruptThread: Boolean,
reason: String): Unit = {
// Check the only one killTask event in this case, which triggered by
// task 2.1 completed.
assert(taskId === 2)
Expand All @@ -1396,10 +1396,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
var resubmittedTasks = new mutable.HashSet[Int]
val dagScheduler = new FakeDAGScheduler(sc, sched) {
override def taskEnded(task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo): Unit = {
reason: TaskEndReason,
result: Any,
accumUpdates: Seq[AccumulatorV2[_, _]],
taskInfo: TaskInfo): Unit = {
super.taskEnded(task, reason, result, accumUpdates, taskInfo)
reason match {
case Resubmitted => resubmittedTasks += taskInfo.index
Expand All @@ -1423,10 +1423,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
}
// Offer resources for 4 tasks to start
for ((exec, host) <- Seq(
"exec1" -> "host1",
"exec1" -> "host1",
"exec3" -> "host3",
"exec2" -> "host2")) {
"exec1" -> "host1",
"exec1" -> "host1",
"exec3" -> "host3",
"exec2" -> "host2")) {
val taskOption = manager.resourceOffer(exec, host, NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
Expand Down