Skip to content

Commit b78c294

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7492][CARMEL-6086] DAGScheduler exit because of jobId not exist (apache#162)
1 parent 6072e7c commit b78c294

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,7 @@ private[spark] class DAGScheduler(
10051005
* @param job The job whose state to cleanup.
10061006
*/
10071007
private def cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit = {
1008+
logInfo(s"Clean up job:${job.jobId} state")
10081009
val registeredStages = jobIdToStageIds.get(job.jobId)
10091010
if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
10101011
logError("No stages registered for job " + job.jobId)
@@ -1883,6 +1884,8 @@ private[spark] class DAGScheduler(
18831884
eventProcessLoop.post(SubmitMissingTask(
18841885
stage,
18851886
jobId,
1887+
properties,
1888+
partitionsToCompute,
18861889
taskIdToLocations,
18871890
taskBinary,
18881891
partitions))
@@ -1891,6 +1894,8 @@ private[spark] class DAGScheduler(
18911894
SubmitMissingTask(
18921895
stage,
18931896
jobId,
1897+
properties,
1898+
partitionsToCompute,
18941899
taskIdToLocations,
18951900
taskBinary,
18961901
partitions))
@@ -1930,18 +1935,18 @@ private[spark] class DAGScheduler(
19301935
// If we reach here, it is very possible the job was already cancelled.
19311936
return
19321937
}
1933-
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
1938+
19341939
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
19351940
// with this Stage
1936-
val properties = jobIdToActiveJob(missingTask.jobId).properties
1941+
val properties = missingTask.properties
19371942
val artifacts = jobIdToActiveJob(missingTask.jobId).artifacts
19381943

19391944
val tasks: Seq[Task[_]] = try {
19401945
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
19411946
stage match {
19421947
case stage: ShuffleMapStage =>
19431948
stage.pendingPartitions.clear()
1944-
partitionsToCompute.map { id =>
1949+
missingTask.partitionsToCompute.map { id =>
19451950
val locs = missingTask.taskIdToLocations(id)
19461951
val part = missingTask.partitions(id)
19471952
stage.pendingPartitions += id
@@ -1952,7 +1957,7 @@ private[spark] class DAGScheduler(
19521957
}
19531958

19541959
case stage: ResultStage =>
1955-
partitionsToCompute.map { id =>
1960+
missingTask.partitionsToCompute.map { id =>
19561961
val p: Int = stage.partitions(id)
19571962
val part = missingTask.partitions(p)
19581963
val locs = missingTask.taskIdToLocations(id)

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ case class SpeculativeTaskSubmitted(task: Task[_], taskIndex: Int = -1) extends
113113
private[scheduler] case class SubmitMissingTask(
114114
stage: Stage,
115115
jobId: Int,
116+
properties: Properties,
117+
partitionsToCompute: Seq[Int],
116118
taskIdToLocations: scala.collection.Map[Int, Seq[TaskLocation]],
117119
taskBinary: Broadcast[Array[Byte]],
118120
partitions: Array[Partition]) extends DAGSchedulerEvent

0 commit comments

Comments
 (0)