Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object CommandUtils extends Logging {
}
}
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
paths, sessionState.newHadoopConf(), pathFilter, spark)
paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true)
fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
} else {
partitions.map { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class InMemoryFileIndex(
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
pathsToFetch, hadoopConf, filter, sparkSession)
pathsToFetch, hadoopConf, filter, sparkSession, areRootPaths = true)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
Expand Down Expand Up @@ -166,12 +166,22 @@ object InMemoryFileIndex extends Logging {
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
sparkSession: SparkSession,
areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {

val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
val leafFiles = listLeafFiles(
path,
hadoopConf,
filter,
Some(sparkSession),
ignoreMissingFiles = ignoreMissingFiles,
isRootPath = areRootPaths)
(path, leafFiles)
}
}

Expand Down Expand Up @@ -204,7 +214,14 @@ object InMemoryFileIndex extends Logging {
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
val leafFiles = listLeafFiles(
path,
hadoopConf,
filter,
None,
ignoreMissingFiles = ignoreMissingFiles,
isRootPath = areRootPaths)
(path, leafFiles)
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
Expand Down Expand Up @@ -267,15 +284,24 @@ object InMemoryFileIndex extends Logging {
path: Path,
hadoopConf: Configuration,
filter: PathFilter,
sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
sessionOpt: Option[SparkSession],
ignoreMissingFiles: Boolean,
isRootPath: Boolean): Seq[FileStatus] = {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)

// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
case _: FileNotFoundException =>
// If we are listing a root path (e.g. the top level directory of a table), ignore
// missing files. This is necessary in order to be able to drop SessionCatalog tables
// when the table's root directory has been deleted (see discussion at SPARK-27676).

// If we are NOT listing a root path then a FileNotFoundException here means that the
// directory was present in a previous round of file listing but is absent in this
// listing, likely indicating a race condition (e.g. concurrent table overwrite or S3
// list inconsistency).
case _: FileNotFoundException if isRootPath || ignoreMissingFiles =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}
Expand All @@ -286,9 +312,23 @@ object InMemoryFileIndex extends Logging {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = sessionOpt match {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
bulkListLeafFiles(
dirs.map(_.getPath),
hadoopConf,
filter,
session,
areRootPaths = false
).flatMap(_._2)
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
dirs.flatMap { dir =>
listLeafFiles(
dir.getPath,
hadoopConf,
filter,
sessionOpt,
ignoreMissingFiles = ignoreMissingFiles,
isRootPath = false)
}
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
Expand Down Expand Up @@ -330,7 +370,7 @@ object InMemoryFileIndex extends Logging {
}
Some(lfs)
} catch {
case _: FileNotFoundException =>
case _: FileNotFoundException if ignoreMissingFiles =>
missingFiles += f.getPath.toString
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.File
import java.io.{File, FileNotFoundException}
import java.net.URI

import scala.collection.mutable
Expand Down Expand Up @@ -167,7 +167,7 @@ class FileIndexSuite extends SharedSQLContext {
}
}

test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
test("InMemoryFileIndex: root folders that don't exist don't throw exceptions") {
withTempDir { dir =>
val deletedFolder = new File(dir, "deleted")
assert(!deletedFolder.exists())
Expand All @@ -178,6 +178,41 @@ class FileIndexSuite extends SharedSQLContext {
}
}

test("SPARK-27676: InMemoryFileIndex respects ignoreMissingFiles config for non-root paths") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one config parallelPartitionDiscoveryThreshold can control code path of partition discovery. With the default value, this only tests serial listing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. In the case of a parallel listing, this would cause the listing Spark job to fail with a FileNotFoundException (after maxTaskRetries attempts to list the missing file).

In the interests of complete test coverage, I'll update the test case to exercise the parallel listing path, too.

(Combinatorial test coverage is hard!)

def doTest(): Unit = {
def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex(
spark, Seq(DeletionRaceFileSystem.rootFolderPath), Map.empty, None)

withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
intercept[FileNotFoundException] {
makeCatalog()
}
}

withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
val catalog = makeCatalog()
// doesn't throw an exception
assert(catalog.listLeafFiles(catalog.rootPaths).isEmpty)
}
}

withClue("test missing subdirectories") {
withSQLConf(
"fs.mockFs.impl" -> classOf[SubdirectoryDeletionRaceFileSystem].getName,
"fs.mockFs.impl.disable.cache" -> "true") {
doTest()
}
}

withClue("test missing leaf files") {
withSQLConf(
"fs.mockFs.impl" -> classOf[FileDeletionRaceFileSystem].getName,
"fs.mockFs.impl.disable.cache" -> "true") {
doTest()
}
}
}

test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
withTempDir { dir =>
Expand Down Expand Up @@ -356,6 +391,59 @@ class FileIndexSuite extends SharedSQLContext {

}

object DeletionRaceFileSystem {
val rootFolderPath: Path = new Path("mockFs:///rootFolder/")
val subFolderPath: Path = new Path(rootFolderPath, "subFolder")
val leafFilePath: Path = new Path(subFolderPath, "leafFile")
val rootListing: Array[FileStatus] =
Array(new FileStatus(0, true, 0, 0, 0, subFolderPath))
val subFolderListing: Array[FileStatus] =
Array(new FileStatus(0, false, 0, 100, 0, leafFilePath))
}

// Used in SPARK-27676 test to simulate a race where a subdirectory is deleted
// between back-to-back listing calls.
class SubdirectoryDeletionRaceFileSystem extends RawLocalFileSystem {
import DeletionRaceFileSystem._

override def getScheme: String = "mockFs"

override def listStatus(path: Path): Array[FileStatus] = {
if (path == rootFolderPath) {
rootListing
} else if (path == subFolderPath) {
throw new FileNotFoundException()
} else {
throw new IllegalArgumentException()
}
}
}

// Used in SPARK-27676 test to simulate a race where a file is deleted between
// being listed and having its size / file status checked.
class FileDeletionRaceFileSystem extends RawLocalFileSystem {
import DeletionRaceFileSystem._

override def getScheme: String = "mockFs"

override def listStatus(path: Path): Array[FileStatus] = {
if (path == rootFolderPath) {
rootListing
} else if (path == subFolderPath) {
subFolderListing
} else {
throw new IllegalArgumentException()
}
}

override def getFileBlockLocations(
file: FileStatus,
start: Long,
len: Long): Array[BlockLocation] = {
throw new FileNotFoundException()
}
}

class FakeParentPathFileSystem extends RawLocalFileSystem {
override def getScheme: String = "mockFs"

Expand Down