Skip to content

Conversation

@MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Jul 14, 2021

What changes were proposed in this pull request?

In the PR, I propose to propagate either the SQL config spark.sql.parquet.datetimeRebaseModeInRead or/and Parquet option datetimeRebaseMode to ParquetFilters. The ParquetFilters class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via FilterApi to the parquet-column lib.

Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the CORRECTED mode but in the LEGACY mode, filter's values could not match to actual values.

After the changes, timestamp/dates values of filters pushed down to parquet libs such as FilterApi.eq(col1, -719162) are rebased according the rebase settings. For the example, if the rebase mode is CORRECTED, -719162 is pushed down as is but if the current rebase mode is LEGACY, the number of days is rebased to -719164. For more context, the PR description #28067 shows the diffs between two calendars.

Why are the changes needed?

The changes fix the bug portrayed by the following example from SPARK-36034:

In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
>>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
>>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show()
+----+
|date|
+----+
+----+

The result must have the date value 0001-01-01.

Does this PR introduce any user-facing change?

In some sense, yes. Query results can be different in some cases. For the example above:

scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false)
+----------+
|date      |
+----------+
|0001-01-01|
+----------+

How was this patch tested?

By running the modified test suite ParquetFilterSuite:

$ build/sbt "test:testOnly *ParquetV1FilterSuite"
$ build/sbt "test:testOnly *ParquetV2FilterSuite"

@github-actions github-actions bot added the SQL label Jul 14, 2021
@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45540/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45540/

@MaxGekk MaxGekk marked this pull request as ready for review July 14, 2021 18:58
@MaxGekk MaxGekk changed the title [WIP][SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet [SPARK-36034][SQL] Rebase datetime in pushed down filters to parquet Jul 14, 2021
@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45544/

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Test build #141025 has finished for PR 33347 at commit 96a7ca4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45544/

@SparkQA
Copy link

SparkQA commented Jul 15, 2021

Test build #141029 has finished for PR 33347 at commit fffc22e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

pushDownInFilterThreshold,
isCaseSensitive,
datetimeRebaseMode)
parquetFilters.convertibleFilters(this.filters).toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create ParquetFilters here only to check which filters are convertible, and rebase mode doesn't matter.

Shall we use pass datetimeRebaseMode as CORRECTED here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Let me pass CORRECTED here.

@SparkQA
Copy link

SparkQA commented Jul 15, 2021

Test build #141085 has started for PR 33347 at commit 37d7ffa.

@SparkQA
Copy link

SparkQA commented Jul 15, 2021

Kubernetes integration test unable to build dist.

exiting with code: 141
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45599/

@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 15, 2021

The failed GA is not related to changes:

starting mypy test...
mypy checks failed:
python/pyspark/mllib/tree.pyi:29: error: Overloaded function signatures 1 and 2 overlap with incompatible return types
python/pyspark/mllib/tree.pyi:38: error: Overloaded function signatures 1 and 2 overlap with incompatible return types

I am merging this to master/3.2/3.1.

@MaxGekk MaxGekk closed this in b09b7f7 Jul 15, 2021
MaxGekk added a commit that referenced this pull request Jul 15, 2021
### What changes were proposed in this pull request?
In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib.

Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values.

After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description #28067 shows the diffs between two calendars.

### Why are the changes needed?
The changes fix the bug portrayed by the following example from SPARK-36034:
```scala
In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
>>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
>>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show()
+----+
|date|
+----+
+----+
```
The result must have the date value `0001-01-01`.

### Does this PR introduce _any_ user-facing change?
In some sense, yes. Query results can be different in some cases. For the example above:
```scala
scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false)
+----------+
|date      |
+----------+
|0001-01-01|
+----------+
```

### How was this patch tested?
By running the modified test suite `ParquetFilterSuite`:
```
$ build/sbt "test:testOnly *ParquetV1FilterSuite"
$ build/sbt "test:testOnly *ParquetV2FilterSuite"
```

Closes #33347 from MaxGekk/fix-parquet-ts-filter-pushdown.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit b09b7f7)
Signed-off-by: Max Gekk <[email protected]>
MaxGekk added a commit to MaxGekk/spark that referenced this pull request Jul 15, 2021
In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib.

Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values.

After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description apache#28067 shows the diffs between two calendars.

The changes fix the bug portrayed by the following example from SPARK-36034:
```scala
In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
>>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
>>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show()
+----+
|date|
+----+
+----+
```
The result must have the date value `0001-01-01`.

In some sense, yes. Query results can be different in some cases. For the example above:
```scala
scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false)
+----------+
|date      |
+----------+
|0001-01-01|
+----------+
```

By running the modified test suite `ParquetFilterSuite`:
```
$ build/sbt "test:testOnly *ParquetV1FilterSuite"
$ build/sbt "test:testOnly *ParquetV2FilterSuite"
```

Closes apache#33347 from MaxGekk/fix-parquet-ts-filter-pushdown.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit b09b7f7)
Signed-off-by: Max Gekk <[email protected]>
@MaxGekk
Copy link
Member Author

MaxGekk commented Jul 16, 2021

Here is the backport to 3.0: #33387

domybest11 pushed a commit to domybest11/spark that referenced this pull request Jun 15, 2022
### What changes were proposed in this pull request?
In the PR, I propose to propagate either the SQL config `spark.sql.parquet.datetimeRebaseModeInRead` or/and Parquet option `datetimeRebaseMode` to `ParquetFilters`. The `ParquetFilters` class uses the settings in conversions of dates/timestamps instances from datasource filters to values pushed via `FilterApi` to the `parquet-column` lib.

Before the changes, date/timestamp values expressed as days/microseconds/milliseconds are interpreted as offsets in Proleptic Gregorian calendar, and pushed to the parquet library as is. That works fine if timestamp/dates values in parquet files were saved in the `CORRECTED` mode but in the `LEGACY` mode, filter's values could not match to actual values.

After the changes, timestamp/dates values of filters pushed down to parquet libs such as `FilterApi.eq(col1, -719162)` are rebased according the rebase settings. For the example, if the rebase mode is `CORRECTED`, **-719162** is pushed down as is but if the current rebase mode is `LEGACY`, the number of days is rebased to **-719164**. For more context, the PR description apache#28067 shows the diffs between two calendars.

### Why are the changes needed?
The changes fix the bug portrayed by the following example from SPARK-36034:
```scala
In [27]: spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")
>>> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
>>> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show()
+----+
|date|
+----+
+----+
```
The result must have the date value `0001-01-01`.

### Does this PR introduce _any_ user-facing change?
In some sense, yes. Query results can be different in some cases. For the example above:
```scala
scala> spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
scala> spark.sql("SELECT DATE '0001-01-01' AS date").write.mode("overwrite").parquet("date_written_by_spark3_legacy")
scala> spark.read.parquet("date_written_by_spark3_legacy").where("date = '0001-01-01'").show(false)
+----------+
|date      |
+----------+
|0001-01-01|
+----------+
```

### How was this patch tested?
By running the modified test suite `ParquetFilterSuite`:
```
$ build/sbt "test:testOnly *ParquetV1FilterSuite"
$ build/sbt "test:testOnly *ParquetV2FilterSuite"
```

Closes apache#33347 from MaxGekk/fix-parquet-ts-filter-pushdown.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>

(cherry picked from commit b09b7f7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants