Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ class UnivocityParser(
new NoopFilters
}

// Flag is needed to distinguish parsing mode when inferring timestamp and date types.
// For more information, see the comments for TimestampType and DateType converter functions.
// Available for testing.
val isLegacyParserPolicy =
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
val currentContent = tokenizer.getContext.currentParsedContent()
Expand Down Expand Up @@ -204,7 +210,13 @@ class UnivocityParser(
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
// compatibility only if no custom pattern has been set.
//
// If a custom pattern was provided and parser policy is not legacy, throw exception
// without applying legacy behavior to avoid producing incorrect results.
if (!isLegacyParserPolicy && options.timestampFormatInRead.isDefined) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
}
Expand All @@ -222,7 +234,13 @@ class UnivocityParser(
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
// compatibility only if no custom pattern has been set.
//
// If a custom pattern was provided and parser policy is not legacy, throw exception
// without applying legacy behavior to avoid producing incorrect results.
if (!isLegacyParserPolicy && options.dateFormatInRead.isDefined) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToDate(str).getOrElse(throw e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class JacksonParser(
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)

// Flag is needed to distinguish parsing mode when inferring timestamp and date types.
// For more information, see the comments for TimestampType and DateType converter functions.
// Available for testing.
val isLegacyParserPolicy =
SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -258,6 +264,12 @@ class JacksonParser(
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
//
// If a custom pattern was provided and parser policy is not legacy, throw exception
// without applying legacy behavior to avoid producing incorrect results.
if (!isLegacyParserPolicy && options.timestampFormatInRead.isDefined) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
}
Expand All @@ -281,6 +293,12 @@ class JacksonParser(
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
//
// If a custom pattern was provided and parser policy is not legacy, throw exception
// without applying legacy behavior to avoid producing incorrect results.
if (!isLegacyParserPolicy && options.dateFormatInRead.isDefined) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
DateTimeUtils.stringToDate(str).getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,16 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {

val optionsWithPattern = new CSVOptions(
Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC")
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern))

// With legacy parser enabled, we are still able to parse dates and timestamps.
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern) {
override val isLegacyParserPolicy: Boolean = true
})

// With legacy parser disabled, parsing results in error.
val err = intercept[IllegalArgumentException] {
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern))
}
assert(err.getMessage.contains("Illegal pattern character: n"))
Copy link
Contributor

@Jonathancui123 Jonathancui123 Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this technically a breaking change for users who could previously specify an invalid pattern without LEGACY mode?

Before -- ignore the invalid pattern and parse with DateTimeUtils.stringToTimestamp
Now -- it throws an error

We don't support invalid patterns but as a user I would be unhappy to see my code break. I'm unsure if this is actually considered a breaking change because this is such an edge case and the user is already doing something invalid. I'm curious to hear your thoughts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. It would be a breaking change for users if they were relying on the compatibility fallback.
There could an alternative fix, maybe we can look into updating DateTimeUtils.stringToDate but I am not sure.

I can also add a feature flag to control this behaviour in JSON and CSV connectors so users can always opt in to use legacy behaviour. For example, I can a data source option "useLegacyParsing" or something similar. The option could be disabled by default, the exception would contain a message saying that you can enable the option to maintain the previous behaviour. Maybe this could be a good solution.

Let me know if something like that could work, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also add a feature flag to control this behaviour in JSON and CSV connectors so users can always opt in to use legacy behaviour.

I think this should work. It feels weird that users have to opt-in to the correct behavior but hopefully this is a small percentage of users. Maybe @kamcheungting-db or @cloud-fan can weigh in.

There could an alternative fix, maybe we can look into updating DateTimeUtils.stringToDate but I am not sure

I personally wouldn't be confident updating DateTimeUtils.stringToDate because there are so many usages elsewhere. But if you are familiar with the other use cases of DateTimeUtils.stringToDate then this could work.

I'll loop back if I think of an alternative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the safest option is to copy-paste the old code of stringToDate before #32959 and use it here, but that's really ugly and hard to maintain.

I'd like to understand more about the invalid pattern behavior. Will we trigger the fallback for every input row? That sounds like a big perf problem...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the invalid pattern and before this PR, yes, the fallback code would be triggered on every pattern mismatch. With the change, we will just throw an exception parsing those values as nulls. Yes, it does sound like a performance issue but it has been there for some time.

I agree with copy-paste of stringToDate, I proposed to add a data source config to keep the old behaviour. What do you think?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.logging.log4j.Level

import org.apache.spark.{SparkConf, SparkException, TestUtils}
import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
Expand Down Expand Up @@ -2788,6 +2788,52 @@ abstract class CSVSuite
}
}
}

test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") {
withTempPath { path =>
Seq(
"1,2020011,2020011",
"2,20201203,20201203").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)
val schema = new StructType()
.add("id", IntegerType)
.add("date", DateType)
.add("ts", TimestampType)
val output = spark.read
.schema(schema)
.option("dateFormat", "yyyyMMdd")
.option("timestampFormat", "yyyyMMdd")
.csv(path.getAbsolutePath)

def check(mode: String, res: Seq[Row]): Unit = {
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) {
checkAnswer(output, res)
}
}

check(
"legacy",
Seq(
Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")),
Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
)

check(
"corrected",
Seq(
Row(1, null, null),
Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
Comment on lines +2866 to +2879
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, would you consider adding a check for LEGACY_TIME_PARSER_POLICY = EXCEPTION? Similar to the following?

val msg = intercept[SparkException] {
csv.collect()
}.getCause.getMessage
assert(msg.contains("Fail to parse"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

)

val err = intercept[SparkException] {
check("exception", Nil)
}.getCause
assert(err.isInstanceOf[SparkUpgradeException])
}
}
}

class CSVv1Suite extends CSVSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.{SparkConf, SparkException, TestUtils}
import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
Expand Down Expand Up @@ -3249,6 +3249,51 @@ abstract class JsonSuite
}
}
}

test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") {
withTempPath { path =>
Seq(
"""{"date": "2020011", "ts": "2020011"}""",
"""{"date": "20201203", "ts": "20201203"}""").toDF()
.repartition(1)
.write.text(path.getAbsolutePath)
val schema = new StructType()
.add("date", DateType)
.add("ts", TimestampType)
val output = spark.read
.schema(schema)
.option("dateFormat", "yyyyMMdd")
.option("timestampFormat", "yyyyMMdd")
.json(path.getAbsolutePath)

def check(mode: String, res: Seq[Row]): Unit = {
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) {
checkAnswer(output, res)
}
}

check(
"legacy",
Seq(
Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")),
Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
)

check(
"corrected",
Seq(
Row(null, null),
Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00"))
)
)

val err = intercept[SparkException] {
check("exception", Nil)
}.getCause
assert(err.isInstanceOf[SparkUpgradeException])
}
}
}

class JsonV1Suite extends JsonSuite {
Expand Down