Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix tests
  • Loading branch information
cloud-fan committed Mar 4, 2019
commit f94809df47668600c2044bce3ecc2e1650b6e192
Original file line number Diff line number Diff line change
Expand Up @@ -201,30 +201,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// Even if one of the task sets has not-serializable tasks, the other task set should
// still be processed without error
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
taskScheduler.submitTasks(taskSet)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't have 2 active task set managers at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we shall just give it another stageId ?

taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
}

test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part of code is reverted in this PR, so remove the test as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, but do we also want to add a test case to ensure the new behavior will not break ?

val taskScheduler = setupScheduler()
val attempt1 = FakeTask.createTaskSet(1, 0)
val attempt2 = FakeTask.createTaskSet(1, 1)
taskScheduler.submitTasks(attempt1)
intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) }

// OK to submit multiple if previous attempts are all zombie
taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId)
.get.isZombie = true
taskScheduler.submitTasks(attempt2)
val attempt3 = FakeTask.createTaskSet(1, 2)
intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) }
taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId)
.get.isZombie = true
taskScheduler.submitTasks(attempt3)
assert(!failedTaskSet)
}

test("don't schedule more tasks after a taskset is zombie") {
val taskScheduler = setupScheduler()

Expand Down