Skip to content

Commit b9cfad8

Browse files
MaxGekkchuer
authored andcommitted
[SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet
### What changes were proposed in this pull request? In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib. Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values. After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description apache#28067 shows the diffs between two calendars. ### Why are the changes needed? The changes fix the bug portrayed by the following example from SPARK-36034: ```scala In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") >>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy") >>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show() +----+ |date| +----+ +----+ ``` The result must have the date value `0001-01-01`. ### Does this PR introduce _any_ user-facing change? In some sense, yes. Query results can be different in some cases. For the example above: ```scala scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy") scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false) +----------+ |date | +----------+ |0001-01-01| +----------+ ``` ### How was this patch tested? By running the modified test suite `ParquetFilterSuite`: ``` $ build/sbt "test:testOnly *ParquetV1FilterSuite" $ build/sbt "test:testOnly *ParquetV2FilterSuite" ``` Closes apache#33347 from MaxGekk/fix-parquet-ts-filter-pushdown. Authored-by: Max Gekk <[email protected]> Signed-off-by: Max Gekk <[email protected]> (cherry picked from commit b09b7f7)
1 parent 8370ad2 commit b9cfad8

File tree

5 files changed

+369
-94
lines changed

5 files changed

+369
-94
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,21 @@ class ParquetFileFormat
268268

269269
lazy val footerFileMetaData =
270270
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
271+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
272+
footerFileMetaData.getKeyValueMetaData.get,
273+
datetimeRebaseModeInRead)
271274
// Try to push down filters when filter push-down is enabled.
272275
val pushed = if (enableParquetFilterPushDown) {
273276
val parquetSchema = footerFileMetaData.getSchema
274-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
275-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
277+
val parquetFilters = new ParquetFilters(
278+
parquetSchema,
279+
pushDownDate,
280+
pushDownTimestamp,
281+
pushDownDecimal,
282+
pushDownStringStartWith,
283+
pushDownInFilterThreshold,
284+
isCaseSensitive,
285+
datetimeRebaseMode)
276286
filters
277287
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
278288
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -298,9 +308,6 @@ class ParquetFileFormat
298308
None
299309
}
300310

301-
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
302-
footerFileMetaData.getKeyValueMetaData.get,
303-
datetimeRebaseModeInRead)
304311
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
305312
footerFileMetaData.getKeyValueMetaData.get,
306313
int96RebaseModeInRead)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
3434
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
3535

3636
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
37+
import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseGregorianToJulianMicros}
38+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
3739
import org.apache.spark.sql.sources
3840
import org.apache.spark.unsafe.types.UTF8String
3941

@@ -47,7 +49,8 @@ class ParquetFilters(
4749
pushDownDecimal: Boolean,
4850
pushDownStartWith: Boolean,
4951
pushDownInFilterThreshold: Int,
50-
caseSensitive: Boolean) {
52+
caseSensitive: Boolean,
53+
datetimeRebaseMode: LegacyBehaviorPolicy.Value) {
5154
// A map which contains parquet field name and data type, if predicate push down applies.
5255
//
5356
// Each key in `nameToParquetField` represents a column; `dots` are used as separators for
@@ -123,14 +126,26 @@ class ParquetFilters(
123126
private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null)
124127
private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null)
125128

126-
private def dateToDays(date: Any): Int = date match {
127-
case d: Date => DateTimeUtils.fromJavaDate(d)
128-
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
129+
private def dateToDays(date: Any): Int = {
130+
val gregorianDays = date match {
131+
case d: Date => DateTimeUtils.fromJavaDate(d)
132+
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
133+
}
134+
datetimeRebaseMode match {
135+
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianDays(gregorianDays)
136+
case _ => gregorianDays
137+
}
129138
}
130139

131-
private def timestampToMicros(v: Any): JLong = v match {
132-
case i: Instant => DateTimeUtils.instantToMicros(i)
133-
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
140+
private def timestampToMicros(v: Any): JLong = {
141+
val gregorianMicros = v match {
142+
case i: Instant => DateTimeUtils.instantToMicros(i)
143+
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
144+
}
145+
datetimeRebaseMode match {
146+
case LegacyBehaviorPolicy.LEGACY => rebaseGregorianToJulianMicros(gregorianMicros)
147+
case _ => gregorianMicros
148+
}
134149
}
135150

136151
private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,21 @@ case class ParquetPartitionReaderFactory(
133133

134134
lazy val footerFileMetaData =
135135
ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData
136+
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
137+
footerFileMetaData.getKeyValueMetaData.get,
138+
datetimeRebaseModeInRead)
136139
// Try to push down filters when filter push-down is enabled.
137140
val pushed = if (enableParquetFilterPushDown) {
138141
val parquetSchema = footerFileMetaData.getSchema
139-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
140-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
142+
val parquetFilters = new ParquetFilters(
143+
parquetSchema,
144+
pushDownDate,
145+
pushDownTimestamp,
146+
pushDownDecimal,
147+
pushDownStringStartWith,
148+
pushDownInFilterThreshold,
149+
isCaseSensitive,
150+
datetimeRebaseMode)
141151
filters
142152
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
143153
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -170,9 +180,6 @@ case class ParquetPartitionReaderFactory(
170180
if (pushed.isDefined) {
171181
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
172182
}
173-
val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
174-
footerFileMetaData.getKeyValueMetaData.get,
175-
datetimeRebaseModeInRead)
176183
val int96RebaseMode = DataSourceUtils.int96RebaseMode(
177184
footerFileMetaData.getKeyValueMetaData.get,
178185
int96RebaseModeInRead)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
2424
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
2525
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, SparkToParquetSchemaConverter}
2626
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
27+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
2728
import org.apache.spark.sql.sources.Filter
2829
import org.apache.spark.sql.types.StructType
2930
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -50,9 +51,18 @@ case class ParquetScanBuilder(
5051
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
5152
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
5253
val parquetSchema =
53-
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(schema)
54-
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
55-
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
54+
new SparkToParquetSchemaConverter(sparkSession.sessionState.conf).convert(readDataSchema())
55+
val parquetFilters = new ParquetFilters(
56+
parquetSchema,
57+
pushDownDate,
58+
pushDownTimestamp,
59+
pushDownDecimal,
60+
pushDownStringStartWith,
61+
pushDownInFilterThreshold,
62+
isCaseSensitive,
63+
// The rebase mode doesn't matter here because the filters are used to determine
64+
// whether they is convertible.
65+
LegacyBehaviorPolicy.CORRECTED)
5666
parquetFilters.convertibleFilters(this.filters).toArray
5767
}
5868

0 commit comments

Comments
 (0)