Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.existentials
Expand Down Expand Up @@ -140,8 +141,8 @@ class DAGScheduler(
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)

private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val jobIdToStageIds = new TrieMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new TrieMap[Int, Stage]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need TrieMap for concurrence purpose, since job schedule is FIFO(even if after move createResultStage() ) ?

BTW, out of my curiosity, what's TrieMap advantages compare to ConcurrentHashMap

/**
* Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for
* that dependency. Only includes stages that are part of currently running job (when the job(s)
Expand Down Expand Up @@ -370,7 +371,7 @@ class DAGScheduler(
/**
* Create a ResultStage associated with the provided jobId.
*/
private def createResultStage(
private[scheduler] def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
Expand Down Expand Up @@ -600,9 +601,20 @@ class DAGScheduler(

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(rdd, func2, partitions.toArray, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
waiter.jobFailed(e)
return waiter
}
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
jobId, finalStage, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
Expand Down Expand Up @@ -667,8 +679,19 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.length).toArray
val jobId = nextJobId.getAndIncrement()
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(rdd, func2, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return listener.awaitResult()
}
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties)))
jobId, finalStage, partitions, callSite, listener, SerializationUtils.clone(properties)))
listener.awaitResult() // Will throw an exception if the job fails
}

Expand Down Expand Up @@ -854,24 +877,11 @@ class DAGScheduler(
}

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
finalStage: ResultStage,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
Expand Down Expand Up @@ -1773,8 +1783,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
}

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case JobSubmitted(jobId, finalStage, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, finalStage, partitions, callSite, listener, properties)

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ private[scheduler] sealed trait DAGSchedulerEvent
/** A result-yielding job was submitted on a target RDD */
private[scheduler] case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
finalStage: ResultStage,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,20 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
partitions: Array[Int],
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
listener: JobListener = jobListener,
properties: Properties = null): Int = {
properties: Properties = null,
scheduler: DAGScheduler = scheduler): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties))
var finalStage: ResultStage = null
val callSite = CallSite("", "")
try {
finalStage = scheduler.createResultStage(rdd, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return jobId
}
runEvent(JobSubmitted(jobId, finalStage, partitions, callSite, listener, properties))
jobId
}

Expand Down Expand Up @@ -647,7 +658,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
blockManagerMaster,
sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(noKillScheduler)
val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
val jobId = submit(new MyRDD(sc, 1, Nil), Array(0), scheduler = noKillScheduler)
cancel(jobId)
// Because the job wasn't actually cancelled, we shouldn't have received a failure message.
assert(failure === null)
Expand Down