From 2910cb9c3c14c97ba4b26d1fab09ab35284aaa3e Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Mon, 10 Dec 2018 22:30:29 +0900 Subject: [PATCH 01/10] Throws better exception when reading files that start with underscore --- .../apache/spark/sql/execution/datasources/DataSource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 795a6d0b6b04..34876cc48e38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -554,7 +554,8 @@ case class DataSource( // Sufficient to check head of the globPath seq for non-glob scenario // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { + if (checkFilesExist && + (!fs.exists(globPath.head) || InMemoryFileIndex.shouldFilterOut(globPath.head.getName))) { throw new AnalysisException(s"Path does not exist: ${globPath.head}") } globPath From 08850ae2f64449bae5c449e53c00fa5051479380 Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Fri, 14 Dec 2018 10:59:19 +0900 Subject: [PATCH 02/10] Clarify the message further with a different exception for file which is ignored --- .../spark/sql/execution/datasources/DataSource.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 34876cc48e38..f1edc76f9b5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -554,9 +554,13 @@ case class DataSource( // Sufficient to check head of the globPath seq for non-glob scenario // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && - (!fs.exists(globPath.head) || InMemoryFileIndex.shouldFilterOut(globPath.head.getName))) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") + if (checkFilesExist) { + val firstPath = globPath.head + if (!fs.exists(firstPath)) { + throw new AnalysisException(s"Path does not exist: ${firstPath}") + } else if (InMemoryFileIndex.shouldFilterOut(firstPath.getName)) { + throw new AnalysisException(s"Path exists but is ignored: ${firstPath}") + } } globPath }.toSeq From 777b4db54748aa417d412f66767f1076ec512c40 Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Tue, 25 Dec 2018 20:55:10 +0900 Subject: [PATCH 03/10] Revert "Clarify the message further with a different exception for file which is ignored" This reverts commit 08850ae2f64449bae5c449e53c00fa5051479380. --- .../spark/sql/execution/datasources/DataSource.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f1edc76f9b5d..34876cc48e38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -554,13 +554,9 @@ case class DataSource( // Sufficient to check head of the globPath seq for non-glob scenario // Don't need to check once again if files exist in streaming mode - if (checkFilesExist) { - val firstPath = globPath.head - if (!fs.exists(firstPath)) { - throw new AnalysisException(s"Path does not exist: ${firstPath}") - } else if (InMemoryFileIndex.shouldFilterOut(firstPath.getName)) { - throw new AnalysisException(s"Path exists but is ignored: ${firstPath}") - } + if (checkFilesExist && + (!fs.exists(globPath.head) || InMemoryFileIndex.shouldFilterOut(globPath.head.getName))) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") } globPath }.toSeq From c0a57d983e370b65ee3bafc4a018c1180fddf471 Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Tue, 25 Dec 2018 20:55:40 +0900 Subject: [PATCH 04/10] Revert "Throws better exception when reading files that start with underscore" This reverts commit 2910cb9c3c14c97ba4b26d1fab09ab35284aaa3e. --- .../apache/spark/sql/execution/datasources/DataSource.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 34876cc48e38..795a6d0b6b04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -554,8 +554,7 @@ case class DataSource( // Sufficient to check head of the globPath seq for non-glob scenario // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && - (!fs.exists(globPath.head) || InMemoryFileIndex.shouldFilterOut(globPath.head.getName))) { + if (checkFilesExist && !fs.exists(globPath.head)) { throw new AnalysisException(s"Path does not exist: ${globPath.head}") } globPath From 1b64ffb44991c2713621c0ca4360e6727666b27a Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Tue, 25 Dec 2018 23:46:50 +0900 Subject: [PATCH 05/10] Log a debug statement about files/directories that are ignored, and throw an exception only if all of the files are filtered out --- .../execution/datasources/DataSource.scala | 28 +++++++++- .../datasources/InMemoryFileIndex.scala | 54 +++++++++++-------- .../src/test/resources/test-data/_cars.csv | 7 +++ .../execution/datasources/csv/CSVSuite.scala | 45 +++++++++++++++- 4 files changed, 111 insertions(+), 23 deletions(-) create mode 100644 sql/core/src/test/resources/test-data/_cars.csv diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 795a6d0b6b04..fdad36c04457 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -542,7 +543,7 @@ case class DataSource( checkFilesExist: Boolean): Seq[Path] = { val allPaths = caseInsensitiveOptions.get("path") ++ paths val hadoopConf = sparkSession.sessionState.newHadoopConf() - allPaths.flatMap { path => + val allGlobPath = allPaths.flatMap { path => val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -559,6 +560,31 @@ case class DataSource( } globPath }.toSeq + + if (checkFilesExist) { + val (allLeafPath, ignoredPath) = { + val pathFilter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) + val discovered = InMemoryFileIndex.bulkListLeafFiles( + allGlobPath, hadoopConf, pathFilter, sparkSession) + val paths = discovered.map { case (_, leaf, ignored) => + (leaf.map(_.getPath), ignored.map(_.getPath)) + } + (paths.flatMap(_._1), paths.flatMap(_._2)) + } + if (ignoredPath.nonEmpty) { + if (allLeafPath.isEmpty) { + throw new AnalysisException( + "All files were ignored. The following files were ignored during file scan:\n" + + s"${ignoredPath.mkString("\n ")}") + } else { + logDebug( + "The following files were ignored during file scan:\n" + + s"${ignoredPath.mkString("\n ")}") + } + } + } + + allGlobPath } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index fe418e610da8..9870b35c25fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -125,7 +125,7 @@ class InMemoryFileIndex( val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( pathsToFetch, hadoopConf, filter, sparkSession) - discovered.foreach { case (path, leafFiles) => + discovered.foreach { case (path, leafFiles, _) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) output ++= leafFiles @@ -160,18 +160,20 @@ object InMemoryFileIndex extends Logging { * * This may only be called on the driver. * - * @return for each input path, the set of discovered files for the path + * @return for each input path, the set of discovered files and ignored + * files/directories for the path */ private[sql] def bulkListLeafFiles( paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { + sparkSession: SparkSession): Seq[(Path, Seq[FileStatus], Seq[FileStatus])] = { // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { return paths.map { path => - (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) + val listed = listLeafFiles(path, hadoopConf, filter, Some(sparkSession)) + (path, listed._1, listed._2) } } @@ -204,9 +206,10 @@ object InMemoryFileIndex extends Logging { .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => - (path, listLeafFiles(path, hadoopConf, filter, None)) + val listed = listLeafFiles(path, hadoopConf, filter, None) + (path, listed._1, listed._2) }.iterator - }.map { case (path, statuses) => + }.map { case (path, statuses, ignoredStatuses) => val serializableStatuses = statuses.map { status => // Turn FileStatus into SerializableFileStatus so we can send it back to the driver val blockLocations = status match { @@ -233,14 +236,14 @@ object InMemoryFileIndex extends Logging { status.getAccessTime, blockLocations) } - (path.toString, serializableStatuses) + (path.toString, serializableStatuses, ignoredStatuses) }.collect() } finally { sparkContext.setJobDescription(previousJobDescription) } // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => + statusMap.map { case (path, serializableStatuses, ignoredStatuses) => val statuses = serializableStatuses.map { f => val blockLocations = f.blockLocations.map { loc => new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) @@ -251,7 +254,7 @@ object InMemoryFileIndex extends Logging { new Path(f.path)), blockLocations) } - (new Path(path), statuses) + (new Path(path), statuses, ignoredStatuses) } } @@ -267,7 +270,7 @@ object InMemoryFileIndex extends Logging { path: Path, hadoopConf: Configuration, filter: PathFilter, - sessionOpt: Option[SparkSession]): Seq[FileStatus] = { + sessionOpt: Option[SparkSession]): (Seq[FileStatus], Seq[FileStatus]) = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -280,23 +283,32 @@ object InMemoryFileIndex extends Logging { Array.empty[FileStatus] } - val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + val (filteredStatuses, ignoredStatuses) = statuses.partition( + status => !shouldFilterOut(status.getPath.getName)) - val allLeafStatuses = { + val (allLeafStatuses, allIgnoredStatuses) = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) - val nestedFiles: Seq[FileStatus] = sessionOpt match { + val nested: (Seq[FileStatus], Seq[FileStatus]) = sessionOpt match { case Some(session) => - bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) + val listed = bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session) + (listed.flatMap(_._2), listed.flatMap(_._3)) case _ => - dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) + val listed = dirs.map(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) + (listed.flatMap(_._1), listed.flatMap(_._2)) + } + val allFiles = topLevelFiles ++ nested._1 + val ignoredFiles = ignoredStatuses ++ nested._2 + if (filter != null) { + val (filtered, ignored) = allFiles.partition(f => filter.accept(f.getPath)) + (filtered, ignoredFiles ++ ignored) + } else { + (allFiles, ignoredFiles) } - val allFiles = topLevelFiles ++ nestedFiles - if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } - val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = allLeafStatuses.filterNot( - status => shouldFilterOut(status.getPath.getName)) + val (filteredLeafStatuses, ignoredLeafStatuses) = allLeafStatuses.partition( + status => !shouldFilterOut(status.getPath.getName)) + val resolvedLeafStatuses = filteredLeafStatuses.flatMap { case f: LocatedFileStatus => Some(f) @@ -341,7 +353,7 @@ object InMemoryFileIndex extends Logging { s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") } - resolvedLeafStatuses + (resolvedLeafStatuses, allIgnoredStatuses ++ ignoredLeafStatuses) } /** Checks if we should filter out this path name. */ diff --git a/sql/core/src/test/resources/test-data/_cars.csv b/sql/core/src/test/resources/test-data/_cars.csv new file mode 100644 index 000000000000..40ded573ade5 --- /dev/null +++ b/sql/core/src/test/resources/test-data/_cars.csv @@ -0,0 +1,7 @@ + +year,make,model,comment,blank +"2012","Tesla","S","No comment", + +1997,Ford,E350,"Go get one now they are going fast", +2015,Chevy,Volt + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 3b977d74053e..bf820feb5305 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -30,7 +30,7 @@ import scala.util.Properties import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.log4j.{AppenderSkeleton, LogManager} +import org.apache.log4j.{AppenderSkeleton, Level, LogManager} import org.apache.log4j.spi.LoggingEvent import org.apache.spark.{SparkException, TestUtils} @@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te private val carsEmptyValueFile = "test-data/cars-empty-value.csv" private val carsBlankColName = "test-data/cars-blank-column-name.csv" private val carsCrlf = "test-data/cars-crlf.csv" + private val carsFilteredOutFile = "test-data/_cars.csv" private val emptyFile = "test-data/empty.csv" private val commentsFile = "test-data/comments.csv" private val disableCommentsFile = "test-data/disable_comments.csv" @@ -345,6 +346,48 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(result.schema.fieldNames.size === 1) } + test("SPARK-26339 Debug statement if some of the files are filtered out") { + class TestAppender extends AppenderSkeleton { + var events = new java.util.ArrayList[LoggingEvent] + override def close(): Unit = {} + override def requiresLayout: Boolean = false + protected def append(event: LoggingEvent): Unit = events.add(event) + } + + val testAppender1 = new TestAppender + val rootLogger = LogManager.getRootLogger + val origLevel = rootLogger.getLevel + rootLogger.setLevel(Level.DEBUG) + rootLogger.addAppender(testAppender1) + try { + val cars = spark + .read + .format("csv") + .option("header", "false") + .load(testFile(carsFile), testFile(carsFilteredOutFile)) + + verifyCars(cars, withHeader = false, checkTypes = false) + } finally { + rootLogger.setLevel(origLevel) + rootLogger.removeAppender(testAppender1) + } + assert(testAppender1.events.asScala + .exists(msg => msg.getRenderedMessage.contains( + "The following files were ignored during file scan:"))) + } + + test("SPARK-26339 Throw an exception only if all of the files are filtered out") { + val e = intercept[AnalysisException] { + val cars = spark + .read + .format("csv") + .option("header", "false") + .load(testFile(carsFilteredOutFile)) + }.getMessage + assert(e.contains("All files were ignored. " + + "The following files were ignored during file scan:")) + } + test("DDL test with empty file") { withView("carsTable") { spark.sql( From a95637ef14508dcd154e73ee48e2c0a2e51a8abb Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Sat, 29 Dec 2018 15:14:21 +0900 Subject: [PATCH 06/10] Change to check only filename match, not check dir recursively --- .../execution/datasources/DataSource.scala | 23 +++----- .../datasources/InMemoryFileIndex.scala | 54 ++++++++----------- .../execution/datasources/csv/CSVSuite.scala | 9 ++-- 3 files changed, 33 insertions(+), 53 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index fdad36c04457..b8b0c5ba0ff1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -24,7 +24,6 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -562,24 +561,18 @@ case class DataSource( }.toSeq if (checkFilesExist) { - val (allLeafPath, ignoredPath) = { - val pathFilter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) - val discovered = InMemoryFileIndex.bulkListLeafFiles( - allGlobPath, hadoopConf, pathFilter, sparkSession) - val paths = discovered.map { case (_, leaf, ignored) => - (leaf.map(_.getPath), ignored.map(_.getPath)) - } - (paths.flatMap(_._1), paths.flatMap(_._2)) + val (filtered, filteredOut) = allGlobPath.partition { path => + !InMemoryFileIndex.shouldFilterOut(path.getName) } - if (ignoredPath.nonEmpty) { - if (allLeafPath.isEmpty) { + if (filteredOut.nonEmpty) { + if (filtered.isEmpty) { throw new AnalysisException( - "All files were ignored. The following files were ignored during file scan:\n" + - s"${ignoredPath.mkString("\n ")}") + "All path were ignored. The following path were ignored:\n" + + s"${filteredOut.mkString("\n ")}") } else { logDebug( - "The following files were ignored during file scan:\n" + - s"${ignoredPath.mkString("\n ")}") + "The following path were ignored:\n" + + s"${filteredOut.mkString("\n ")}") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 9870b35c25fe..fe418e610da8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -125,7 +125,7 @@ class InMemoryFileIndex( val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( pathsToFetch, hadoopConf, filter, sparkSession) - discovered.foreach { case (path, leafFiles, _) => + discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) output ++= leafFiles @@ -160,20 +160,18 @@ object InMemoryFileIndex extends Logging { * * This may only be called on the driver. * - * @return for each input path, the set of discovered files and ignored - * files/directories for the path + * @return for each input path, the set of discovered files for the path */ private[sql] def bulkListLeafFiles( paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - sparkSession: SparkSession): Seq[(Path, Seq[FileStatus], Seq[FileStatus])] = { + sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { return paths.map { path => - val listed = listLeafFiles(path, hadoopConf, filter, Some(sparkSession)) - (path, listed._1, listed._2) + (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) } } @@ -206,10 +204,9 @@ object InMemoryFileIndex extends Logging { .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => - val listed = listLeafFiles(path, hadoopConf, filter, None) - (path, listed._1, listed._2) + (path, listLeafFiles(path, hadoopConf, filter, None)) }.iterator - }.map { case (path, statuses, ignoredStatuses) => + }.map { case (path, statuses) => val serializableStatuses = statuses.map { status => // Turn FileStatus into SerializableFileStatus so we can send it back to the driver val blockLocations = status match { @@ -236,14 +233,14 @@ object InMemoryFileIndex extends Logging { status.getAccessTime, blockLocations) } - (path.toString, serializableStatuses, ignoredStatuses) + (path.toString, serializableStatuses) }.collect() } finally { sparkContext.setJobDescription(previousJobDescription) } // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses, ignoredStatuses) => + statusMap.map { case (path, serializableStatuses) => val statuses = serializableStatuses.map { f => val blockLocations = f.blockLocations.map { loc => new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) @@ -254,7 +251,7 @@ object InMemoryFileIndex extends Logging { new Path(f.path)), blockLocations) } - (new Path(path), statuses, ignoredStatuses) + (new Path(path), statuses) } } @@ -270,7 +267,7 @@ object InMemoryFileIndex extends Logging { path: Path, hadoopConf: Configuration, filter: PathFilter, - sessionOpt: Option[SparkSession]): (Seq[FileStatus], Seq[FileStatus]) = { + sessionOpt: Option[SparkSession]): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -283,32 +280,23 @@ object InMemoryFileIndex extends Logging { Array.empty[FileStatus] } - val (filteredStatuses, ignoredStatuses) = statuses.partition( - status => !shouldFilterOut(status.getPath.getName)) + val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - val (allLeafStatuses, allIgnoredStatuses) = { + val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) - val nested: (Seq[FileStatus], Seq[FileStatus]) = sessionOpt match { + val nestedFiles: Seq[FileStatus] = sessionOpt match { case Some(session) => - val listed = bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session) - (listed.flatMap(_._2), listed.flatMap(_._3)) + bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) case _ => - val listed = dirs.map(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) - (listed.flatMap(_._1), listed.flatMap(_._2)) - } - val allFiles = topLevelFiles ++ nested._1 - val ignoredFiles = ignoredStatuses ++ nested._2 - if (filter != null) { - val (filtered, ignored) = allFiles.partition(f => filter.accept(f.getPath)) - (filtered, ignoredFiles ++ ignored) - } else { - (allFiles, ignoredFiles) + dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) } + val allFiles = topLevelFiles ++ nestedFiles + if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } - val missingFiles = mutable.ArrayBuffer.empty[String] - val (filteredLeafStatuses, ignoredLeafStatuses) = allLeafStatuses.partition( - status => !shouldFilterOut(status.getPath.getName)) + val missingFiles = mutable.ArrayBuffer.empty[String] + val filteredLeafStatuses = allLeafStatuses.filterNot( + status => shouldFilterOut(status.getPath.getName)) val resolvedLeafStatuses = filteredLeafStatuses.flatMap { case f: LocatedFileStatus => Some(f) @@ -353,7 +341,7 @@ object InMemoryFileIndex extends Logging { s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") } - (resolvedLeafStatuses, allIgnoredStatuses ++ ignoredLeafStatuses) + resolvedLeafStatuses } /** Checks if we should filter out this path name. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index bf820feb5305..0eaf7bfe1d85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -346,7 +346,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(result.schema.fieldNames.size === 1) } - test("SPARK-26339 Debug statement if some of the files are filtered out") { + test("SPARK-26339 Debug statement if some of specified paths are filtered out") { class TestAppender extends AppenderSkeleton { var events = new java.util.ArrayList[LoggingEvent] override def close(): Unit = {} @@ -373,10 +373,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } assert(testAppender1.events.asScala .exists(msg => msg.getRenderedMessage.contains( - "The following files were ignored during file scan:"))) + "The following path were ignored:"))) } - test("SPARK-26339 Throw an exception only if all of the files are filtered out") { + test("SPARK-26339 Throw an exception only if all of the specified paths are filtered out") { val e = intercept[AnalysisException] { val cars = spark .read @@ -384,8 +384,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .option("header", "false") .load(testFile(carsFilteredOutFile)) }.getMessage - assert(e.contains("All files were ignored. " + - "The following files were ignored during file scan:")) + assert(e.contains("All path were ignored. The following path were ignored:")) } test("DDL test with empty file") { From 239cfa4792b6bebd386e4a962cdf4b0e9b2471aa Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Sun, 30 Dec 2018 01:58:28 +0900 Subject: [PATCH 07/10] Avoid checkFilesExist check --- .../execution/datasources/DataSource.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b8b0c5ba0ff1..95fcb0f89c9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -560,20 +560,18 @@ case class DataSource( globPath }.toSeq - if (checkFilesExist) { - val (filtered, filteredOut) = allGlobPath.partition { path => - !InMemoryFileIndex.shouldFilterOut(path.getName) - } - if (filteredOut.nonEmpty) { - if (filtered.isEmpty) { - throw new AnalysisException( - "All path were ignored. The following path were ignored:\n" + - s"${filteredOut.mkString("\n ")}") - } else { - logDebug( - "The following path were ignored:\n" + - s"${filteredOut.mkString("\n ")}") - } + val (filtered, filteredOut) = allGlobPath.partition { path => + !InMemoryFileIndex.shouldFilterOut(path.getName) + } + if (filteredOut.nonEmpty) { + if (filtered.isEmpty) { + throw new AnalysisException( + "All path were ignored. The following path were ignored:\n" + + s"${filteredOut.mkString("\n ")}") + } else { + logDebug( + "The following path were ignored:\n" + + s"${filteredOut.mkString("\n ")}") } } From e72bf002a023a23858566c9442ca12deffaa3936 Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Sun, 30 Dec 2018 02:22:27 +0900 Subject: [PATCH 08/10] Minor modifications for clarity --- .../spark/sql/execution/datasources/DataSource.scala | 12 +++++------- .../sql/execution/datasources/csv/CSVSuite.scala | 10 ++++------ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 95fcb0f89c9b..f14b5a370612 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -560,18 +560,16 @@ case class DataSource( globPath }.toSeq - val (filtered, filteredOut) = allGlobPath.partition { path => - !InMemoryFileIndex.shouldFilterOut(path.getName) + val (filteredOut, filteredIn) = allGlobPath.partition { path => + InMemoryFileIndex.shouldFilterOut(path.getName) } if (filteredOut.nonEmpty) { - if (filtered.isEmpty) { + if (filteredIn.isEmpty) { throw new AnalysisException( - "All path were ignored. The following path were ignored:\n" + - s"${filteredOut.mkString("\n ")}") + s"All paths were ignored:\n${filteredOut.mkString("\n ")}") } else { logDebug( - "The following path were ignored:\n" + - s"${filteredOut.mkString("\n ")}") + s"Some paths were ignored:\n${filteredOut.mkString("\n ")}") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 0eaf7bfe1d85..e63a801ff067 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -362,9 +362,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te try { val cars = spark .read - .format("csv") .option("header", "false") - .load(testFile(carsFile), testFile(carsFilteredOutFile)) + .csv(testFile(carsFile), testFile(carsFilteredOutFile)) verifyCars(cars, withHeader = false, checkTypes = false) } finally { @@ -373,18 +372,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te } assert(testAppender1.events.asScala .exists(msg => msg.getRenderedMessage.contains( - "The following path were ignored:"))) + "Some paths were ignored:"))) } test("SPARK-26339 Throw an exception only if all of the specified paths are filtered out") { val e = intercept[AnalysisException] { val cars = spark .read - .format("csv") .option("header", "false") - .load(testFile(carsFilteredOutFile)) + .csv(testFile(carsFilteredOutFile)) }.getMessage - assert(e.contains("All path were ignored. The following path were ignored:")) + assert(e.contains("All paths were ignored:")) } test("DDL test with empty file") { From abfe2e6eefa7ee9fc514f21db52197e0e9ce0018 Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Sun, 30 Dec 2018 02:30:32 +0900 Subject: [PATCH 09/10] Remove debugStatement test --- .../execution/datasources/csv/CSVSuite.scala | 34 ++++--------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e63a801ff067..dfdc689d7cdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -30,7 +30,7 @@ import scala.util.Properties import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.log4j.{AppenderSkeleton, Level, LogManager} +import org.apache.log4j.{AppenderSkeleton, LogManager} import org.apache.log4j.spi.LoggingEvent import org.apache.spark.{SparkException, TestUtils} @@ -346,33 +346,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(result.schema.fieldNames.size === 1) } - test("SPARK-26339 Debug statement if some of specified paths are filtered out") { - class TestAppender extends AppenderSkeleton { - var events = new java.util.ArrayList[LoggingEvent] - override def close(): Unit = {} - override def requiresLayout: Boolean = false - protected def append(event: LoggingEvent): Unit = events.add(event) - } - - val testAppender1 = new TestAppender - val rootLogger = LogManager.getRootLogger - val origLevel = rootLogger.getLevel - rootLogger.setLevel(Level.DEBUG) - rootLogger.addAppender(testAppender1) - try { - val cars = spark - .read - .option("header", "false") - .csv(testFile(carsFile), testFile(carsFilteredOutFile)) + test("SPARK-26339 Not throw an exception if some of specified paths are filtered out") { + val cars = spark + .read + .option("header", "false") + .csv(testFile(carsFile), testFile(carsFilteredOutFile)) - verifyCars(cars, withHeader = false, checkTypes = false) - } finally { - rootLogger.setLevel(origLevel) - rootLogger.removeAppender(testAppender1) - } - assert(testAppender1.events.asScala - .exists(msg => msg.getRenderedMessage.contains( - "Some paths were ignored:"))) + verifyCars(cars, withHeader = false, checkTypes = false) } test("SPARK-26339 Throw an exception only if all of the specified paths are filtered out") { From 3708ef1532bdf5e7064536f875dc1c0d53805bb6 Mon Sep 17 00:00:00 2001 From: Hirobe Keiichi Date: Sun, 30 Dec 2018 02:59:52 +0900 Subject: [PATCH 10/10] Fix title of test case --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index dfdc689d7cdf..f318f1996568 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -346,7 +346,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(result.schema.fieldNames.size === 1) } - test("SPARK-26339 Not throw an exception if some of specified paths are filtered out") { + test("SPARK-26339 Not throw an exception if some of specified paths are filtered in") { val cars = spark .read .option("header", "false")