Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix style
  • Loading branch information
scwf committed Sep 23, 2016
commit 7056cd6efc301e8730546c6dd8b24c7f3b56c55a
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleIdToMapStage(shuffleId)
var abortStage = false
var abortedStage = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this line just above if (disallowStageRetryForTest) { since it's only used in that scope.


if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
Expand All @@ -1262,7 +1262,7 @@ class DAGScheduler(
s"has failed the maximum allowable number of " +
s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
s"Most recent failure reason: ${failureMessage}", None)
abortStage = true
abortedStage = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another abortStage in if (disallowStageRetryForTest) { branch.

} else if (failedStages.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having the abortedStage variable, how about re-writing the "else if" statement to be:

else {
if (failedStages.isEmpty) {
... stuff currently in else-if ...
}
failedStages += failedStage
failedStages += mapStage
}

That eliminates the confusion of multiple abortStage variables, as @zsxwing pointed out, and also makes the relationship between (i) adding the stage to failed stages and (ii) scheduling the Resubmit event more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it make sense to me, updated

// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled.
Expand All @@ -1273,7 +1273,7 @@ class DAGScheduler(
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
}
if (!abortStage) {
if (!abortedStage) {
failedStages += failedStage
failedStages += mapStage
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.shuffle.FetchFailedException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you group this with the next import (so import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}

import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util._

Expand Down Expand Up @@ -2121,7 +2121,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd1.map { x =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may write

rdd1.map { 
    case (x, _) =>              
        if (x == 1) {
                throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
        }
        x
}.count()

instead. Pattern matching on tuples looks more readable than accessing to _1 attribute

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even

rdd1.map { 
    case (x, _) if (x == 1)  =>  throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")             
    case (x, _) => x
}.count()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks

if (x._1 == 1) {
throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
}
x._1
}.count()
Expand All @@ -2138,7 +2139,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
rdd2.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
rdd2.map { x =>
if (x._1 == 1) {
throw new FetchFailedException(BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
throw new FetchFailedException(
BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
}
x._1
}.count()
Expand Down