Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ class DAGScheduler(
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
stage.makeNewStageAttempt(partitionsToCompute.size)
outputCommitCoordinator.stageStart(stage.id)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

Expand Down Expand Up @@ -937,8 +937,8 @@ class DAGScheduler(
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
Expand Down
20 changes: 13 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,28 @@ private[spark] abstract class Stage(

var pendingTasks = new HashSet[Task[_]]

/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0

val name = callSite.shortForm
val details = callSite.longForm

/** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
var latestInfo: StageInfo = StageInfo.fromStage(this)
/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
*/
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

/** Return a new attempt id, starting with 0. */
def newAttemptId(): Int = {
val id = nextAttemptId
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(numPartitionsToCompute: Int): Unit = {
_latestInfo = StageInfo.fromStage(this, nextAttemptId, Some(numPartitionsToCompute))
nextAttemptId += 1
id
}

def attemptId: Int = nextAttemptId
/** Returns the StageInfo for the most recent attempt for this stage. */
def latestInfo: StageInfo = _latestInfo

override final def hashCode(): Int = id
override final def equals(other: Any): Boolean = other match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ private[spark] object StageInfo {
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
def fromStage(stage: Stage, attemptId: Int, numTasks: Option[Int] = None): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(
stage.id,
stage.attemptId,
attemptId,
stage.name,
numTasks.getOrElse(stage.numTasks),
rddInfos,
Expand Down