Skip to content

Commit cc15399

Browse files
bersprocketsJackey Lee
authored andcommitted
[SPARK-26372][SQL] Don't reuse value from previous row when parsing bad CSV input field
## What changes were proposed in this pull request? CSV parsing accidentally uses the previous good value for a bad input field. See example in Jira. This PR ensures that the associated column is set to null when an input field cannot be converted. ## How was this patch tested? Added new test. Ran all SQL unit tests (testOnly org.apache.spark.sql.*). Ran pyspark tests for pyspark-sql Closes apache#23323 from bersprockets/csv-bad-field. Authored-by: Bruce Robbins <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 2e89cbe commit cc15399

File tree

3 files changed

+22
-0
lines changed

3 files changed

+22
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ class UnivocityParser(
239239
} catch {
240240
case NonFatal(e) =>
241241
badRecordException = badRecordException.orElse(Some(e))
242+
row.setNullAt(i)
242243
}
243244
i += 1
244245
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
"good record",1999-08-01
2+
"bad record",1999-088-01

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
6363
private val datesFile = "test-data/dates.csv"
6464
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
6565
private val valueMalformedFile = "test-data/value-malformed.csv"
66+
private val badAfterGoodFile = "test-data/bad_after_good.csv"
6667

6768
/** Verifies data and schema. */
6869
private def verifyCars(
@@ -2012,4 +2013,22 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
20122013
assert(!files.exists(_.getName.endsWith("csv")))
20132014
}
20142015
}
2016+
2017+
test("Do not reuse last good value for bad input field") {
2018+
val schema = StructType(
2019+
StructField("col1", StringType) ::
2020+
StructField("col2", DateType) ::
2021+
Nil
2022+
)
2023+
val rows = spark.read
2024+
.schema(schema)
2025+
.format("csv")
2026+
.load(testFile(badAfterGoodFile))
2027+
2028+
val expectedRows = Seq(
2029+
Row("good record", java.sql.Date.valueOf("1999-08-01")),
2030+
Row("bad record", null))
2031+
2032+
checkAnswer(rows, expectedRows)
2033+
}
20152034
}

0 commit comments

Comments
 (0)