Skip to content

Commit 9c0dc28

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet
### What changes were proposed in this pull request? Fixed conversions of `java.sql.Timestamp` to milliseconds in `ParquetFilter` by using existing functions from `DateTimeUtils` `fromJavaTimestamp()` and `microsToMillis()`. ### Why are the changes needed? The changes fix the bug: ```scala scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS") scala> spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") scala> Seq(java.sql.Timestamp.valueOf("1000-06-14 08:28:53.123")).toDF("ts").write.mode("overwrite").parquet("/Users/maximgekk/tmp/ts_millis_old_filter") scala> spark.read.parquet("/Users/maximgekk/tmp/ts_millis_old_filter").filter($"ts" === "1000-06-14 08:28:53.123").show(false) +---+ |ts | +---+ +---+ ``` ### Does this PR introduce _any_ user-facing change? Yes, after the changes (for the example above): ```scala scala> spark.read.parquet("/Users/maximgekk/tmp/ts_millis_old_filter").filter($"ts" === "1000-06-14 08:28:53.123").show(false) +-----------------------+ |ts | +-----------------------+ |1000-06-14 08:28:53.123| +-----------------------+ ``` ### How was this patch tested? Modified tests in `ParquetFilterSuite` to check old timestamps. Closes #28693 from MaxGekk/parquet-ts-millis-filter. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 6a895d0 commit 9c0dc28

File tree

3 files changed

+25
-22
lines changed

3 files changed

+25
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,13 @@ class ParquetFilters(
148148
Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
149149
}
150150

151+
private def timestampToMillis(v: Any): JLong = {
152+
val timestamp = v.asInstanceOf[Timestamp]
153+
val micros = DateTimeUtils.fromJavaTimestamp(timestamp)
154+
val millis = DateTimeUtils.microsToMillis(micros)
155+
millis.asInstanceOf[JLong]
156+
}
157+
151158
private val makeEq:
152159
PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = {
153160
case ParquetBooleanType =>
@@ -184,7 +191,7 @@ class ParquetFilters(
184191
case ParquetTimestampMillisType if pushDownTimestamp =>
185192
(n: Array[String], v: Any) => FilterApi.eq(
186193
longColumn(n),
187-
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
194+
Option(v).map(timestampToMillis).orNull)
188195

189196
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
190197
(n: Array[String], v: Any) => FilterApi.eq(
@@ -235,7 +242,7 @@ class ParquetFilters(
235242
case ParquetTimestampMillisType if pushDownTimestamp =>
236243
(n: Array[String], v: Any) => FilterApi.notEq(
237244
longColumn(n),
238-
Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
245+
Option(v).map(timestampToMillis).orNull)
239246

240247
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
241248
(n: Array[String], v: Any) => FilterApi.notEq(
@@ -277,9 +284,7 @@ class ParquetFilters(
277284
longColumn(n),
278285
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
279286
case ParquetTimestampMillisType if pushDownTimestamp =>
280-
(n: Array[String], v: Any) => FilterApi.lt(
281-
longColumn(n),
282-
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
287+
(n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v))
283288

284289
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
285290
(n: Array[String], v: Any) =>
@@ -318,9 +323,7 @@ class ParquetFilters(
318323
longColumn(n),
319324
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
320325
case ParquetTimestampMillisType if pushDownTimestamp =>
321-
(n: Array[String], v: Any) => FilterApi.ltEq(
322-
longColumn(n),
323-
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
326+
(n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v))
324327

325328
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
326329
(n: Array[String], v: Any) =>
@@ -359,9 +362,7 @@ class ParquetFilters(
359362
longColumn(n),
360363
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
361364
case ParquetTimestampMillisType if pushDownTimestamp =>
362-
(n: Array[String], v: Any) => FilterApi.gt(
363-
longColumn(n),
364-
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
365+
(n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v))
365366

366367
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
367368
(n: Array[String], v: Any) =>
@@ -400,9 +401,7 @@ class ParquetFilters(
400401
longColumn(n),
401402
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
402403
case ParquetTimestampMillisType if pushDownTimestamp =>
403-
(n: Array[String], v: Any) => FilterApi.gtEq(
404-
longColumn(n),
405-
v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
404+
(n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v))
406405

407406
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
408407
(n: Array[String], v: Any) =>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -589,19 +589,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
589589

590590
test("filter pushdown - timestamp") {
591591
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
592-
val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"),
593-
Timestamp.valueOf("2018-06-15 08:28:53.123"),
594-
Timestamp.valueOf("2018-06-16 08:28:53.123"),
595-
Timestamp.valueOf("2018-06-17 08:28:53.123"))
592+
val millisData = Seq(
593+
Timestamp.valueOf("1000-06-14 08:28:53.123"),
594+
Timestamp.valueOf("1582-06-15 08:28:53.001"),
595+
Timestamp.valueOf("1900-06-16 08:28:53.0"),
596+
Timestamp.valueOf("2018-06-17 08:28:53.999"))
596597
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
597598
ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
598599
testTimestampPushdown(millisData)
599600
}
600601

601602
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
602-
val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"),
603-
Timestamp.valueOf("2018-06-15 08:28:53.123456"),
604-
Timestamp.valueOf("2018-06-16 08:28:53.123456"),
603+
val microsData = Seq(
604+
Timestamp.valueOf("1000-06-14 08:28:53.123456"),
605+
Timestamp.valueOf("1582-06-15 08:28:53.123456"),
606+
Timestamp.valueOf("1900-06-16 08:28:53.123456"),
605607
Timestamp.valueOf("2018-06-17 08:28:53.123456"))
606608
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
607609
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
6969
protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true)
7070
(f: DataFrame => Unit): Unit = {
7171
withTempPath { file =>
72-
df.write.format(dataSourceName).save(file.getCanonicalPath)
72+
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
73+
df.write.format(dataSourceName).save(file.getCanonicalPath)
74+
}
7375
readFile(file.getCanonicalPath, testVectorized)(f)
7476
}
7577
}

0 commit comments

Comments
 (0)