Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add same fix to submitMapStage() and runApproximateJob()
  • Loading branch information
JoshRosen committed Oct 12, 2021
commit 3bcc554dfa3dcea8e53ee17c19c09890ec6dcabd
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,12 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
return new PartialResult(evaluator.currentResult(), true)
}

// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)

val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessLoop.post(JobSubmitted(
Expand Down Expand Up @@ -993,6 +999,11 @@ private[spark] class DAGScheduler(
throw SparkCoreErrors.cannotRunSubmitMapStageOnZeroPartitionRDDError()
}

// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like before the three places calling eagerlyComputePartitionsForRddAndAncestors, there are some checks for rdd.partitions, so actually we just (need) compute its ancestors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but I chose not to make that optimization because (a) it doesn't actually matter from a performance perspective (accessing already-computed .partitions is very cheap) and (b) I think the optimization would make the code more complex.

// We create a JobWaiter with only one "task", which will be marked as complete when the whole
// map stage has completed, and will be passed the MapOutputStatistics for that stage.
// This makes it easier to avoid race conditions between the user code and the map output
Expand Down