From 59a7f142ae9c83c76c2bfbff2962c071fc586122 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 25 Jun 2018 13:18:37 +0900 Subject: [PATCH 1/3] fix --- .../execution/datasources/csv/UnivocityParser.scala | 10 +++++++++- .../spark/sql/execution/datasources/csv/CSVSuite.scala | 10 ++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 5f7d5696b71a..59be65b89748 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -183,11 +183,19 @@ class UnivocityParser( } } + private lazy val doParse = if (schema.nonEmpty) { + (input: String) => convert(tokenizer.parseLine(input)) + } else { + // If `columnPruning` enabled and partition attributes scanned only, + // `schema` gets empty. + (_: String) => InternalRow.empty + } + /** * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = doParse(input) private def convert(tokens: Array[String]): InternalRow = { if (tokens.length != schema.length) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index d2f166c7d187..803c383f54f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1602,4 +1602,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te assert(testAppender2.events.asScala .exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema"))) } + + test("SPARK-24645 skip parsing when columnPruning enabled and partitions scanned only") { + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { + withTempPath { path => + val dir = path.getAbsolutePath + spark.range(10).selectExpr("id % 2 AS p", "id").write.partitionBy("p").csv(dir) + spark.read.csv(dir).selectExpr("sum(p)").collect() + } + } + } } From c7362dce248726598b63ea4fd1b45df19d5e9861 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 25 Jun 2018 14:14:21 +0900 Subject: [PATCH 2/3] Fix --- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 803c383f54f1..d7a99a022185 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1608,7 +1608,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te withTempPath { path => val dir = path.getAbsolutePath spark.range(10).selectExpr("id % 2 AS p", "id").write.partitionBy("p").csv(dir) - spark.read.csv(dir).selectExpr("sum(p)").collect() + checkAnswer(spark.read.csv(dir).selectExpr("sum(p)"), Row(5)) } } } From 39811a34a5f0fb0f2c97bc096e01129b593315fe Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Tue, 26 Jun 2018 07:28:07 +0900 Subject: [PATCH 3/3] Fix --- .../spark/sql/execution/datasources/csv/UnivocityParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 59be65b89748..aa545e1a0c00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -183,7 +183,7 @@ class UnivocityParser( } } - private lazy val doParse = if (schema.nonEmpty) { + private val doParse = if (schema.nonEmpty) { (input: String) => convert(tokenizer.parseLine(input)) } else { // If `columnPruning` enabled and partition attributes scanned only,