diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index d89186af8c8e5..7900693a84820 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -148,6 +148,13 @@ class ParquetFilters( Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes) } + private def timestampToMillis(v: Any): JLong = { + val timestamp = v.asInstanceOf[Timestamp] + val micros = DateTimeUtils.fromJavaTimestamp(timestamp) + val millis = DateTimeUtils.microsToMillis(micros) + millis.asInstanceOf[JLong] + } + private val makeEq: PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { case ParquetBooleanType => @@ -184,7 +191,7 @@ class ParquetFilters( case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), - Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull) + Option(v).map(timestampToMillis).orNull) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.eq( @@ -235,7 +242,7 @@ class ParquetFilters( case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), - Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull) + Option(v).map(timestampToMillis).orNull) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.notEq( @@ -277,9 +284,7 @@ class ParquetFilters( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.lt( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v)) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => @@ -318,9 +323,7 @@ class ParquetFilters( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.ltEq( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v)) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => @@ -359,9 +362,7 @@ class ParquetFilters( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.gt( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v)) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => @@ -400,9 +401,7 @@ class ParquetFilters( longColumn(n), DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) case ParquetTimestampMillisType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.gtEq( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v)) case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => (n: Array[String], v: Any) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 7b33ceff32e01..c4cf5116c2030 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -589,19 +589,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("filter pushdown - timestamp") { // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS - val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"), - Timestamp.valueOf("2018-06-15 08:28:53.123"), - Timestamp.valueOf("2018-06-16 08:28:53.123"), - Timestamp.valueOf("2018-06-17 08:28:53.123")) + val millisData = Seq( + Timestamp.valueOf("1000-06-14 08:28:53.123"), + Timestamp.valueOf("1582-06-15 08:28:53.001"), + Timestamp.valueOf("1900-06-16 08:28:53.0"), + Timestamp.valueOf("2018-06-17 08:28:53.999")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { testTimestampPushdown(millisData) } // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS - val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"), - Timestamp.valueOf("2018-06-15 08:28:53.123456"), - Timestamp.valueOf("2018-06-16 08:28:53.123456"), + val microsData = Seq( + Timestamp.valueOf("1000-06-14 08:28:53.123456"), + Timestamp.valueOf("1582-06-15 08:28:53.123456"), + Timestamp.valueOf("1900-06-16 08:28:53.123456"), Timestamp.valueOf("2018-06-17 08:28:53.123456")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index f5726973d0abf..105f025adc0ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -69,7 +69,9 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true) (f: DataFrame => Unit): Unit = { withTempPath { file => - df.write.format(dataSourceName).save(file.getCanonicalPath) + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { + df.write.format(dataSourceName).save(file.getCanonicalPath) + } readFile(file.getCanonicalPath, testVectorized)(f) } }