Skip to content
Closed
Prev Previous commit
Next Next commit
Do not un-register shuffle files in case of executor lost
  • Loading branch information
Sital Kedia committed Jun 13, 2017
commit 6145d2ae72f070ecb25ae78dfc1ad5fa15de86cf
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ class DAGScheduler(
*/
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val execIdToHost = new HashMap[String, String]

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
Expand Down Expand Up @@ -549,10 +548,8 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please keep the origin format.

* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's keep this empty line too.

* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
Expand Down Expand Up @@ -1337,7 +1334,8 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch))
handleExecutorLost(bmAddress.executorId, fileLost = false, hostLost = true,
Some(bmAddress.host), Some(task.epoch))
}
}

Expand Down Expand Up @@ -1371,7 +1369,9 @@ class DAGScheduler(
*/
private[scheduler] def handleExecutorLost(
execId: String,
slaveLost: Boolean,
fileLost: Boolean,
hostLost: Boolean = false,
maybeHost: Option[String] = None,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
Expand All @@ -1396,7 +1396,6 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
execIdToHost.put(execId, host)
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
scheduler.handleExecutorLost("exec-hostA1", fileLost = false, hostLost = true, Some("hostA"))
runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true)))
val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null)
assert(mapStatus.size === 1)
Expand Down