Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ case class DataSource(
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
allPaths.flatMap { path =>
val allGlobPath = allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Expand All @@ -559,6 +559,23 @@ case class DataSource(
}
globPath
}.toSeq

if (checkFilesExist) {
val (filteredOut, filteredIn) = allGlobPath.partition { path =>
InMemoryFileIndex.shouldFilterOut(path.getName)
}
if (filteredOut.nonEmpty) {
if (filteredIn.isEmpty) {
throw new AnalysisException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid this could break the existing applications. Currently, when users specify the schema, no exception is thrown, right?

cc @HyukjinKwon @srowen @MaxGekk @cloud-fan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it was discussed:

#23288 (comment)
#23288 (review)

I don't have a strong opinion on this. If you think it should be considered as a behaviour change, yea, no objection from me. We can turn it to warning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please submit a follow-up PR to change it to a warning? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me make a followup by the end of today (singapore time)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a fair point. It might not have thrown an exception later if it didn't try to infer schema.

s"All paths were ignored:\n${filteredOut.mkString("\n ")}")
} else {
logDebug(
s"Some paths were ignored:\n${filteredOut.mkString("\n ")}")
}
}
}

allGlobPath
}
}

Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/test-data/_cars.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

year,make,model,comment,blank
"2012","Tesla","S","No comment",

1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt

Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
private val carsEmptyValueFile = "test-data/cars-empty-value.csv"
private val carsBlankColName = "test-data/cars-blank-column-name.csv"
private val carsCrlf = "test-data/cars-crlf.csv"
private val carsFilteredOutFile = "test-data/_cars.csv"
private val emptyFile = "test-data/empty.csv"
private val commentsFile = "test-data/comments.csv"
private val disableCommentsFile = "test-data/disable_comments.csv"
Expand Down Expand Up @@ -345,6 +346,25 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(result.schema.fieldNames.size === 1)
}

test("SPARK-26339 Not throw an exception if some of specified paths are filtered in") {
val cars = spark
.read
.option("header", "false")
.csv(testFile(carsFile), testFile(carsFilteredOutFile))

verifyCars(cars, withHeader = false, checkTypes = false)
}

test("SPARK-26339 Throw an exception only if all of the specified paths are filtered out") {
val e = intercept[AnalysisException] {
val cars = spark
.read
.option("header", "false")
.csv(testFile(carsFilteredOutFile))
}.getMessage
assert(e.contains("All paths were ignored:"))
}

test("DDL test with empty file") {
withView("carsTable") {
spark.sql(
Expand Down