Skip to content

Conversation

@MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Apr 4, 2020

What changes were proposed in this pull request?

In the PR, I propose to optimise the DateTimeUtils.rebaseJulianToGregorianMicros() and rebaseGregorianToJulianMicros() functions, and make them faster by using pre-calculated rebasing tables. This approach allows to avoid expensive conversions via local timestamps. For example, the America/Los_Angeles time zone has just a few time points when difference between Proleptic Gregorian calendar and the hybrid calendar (Julian + Gregorian since 1582-10-15) is changed in the time interval 0001-01-01 .. 2100-01-01:

i local timestamp Proleptic Greg. seconds Hybrid (Julian+Greg) seconds difference in minutes
0 0001-01-01 00:00 -62135568422 -62135740800 -2872
1 0100-03-01 00:00 -59006333222 -59006419200 -1432
... ... ... ... ...
13 1582-10-15 00:00 -12219264422 -12219264000 7
14 1883-11-18 12:00 -2717640000 -2717640000 0

The difference in microseconds between Proleptic and hybrid calendars for any local timestamp in time intervals [local timestamp(i), local timestamp(i+1)), and for any microseconds in the time interval [Gregorian micros(i), Gregorian micros(i+1)) is the same. In this way, we can rebase an input micros by following the steps:

  1. Look at the table, and find the time interval where the micros falls to
  2. Take the difference between 2 calendars for this time interval
  3. Add the difference to the input micros. The result is rebased microseconds that has the same local timestamp representation.

Here are details of the implementation:

  • Pre-calculated tables are stored to JSON files gregorian-julian-rebase-micros.json and julian-gregorian-rebase-micros.json in the resource folder of sql/catalyst. The diffs and switch time points are stored as seconds, for example:
[
  {
    "tz" : "America/Los_Angeles",
    "switches" : [ -62135740800, -59006419200, ... , -2717640000 ],
    "diffs" : [ 172378, 85978, ..., 0 ]
  }
]

The JSON files are generated by 2 tests in RebaseDateTimeSuite - generate 'gregorian-julian-rebase-micros.json' and generate 'julian-gregorian-rebase-micros.json'. Both tests are disabled by default.
The switches time points are ordered from old to recent timestamps. This condition is checked by the test validate rebase records in JSON files in RebaseDateTimeSuite. Also sizes of the switches and diffs arrays are the same (this is checked by the same test).

  • The Asia/Tehran, Iran, Africa/Casablanca and Africa/El_Aaiun time zones weren't added to the JSON files, see SPARK-31385

  • The rebase info from the JSON files is placed to hash tables - gregJulianRebaseMap and julianGregRebaseMap. I use AnyRefMap because it is almost 2 times faster than Scala's immutable Map. Also I tried java.util.HashMap but it has worse lookup time than AnyRefMap in our case.
    The hash maps store the switch time points and diffs in microseconds precision to avoid conversions from microseconds to seconds in the runtime.

  • I moved the code related to days and microseconds rebasing to the separate object RebaseDateTime to do not pollute DateTimeUtils. Tests related to date-time rebasing are moved to RebaseDateTimeSuite for the same reason.

  • I placed rebasing via local timestamp to separate methods that require zone id as the first parameter assuming that the caller has zone id already. This allows to void unnecessary retrieving the default time zone. The methods are marked as private[sql] because they are used in RebaseDateTimeSuite as reference implementation.

  • Modified the rebaseGregorianToJulianMicros() and rebaseJulianToGregorianMicros() methods in RebaseDateTime to look up the rebase tables first of all. If hash maps don't contain rebasing info for the given time zone id, the methods falls back to the implementation via local timestamps. This allows to support time zones specified as zone offsets like '-08:00'.

Why are the changes needed?

