Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-34541][CORE] Fixed an issue where data could not be cleaned up…
… when unregisterShuffle.
  • Loading branch information
yikf committed Mar 1, 2021
commit 7319255632c64e339c94616fe44a35b72c67c1d9
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,40 @@ class SortShuffleManagerSuite extends SparkFunSuite with Matchers with LocalSpar
)))
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: revert the unrelated change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line-133 is empty line, we add test in SortShuffleManagerSuite before, after #31664 (comment), move the test to ShuffleSuite, So i change the empty line together.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the reason, but it's still not a necessary change, especially when there's no other changes in the file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,i will revert it.

test("Data could not be cleaned up when unregisterShuffle") {
test("Shuffle data can be cleaned up whether spark.shuffle.useOldFetchProtocol=true/false") {
Copy link
Member

@Ngone51 Ngone51 Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually test different config values like this:

Seq(true, false).foreach { value =>
   test(s"SPARK-34541: shuffle data can be cleaned up whether spark.shuffle.useOldFetchProtocol=$value") {
    ...
    conf.set(spark.shuffle.useOldFetchProtocol, value)
    ...
}

Could you follow this way?

Copy link
Contributor Author

@yikf yikf Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, Thanks for guiding codeStyle very much, xiexie~

withTempDir { tmpDir =>
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.local.dir", tmpDir.getAbsolutePath)
sc = new SparkContext("local", "SPARK-34541", conf)
val rdd = sc.parallelize(1 to 10, 1).map(x => (x, x))
// Create a shuffleRdd
val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new HashPartitioner(4))
.setSerializer(new JavaSerializer(conf))
def getAllFiles: Set[File] =
FileUtils.listFiles(tmpDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet
val filesBeforeShuffle = getAllFiles
// Force the shuffle to be performed
shuffledRdd.count()
// Ensure that the shuffle actually created files that will need to be cleaned up
val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle
filesCreatedByShuffle.map(_.getName) should be
Set("shuffle_0_0_0.data", "shuffle_0_0_0.index")
// Check that the cleanup actually removes the files
sc.env.blockManager.master.removeShuffle(0, blocking = true)
for (file <- filesCreatedByShuffle) {
assert (!file.exists(), s"Shuffle file $file was not cleaned up")
def runJobAndRemoveShuffle(conf: SparkConf, mapId: Long): Unit = {
sc = new SparkContext("local", "test", conf)
// For making the taskAttemptId starts from 1.
sc.parallelize(1 to 10).count()
val rdd = sc.parallelize(1 to 10, 1).map(x => (x, x))
// Create a shuffleRdd
val shuffledRdd = new ShuffledRDD[Int, Int, Int](rdd, new HashPartitioner(4))
.setSerializer(new JavaSerializer(conf))
val filesBeforeShuffle = getAllFiles
// Force the shuffle to be performed
shuffledRdd.count()
// Ensure that the shuffle actually created files that will need to be cleaned up
val filesCreatedByShuffle = getAllFiles -- filesBeforeShuffle
filesCreatedByShuffle.map(_.getName) should be
Set("shuffle_0_" + mapId + "_0.data", "shuffle_0_" + mapId + "_0.index")
// Check that the cleanup actually removes the files
sc.env.blockManager.master.removeShuffle(0, blocking = true)
for (file <- filesCreatedByShuffle) {
assert (!file.exists(), s"Shuffle file $file was not cleaned up")
}
}
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.local.dir", tmpDir.getAbsolutePath)
conf.set("spark.shuffle.useOldFetchProtocol", "true")
runJobAndRemoveShuffle(conf, 0)
if (sc != null) {
sc.stop()
}
conf.set("spark.shuffle.useOldFetchProtocol", "false")
runJobAndRemoveShuffle(conf, 1)
}
}
}