-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31183][SQL] Rebase date/timestamp from/to Julian calendar in Avro #27953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…-datetime # Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala # sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
|
cc @cloud-fan |
|
Test build #120003 has finished for PR 27953 at commit
|
…-datetime # Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
|
@cloud-fan I cannot test the scala> :paste
// Entering paste mode (ctrl-D to finish)
val timestampSchema = s"""
{
"namespace": "logical",
"type": "record",
"name": "test",
"fields": [
{"name": "ts", "type": {"type": "long","logicalType": "timestamp-millis"}}
]
}
"""
// Exiting paste mode, now interpreting.
timestampSchema: String =
"
{
"namespace": "logical",
"type": "record",
"name": "test",
"fields": [
{"name": "ts", "type": {"type": "long","logicalType": "timestamp-millis"}}
]
}
"
scala> val df3 = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts"))
df3: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala> df3.write.format("avro").option("avroSchema", timestampSchema).save("/Users/maxim/tmp/before_1582/2_4_5_ts_millis_avro")
20/03/19 03:00:37 ERROR Utils: Aborting task
org.apache.avro.AvroRuntimeException: Not a union: {"type":"long","logicalType":"timestamp-millis"}The same works on the master. |
|
@gengliangwang Is it possible to save timestamps as |
|
@cloud-fan @HyukjinKwon Please, review the PR. |
| val tsStr = "1001-01-01 01:02:03.123456" | ||
| val rebased = "1001-01-01 01:02:03.123" | ||
| val nonRebased = "1001-01-07 01:09:05.123" | ||
| val timestampSchema = """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we use the Scala multi line string?
"""
|abc
|xyz
""".stripMargin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
It's probably because the actual column is nullable (after the cast), but the specified schema is not. Maybe we've fixed something in 3.0. |
|
Test build #120045 has finished for PR 27953 at commit
|
|
Test build #120048 has finished for PR 27953 at commit
|
@cloud-fan You are right. I changed the schema while writing by Spark 2.4 and everything is ok. |
|
Test build #120067 has finished for PR 27953 at commit
|
|
thanks, merging to master/3.0! |
The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via **Avro** datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to: - -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into **Avro** files. - -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value. The PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config: ``` spark.sql.legacy.avro.rebaseDateTime.enabled ``` which is set to `false` by default which means the rebasing is not performed by default. The details of the implementation: 1. Re-use 2 methods of `DateTimeUtils` added by the PR #27915 for rebasing microseconds. 2. Re-use 2 methods of `DateTimeUtils` added by the PR #27915 for rebasing days. 3. Use `rebaseGregorianToJulianMicros()` and `rebaseGregorianToJulianDays()` while saving timestamps/dates to **Avro** files if the SQL config is on. 4. Use `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` while loading timestamps/dates from **Avro** files if the SQL config is on. 5. The SQL config `spark.sql.legacy.avro.rebaseDateTime.enabled` controls conversions from/to dates, and timestamps of the `timestamp-millis`, `timestamp-micros` logical types. For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions. Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as `timestamp-micros` is interpreted by Spark 3.0.0-preview2 differently: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false) +----------+ |date | +----------+ |1001-01-07| +----------+ ``` After the changes: ```scala scala> spark.conf.set("spark.sql.legacy.avro.rebaseDateTime.enabled", true) scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false) +----------+ |date | +----------+ |1001-01-01| +----------+ ``` 1. Added tests to `AvroLogicalTypeSuite` to check rebasing in read. The test reads back avro files saved by Spark 2.4.5 via: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> val df = Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date")) df: org.apache.spark.sql.DataFrame = [date: date] scala> df.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_date_avro") scala> val df2 = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts")) df2: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df2.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_ts_avro") scala> :paste // Entering paste mode (ctrl-D to finish) val timestampSchema = s""" | { | "namespace": "logical", | "type": "record", | "name": "test", | "fields": [ | {"name": "ts", "type": ["null", {"type": "long","logicalType": "timestamp-millis"}], "default": null} | ] | } |""".stripMargin // Exiting paste mode, now interpreting. scala> df3.write.format("avro").option("avroSchema", timestampSchema).save("/Users/maxim/tmp/before_1582/2_4_5_ts_millis_avro") ``` 2. Added the following tests to `AvroLogicalTypeSuite` to check rebasing of dates/timestamps (in microsecond and millisecond precision). The tests write rebased a date/timestamps and read them back w/ enabled/disabled rebasing, and compare results. : - `rebasing microseconds timestamps in write` - `rebasing milliseconds timestamps in write` - `rebasing dates in write` Closes #27953 from MaxGekk/rebase-avro-datetime. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 4766a36) Signed-off-by: Wenchen Fan <[email protected]>
| spark.read.format("avro").load(url.toString) | ||
| } | ||
|
|
||
| test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed one thing. I think the test is not very related to logical types and probably should be put in AvroSuite.
@MaxGekk can you move the test in your next PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean only this test, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the new tests added here. The are more about compatibility, not logical type.
| // (the `null` case), output the timestamp value as with millisecond precision. | ||
| case null | _: TimestampMillis => (getter, ordinal) => | ||
| val micros = getter.getLong(ordinal) | ||
| val rebasedMicros = if (rebaseDateTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing, why don't we return a function rather than checking rebaseDateTime for every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I assumed timestamps in milliseconds is rare case. By default, Spark writes microseconds.
- Checking the boolean flag shouldn't have significant overhead.
- If the function is hot, jvm should optimize it
I can move the flag checking out of the function body in a follow PR, or in the same for #27953 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's easy to switch with almost no additional complexity. Seems fine to change rather than relying on other optimization like JIT, or having a bad example.
…ck the rebase flag out of function bodies ### What changes were proposed in this pull request? 1. The tests added by #27953 are moved from `AvroLogicalTypeSuite` to `AvroSuite`. 2. Checking of the `rebaseDateTime` flag is moved out from functions bodies. ### Why are the changes needed? 1. The tests are moved because they are not directly related to logical types. 2. Checking the flag out of functions bodies should improve performance. ### Does this PR introduce any user-facing change? No ### How was this patch tested? By running Avro tests via the command `build/sbt avro/test` Closes #27964 from MaxGekk/rebase-avro-datetime-followup. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…ck the rebase flag out of function bodies 1. The tests added by #27953 are moved from `AvroLogicalTypeSuite` to `AvroSuite`. 2. Checking of the `rebaseDateTime` flag is moved out from functions bodies. 1. The tests are moved because they are not directly related to logical types. 2. Checking the flag out of functions bodies should improve performance. No By running Avro tests via the command `build/sbt avro/test` Closes #27964 from MaxGekk/rebase-avro-datetime-followup. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…d check the rebase flag out of function bodies ### What changes were proposed in this pull request? 1. The tests added by #27953 are moved from `AvroLogicalTypeSuite` to `AvroSuite`. 2. Checking of the `rebaseDateTime` flag is moved out from functions bodies. This is a backport of #27964 ### Why are the changes needed? 1. The tests are moved because they are not directly related to logical types. 2. Checking the flag out of functions bodies should improve performance. ### Does this PR introduce any user-facing change? No ### How was this patch tested? By running Avro tests via the command `build/sbt avro/test` Closes #27977 from MaxGekk/rebase-avro-datetime-followup-3.0. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…asource ### What changes were proposed in this pull request? In the PR, I propose to add new benchmark `DateTimeRebaseBenchmark` which should measure the performance of rebasing of dates/timestamps from/to to the hybrid calendar (Julian+Gregorian) to/from Proleptic Gregorian calendar: 1. In write, it saves separately dates and timestamps before and after 1582 year w/ and w/o rebasing. 2. In read, it loads previously saved parquet files by vectorized reader and by regular reader. Here is the summary of benchmarking: - Saving timestamps is **~6 times slower** - Loading timestamps w/ vectorized **off** is **~4 times slower** - Loading timestamps w/ vectorized **on** is **~10 times slower** ### Why are the changes needed? To know the impact of date-time rebasing introduced by #27915, #27953, #27807. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Run the `DateTimeRebaseBenchmark` benchmark using Amazon EC2: | Item | Description | | ---- | ----| | Region | us-west-2 (Oregon) | | Instance | r3.xlarge | | AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) | | Java | OpenJDK8/11 | Closes #28057 from MaxGekk/rebase-bechmark. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…asource ### What changes were proposed in this pull request? In the PR, I propose to add new benchmark `DateTimeRebaseBenchmark` which should measure the performance of rebasing of dates/timestamps from/to to the hybrid calendar (Julian+Gregorian) to/from Proleptic Gregorian calendar: 1. In write, it saves separately dates and timestamps before and after 1582 year w/ and w/o rebasing. 2. In read, it loads previously saved parquet files by vectorized reader and by regular reader. Here is the summary of benchmarking: - Saving timestamps is **~6 times slower** - Loading timestamps w/ vectorized **off** is **~4 times slower** - Loading timestamps w/ vectorized **on** is **~10 times slower** ### Why are the changes needed? To know the impact of date-time rebasing introduced by #27915, #27953, #27807. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Run the `DateTimeRebaseBenchmark` benchmark using Amazon EC2: | Item | Description | | ---- | ----| | Region | us-west-2 (Oregon) | | Instance | r3.xlarge | | AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) | | Java | OpenJDK8/11 | Closes #28057 from MaxGekk/rebase-bechmark. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit a1dbcd1) Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via **Avro** datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to: - -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into **Avro** files. - -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value. The PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config: ``` spark.sql.legacy.avro.rebaseDateTime.enabled ``` which is set to `false` by default which means the rebasing is not performed by default. The details of the implementation: 1. Re-use 2 methods of `DateTimeUtils` added by the PR apache#27915 for rebasing microseconds. 2. Re-use 2 methods of `DateTimeUtils` added by the PR apache#27915 for rebasing days. 3. Use `rebaseGregorianToJulianMicros()` and `rebaseGregorianToJulianDays()` while saving timestamps/dates to **Avro** files if the SQL config is on. 4. Use `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` while loading timestamps/dates from **Avro** files if the SQL config is on. 5. The SQL config `spark.sql.legacy.avro.rebaseDateTime.enabled` controls conversions from/to dates, and timestamps of the `timestamp-millis`, `timestamp-micros` logical types. ### Why are the changes needed? For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions. ### Does this PR introduce any user-facing change? Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as `timestamp-micros` is interpreted by Spark 3.0.0-preview2 differently: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false) +----------+ |date | +----------+ |1001-01-07| +----------+ ``` After the changes: ```scala scala> spark.conf.set("spark.sql.legacy.avro.rebaseDateTime.enabled", true) scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false) +----------+ |date | +----------+ |1001-01-01| +----------+ ``` ### How was this patch tested? 1. Added tests to `AvroLogicalTypeSuite` to check rebasing in read. The test reads back avro files saved by Spark 2.4.5 via: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> val df = Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date")) df: org.apache.spark.sql.DataFrame = [date: date] scala> df.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_date_avro") scala> val df2 = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts")) df2: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df2.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_ts_avro") scala> :paste // Entering paste mode (ctrl-D to finish) val timestampSchema = s""" | { | "namespace": "logical", | "type": "record", | "name": "test", | "fields": [ | {"name": "ts", "type": ["null", {"type": "long","logicalType": "timestamp-millis"}], "default": null} | ] | } |""".stripMargin // Exiting paste mode, now interpreting. scala> df3.write.format("avro").option("avroSchema", timestampSchema).save("/Users/maxim/tmp/before_1582/2_4_5_ts_millis_avro") ``` 2. Added the following tests to `AvroLogicalTypeSuite` to check rebasing of dates/timestamps (in microsecond and millisecond precision). The tests write rebased a date/timestamps and read them back w/ enabled/disabled rebasing, and compare results. : - `rebasing microseconds timestamps in write` - `rebasing milliseconds timestamps in write` - `rebasing dates in write` Closes apache#27953 from MaxGekk/rebase-avro-datetime. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…ck the rebase flag out of function bodies ### What changes were proposed in this pull request? 1. The tests added by apache#27953 are moved from `AvroLogicalTypeSuite` to `AvroSuite`. 2. Checking of the `rebaseDateTime` flag is moved out from functions bodies. ### Why are the changes needed? 1. The tests are moved because they are not directly related to logical types. 2. Checking the flag out of functions bodies should improve performance. ### Does this PR introduce any user-facing change? No ### How was this patch tested? By running Avro tests via the command `build/sbt avro/test` Closes apache#27964 from MaxGekk/rebase-avro-datetime-followup. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…asource ### What changes were proposed in this pull request? In the PR, I propose to add new benchmark `DateTimeRebaseBenchmark` which should measure the performance of rebasing of dates/timestamps from/to to the hybrid calendar (Julian+Gregorian) to/from Proleptic Gregorian calendar: 1. In write, it saves separately dates and timestamps before and after 1582 year w/ and w/o rebasing. 2. In read, it loads previously saved parquet files by vectorized reader and by regular reader. Here is the summary of benchmarking: - Saving timestamps is **~6 times slower** - Loading timestamps w/ vectorized **off** is **~4 times slower** - Loading timestamps w/ vectorized **on** is **~10 times slower** ### Why are the changes needed? To know the impact of date-time rebasing introduced by apache#27915, apache#27953, apache#27807. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Run the `DateTimeRebaseBenchmark` benchmark using Amazon EC2: | Item | Description | | ---- | ----| | Region | us-west-2 (Oregon) | | Instance | r3.xlarge | | AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) | | Java | OpenJDK8/11 | Closes apache#28057 from MaxGekk/rebase-bechmark. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via Avro datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to:
The PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config:
which is set to
falseby default which means the rebasing is not performed by default.The details of the implementation:
DateTimeUtilsadded by the PR [SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet #27915 for rebasing microseconds.DateTimeUtilsadded by the PR [SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet #27915 for rebasing days.rebaseGregorianToJulianMicros()andrebaseGregorianToJulianDays()while saving timestamps/dates to Avro files if the SQL config is on.rebaseJulianToGregorianMicros()andrebaseJulianToGregorianDays()while loading timestamps/dates from Avro files if the SQL config is on.spark.sql.legacy.avro.rebaseDateTime.enabledcontrols conversions from/to dates, and timestamps of thetimestamp-millis,timestamp-microslogical types.Why are the changes needed?
For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions.
Does this PR introduce any user-facing change?
Yes, the timestamp
1001-01-01 01:02:03.123456saved by Spark 2.4.5 astimestamp-microsis interpreted by Spark 3.0.0-preview2 differently:After the changes:
How was this patch tested?
AvroLogicalTypeSuiteto check rebasing in read. The test reads back avro files saved by Spark 2.4.5 via:AvroLogicalTypeSuiteto check rebasing of dates/timestamps (in microsecond and millisecond precision). The tests write rebased a date/timestamps and read them back w/ enabled/disabled rebasing, and compare results. :rebasing microseconds timestamps in writerebasing milliseconds timestamps in writerebasing dates in write