Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking c…
…rc files

This PR fixes the leak of crc files from CheckpointFileManager when FileContextBasedCheckpointFileManager is being used.

Spark hits the Hadoop bug, [HADOOP-16255](https://issues.apache.org/jira/browse/HADOOP-16255) which seems to be a long-standing issue.

This is there're two `renameInternal` methods:

```
public void renameInternal(Path src, Path dst)
public void renameInternal(final Path src, final Path dst, boolean overwrite)
```

which should be overridden to handle all cases but ChecksumFs only overrides method with 2 params, so when latter is called FilterFs.renameInternal(...) is called instead, and it will do rename with RawLocalFs as underlying filesystem.

The bug is related to FileContext, so FileSystemBasedCheckpointFileManager is not affected.

[SPARK-17475](https://issues.apache.org/jira/browse/SPARK-17475) took a workaround for this bug, but [SPARK-23966](https://issues.apache.org/jira/browse/SPARK-23966) seemed to bring regression.

This PR deletes crc file as "best-effort" when renaming, as failing to delete crc file is not that critical to fail the task.

This PR prevents crc files not being cleaned up even purging batches. Too many files in same directory often hurts performance, as well as each crc file occupies more space than its own size so possible to occupy nontrivial amount of space when batches go up to 100000+.

No.

Some unit tests are modified to check leakage of crc files.

Closes #25488 from HeartSaVioR/SPARK-28025.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
  • Loading branch information
HeartSaVioR committed Aug 23, 2019
commit c40f8a1a6ba12dd51e270e71594450c8e0df8dce
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
import Options.Rename._
fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
// TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
mayRemoveCrcFile(srcPath)
}


Expand All @@ -345,5 +347,17 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs
case _ => false
}

private def mayRemoveCrcFile(path: Path): Unit = {
try {
val checksumFile = new Path(path.getParent, s".${path.getName}.crc")
if (exists(checksumFile)) {
// checksum file exists, deleting it
delete(checksumFile)
}
} catch {
case NonFatal(_) => // ignore, we are removing crc file as "best-effort"
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
assert(fm.exists(path))
fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception

// crc file should not be leaked when origin file doesn't exist.
// The implementation of Hadoop filesystem may filter out checksum file, so
// listing files from local filesystem.
val fileNames = new File(path.getParent.toString).listFiles().toSeq
.filter(p => p.isFile).map(p => p.getName)
val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
// remove first "." and last ".crc"
name.substring(1, name.length - 4)
}

// Check all origin files exist for all crc files.
assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")

// Open and delete
fm.open(path).close()
fm.delete(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.concurrent.Waiters._
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.UninterruptibleThread

Expand Down Expand Up @@ -59,6 +60,21 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}

test("HDFSMetadataLog: purge") {
testPurge()
}

Seq(
classOf[FileSystemBasedCheckpointFileManager],
classOf[FileContextBasedCheckpointFileManager]
).map(_.getCanonicalName).foreach { cls =>
test(s"HDFSMetadataLog: purge - explicit file manager - $cls") {
withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> cls) {
testPurge()
}
}
}

private def testPurge(): Unit = {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
Expand All @@ -75,12 +91,16 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
assert(metadataLog.get(2).isDefined)
assert(metadataLog.getLatest().get._1 == 2)

// There should be exactly one file, called "2", in the metadata directory.
// There should be at most two files, called "2", and optionally crc file,
// in the metadata directory.
// This check also tests for regressions of SPARK-17475
val allFiles = new File(metadataLog.metadataPath.toString).listFiles()
.filter(!_.getName.startsWith(".")).toSeq
assert(allFiles.size == 1)
assert(allFiles(0).getName() == "2")
val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
assert(allFiles.size <= 2)
assert(allFiles.exists(_.getName == "2"))
if (allFiles.size == 2) {
// there's possibly crc file being left as well
assert(allFiles.exists(_.getName == ".2.crc"))
}
}
}

Expand Down