Skip to content
Closed
Prev Previous commit
Next Next commit
Addressed review comments and fixed a bug
  • Loading branch information
Sital Kedia committed Jun 13, 2017
commit a548dc3ffcddb69017f112a290a5eeafa41f397c
50 changes: 42 additions & 8 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1334,13 +1334,13 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
if (!env.blockManager.externalShuffleServiceEnabled) {
handleExecutorLost(bmAddress.executorId, fileLost = false, hostLost = true,
Some(bmAddress.host), Some(task.epoch))
if (env.blockManager.externalShuffleServiceEnabled) {
removeExecutorAndUnregisterOutputOnHost(bmAddress.executorId,
bmAddress.host, Some(task.epoch))
}
else {
handleExecutorLost(bmAddress.executorId, fileLost = true, hostLost = false,
Some(bmAddress.host), Some(task.epoch))
removeExecutorAndUnregisterOutputOnExecutor(bmAddress.executorId,
true, Some(task.epoch))
}
}
}
Expand Down Expand Up @@ -1374,10 +1374,17 @@ class DAGScheduler(
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
private[scheduler] def handleExecutorLost(
execId: String,
fileLost: Boolean) {
removeExecutorAndUnregisterOutputOnExecutor(execId,
// There will not be any file loss if external shuffle service is enabled
fileLost && !env.blockManager.externalShuffleServiceEnabled, None)
}


private[scheduler] def removeExecutorAndUnregisterOutputOnExecutor(
execId: String,
fileLost: Boolean,
hostLost: Boolean = false,
maybeHost: Option[String] = None,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
Expand All @@ -1392,7 +1399,34 @@ class DAGScheduler(
}
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
"(epoch " + currentEpoch + ")")
}
}

private[scheduler] def removeExecutorAndUnregisterOutputOnHost(
execId: String,
host: String,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
for ((shuffleId, stage) <- shuffleIdToMapStage) {
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
stage.removeOutputsOnHost(host)
mapOutputTracker.registerMapOutputs(
shuffleId,
stage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
}
if (shuffleIdToMapStage.isEmpty) {
mapOutputTracker.incrementEpoch()
}
clearCacheLocs()
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,21 +405,57 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
runEvent(ExecutorAdded("exec-hostA2", "hostA"))
runEvent(ExecutorAdded("exec-hostB", "hostB"))
val shuffleMapRdd = new MyRDD(sc, 3, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
// map stage1 completes successfully, with one task on each executor
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
scheduler.handleExecutorLost("exec-hostA1", fileLost = false, hostLost = true, Some("hostA"))
runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true)))
val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null)
assert(mapStatus.size === 1)
assert(mapStatus(0).location.executorId === "exec-hostB")
assert(mapStatus(0).location.host === "hostB")
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
(Success, makeMapStatus("hostB", 1))
))
// map stage2 completes successfully, with one task on each executor
complete(taskSets(1), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))),
(Success, makeMapStatus("hostB", 1))
))
// make sure our test setup is correct
val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(initialMapStatus1.count(_ != null) === 3)
assert(initialMapStatus1.map{_.location.executorId}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))

val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapOutputTracker.mapStatuses.get(1)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good eye, fixed.

assert(initialMapStatus2.count(_ != null) === 3)
assert(initialMapStatus2.map{_.location.executorId}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))

// reduce stage fails with a fetch failure from one host
complete(taskSets(2), Seq(
(FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), firstShuffleId, 0, 0, "ignored"),
null)
))

// Here is the main assertion -- make sure that we de-register the map outputs for both map stage
// from both executors on hostA
val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(mapStatus1.count(_ != null) === 1)
assert(mapStatus1(2).location.executorId === "exec-hostB")
assert(mapStatus1(2).location.host === "hostB")

val mapStatus2 = mapOutputTracker.mapStatuses.get(1).get
assert(mapStatus2.count(_ != null) === 1)
assert(mapStatus2(2).location.executorId === "exec-hostB")
assert(mapStatus2(2).location.host === "hostB")
}

test("zero split job") {
Expand Down