Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ class UnivocityParser(
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
// compatibility only if no custom pattern has been set. If there is a custom pattern,
// fail since it may be different from the default pattern.
if (options.timestampFormatInRead.isDefined) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
}
Expand All @@ -222,7 +226,11 @@ class UnivocityParser(
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
// compatibility only if no custom pattern has been set. If there is a custom pattern,
// fail since it may be different from the default pattern.
if (options.dateFormatInRead.isDefined) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToDate(str).getOrElse(throw e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,47 @@ abstract class CSVSuite
}
}
}

test("SPARK-39731: Correctly parse dates with yyyyMMdd pattern") {
withTempPath { path =>
Seq(
"1,2020011,2020011",
"2,20201203,20201203").toDF("data")
.repartition(1)
.write.text(path.getAbsolutePath)
val schema = new StructType()
.add("id", IntegerType)
.add("date", DateType)
.add("ts", TimestampType)
val output = spark.read
.schema(schema)
.option("dateFormat", "yyyyMMdd")
.option("timestampFormat", "yyyyMMdd")
.csv(path.getAbsolutePath)

def check(mode: String, res: Seq[Row]): Unit = {
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) {
checkAnswer(output, res)
}
}

check(
"legacy",
Seq(
Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")),
Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
)

check(
"corrected",
Seq(
Row(1, null, null),
Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
Comment on lines +2866 to +2879
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, would you consider adding a check for LEGACY_TIME_PARSER_POLICY = EXCEPTION? Similar to the following?

val msg = intercept[SparkException] {
csv.collect()
}.getCause.getMessage
assert(msg.contains("Fail to parse"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

)
}
}
}

class CSVv1Suite extends CSVSuite {
Expand Down