Skip to content

Commit f982ca0

Browse files
MaxGekksrowen
authored andcommitted
[SPARK-26178][SQL] Use java.time API for parsing timestamps and dates from CSV
## What changes were proposed in this pull request? In the PR, I propose to use **java.time API** for parsing timestamps and dates from CSV content with microseconds precision. The SQL config `spark.sql.legacy.timeParser.enabled` allow to switch back to previous behaviour with using `java.text.SimpleDateFormat`/`FastDateFormat` for parsing/generating timestamps/dates. ## How was this patch tested? It was tested by `UnivocityParserSuite`, `CsvExpressionsSuite`, `CsvFunctionsSuite` and `CsvSuite`. Closes #23150 from MaxGekk/time-parser. Lead-authored-by: Maxim Gekk <max.gekk@gmail.com> Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
1 parent 06a3b6a commit f982ca0

File tree

14 files changed

+431
-134
lines changed

14 files changed

+431
-134
lines changed

docs/sql-migration-guide-upgrade.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ displayTitle: Spark SQL Upgrading Guide
3333

3434
- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.
3535

36+
- Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
37+
3638
## Upgrading From Spark SQL 2.3 to 2.4
3739

3840
- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,16 @@ import scala.util.control.Exception.allCatch
2222
import org.apache.spark.rdd.RDD
2323
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
2424
import org.apache.spark.sql.catalyst.expressions.ExprUtils
25-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
25+
import org.apache.spark.sql.catalyst.util.DateTimeFormatter
2626
import org.apache.spark.sql.types._
2727

