Skip to content
Prev Previous commit
Next Next commit
Renamed newResultStage to createResultStage and added commenting
  • Loading branch information
kayousterhout committed Jun 15, 2016
commit 1b7a3386fb631f1a41ea73d6bfdc9961ed8e4234
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ class DAGScheduler(
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
Expand All @@ -315,8 +320,8 @@ class DAGScheduler(
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks,
getOrCreateParentStages(rdd, jobId), jobId, rdd.creationSite, shuffleDep)
val stage = new ShuffleMapStage(
id, rdd, numTasks, getOrCreateParentStages(rdd, jobId), jobId, rdd.creationSite, shuffleDep)

stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
Expand Down Expand Up @@ -346,7 +351,7 @@ class DAGScheduler(
/**
* Create a ResultStage associated with the provided jobId.
*/
private def newResultStage(
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
Expand Down Expand Up @@ -830,7 +835,7 @@ class DAGScheduler(
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
Expand Down