Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement fix
  • Loading branch information
JoshRosen committed Oct 12, 2021
commit 39111e103a4030c80ef613776a21cc0de26e361a
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,32 @@ private[spark] class DAGScheduler(
missing.toList
}

/** Invoke `.partitions` on the given RDD and all of its ancestors */
private def eagerlyComputePartitionsForRddAndAncestors(rdd: RDD[_]): Unit = {
val visitedRdds = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd

def visit(rdd: RDD[_]): Unit = {
if (!visitedRdds(rdd)) {
visitedRdds += rdd

// Eagerly compute:
rdd.partitions

for (dep <- rdd.dependencies) {
waitingForVisit.prepend(dep.rdd)
}
}
}

while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.remove(0))
}
}

/**
* Registers the given jobId among the jobs that need the given stage and
* all of that stage's ancestors.
Expand Down Expand Up @@ -841,6 +867,11 @@ private[spark] class DAGScheduler(
"Total number of partitions: " + maxPartitions)
}

// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a good idea to add an assertion in the DebugFilesystem we have to check that it's not accessed within the event loop thread? It might help catch other cases where event loop might be doing heavy blocking operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea, but I'd like to defer it to a separate followup PR. I've filed https://issues.apache.org/jira/browse/SPARK-37009 to track that.


val jobId = nextJobId.getAndIncrement()
if (partitions.isEmpty) {
val clonedProperties = Utils.cloneProperties(properties)
Expand Down