Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated listLeafFiles() handile SPARK-19059
  • Loading branch information
jayadevanmurali authored Jan 18, 2017
commit 74e4a1a2740262fcf84e8c0704ddf2df57e614a0
Original file line number Diff line number Diff line change
Expand Up @@ -385,55 +385,54 @@ object PartitioningAwareFileIndex extends Logging {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
val name = path.getName.toLowerCase
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
} else {
// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
case _: FileNotFoundException =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]

// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
case _: FileNotFoundException =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}

val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName))

val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = sessionOpt match {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}

val allLeafStatuses = {
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = sessionOpt match {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus =>
f

// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
val locations = fs.getFileBlockLocations(f, 0, f.getLen)
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}

allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus =>
f

// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `listLeafFilesInParallel` when the number of
// paths exceeds threshold.
case f =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
val locations = fs.getFileBlockLocations(f, 0, f.getLen)
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
lfs
}
lfs
}
}

Expand Down