Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge remote-tracking branch 'origin/master' into SPARK-9266
  • Loading branch information
JoshRosen committed Jul 23, 2015
commit c268cb5f6426eb087db63a618f387d5bd4d179d0
71 changes: 0 additions & 71 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -646,77 +646,6 @@ class DAGScheduler(
}
}

/**
* Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
* We run the operation in a separate thread just in case it takes a bunch of time, so that we
* don't block the DAGScheduler event loop or other concurrent jobs.
*/
protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally")
new Thread("Local computation of job " + job.jobId) {
override def run() {
runLocallyWithinThread(job)
}
}.start()
}

// Broken out for easier testing in DAGSchedulerSuite.
protected def runLocallyWithinThread(job: ActiveJob) {
var jobResult: JobResult = JobSucceeded
try {
val rdd = job.finalStage.rdd
val split = rdd.partitions(job.partitions(0))
val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
val taskContext =
new TaskContextImpl(
job.finalStage.id,
job.partitions(0),
taskAttemptId = 0,
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
metricsSystem = env.metricsSystem,
runningLocally = true)
TaskContext.setTaskContext(taskContext)
var threwException = true
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
threwException = false
} finally {
taskContext.markTaskCompleted()
TaskContext.unset()
// Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
// make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)
&& !threwException) {
throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
} else {
logError(s"Managed memory leak detected; size = $freedMemory bytes")
}
}
}
} catch {
case e: Exception =>
val exception = new SparkDriverExecutionException(e)
jobResult = JobFailed(exception)
job.listener.jobFailed(exception)
case oom: OutOfMemoryError =>
val exception = new SparkException("Local job aborted due to out of memory error", oom)
jobResult = JobFailed(exception)
job.listener.jobFailed(exception)
} finally {
val s = job.finalStage
// clean up data structures that were populated for a local job,
// but that won't get cleaned up via the normal paths through
// completion events or stage abort
stageIdToStage -= s.id
jobIdToStageIds -= job.jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
}
}

/** Finds the earliest-created active job that needs the stage */
// TODO: Probably should actually find among the active jobs that need this
// stage the one with the highest priority (highest-priority pool, earliest created).
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.