To make timestamps rebasing faster:

  • Saving timestamps to parquet files is ~ x3.8 faster
  • Loading timestamps from parquet files is ~x2.8 faster.
  • Loading timestamps by Vectorized reader ~x4.6 faster.

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • Added the test validate rebase records in JSON files to RebaseDateTimeSuite. The test validates 2 json files from the resource folder - gregorian-julian-rebase-micros.json and julian-gregorian-rebase-micros.json, and it checks per each time zone records that
    • the number of switch points is equal to the number of diffs between calendars. If the numbers are different, this will violate the assumption made in RebaseDateTime.rebaseMicros.
    • swith points are ordered from old to recent timestamps. This pre-condition is required for linear search in the rebaseMicros function.
  • Added the test optimization of micros rebasing - Gregorian to Julian to RebaseDateTimeSuite which iterates over timestamps from 0001-01-01 to 2100-01-01 with the steps 1 ± 0.5 months, and checks that optimised function RebaseDateTime.rebaseGregorianToJulianMicros() returns the same result as non-optimised one. The check is performed for the UTC, PST, CET, Africa/Dakar, America/Los_Angeles, Antarctica/Vostok, Asia/Hong_Kong, Europe/Amsterdam time zones.
  • Added the test optimization of micros rebasing - Julian to Gregorian to RebaseDateTimeSuite which does similar checks as the test above but for rebasing from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar.
  • The tests for days rebasing are moved from DateTimeUtilsSuite to RebaseDateTimeSuite because the rebasing related code is moved from DateTimeUtils to the separate object RebaseDateTime.
  • Re-run DateTimeRebaseBenchmark at the America/Los_Angeles time zone (it is set explicitly in the PR [SPARK-31353][SQL] Set a time zone in DateTimeBenchmark and DateTimeRebaseBenchmark #28127):
Item Description
Region us-west-2 (Oregon)
Instance r3.xlarge
AMI ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5)
Java OpenJDK 64-Bit Server VM 1.8.0_242 and OpenJDK 64-Bit Server VM 11.0.6+10

@SparkQA
Copy link

SparkQA commented Apr 4, 2020

Test build #120812 has finished for PR 28119 at commit 52bf1d4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2020

Test build #120819 has finished for PR 28119 at commit 4b89d12.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2020

Test build #120824 has finished for PR 28119 at commit ac2beef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2020

Test build #120833 has finished for PR 28119 at commit 2b21ad7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2020

Test build #120842 has finished for PR 28119 at commit a52175d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper

@SparkQA
Copy link

SparkQA commented Apr 6, 2020

Test build #120845 has finished for PR 28119 at commit 2b4a437.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

MaxGekk added 3 commits April 6, 2020 08:53
…base-micros

# Conflicts:
#	sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt
#	sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt
#	sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala
@SparkQA
Copy link

SparkQA commented Apr 6, 2020

Test build #120859 has finished for PR 28119 at commit d39c835.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk MaxGekk changed the title [WIP][SQL] Speed up timestamps rebasing [WIP][SPARK-31359][SQL] Speed up timestamps rebasing Apr 6, 2020
@SparkQA
Copy link

SparkQA commented Apr 6, 2020

Test build #120864 has finished for PR 28119 at commit d5800f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


var micros = start
var diff = Long.MaxValue
val maxStep = DAYS_PER_WEEK * MICROS_PER_DAY
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it from month to week

@SparkQA
Copy link

SparkQA commented Apr 8, 2020

Test build #120969 has finished for PR 28119 at commit 9402336.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 8, 2020

Test build #120970 has finished for PR 28119 at commit 25f0996.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 8, 2020

Test build #120971 has finished for PR 28119 at commit 864734d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk MaxGekk changed the title [WIP][SPARK-31359][SQL] Speed up timestamps rebasing [SPARK-31359][SQL] Speed up timestamps rebasing Apr 8, 2020
@SparkQA
Copy link

SparkQA commented Apr 8, 2020

Test build #120980 has finished for PR 28119 at commit 4281d4b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 9, 2020

thanks, merging to master!

@cloud-fan cloud-fan closed this in e2d9399 Apr 9, 2020
MaxGekk added a commit to MaxGekk/spark that referenced this pull request Apr 9, 2020
In the PR, I propose to optimise the `DateTimeUtils`.`rebaseJulianToGregorianMicros()` and `rebaseGregorianToJulianMicros()` functions, and make them faster by using pre-calculated rebasing tables. This approach allows to avoid expensive conversions via local timestamps. For example, the `America/Los_Angeles` time zone has just a few time points when difference between Proleptic Gregorian calendar and the hybrid calendar (Julian + Gregorian since 1582-10-15) is changed in the time interval 0001-01-01 .. 2100-01-01:

