Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ object SQLExecution extends Logging {
shuffleIds.foreach { shuffleId =>
queryExecution.shuffleCleanupMode match {
case RemoveShuffleFiles =>
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
// Same as what we do in ContextCleaner.doCleanupShuffle, but do not unregister
// the shuffle on MapOutputTracker, so that stage retries would be triggered.
// Set blocking to Utils.isTesting to deflake unit tests.
sc.shuffleDriverComponents.removeShuffle(shuffleId, Utils.isTesting)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup is not critical, I think the second parameter blocking can always be false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit tests would be flaky if we use false here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, can we add some comments to explain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done.

case SkipMigration =>
SparkEnv.get.blockManager.migratableResolver.addShuffleToSkip(shuffleId)
case _ => // this should not happen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ class QueryExecutionSuite extends SharedSparkSession {
val blockManager = spark.sparkContext.env.blockManager
blockManager.diskBlockManager.getAllBlocks().foreach {
case ShuffleIndexBlockId(shuffleId, _, _) =>
spark.sparkContext.env.shuffleManager.unregisterShuffle(shuffleId)
spark.sparkContext.shuffleDriverComponents.removeShuffle(shuffleId, true)
case _ =>
}
}
Expand Down