From 46433252cd9154b7c1a967a487e7d51ef65f3814 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Fri, 24 Feb 2017 16:35:00 -0800 Subject: [PATCH 01/16] [SPARK-19753][CORE] All shuffle files on a host should be removed in case of fetch failure or slave lost --- .../apache/spark/scheduler/DAGScheduler.scala | 6 +++-- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../spark/scheduler/DAGSchedulerSuite.scala | 25 +++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 932e6c138e1c..c65553f8e84f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -149,6 +149,7 @@ class DAGScheduler( */ private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] + private[scheduler] val execIdToHost = new HashMap[String, String] // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] @@ -1336,7 +1337,7 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) + handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch)) } } @@ -1370,7 +1371,7 @@ class DAGScheduler( */ private[scheduler] def handleExecutorLost( execId: String, - filesLost: Boolean, + slaveLost: Boolean, maybeEpoch: Option[Long] = None) { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { @@ -1395,6 +1396,7 @@ class DAGScheduler( logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } + execIdToHost.put(execId, host) } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 629cfc7c7a8c..d17f9d0bf87e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -426,6 +426,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } case None => logError( + ("Ignoring update with state %s for TID %s because its task set is gone (this is " + "likely the result of receiving duplicate task finished status updates) or its " + "executor has been marked as failed.") diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 67145e744506..9da81c2cb4f5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -396,6 +396,31 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + test("All shuffle files should on the slave should be cleaned up when slave lost") { + // reset the test context with the right shuffle service config + afterEach() + val conf = new SparkConf() + conf.set("spark.shuffle.service.enabled", "true") + init(conf) + runEvent(ExecutorAdded("exec-hostA1", "hostA")) + runEvent(ExecutorAdded("exec-hostA2", "hostA")) + runEvent(ExecutorAdded("exec-hostB", "hostB")) + val shuffleMapRdd = new MyRDD(sc, 3, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)))) + runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true))) + val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null) + assert(mapStatus.size === 1) + assert(mapStatus(0).location.executorId === "exec-hostB") + assert(mapStatus(0).location.host === "hostB") + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None From 6145d2ae72f070ecb25ae78dfc1ad5fa15de86cf Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 28 Feb 2017 18:03:55 -0800 Subject: [PATCH 02/16] Do not un-register shuffle files in case of executor lost --- .../org/apache/spark/scheduler/DAGScheduler.scala | 11 +++++------ .../apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c65553f8e84f..f3a2afb67fb0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -149,7 +149,6 @@ class DAGScheduler( */ private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] - private[scheduler] val execIdToHost = new HashMap[String, String] // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] @@ -549,10 +548,8 @@ class DAGScheduler( * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name - * * @return a JobWaiter object that can be used to block until the job finishes executing * or can be used to cancel the job. - * * @throws IllegalArgumentException when partitions ids are illegal */ def submitJob[T, U]( @@ -1337,7 +1334,8 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch)) + handleExecutorLost(bmAddress.executorId, fileLost = false, hostLost = true, + Some(bmAddress.host), Some(task.epoch)) } } @@ -1371,7 +1369,9 @@ class DAGScheduler( */ private[scheduler] def handleExecutorLost( execId: String, - slaveLost: Boolean, + fileLost: Boolean, + hostLost: Boolean = false, + maybeHost: Option[String] = None, maybeEpoch: Option[Long] = None) { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { @@ -1396,7 +1396,6 @@ class DAGScheduler( logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } - execIdToHost.put(execId, host) } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9da81c2cb4f5..2808cf9e59f9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -414,6 +414,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) + scheduler.handleExecutorLost("exec-hostA1", fileLost = false, hostLost = true, Some("hostA")) runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true))) val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null) assert(mapStatus.size === 1) From ef86d12577fac7f63e16db6f695872197ef6fd0a Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 28 Feb 2017 18:13:07 -0800 Subject: [PATCH 03/16] no-op when external shuffle service is disabled --- .../org/apache/spark/scheduler/DAGScheduler.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f3a2afb67fb0..53853e272800 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1334,8 +1334,14 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, fileLost = false, hostLost = true, - Some(bmAddress.host), Some(task.epoch)) + if (!env.blockManager.externalShuffleServiceEnabled) { + handleExecutorLost(bmAddress.executorId, fileLost = false, hostLost = true, + Some(bmAddress.host), Some(task.epoch)) + } + else { + handleExecutorLost(bmAddress.executorId, fileLost = true, hostLost = false, + Some(bmAddress.host),Some(task.epoch)) + } } } From 263d55b74872775419c772a789749e60f5b10005 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 28 Feb 2017 18:23:59 -0800 Subject: [PATCH 04/16] fix check style --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 53853e272800..c0e04a5ffa57 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1340,7 +1340,7 @@ class DAGScheduler( } else { handleExecutorLost(bmAddress.executorId, fileLost = true, hostLost = false, - Some(bmAddress.host),Some(task.epoch)) + Some(bmAddress.host), Some(task.epoch)) } } } From a548dc3ffcddb69017f112a290a5eeafa41f397c Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 16 Mar 2017 15:24:43 -0700 Subject: [PATCH 05/16] Addressed review comments and fixed a bug --- .../apache/spark/scheduler/DAGScheduler.scala | 50 ++++++++++++--- .../spark/scheduler/DAGSchedulerSuite.scala | 62 +++++++++++++++---- 2 files changed, 91 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c0e04a5ffa57..e1ac69bf5765 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1334,13 +1334,13 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - if (!env.blockManager.externalShuffleServiceEnabled) { - handleExecutorLost(bmAddress.executorId, fileLost = false, hostLost = true, - Some(bmAddress.host), Some(task.epoch)) + if (env.blockManager.externalShuffleServiceEnabled) { + removeExecutorAndUnregisterOutputOnHost(bmAddress.executorId, + bmAddress.host, Some(task.epoch)) } else { - handleExecutorLost(bmAddress.executorId, fileLost = true, hostLost = false, - Some(bmAddress.host), Some(task.epoch)) + removeExecutorAndUnregisterOutputOnExecutor(bmAddress.executorId, + true, Some(task.epoch)) } } } @@ -1374,10 +1374,17 @@ class DAGScheduler( * stray fetch failures from possibly retriggering the detection of a node as lost. */ private[scheduler] def handleExecutorLost( + execId: String, + fileLost: Boolean) { + removeExecutorAndUnregisterOutputOnExecutor(execId, + // There will not be any file loss if external shuffle service is enabled + fileLost && !env.blockManager.externalShuffleServiceEnabled, None) + } + + + private[scheduler] def removeExecutorAndUnregisterOutputOnExecutor( execId: String, fileLost: Boolean, - hostLost: Boolean = false, - maybeHost: Option[String] = None, maybeEpoch: Option[Long] = None) { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { @@ -1392,7 +1399,34 @@ class DAGScheduler( } } else { logDebug("Additional executor lost message for " + execId + - "(epoch " + currentEpoch + ")") + "(epoch " + currentEpoch + ")") + } + } + + private[scheduler] def removeExecutorAndUnregisterOutputOnHost( + execId: String, + host: String, + maybeEpoch: Option[Long] = None) { + val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) + if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { + failedEpoch(execId) = currentEpoch + logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) + blockManagerMaster.removeExecutor(execId) + for ((shuffleId, stage) <- shuffleIdToMapStage) { + logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + stage.removeOutputsOnHost(host) + mapOutputTracker.registerMapOutputs( + shuffleId, + stage.outputLocInMapOutputTrackerFormat(), + changeEpoch = true) + } + if (shuffleIdToMapStage.isEmpty) { + mapOutputTracker.incrementEpoch() + } + clearCacheLocs() + } else { + logDebug("Additional executor lost message for " + execId + + "(epoch " + currentEpoch + ")") } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2808cf9e59f9..312af96fe637 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -405,21 +405,57 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou runEvent(ExecutorAdded("exec-hostA1", "hostA")) runEvent(ExecutorAdded("exec-hostA2", "hostA")) runEvent(ExecutorAdded("exec-hostB", "hostB")) - val shuffleMapRdd = new MyRDD(sc, 3, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) - val shuffleId = shuffleDep.shuffleId - val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) + val firstRDD = new MyRDD(sc, 3, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) + // map stage1 completes successfully, with one task on each executor complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) - scheduler.handleExecutorLost("exec-hostA1", fileLost = false, hostLost = true, Some("hostA")) - runEvent(ExecutorLost("exec-hostA1", SlaveLost("", true))) - val mapStatus = mapOutputTracker.mapStatuses.get(0).get.filter(_!= null) - assert(mapStatus.size === 1) - assert(mapStatus(0).location.executorId === "exec-hostB") - assert(mapStatus(0).location.host === "hostB") + (Success, + MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))), + (Success, + MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))), + (Success, makeMapStatus("hostB", 1)) + )) + // map stage2 completes successfully, with one task on each executor + complete(taskSets(1), Seq( + (Success, + MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))), + (Success, + MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))), + (Success, makeMapStatus("hostB", 1)) + )) + // make sure our test setup is correct + val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get + assert(initialMapStatus1.count(_ != null) === 3) + assert(initialMapStatus1.map{_.location.executorId}.toSet === + Set("exec-hostA1", "exec-hostA2", "exec-hostB")) + + val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get + assert(initialMapStatus2.count(_ != null) === 3) + assert(initialMapStatus2.map{_.location.executorId}.toSet === + Set("exec-hostA1", "exec-hostA2", "exec-hostB")) + + // reduce stage fails with a fetch failure from one host + complete(taskSets(2), Seq( + (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), firstShuffleId, 0, 0, "ignored"), + null) + )) + + // Here is the main assertion -- make sure that we de-register the map outputs for both map stage + // from both executors on hostA + val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get + assert(mapStatus1.count(_ != null) === 1) + assert(mapStatus1(2).location.executorId === "exec-hostB") + assert(mapStatus1(2).location.host === "hostB") + + val mapStatus2 = mapOutputTracker.mapStatuses.get(1).get + assert(mapStatus2.count(_ != null) === 1) + assert(mapStatus2(2).location.executorId === "exec-hostB") + assert(mapStatus2(2).location.host === "hostB") } test("zero split job") { From b5f19495936cd4470b1254cfff9a991e2d5db281 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 16 Mar 2017 16:00:17 -0700 Subject: [PATCH 06/16] Fix build --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 312af96fe637..bfd1611e0e17 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -445,8 +445,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou null) )) - // Here is the main assertion -- make sure that we de-register the map outputs for both map stage - // from both executors on hostA + // Here is the main assertion -- make sure that we de-register + // the map outputs for both map stage from both executors on hostA val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(mapStatus1.count(_ != null) === 1) assert(mapStatus1(2).location.executorId === "exec-hostB") From 705a5aaa48d109ced7dd889940f4e70a8796a39a Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 16 Mar 2017 19:37:44 -0700 Subject: [PATCH 07/16] Fix test failures --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e1ac69bf5765..8d0354bc5c7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1377,8 +1377,7 @@ class DAGScheduler( execId: String, fileLost: Boolean) { removeExecutorAndUnregisterOutputOnExecutor(execId, - // There will not be any file loss if external shuffle service is enabled - fileLost && !env.blockManager.externalShuffleServiceEnabled, None) + fileLost || !env.blockManager.externalShuffleServiceEnabled, None) } From 996ad206057a095bb41e38b39553a73cf589d449 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 17 Mar 2017 10:53:25 -0500 Subject: [PATCH 08/16] refactoring & comments --- .../apache/spark/scheduler/DAGScheduler.scala | 73 ++++++++----------- 1 file changed, 30 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8d0354bc5c7a..38444f23795c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1334,14 +1334,20 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - if (env.blockManager.externalShuffleServiceEnabled) { - removeExecutorAndUnregisterOutputOnHost(bmAddress.executorId, - bmAddress.host, Some(task.epoch)) - } - else { - removeExecutorAndUnregisterOutputOnExecutor(bmAddress.executorId, - true, Some(task.epoch)) + val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled) { + // We had a fetch failure with the external shuffle service, so we + // assume all shuffle data on the node is bad. + Some(bmAddress.host) + } else { + // Deregister shuffle data just for one executor (we don't have any + // reason to believe shuffle data has been lost for the entire host). + None } + removeExecutorAndUnregisterOutputs( + execId = bmAddress.executorId, + fileLost = true, + hostToUnregisterOutputs = hostToUnregisterOutputs, + maybeEpoch = Some(task.epoch)) } } @@ -1375,16 +1381,23 @@ class DAGScheduler( */ private[scheduler] def handleExecutorLost( execId: String, - fileLost: Boolean) { - removeExecutorAndUnregisterOutputOnExecutor(execId, - fileLost || !env.blockManager.externalShuffleServiceEnabled, None) + workerLost: Boolean): Unit = { + // if the cluster manager explicitly tells us that the entire worker was lost, then + // we know to unregister shuffle output. (Note that "worker" specifically refers to the process + // from a Standalone cluster, where the shuffle service lives in the Worker.) + val filesLost = workerLost || !env.blockManager.externalShuffleServiceEnabled + removeExecutorAndUnregisterOutputs( + execId = execId, + fileLost = filesLost, + hostToUnregisterOutputs = None, + maybeEpoch = None) } - - private[scheduler] def removeExecutorAndUnregisterOutputOnExecutor( + private def removeExecutorAndUnregisterOutputs( execId: String, fileLost: Boolean, - maybeEpoch: Option[Long] = None) { + hostToUnregisterOutputs: Option[String], + maybeEpoch: Option[Long] = None): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { failedEpoch(execId) = currentEpoch @@ -1402,33 +1415,6 @@ class DAGScheduler( } } - private[scheduler] def removeExecutorAndUnregisterOutputOnHost( - execId: String, - host: String, - maybeEpoch: Option[Long] = None) { - val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) - if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { - failedEpoch(execId) = currentEpoch - logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) - blockManagerMaster.removeExecutor(execId) - for ((shuffleId, stage) <- shuffleIdToMapStage) { - logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) - stage.removeOutputsOnHost(host) - mapOutputTracker.registerMapOutputs( - shuffleId, - stage.outputLocInMapOutputTrackerFormat(), - changeEpoch = true) - } - if (shuffleIdToMapStage.isEmpty) { - mapOutputTracker.incrementEpoch() - } - clearCacheLocs() - } else { - logDebug("Additional executor lost message for " + execId + - "(epoch " + currentEpoch + ")") - } - } - private[scheduler] def handleExecutorAdded(execId: String, host: String) { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { @@ -1718,11 +1704,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => - val filesLost = reason match { - case SlaveLost(_, true) => true + val workerLost = reason match { + case SlaveLost(_, true) => + true case _ => false } - dagScheduler.handleExecutorLost(execId, filesLost) + dagScheduler.handleExecutorLost(execId, workerLost) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) From 78f9094b8705fa4e4e2e5424160ab952ebfe6d24 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Fri, 17 Mar 2017 21:03:37 -0700 Subject: [PATCH 09/16] Review comments --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +-- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 38444f23795c..a8a80340fbd0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1410,8 +1410,7 @@ class DAGScheduler( clearCacheLocs() } } else { - logDebug("Additional executor lost message for " + execId + - "(epoch " + currentEpoch + ")") + logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index bfd1611e0e17..294c799e3bc0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -406,10 +406,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou runEvent(ExecutorAdded("exec-hostA2", "hostA")) runEvent(ExecutorAdded("exec-hostB", "hostB")) val firstRDD = new MyRDD(sc, 3, Nil) - val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) val firstShuffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // map stage1 completes successfully, with one task on each executor From 3b53f7e3e1e172e45abd1d40d691601a8284a812 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Fri, 24 Mar 2017 15:39:05 -0700 Subject: [PATCH 10/16] Minor changes as per review comments --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a8a80340fbd0..530fb6c10bd5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1339,7 +1339,7 @@ class DAGScheduler( // assume all shuffle data on the node is bad. Some(bmAddress.host) } else { - // Deregister shuffle data just for one executor (we don't have any + // Unregister shuffle data just for one executor (we don't have any // reason to believe shuffle data has been lost for the entire host). None } @@ -1385,10 +1385,10 @@ class DAGScheduler( // if the cluster manager explicitly tells us that the entire worker was lost, then // we know to unregister shuffle output. (Note that "worker" specifically refers to the process // from a Standalone cluster, where the shuffle service lives in the Worker.) - val filesLost = workerLost || !env.blockManager.externalShuffleServiceEnabled + val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled removeExecutorAndUnregisterOutputs( execId = execId, - fileLost = filesLost, + fileLost = fileLost, hostToUnregisterOutputs = None, maybeEpoch = None) } @@ -1704,8 +1704,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ExecutorLost(execId, reason) => val workerLost = reason match { - case SlaveLost(_, true) => - true + case SlaveLost(_, true) => true case _ => false } dagScheduler.handleExecutorLost(execId, workerLost) From e5f532e579082fe5d6418401546b8a80e4fe5d9d Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Fri, 9 Jun 2017 12:19:27 -0700 Subject: [PATCH 11/16] Make the behaviour configurable and minor review comments --- .../org/apache/spark/scheduler/DAGScheduler.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 530fb6c10bd5..a2f247d68119 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -190,6 +190,12 @@ class DAGScheduler( /** * Number of consecutive stage attempts allowed before a stage is aborted. */ + private[scheduler] val unRegisterOutputOnHostOnFetchFailure = + sc.getConf.getBoolean("spark.fetch.failure.unRegister.output.on.host", true) + + /** + * + */ private[scheduler] val maxConsecutiveStageAttempts = sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) @@ -548,6 +554,7 @@ class DAGScheduler( * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name + * * @return a JobWaiter object that can be used to block until the job finishes executing * or can be used to cancel the job. * @throws IllegalArgumentException when partitions ids are illegal @@ -1334,7 +1341,8 @@ class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled) { + val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && + unRegisterOutputOnHostOnFetchFailure) { // We had a fetch failure with the external shuffle service, so we // assume all shuffle data on the node is bad. Some(bmAddress.host) From 1189e409ddec9faf38de2c1b4d0c00e68c162e46 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Fri, 9 Jun 2017 13:38:17 -0700 Subject: [PATCH 12/16] Minor changes --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a2f247d68119..07d5aa19c6d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -188,13 +188,13 @@ class DAGScheduler( private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) /** - * Number of consecutive stage attempts allowed before a stage is aborted. + * If enabled, fetch failure will cause all the output on that host to be unregistered. */ private[scheduler] val unRegisterOutputOnHostOnFetchFailure = - sc.getConf.getBoolean("spark.fetch.failure.unRegister.output.on.host", true) + sc.getConf.getBoolean("spark.files.fetchFailure.unRegisterOutputOnHost", true) /** - * + * Number of consecutive stage attempts allowed before a stage is aborted. */ private[scheduler] val maxConsecutiveStageAttempts = sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", From 9d700e278c3e24f6fb38bf3323bc8e9807fb866b Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Fri, 9 Jun 2017 15:35:44 -0700 Subject: [PATCH 13/16] set the behavior to false by defaut --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 07d5aa19c6d4..4149c95a7c6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -188,10 +188,12 @@ class DAGScheduler( private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) /** - * If enabled, fetch failure will cause all the output on that host to be unregistered. + * Whether to unregister all the outputs on the host in condition that we receive a FetchFailure, + * this is set default to false, which means, we only unregister the outputs related to the exact + * executor(instead of the host) on a FetchFailure. */ private[scheduler] val unRegisterOutputOnHostOnFetchFailure = - sc.getConf.getBoolean("spark.files.fetchFailure.unRegisterOutputOnHost", true) + sc.getConf.getBoolean("spark.files.fetchFailure.unRegisterOutputOnHost", false) /** * Number of consecutive stage attempts allowed before a stage is aborted. @@ -557,6 +559,7 @@ class DAGScheduler( * * @return a JobWaiter object that can be used to block until the job finishes executing * or can be used to cancel the job. + * * @throws IllegalArgumentException when partitions ids are illegal */ def submitJob[T, U]( From b26e99d3cffea08f1a289eaa0721bfb53fb63218 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Fri, 9 Jun 2017 17:55:08 -0700 Subject: [PATCH 14/16] Fix test --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 294c799e3bc0..e580917ff61f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -401,6 +401,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou afterEach() val conf = new SparkConf() conf.set("spark.shuffle.service.enabled", "true") + conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true") init(conf) runEvent(ExecutorAdded("exec-hostA1", "hostA")) runEvent(ExecutorAdded("exec-hostA2", "hostA")) From 74f285ac1faa7a67d9943a14c49df06097755ff0 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 13 Jun 2017 17:10:23 -0700 Subject: [PATCH 15/16] Rebase with master --- .../org/apache/spark/MapOutputTracker.scala | 33 +++++++++++++++++-- .../spark/internal/config/package.scala | 8 +++++ .../apache/spark/scheduler/DAGScheduler.scala | 21 ++++++++---- .../spark/scheduler/DAGSchedulerSuite.scala | 14 +++++--- 4 files changed, 61 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 3e10b9eee4e2..5d48bc7c9655 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -55,7 +55,8 @@ private class ShuffleStatus(numPartitions: Int) { * locations is so small that we choose to ignore that case and store only a single location * for each output. */ - private[this] val mapStatuses = new Array[MapStatus](numPartitions) + // Exposed for testing + val mapStatuses = new Array[MapStatus](numPartitions) /** * The cached result of serializing the map statuses array. This cache is lazily populated when @@ -105,14 +106,30 @@ private class ShuffleStatus(numPartitions: Int) { } } + /** + * Removes all shuffle outputs associated with this host. Note that this will also remove + * outputs which are served by an external shuffle server (if one exists). + */ + def removeOutputsOnHost(host: String): Unit = { + removeOutputsByFilter(x => x.host == host) + } + /** * Removes all map outputs associated with the specified executor. Note that this will also * remove outputs which are served by an external shuffle server (if one exists), as they are * still registered with that execId. */ def removeOutputsOnExecutor(execId: String): Unit = synchronized { + removeOutputsByFilter(x => x.executorId == execId) + } + + /** + * Removes all shuffle outputs which satisfies the filter. Note that this will also + * remove outputs which are served by an external shuffle server (if one exists). + */ + def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized { for (mapId <- 0 until mapStatuses.length) { - if (mapStatuses(mapId) != null && mapStatuses(mapId).location.executorId == execId) { + if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) { _numAvailableOutputs -= 1 mapStatuses(mapId) = null invalidateSerializedMapOutputStatusCache() @@ -317,7 +334,8 @@ private[spark] class MapOutputTrackerMaster( // HashMap for storing shuffleStatuses in the driver. // Statuses are dropped only by explicit de-registering. - private val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala + // Exposed for testing + val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) @@ -415,6 +433,15 @@ private[spark] class MapOutputTrackerMaster( } } + /** + * Removes all shuffle outputs associated with this host. Note that this will also remove + * outputs which are served by an external shuffle server (if one exists). + */ + def removeOutputsOnHost(host: String): Unit = { + shuffleStatuses.valuesIterator.foreach { _.removeOutputsOnHost(host) } + incrementEpoch() + } + /** * Removes all shuffle outputs associated with this executor. Note that this will also remove * outputs which are served by an external shuffle server (if one exists), as they are still diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7827e6760f35..84ef57f2d271 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -151,6 +151,14 @@ package object config { .createOptional // End blacklist confs + private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE = + ConfigBuilder("spark.files.fetchFailure.unRegisterOutputOnHost") + .doc("Whether to un-register all the outputs on the host in condition that we receive " + + " a FetchFailure. This is set default to false, which means, we only un-register the " + + " outputs related to the exact executor(instead of the host) on a FetchFailure.") + .booleanConf + .createWithDefault(false) + private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY = ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity") .withAlternative("spark.scheduler.listenerbus.eventqueue.size") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4149c95a7c6e..fafe9cafdc18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -35,6 +35,7 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.config import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} @@ -193,7 +194,7 @@ class DAGScheduler( * executor(instead of the host) on a FetchFailure. */ private[scheduler] val unRegisterOutputOnHostOnFetchFailure = - sc.getConf.getBoolean("spark.files.fetchFailure.unRegisterOutputOnHost", false) + sc.getConf.get(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE) /** * Number of consecutive stage attempts allowed before a stage is aborted. @@ -1414,14 +1415,20 @@ class DAGScheduler( failedEpoch(execId) = currentEpoch logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) blockManagerMaster.removeExecutor(execId) - - if (filesLost || !env.blockManager.externalShuffleServiceEnabled) { - logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) - mapOutputTracker.removeOutputsOnExecutor(execId) + if (fileLost) { + hostToUnregisterOutputs match { + case Some(host) => + logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + mapOutputTracker.removeOutputsOnHost(host) + case None => + logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) + mapOutputTracker.removeOutputsOnExecutor(execId) + } clearCacheLocs() + + } else { + logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) } - } else { - logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e580917ff61f..ddd328110674 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -396,7 +396,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - test("All shuffle files should on the slave should be cleaned up when slave lost") { + test("All shuffle files on the slave should be cleaned up when slave lost") { // reset the test context with the right shuffle service config afterEach() val conf = new SparkConf() @@ -411,6 +411,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val firstShuffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val secondShuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // map stage1 completes successfully, with one task on each executor @@ -430,12 +431,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou (Success, makeMapStatus("hostB", 1)) )) // make sure our test setup is correct - val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get + val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses + // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus1.count(_ != null) === 3) assert(initialMapStatus1.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) - val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get + val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses + // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus2.count(_ != null) === 3) assert(initialMapStatus2.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) @@ -448,12 +451,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // Here is the main assertion -- make sure that we de-register // the map outputs for both map stage from both executors on hostA - val mapStatus1 = mapOutputTracker.mapStatuses.get(0).get + + val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses assert(mapStatus1.count(_ != null) === 1) assert(mapStatus1(2).location.executorId === "exec-hostB") assert(mapStatus1(2).location.host === "hostB") - val mapStatus2 = mapOutputTracker.mapStatuses.get(1).get + val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses assert(mapStatus2.count(_ != null) === 1) assert(mapStatus2(2).location.executorId === "exec-hostB") assert(mapStatus2(2).location.host === "hostB") From ba2ca2adfe15977fe48ccc97b282e97381135f67 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 13 Jun 2017 17:17:47 -0700 Subject: [PATCH 16/16] remove extra while space --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d17f9d0bf87e..629cfc7c7a8c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -426,7 +426,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } case None => logError( - ("Ignoring update with state %s for TID %s because its task set is gone (this is " + "likely the result of receiving duplicate task finished status updates) or its " + "executor has been marked as failed.")