| i | local  timestamp | Proleptic Greg. seconds | Hybrid (Julian+Greg) seconds | difference in minutes|
| -- | ------- |----|----| ---- |
|0|0001-01-01 00:00|-62135568422|-62135740800|-2872|
|1|0100-03-01 00:00|-59006333222|-59006419200|-1432|
|...|...|...|...|...|
|13|1582-10-15 00:00|-12219264422|-12219264000|7|
|14|1883-11-18 12:00|-2717640000|-2717640000|0|

The difference in microseconds between Proleptic and hybrid calendars for any local timestamp in time intervals `[local timestamp(i), local timestamp(i+1))`, and for any microseconds in the time interval `[Gregorian micros(i), Gregorian micros(i+1))` is the same. In this way, we can rebase an input micros by following the steps:
1. Look at the table, and find the time interval where the micros falls to
2. Take the difference between 2 calendars for this time interval
3. Add the difference to the input micros. The result is rebased microseconds that has the same local timestamp representation.

Here are details of the implementation:
- Pre-calculated tables are stored to JSON files `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json` in the resource folder of `sql/catalyst`. The diffs and switch time points are stored as seconds, for example:
```json
[
  {
    "tz" : "America/Los_Angeles",
    "switches" : [ -62135740800, -59006419200, ... , -2717640000 ],
    "diffs" : [ 172378, 85978, ..., 0 ]
  }
]
```
  The JSON files are generated by 2 tests in `RebaseDateTimeSuite` - `generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'`. Both tests are disabled by default.
  The `switches` time points are ordered from old to recent timestamps. This condition is checked by the test `validate rebase records in JSON files` in `RebaseDateTimeSuite`. Also sizes of the `switches` and `diffs` arrays are the same (this is checked by the same test).

- The **_Asia/Tehran, Iran, Africa/Casablanca and Africa/El_Aaiun_** time zones weren't added to the JSON files, see [SPARK-31385](https://issues.apache.org/jira/browse/SPARK-31385)
- The rebase info from the JSON files is placed to hash tables - `gregJulianRebaseMap` and `julianGregRebaseMap`. I use `AnyRefMap` because it is almost 2 times faster than Scala's immutable Map. Also I tried `java.util.HashMap` but it has worse lookup time than `AnyRefMap` in our case.
The hash maps store the switch time points and diffs in microseconds precision to avoid conversions from microseconds to seconds in the runtime.

- I moved the code related to days and microseconds rebasing to the separate object `RebaseDateTime` to do not pollute `DateTimeUtils`. Tests related to date-time rebasing are moved to `RebaseDateTimeSuite` for the same reason.

- I placed rebasing via local timestamp to separate methods that require zone id as the first parameter assuming that the caller has zone id already. This allows to void unnecessary retrieving the default time zone. The methods are marked as `private[sql]` because they are used in `RebaseDateTimeSuite` as reference implementation.

- Modified the `rebaseGregorianToJulianMicros()` and `rebaseJulianToGregorianMicros()` methods in `RebaseDateTime` to look up the rebase tables first of all. If hash maps don't contain rebasing info for the given time zone id, the methods falls back to the implementation via local timestamps. This allows to support time zones specified as zone offsets like '-08:00'.

To make timestamps rebasing faster:
- Saving timestamps to parquet files is ~ **x3.8 faster**
- Loading timestamps from parquet files is ~**x2.8 faster**.
- Loading timestamps by Vectorized reader ~**x4.6 faster**.

No

- Added the test `validate rebase records in JSON files` to `RebaseDateTimeSuite`. The test validates 2 json files from the resource folder - `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json`, and it checks per each time zone records that
  - the number of switch points is equal to the number of diffs between calendars. If the numbers are different, this will violate the assumption made in `RebaseDateTime.rebaseMicros`.
  - swith points are ordered from old to recent timestamps. This pre-condition is required for linear search in the `rebaseMicros` function.
