Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5ad6efd
[SPARK-25250] : On successful completion of a task attempt on a parti…
Oct 23, 2018
8667c28
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Oct 23, 2018
a73f619
[SPARK-25250] : Calling maybeFinishTaskSet() from method and adding c…
Dec 28, 2018
5509165
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Dec 28, 2018
ee5bc68
[SPARK-25250] : Fixing scalastyle tests
Dec 28, 2018
7677aec
[SPARK-25250] : Addressing Reviews January 2, 2019
Jan 2, 2019
67e1644
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 8, 2019
f395b65
[SPARK-25250] : Addressing Reviews January 8, 2019
Jan 8, 2019
f7102ca
[SPARK-25250] : Addressing Reviews January 9, 2019
Jan 9, 2019
6709fe1
[SPARK-25250] : Addressing Reviews January 10, 2019
Jan 10, 2019
5234e87
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 10, 2019
9efbc58
[SPARK-25250] : Addressing Reviews January 15, 2019
Jan 15, 2019
6abd52c
[SPARK-25250] : Addressing Reviews January 15, 2019 - 2
Jan 15, 2019
89373af
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 16, 2019
231c51b
[SPARK-25250] : Addressing Reviews January 16, 2019
Jan 16, 2019
fcfe9f5
[SPARK-25250] : Addressing Reviews January 18, 2019
Jan 18, 2019
7ce6f10
[SPARK-25250] : Adding unit test
Jan 22, 2019
0610939
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 22, 2019
929fbf9
[SPARK-25250] : Addressing Reviews January 24, 2019
Jan 24, 2019
393f901
[SPARK-25250] : Fixing Unit Tests
Jan 25, 2019
52e832a
[SPARK-25250] : Addressing Reviews January 30, 2019
Jan 30, 2019
afbac96
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Jan 30, 2019
024ec53
[SPARK-25250] : Fixing Scalastyle Checks
Jan 30, 2019
d2b7044
[SPARK-25250] : Addressing Minor Reviews January 30, 2019
Jan 30, 2019
d6ac4a9
[SPARK-25250] : Addressing Reviews January 31, 2019
Jan 31, 2019
e9b363b
Merge branch 'master' of https://github.com/pgandhi999/spark into SPA…
Feb 15, 2019
b55dbb0
[SPARK-25250] : Restructuring PR and trying out a different solution
Feb 18, 2019
551f412
[SPARK-25250] : Fixing indentation
Feb 18, 2019
28017ed
[SPARK-25250] : Addressing Reviews February 19, 2019
Feb 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-25250] : Addressing Reviews January 2, 2019
  • Loading branch information
pgandhi committed Jan 2, 2019
commit 7677aece85f8c82818034228c3b2a0c86febc67c
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,17 @@ private[spark] class TaskSchedulerImpl(
override def markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this logic overlap with killAllTaskAttempts? should it reuse that logic? I understand it does something a little different, and I don't know this code well, but seems like there are related but separate implementations of something similar here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand the code, killAllTaskAttempts kills all the running tasks for a particular stage whereas markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts kills all running tasks for all stages and attempts working on a particular partition that has already been marked as completed by one of the previously running tasks for that corresponding partition. So the logic is different for both the cases, but we can modify the code to have one fixed method for performing both these tasks. Let me know what you think!

partitionId: Int, stageId: Int): Unit = {
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
val index: Option[Int] = tsm.partitionToIndex.get(partitionId)
if (!index.isEmpty) {
tsm.markPartitionIdAsCompletedForTaskAttempt(index.get)
val taskInfoList = tsm.taskAttempts(index.get)
taskInfoList.foreach { taskInfo =>
if (taskInfo.running) {
killTaskAttempt(taskInfo.taskId, false, "Corresponding Partition Id " + partitionId +
" has been marked as Completed")
tsm.partitionToIndex.get(partitionId) match {
case Some(index) =>
tsm.markPartitionIdAsCompletedForTaskAttempt(index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we call markPartitionCompleted instead of creating this new method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this in mind when I was writing the code but a question here popped into my head, i.e. should we really update tasksSuccessful counter if we are killing those tasks? Wouldn't that be incorrect? Let me know your thoughts @cloud-fan @Ngone51 .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if tsm is the newest TaskSet, so, update taskSuccessful is necessary, so that it can have a chance to finish itself at this time. If not, I think it is not necessary, and even successful(index) = true is not. Because we've already send killing signals to those tasks, so zombie TaskSets will handle the failed task attempts at the end and finished normally.

(But you tell me tsm may not happen to be the newtest TaskSet above, so I'm not sure about it. )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable to me, but we should pick a better name to highlight this difference.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have renamed the method, please let me know if it still sounds wrong. Thank you.

val taskInfoList = tsm.taskAttempts(index)
taskInfoList.filter(_.running).foreach { taskInfo =>
killTaskAttempt(taskInfo.taskId, false,
s"Corresponding Partition ID $partitionId has been marked as Completed")
}
}

case None =>
logError(s"No corresponding index found for partition ID $partitionId")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to log error here? What's wrong if we go into this branch?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only happens if the partition id does not have the mapping for the corresponding index, so ideally this should never be printed. Just a safety check log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then maybe it's better to throw an exception here and ask users to open a bug report?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, have updated the code.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.scheduler

import java.nio.ByteBuffer
import java.util.HashSet

import scala.collection.mutable.HashMap
import scala.collection.mutable.Set
import scala.concurrent.duration._

import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq}
Expand All @@ -40,7 +40,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
def reviveOffers() {}
def defaultParallelism(): Int = 1
def maxNumConcurrentTasks(): Int = 0
val killedTaskIds: HashSet[Long] = new HashSet[Long]()
val killedTaskIds: Set[Long] = Set[Long]()
override def killTask(
taskId: Long,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 space indentation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

executorId: String,
Expand Down Expand Up @@ -1328,22 +1328,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, TaskKilled("test"))
assert(tsm.isZombie)
}

test("SPARK-25250 On successful completion of a task attempt on a partition id, kill other" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: blank line above, and please use spaces in the body of the test to break this up into more readable chunks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test isn't testing the important case that was missed before, and I think for now we don't want to kill tasks as part of this change. OTOH, we do need to take the "Completions in zombie tasksets update status of non-zombie taskset" test and move it to a test in DAGSchedulerSuite.

" running task attempts on that same partition") {
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()

val firstAttempt = FakeTask.createTaskSet(10, stageAttemptId = 0)
taskScheduler.submitTasks(firstAttempt)

val offersFirstAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offersFirstAttempt)

val tsm0 = taskScheduler.taskSetManagerForAttempt(0, 0).get
val matchingTaskInfoFirstAttempt = tsm0.taskAttempts(0).head
tsm0.handleFailedTask(matchingTaskInfoFirstAttempt.taskId, TaskState.FAILED,
FetchFailed(null, 0, 0, 0, "fetch failed"))

val secondAttempt = FakeTask.createTaskSet(10, stageAttemptId = 1)
taskScheduler.submitTasks(secondAttempt)

val offersSecondAttempt = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offersSecondAttempt)

taskScheduler.markPartitionIdAsCompletedAndKillCorrespondingTaskAttempts(2, 0)

val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get
val indexInTsm = tsm1.partitionToIndex(2)
val matchingTaskInfoSecondAttempt = tsm1.taskAttempts.flatten.filter(_.index == indexInTsm).head
Expand Down