28-
class CSVInferSchema(options: CSVOptions) extends Serializable {
28+
class CSVInferSchema(val options: CSVOptions) extends Serializable {
29+
30+
@transient
31+
private lazy val timeParser = DateTimeFormatter(
32+
options.timestampFormat,
33+
options.timeZone,
34+
options.locale)
2935

3036
private val decimalParser = {
3137
ExprUtils.getDecimalParser(options.locale)
@@ -154,10 +160,7 @@ class CSVInferSchema(options: CSVOptions) extends Serializable {
154160

155161
private def tryParseTimestamp(field: String): DataType = {
156162
// This case infers a custom `dataFormat` is set.
157-
if ((allCatch opt options.timestampFormat.parse(field)).isDefined) {
158-
TimestampType
159-
} else if ((allCatch opt DateTimeUtils.stringToTime(field)).isDefined) {
160-
// We keep this for backwards compatibility.
163+
if ((allCatch opt timeParser.parse(field)).isDefined) {
161164
TimestampType
162165
} else {
163166
tryParseBoolean(field)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
2121
import java.util.{Locale, TimeZone}
2222

2323
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
24-
import org.apache.commons.lang3.time.FastDateFormat
2524

2625
import org.apache.spark.internal.Logging
2726
import org.apache.spark.sql.catalyst.util._
@@ -146,13 +145,10 @@ class CSVOptions(
146145
// A language tag in IETF BCP 47 format
147146
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
148147

149-
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
150-
val dateFormat: FastDateFormat =
151-
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
148+
val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
152149

153-
val timestampFormat: FastDateFormat =
154-
FastDateFormat.getInstance(
155-
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
150+
val timestampFormat: String =
151+
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
156152

157153
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
158154

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.Writer
2222
import com.univocity.parsers.csv.CsvWriter
2323

2424
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
25+
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
2626
import org.apache.spark.sql.types._
2727

2828
class UnivocityGenerator(
@@ -41,14 +41,18 @@ class UnivocityGenerator(
4141
private val valueConverters: Array[ValueConverter] =
4242
schema.map(_.dataType).map(makeConverter).toArray
4343

44+
private val timeFormatter = DateTimeFormatter(
45+
options.timestampFormat,
46+
options.timeZone,
47+
options.locale)
48+
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
49+
4450
private def makeConverter(dataType: DataType): ValueConverter = dataType match {
4551
case DateType =>
46-
(row: InternalRow, ordinal: Int) =>
47-
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
52+
(row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))
4853

4954
case TimestampType =>
50-
(row: InternalRow, ordinal: Int) =>
51-
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
55+
(row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))
5256

5357
case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
5458

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ package org.apache.spark.sql.catalyst.csv
1919

2020
import java.io.InputStream
2121

22-
import scala.util.Try
2322
import scala.util.control.NonFatal
2423

2524
import com.univocity.parsers.csv.CsvParser
2625

2726
import org.apache.spark.internal.Logging
2827
import org.apache.spark.sql.catalyst.InternalRow
2928
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
30-
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser}
29+
import org.apache.spark.sql.catalyst.util._
3130
import org.apache.spark.sql.types._
3231
import org.apache.spark.unsafe.types.UTF8String
3332

@@ -75,6 +74,12 @@ class UnivocityParser(
7574

7675
private val row = new GenericInternalRow(requiredSchema.length)
7776

77+
private val timeFormatter = DateTimeFormatter(
78+
options.timestampFormat,
79+
options.timeZone,
80+
options.locale)
81+
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
82+
7883
// Retrieve the raw record string.
7984
private def getCurrentInput: UTF8String = {
8085
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
@@ -100,7 +105,7 @@ class UnivocityParser(
100105
//
101106
// output row - ["A", 2]
102107
private val valueConverters: Array[ValueConverter] = {
103-
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
108+
requiredSchema.map(f => makeConverter(f.name, f.dataType, f.nullable)).toArray
104109
}
105110

106111
private val decimalParser = ExprUtils.getDecimalParser(options.locale)
@@ -115,8 +120,7 @@ class UnivocityParser(
115120
def makeConverter(
116121
name: String,
117122
dataType: DataType,
118-
nullable: Boolean = true,
119-
options: CSVOptions): ValueConverter = dataType match {
123+
nullable: Boolean = true): ValueConverter = dataType match {
120124
case _: ByteType => (d: String) =>
121125
nullSafeDatum(d, name, nullable, options)(_.toByte)
122126

@@ -154,34 +158,16 @@ class UnivocityParser(
154158
}
155159

156160
case _: TimestampType => (d: String) =>
157-
nullSafeDatum(d, name, nullable, options) { datum =>
158-
// This one will lose microseconds parts.
159-
// See https://issues.apache.org/jira/browse/SPARK-10681.
160-
Try(options.timestampFormat.parse(datum).getTime * 1000L)
161-
.getOrElse {
162-
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
163-
// compatibility.
164-
DateTimeUtils.stringToTime(datum).getTime * 1000L
165-
}
166-
}
161+
nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
167162

168163
case _: DateType => (d: String) =>
169-
nullSafeDatum(d, name, nullable, options) { datum =>
170-
// This one will lose microseconds parts.
171-
// See https://issues.apache.org/jira/browse/SPARK-10681.x
172-
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(datum).getTime))
173-
.getOrElse {
174-
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
175-
// compatibility.
176-
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
177-
}
178-
}
164+
nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)
179165

180166
case _: StringType => (d: String) =>
181167
nullSafeDatum(d, name, nullable, options)(UTF8String.fromString)
182168

183169
case udt: UserDefinedType[_] => (datum: String) =>
184-
makeConverter(name, udt.sqlType, nullable, options)
170+
makeConverter(name, udt.sqlType, nullable)
185171

186172
// We don't actually hit this exception though, we keep it for understandability
187173
case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}")
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.util
19+
20+
import java.time._
21+
import java.time.format.DateTimeFormatterBuilder
22+
import java.time.temporal.{ChronoField, TemporalQueries}
23+
import java.util.{Locale, TimeZone}
24+
25+
import scala.util.Try
26+
27+
import org.apache.commons.lang3.time.FastDateFormat
28+
29+
import org.apache.spark.sql.internal.SQLConf
30+
31+
sealed trait DateTimeFormatter {
32+
def parse(s: String): Long // returns microseconds since epoch
33+
def format(us: Long): String
34+
}
35+
36+
class Iso8601DateTimeFormatter(
37+
pattern: String,
38+
timeZone: TimeZone,
39+
locale: Locale) extends DateTimeFormatter {
40+
val formatter = new DateTimeFormatterBuilder()
41+
.appendPattern(pattern)
42+
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
43+
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
44+
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
45+
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
46+
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
47+
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
48+
.toFormatter(locale)
49+
50+
def toInstant(s: String): Instant = {
51+
val temporalAccessor = formatter.parse(s)
52+
if (temporalAccessor.query(TemporalQueries.offset()) == null) {
53+
val localDateTime = LocalDateTime.from(temporalAccessor)
54+
val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
55+
Instant.from(zonedDateTime)
56+
} else {
57+
Instant.from(temporalAccessor)
58+
}
59+
}
60+
61+
private def instantToMicros(instant: Instant): Long = {
62+
val sec = Math.multiplyExact(instant.getEpochSecond, DateTimeUtils.MICROS_PER_SECOND)
63+
val result = Math.addExact(sec, instant.getNano / DateTimeUtils.NANOS_PER_MICROS)
64+
result
65+
}
66+
67+
def parse(s: String): Long = instantToMicros(toInstant(s))
68+
69+
def format(us: Long): String = {
70+
val secs = Math.floorDiv(us, DateTimeUtils.MICROS_PER_SECOND)
71+
val mos = Math.floorMod(us, DateTimeUtils.MICROS_PER_SECOND)
72+
val instant = Instant.ofEpochSecond(secs, mos * DateTimeUtils.NANOS_PER_MICROS)
73+
74+
formatter.withZone(timeZone.toZoneId).format(instant)
75+
}
76+
}
77+
78+
class LegacyDateTimeFormatter(
79+
pattern: String,
80+
timeZone: TimeZone,
81+
locale: Locale) extends DateTimeFormatter {
82+
val format = FastDateFormat.getInstance(pattern, timeZone, locale)
83+
84+
protected def toMillis(s: String): Long = format.parse(s).getTime
85+
86+
def parse(s: String): Long = toMillis(s) * DateTimeUtils.MICROS_PER_MILLIS
87+
88+
def format(us: Long): String = {
89+
format.format(DateTimeUtils.toJavaTimestamp(us))
90+
}
91+
}
92+
93+
class LegacyFallbackDateTimeFormatter(
94+
pattern: String,
95+
timeZone: TimeZone,
96+
locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) {
97+
override def toMillis(s: String): Long = {
98+
Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
99+
}
100+
}
101+
102+
object DateTimeFormatter {
103+
def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = {
104+
if (SQLConf.get.legacyTimeParserEnabled) {
105+
new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
106+
} else {
107+
new Iso8601DateTimeFormatter(format, timeZone, locale)
108+
}
109+
}
110+
}
111+
112+
sealed trait DateFormatter {
113+
def parse(s: String): Int // returns days since epoch
114+
def format(days: Int): String
115+
}
116+
117+
class Iso8601DateFormatter(
118+
pattern: String,
119+
timeZone: TimeZone,
120+
locale: Locale) extends DateFormatter {
121+
122+
val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale)
123+
124+
override def parse(s: String): Int = {
125+
val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
126+
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
127+
128+
days.toInt
129+
}
130+
131+
override def format(days: Int): String = {
132+
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
133+
dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
134+
}
135+
}
136+
137+
class LegacyDateFormatter(
138+
pattern: String,
139+
timeZone: TimeZone,
140+
locale: Locale) extends DateFormatter {
141+
val format = FastDateFormat.getInstance(pattern, timeZone, locale)
142+
143+
def parse(s: String): Int = {
144+
val milliseconds = format.parse(s).getTime
145+
DateTimeUtils.millisToDays(milliseconds)
146+
}
147+
148+
def format(days: Int): String = {
149+
val date = DateTimeUtils.toJavaDate(days)
150+
format.format(date)
151+
}
152+
}
153+
154+
class LegacyFallbackDateFormatter(
155+
pattern: String,
156+
timeZone: TimeZone,
157+
locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
158+
override def parse(s: String): Int = {
159+
Try(super.parse(s)).orElse {
160+
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
161+
// compatibility.
162+
Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(s).getTime))
163+
}.getOrElse {
164+
// In Spark 1.5.0, we store the data as number of days since epoch in string.
165+
// So, we just convert it to Int.
166+
s.toInt
167+
}
168+
}
169+
}
170+
171+
object DateFormatter {
172+
def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = {
173+
if (SQLConf.get.legacyTimeParserEnabled) {
174+
new LegacyFallbackDateFormatter(format, timeZone, locale)
175+
} else {
176+
new Iso8601DateFormatter(format, timeZone, locale)
177+
}
178+
}
179+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ object DateTimeUtils {
5050
final val MILLIS_PER_SECOND = 1000L
5151
final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L
5252
final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY
53-
53+
final val NANOS_PER_MICROS = 1000L
5454
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
5555

5656
// number of days in 400 years

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,13 @@ object SQLConf {
16181618
"a SparkConf entry.")
16191619
.booleanConf
16201620
.createWithDefault(true)
1621+
1622+
val LEGACY_TIME_PARSER_ENABLED = buildConf("spark.sql.legacy.timeParser.enabled")
1623+
.doc("When set to true, java.text.SimpleDateFormat is used for formatting and parsing " +
1624+
" dates/timestamps in a locale-sensitive manner. When set to false, classes from " +
1625+
"java.time.* packages are used for the same purpose.")
1626+
.booleanConf
1627+
.createWithDefault(false)
16211628
}
16221629

16231630
/**
@@ -2040,6 +2047,8 @@ class SQLConf extends Serializable with Logging {
20402047

20412048
def setCommandRejectsSparkConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CONFS)
20422049

2050+
def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED)
2051+
20432052
/** ********************** SQLConf functionality methods ************ */
20442053

20452054
/** Set Spark SQL configuration properties. */

0 commit comments

Comments
 (0)