- Added the test `optimization of micros rebasing - Gregorian to Julian` to `RebaseDateTimeSuite` which iterates over timestamps from 0001-01-01 to 2100-01-01 with the steps 1 ± 0.5 months, and checks that optimised function `RebaseDateTime`.`rebaseGregorianToJulianMicros()` returns the same result as non-optimised one. The check is performed for the UTC, PST, CET, Africa/Dakar, America/Los_Angeles, Antarctica/Vostok, Asia/Hong_Kong, Europe/Amsterdam time zones.
- Added the test `optimization of micros rebasing - Julian to Gregorian` to `RebaseDateTimeSuite` which does similar checks as the test above but for rebasing from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar.
- The tests for days rebasing are moved from `DateTimeUtilsSuite` to `RebaseDateTimeSuite` because the rebasing related code is moved from `DateTimeUtils` to the separate object `RebaseDateTime`.
- Re-run `DateTimeRebaseBenchmark` at the America/Los_Angeles time zone (it is set explicitly in the PR apache#28127):

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_242 and OpenJDK 64-Bit Server VM 11.0.6+10 |

Closes apache#28119 from MaxGekk/optimize-rebase-micros.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e2d9399)
Signed-off-by: Max Gekk <[email protected]>
cloud-fan pushed a commit that referenced this pull request Apr 14, 2020
…mestamp

### What changes were proposed in this pull request?
Reuse the `rebaseGregorianToJulianMicros()` and `rebaseJulianToGregorianMicros()` functions introduced by the PR #28119 in `DateTimeUtils`.`toJavaTimestamp()` and `fromJavaTimestamp()`. Actually, new implementation is derived from Spark 2.4 + rebasing via pre-calculated rebasing maps.

### Why are the changes needed?
The changes speed up conversions to/from java.sql.Timestamp, and as a consequence the PR improve performance of ORC datasource in loading/saving timestamps:
- Saving ~ **x2.8 faster** in master, and -11% against Spark 2.4.6
- Loading - **x3.2-4.5 faster** in master, -5% against Spark 2.4.6

Before:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        59877          59877           0          1.7         598.8       0.0X
before 1582                                       61361          61361           0          1.6         613.6       0.0X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               48197          48288         118          2.1         482.0       1.0X
after 1582, vec on                                38247          38351         128          2.6         382.5       1.3X
before 1582, vec off                              53179          53359         249          1.9         531.8       0.9X
before 1582, vec on                               44076          44268         269          2.3         440.8       1.1X
```

After:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        21250          21250           0          4.7         212.5       0.1X
before 1582                                       22105          22105           0          4.5         221.0       0.1X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               14903          14933          40          6.7         149.0       1.0X
after 1582, vec on                                 8342           8426          73         12.0          83.4       1.8X
before 1582, vec off                              15528          15575          76          6.4         155.3       1.0X
before 1582, vec on                                9025           9075          61         11.1          90.2       1.7X
```

Spark 2.4.6-SNAPSHOT:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        18858          18858           0          5.3         188.6       1.0X
before 1582                                       18508          18508           0          5.4         185.1       1.0X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               14063          14177         143          7.1         140.6       1.0X
after 1582, vec on                                 5955           6029         100         16.8          59.5       2.4X
before 1582, vec off                              14119          14126           7          7.1         141.2       1.0X
before 1582, vec on                                5991           6007          25         16.7          59.9       2.3X
```

### Does this PR introduce any user-facing change?
Yes, the `to_utc_timestamp` function returns the later local timestamp in the case of overlapping local timestamps at daylight saving time. it's changed back to the 2.4 behavior.

### How was this patch tested?
- By existing test suite `DateTimeUtilsSuite`, `RebaseDateTimeSuite`, `DateFunctionsSuite`, `DateExpressionsSuites`, `ParquetIOSuite`, `OrcHadoopFsRelationSuite`.
- Re-generating results of the benchmarks `DateTimeBenchmark` and `DateTimeRebaseBenchmark` in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_242 and OpenJDK 64-Bit Server VM 11.0.6+10 |

Closes #28189 from MaxGekk/optimize-to-from-java-timestamp.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Apr 14, 2020
…mestamp

### What changes were proposed in this pull request?
Reuse the `rebaseGregorianToJulianMicros()` and `rebaseJulianToGregorianMicros()` functions introduced by the PR #28119 in `DateTimeUtils`.`toJavaTimestamp()` and `fromJavaTimestamp()`. Actually, new implementation is derived from Spark 2.4 + rebasing via pre-calculated rebasing maps.

### Why are the changes needed?
The changes speed up conversions to/from java.sql.Timestamp, and as a consequence the PR improve performance of ORC datasource in loading/saving timestamps:
- Saving ~ **x2.8 faster** in master, and -11% against Spark 2.4.6
- Loading - **x3.2-4.5 faster** in master, -5% against Spark 2.4.6

Before:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        59877          59877           0          1.7         598.8       0.0X
before 1582                                       61361          61361           0          1.6         613.6       0.0X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               48197          48288         118          2.1         482.0       1.0X
after 1582, vec on                                38247          38351         128          2.6         382.5       1.3X
before 1582, vec off                              53179          53359         249          1.9         531.8       0.9X
before 1582, vec on                               44076          44268         269          2.3         440.8       1.1X
```

After:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        21250          21250           0          4.7         212.5       0.1X
before 1582                                       22105          22105           0          4.5         221.0       0.1X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               14903          14933          40          6.7         149.0       1.0X
after 1582, vec on                                 8342           8426          73         12.0          83.4       1.8X
before 1582, vec off                              15528          15575          76          6.4         155.3       1.0X
before 1582, vec on                                9025           9075          61         11.1          90.2       1.7X
```

Spark 2.4.6-SNAPSHOT:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        18858          18858           0          5.3         188.6       1.0X
before 1582                                       18508          18508           0          5.4         185.1       1.0X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               14063          14177         143          7.1         140.6       1.0X
after 1582, vec on                                 5955           6029         100         16.8          59.5       2.4X
before 1582, vec off                              14119          14126           7          7.1         141.2       1.0X
before 1582, vec on                                5991           6007          25         16.7          59.9       2.3X
```

### Does this PR introduce any user-facing change?
Yes, the `to_utc_timestamp` function returns the later local timestamp in the case of overlapping local timestamps at daylight saving time. it's changed back to the 2.4 behavior.

### How was this patch tested?
- By existing test suite `DateTimeUtilsSuite`, `RebaseDateTimeSuite`, `DateFunctionsSuite`, `DateExpressionsSuites`, `ParquetIOSuite`, `OrcHadoopFsRelationSuite`.
- Re-generating results of the benchmarks `DateTimeBenchmark` and `DateTimeRebaseBenchmark` in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_242 and OpenJDK 64-Bit Server VM 11.0.6+10 |

Closes #28189 from MaxGekk/optimize-to-from-java-timestamp.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit a0f8cc0)
Signed-off-by: Wenchen Fan <[email protected]>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?
In the PR, I propose to optimise the `DateTimeUtils`.`rebaseJulianToGregorianMicros()` and `rebaseGregorianToJulianMicros()` functions, and make them faster by using pre-calculated rebasing tables. This approach allows to avoid expensive conversions via local timestamps. For example, the `America/Los_Angeles` time zone has just a few time points when difference between Proleptic Gregorian calendar and the hybrid calendar (Julian + Gregorian since 1582-10-15) is changed in the time interval 0001-01-01 .. 2100-01-01:

| i | local  timestamp | Proleptic Greg. seconds | Hybrid (Julian+Greg) seconds | difference in minutes|
| -- | ------- |----|----| ---- |
|0|0001-01-01 00:00|-62135568422|-62135740800|-2872|
|1|0100-03-01 00:00|-59006333222|-59006419200|-1432|
|...|...|...|...|...|
|13|1582-10-15 00:00|-12219264422|-12219264000|7|
|14|1883-11-18 12:00|-2717640000|-2717640000|0|

The difference in microseconds between Proleptic and hybrid calendars for any local timestamp in time intervals `[local timestamp(i), local timestamp(i+1))`, and for any microseconds in the time interval `[Gregorian micros(i), Gregorian micros(i+1))` is the same. In this way, we can rebase an input micros by following the steps:
1. Look at the table, and find the time interval where the micros falls to
2. Take the difference between 2 calendars for this time interval
3. Add the difference to the input micros. The result is rebased microseconds that has the same local timestamp representation.

Here are details of the implementation:
- Pre-calculated tables are stored to JSON files `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json` in the resource folder of `sql/catalyst`. The diffs and switch time points are stored as seconds, for example:
```json
[
  {
    "tz" : "America/Los_Angeles",
    "switches" : [ -62135740800, -59006419200, ... , -2717640000 ],
    "diffs" : [ 172378, 85978, ..., 0 ]
  }
]
```
  The JSON files are generated by 2 tests in `RebaseDateTimeSuite` - `generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'`. Both tests are disabled by default.
  The `switches` time points are ordered from old to recent timestamps. This condition is checked by the test `validate rebase records in JSON files` in `RebaseDateTimeSuite`. Also sizes of the `switches` and `diffs` arrays are the same (this is checked by the same test).

- The **_Asia/Tehran, Iran, Africa/Casablanca and Africa/El_Aaiun_** time zones weren't added to the JSON files, see [SPARK-31385](https://issues.apache.org/jira/browse/SPARK-31385)
- The rebase info from the JSON files is placed to hash tables - `gregJulianRebaseMap` and `julianGregRebaseMap`. I use `AnyRefMap` because it is almost 2 times faster than Scala's immutable Map. Also I tried `java.util.HashMap` but it has worse lookup time than `AnyRefMap` in our case.
The hash maps store the switch time points and diffs in microseconds precision to avoid conversions from microseconds to seconds in the runtime.

- I moved the code related to days and microseconds rebasing to the separate object `RebaseDateTime` to do not pollute `DateTimeUtils`. Tests related to date-time rebasing are moved to `RebaseDateTimeSuite` for the same reason.

- I placed rebasing via local timestamp to separate methods that require zone id as the first parameter assuming that the caller has zone id already. This allows to void unnecessary retrieving the default time zone. The methods are marked as `private[sql]` because they are used in `RebaseDateTimeSuite` as reference implementation.

- Modified the `rebaseGregorianToJulianMicros()` and `rebaseJulianToGregorianMicros()` methods in `RebaseDateTime` to look up the rebase tables first of all. If hash maps don't contain rebasing info for the given time zone id, the methods falls back to the implementation via local timestamps. This allows to support time zones specified as zone offsets like '-08:00'.

### Why are the changes needed?
To make timestamps rebasing faster:
- Saving timestamps to parquet files is ~ **x3.8 faster**
- Loading timestamps from parquet files is ~**x2.8 faster**.
- Loading timestamps by Vectorized reader ~**x4.6 faster**.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
- Added the test `validate rebase records in JSON files` to `RebaseDateTimeSuite`. The test validates 2 json files from the resource folder - `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json`, and it checks per each time zone records that
  - the number of switch points is equal to the number of diffs between calendars. If the numbers are different, this will violate the assumption made in `RebaseDateTime.rebaseMicros`.
  - swith points are ordered from old to recent timestamps. This pre-condition is required for linear search in the `rebaseMicros` function.
- Added the test `optimization of micros rebasing - Gregorian to Julian` to `RebaseDateTimeSuite` which iterates over timestamps from 0001-01-01 to 2100-01-01 with the steps 1 ± 0.5 months, and checks that optimised function `RebaseDateTime`.`rebaseGregorianToJulianMicros()` returns the same result as non-optimised one. The check is performed for the UTC, PST, CET, Africa/Dakar, America/Los_Angeles, Antarctica/Vostok, Asia/Hong_Kong, Europe/Amsterdam time zones.
- Added the test `optimization of micros rebasing - Julian to Gregorian` to `RebaseDateTimeSuite` which does similar checks as the test above but for rebasing from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar.
- The tests for days rebasing are moved from `DateTimeUtilsSuite` to `RebaseDateTimeSuite` because the rebasing related code is moved from `DateTimeUtils` to the separate object `RebaseDateTime`.
- Re-run `DateTimeRebaseBenchmark` at the America/Los_Angeles time zone (it is set explicitly in the PR apache#28127):

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_242 and OpenJDK 64-Bit Server VM 11.0.6+10 |

Closes apache#28119 from MaxGekk/optimize-rebase-micros.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…mestamp

### What changes were proposed in this pull request?
Reuse the `rebaseGregorianToJulianMicros()` and `rebaseJulianToGregorianMicros()` functions introduced by the PR apache#28119 in `DateTimeUtils`.`toJavaTimestamp()` and `fromJavaTimestamp()`. Actually, new implementation is derived from Spark 2.4 + rebasing via pre-calculated rebasing maps.

### Why are the changes needed?
The changes speed up conversions to/from java.sql.Timestamp, and as a consequence the PR improve performance of ORC datasource in loading/saving timestamps:
- Saving ~ **x2.8 faster** in master, and -11% against Spark 2.4.6
- Loading - **x3.2-4.5 faster** in master, -5% against Spark 2.4.6

Before:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        59877          59877           0          1.7         598.8       0.0X
before 1582                                       61361          61361           0          1.6         613.6       0.0X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               48197          48288         118          2.1         482.0       1.0X
after 1582, vec on                                38247          38351         128          2.6         382.5       1.3X
before 1582, vec off                              53179          53359         249          1.9         531.8       0.9X
before 1582, vec on                               44076          44268         269          2.3         440.8       1.1X
```

After:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        21250          21250           0          4.7         212.5       0.1X
before 1582                                       22105          22105           0          4.5         221.0       0.1X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               14903          14933          40          6.7         149.0       1.0X
after 1582, vec on                                 8342           8426          73         12.0          83.4       1.8X
before 1582, vec off                              15528          15575          76          6.4         155.3       1.0X
before 1582, vec on                                9025           9075          61         11.1          90.2       1.7X
```

Spark 2.4.6-SNAPSHOT:
```
Save timestamps to ORC:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582                                        18858          18858           0          5.3         188.6       1.0X
before 1582                                       18508          18508           0          5.4         185.1       1.0X

Load timestamps from ORC:                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off                               14063          14177         143          7.1         140.6       1.0X
after 1582, vec on                                 5955           6029         100         16.8          59.5       2.4X
before 1582, vec off                              14119          14126           7          7.1         141.2       1.0X
before 1582, vec on                                5991           6007          25         16.7          59.9       2.3X
```

### Does this PR introduce any user-facing change?
Yes, the `to_utc_timestamp` function returns the later local timestamp in the case of overlapping local timestamps at daylight saving time. it's changed back to the 2.4 behavior.

### How was this patch tested?
- By existing test suite `DateTimeUtilsSuite`, `RebaseDateTimeSuite`, `DateFunctionsSuite`, `DateExpressionsSuites`, `ParquetIOSuite`, `OrcHadoopFsRelationSuite`.
- Re-generating results of the benchmarks `DateTimeBenchmark` and `DateTimeRebaseBenchmark` in the environment:

| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge |
| AMI | ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1 (ami-06f2f779464715dc5) |
| Java | OpenJDK 64-Bit Server VM 1.8.0_242 and OpenJDK 64-Bit Server VM 11.0.6+10 |

Closes apache#28189 from MaxGekk/optimize-to-from-java-timestamp.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
// Loads rebasing info from an JSON file. JSON records in the files should conform to
// `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable map because
// it is 2 times faster in DateTimeRebaseBenchmark.
private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, RebaseInfo] = {
Copy link
Member

@HyukjinKwon HyukjinKwon Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to don't private[sql] per SPARK-16813 next time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason "It simply makes debugging harder when Spark developers need to inspect the plans at runtime." is not applicable here, I believe. I would make the method private but I needed to access it from tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it's the same package org.apache.spark.sql.catalyst.util. I guess private[util] works.

@MaxGekk MaxGekk deleted the optimize-rebase-micros branch June 5, 2020 19:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants