Skip to content
Closed
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
354965f
Optimize rebaseGregorianToJulianMicros
MaxGekk Apr 4, 2020
c6a70b2
Optimize rebaseJulianToGregorianMicros
MaxGekk Apr 4, 2020
52bf1d4
Set LA as the default time zone
MaxGekk Apr 4, 2020
bf08bd2
Re-gen DateTimeBenchmark results on JDK 8
MaxGekk Apr 4, 2020
4b89d12
Re-gen DateTimeBenchmark results on JDK 11
MaxGekk Apr 4, 2020
ac2beef
Optimize rebaseMicros
MaxGekk Apr 5, 2020
66a8af4
Re-gen DateTimeBenchmark results on JDK 11
MaxGekk Apr 5, 2020
2b21ad7
Re-gen DateTimeBenchmark results on JDK 8
MaxGekk Apr 5, 2020
17f2684
Move days rebase functions to RebaseDateTime
MaxGekk Apr 5, 2020
ddd1794
Move micros rebase functions to RebaseDateTime
MaxGekk Apr 5, 2020
743d111
Load rebase records from resource files
MaxGekk Apr 5, 2020
a52175d
Move rebasing tests to RebaseDateTimeSuite
MaxGekk Apr 5, 2020
2b4a437
Add comments
MaxGekk Apr 5, 2020
6202535
Add comments to julian-gregorian rebase functions
MaxGekk Apr 6, 2020
d39c835
Merge remote-tracking branch 'remotes/origin/master' into optimize-re…
MaxGekk Apr 6, 2020
248b9a5
Fix DaysWritable v1.2
MaxGekk Apr 6, 2020
d5800f9
Fix OrcColumnVector v1.2
MaxGekk Apr 6, 2020
d87ebec
Validate JSON files
MaxGekk Apr 6, 2020
5d417b7
Add tests for rebasing micros
MaxGekk Apr 7, 2020
ba96e3c
Re-gen JSON files
MaxGekk Apr 7, 2020
f9d0faa
Assume seconds in JSON files
MaxGekk Apr 7, 2020
377ef8a
Add JSON generate function
MaxGekk Apr 7, 2020
44df0dc
Don't check that zone id is resolvable
MaxGekk Apr 7, 2020
be672b6
Try AnyRefMap
MaxGekk Apr 7, 2020
8cd02d9
Try Java HashMap
MaxGekk Apr 7, 2020
be37b16
Revert "Try Java HashMap"
MaxGekk Apr 7, 2020
f6cf352
Put switches and diff to one array
MaxGekk Apr 7, 2020
88ff829
Revert "Put switches and diff to one array"
MaxGekk Apr 7, 2020
0ffd2a6
Fix comments
MaxGekk Apr 7, 2020
9a38c62
Random step in tests
MaxGekk Apr 7, 2020
03e2ed3
Intro JsonRebaseRecord
MaxGekk Apr 7, 2020
5204265
Add note about the system JVM time zone
MaxGekk Apr 7, 2020
b9951b9
Add a comment for RebaseDateTime
MaxGekk Apr 7, 2020
8c3570d
Re-gen DateTimeRebaseBenchmark results on JDK 8
MaxGekk Apr 8, 2020
9402336
Blacklist some time zones
MaxGekk Apr 8, 2020
90dc0bd
Re-gen DateTimeRebaseBenchmark results on JDK 11
MaxGekk Apr 8, 2020
25f0996
Merge branch 'optimize-rebase-micros' of github.com:MaxGekk/spark int…
MaxGekk Apr 8, 2020
864734d
Disable JSON generation
MaxGekk Apr 8, 2020
a84d40e
Add comments for refRebaseGregorianToJulianDays() and refRebaseJulian…
MaxGekk Apr 8, 2020
bb12a81
Fix the comment for RebaseDateTime
MaxGekk Apr 8, 2020
d2d03b2
Fix indentation and rename params
MaxGekk Apr 8, 2020
4281d4b
Address Wenchen's review comments
MaxGekk Apr 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -95,7 +96,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {

case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
val days = value.asInstanceOf[Int]
val rebasedDays = DateTimeUtils.rebaseJulianToGregorianDays(days)
val rebasedDays = rebaseJulianToGregorianDays(days)
updater.setInt(ordinal, rebasedDays)

case (INT, DateType) => (updater, ordinal, value) =>
Expand All @@ -110,15 +111,15 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
updater.setLong(ordinal, micros)
case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
val rebasedMicros = rebaseJulianToGregorianMicros(micros)
updater.setLong(ordinal, rebasedMicros)
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -142,7 +143,7 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))

case (DateType, INT) if rebaseDateTime =>
(getter, ordinal) => DateTimeUtils.rebaseGregorianToJulianDays(getter.getInt(ordinal))
(getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal))

case (DateType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)
Expand All @@ -152,12 +153,12 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
val micros = getter.getLong(ordinal)
val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(micros)
val rebasedMicros = rebaseGregorianToJulianMicros(micros)
DateTimeUtils.microsToMillis(rebasedMicros)
case null | _: TimestampMillis => (getter, ordinal) =>
DateTimeUtils.microsToMillis(getter.getLong(ordinal))
case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal))
rebaseGregorianToJulianMicros(getter.getLong(ordinal))
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
Expand Down
2,385 changes: 2,385 additions & 0 deletions sql/catalyst/src/main/resources/gregorian-julian-rebase-micros.json

Large diffs are not rendered by default.

2,385 changes: 2,385 additions & 0 deletions sql/catalyst/src/main/resources/julian-gregorian-rebase-micros.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time._
import java.time.temporal.{ChronoField, ChronoUnit, IsoFields}
import java.util.{Calendar, Locale, TimeZone}
import java.util.{Locale, TimeZone}
import java.util.concurrent.TimeUnit._

import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -981,153 +982,4 @@ object DateTimeUtils {
val days = period.getDays
new CalendarInterval(months, days, 0)
}

/**
* Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian
* calendar, interprets the result as a local date-time in Julian calendar in UTC time zone.
* And takes microseconds since the epoch from the Julian timestamp.
*
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
* @return The rebased microseconds since the epoch in Julian calendar.
*/
def rebaseGregorianToJulianMicros(micros: Long): Long = {
val instant = microsToInstant(micros)
val zoneId = ZoneId.systemDefault
val ldt = instant.atZone(zoneId).toLocalDateTime
val cal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
.setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
// Local time-line can overlaps, such as at an autumn daylight savings cutover.
// This setting selects the original local timestamp mapped to the given `micros`.
.set(Calendar.DST_OFFSET, zoneId.getRules.getDaylightSavings(instant).toMillis.toInt)
.build()
millisToMicros(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
}

/**
* Converts the given microseconds to a local date-time in UTC time zone in Julian calendar,
* interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone.
* And takes microseconds since the epoch from the Gregorian timestamp.
*
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
* @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
*/
def rebaseJulianToGregorianMicros(micros: Long): Long = {
val cal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setInstant(microsToMillis(micros))
.build()
val localDateTime = LocalDateTime.of(
cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1,
// The number of days will be added later to handle non-existing
// Julian dates in Proleptic Gregorian calendar.
// For example, 1000-02-29 exists in Julian calendar because 1000
// is a leap year but it is not a leap year in Gregorian calendar.
1,
cal.get(Calendar.HOUR_OF_DAY),
cal.get(Calendar.MINUTE),
cal.get(Calendar.SECOND),
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
.plusDays(cal.get(Calendar.DAY_OF_MONTH) - 1)
val zonedDateTime = localDateTime.atZone(ZoneId.systemDefault)
// Zero DST offset means that local clocks have switched to the winter time already.
// So, clocks go back one hour. We should correct zoned date-time and change
// the zone offset to the later of the two valid offsets at a local time-line overlap.
val adjustedZdt = if (cal.get(Calendar.DST_OFFSET) == 0) {
zonedDateTime.withLaterOffsetAtOverlap()
} else {
zonedDateTime
}
instantToMicros(adjustedZdt.toInstant)
}

/**
* Rebases days since the epoch from an original to an target calendar, from instance
* from a hybrid (Julian + Gregorian) to Proleptic Gregorian calendar.
*
* It finds the latest switch day which is less than `days`, and adds the difference
* in days associated with the switch day to the given `days`. The function is based
* on linear search which starts from the most recent switch days. This allows to perform
* less comparisons for modern dates.
*
* @param switchDays The days when difference in days between original and target
* calendar was changed.
* @param diffs The differences in days between calendars.
* @param days The number of days since the epoch 1970-01-01 to be rebased to the
* target calendar.
* @return The rebased day
*/
private def rebaseDays(switchDays: Array[Int], diffs: Array[Int], days: Int): Int = {
var i = switchDays.length - 1
while (i >= 0 && days < switchDays(i)) {
i -= 1
}
val rebased = days + diffs(if (i < 0) 0 else i)
rebased
}

// The differences in days between Julian and Proleptic Gregorian dates.
// The diff at the index `i` is applicable for all days in the date interval:
// [julianGregDiffSwitchDay(i), julianGregDiffSwitchDay(i+1))
private val julianGregDiffs = Array(2, 1, 0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, 0)
// The sorted days in Julian calendar when difference in days between Julian and
// Proleptic Gregorian calendars was changed.
// The starting point is the `0001-01-01` (-719164 days since the epoch in
// Julian calendar). All dates before the staring point have the same difference
// of 2 days in Julian and Proleptic Gregorian calendars.
private val julianGregDiffSwitchDay = Array(
-719164, -682945, -646420, -609895, -536845, -500320, -463795,
-390745, -354220, -317695, -244645, -208120, -171595, -141427)

/**
* Converts the given number of days since the epoch day 1970-01-01 to
* a local date in Julian calendar, interprets the result as a local
* date in Proleptic Gregorian calendar, and take the number of days
* since the epoch from the Gregorian date.
*
* @param days The number of days since the epoch in Julian calendar.
* @return The rebased number of days in Gregorian calendar.
*/
def rebaseJulianToGregorianDays(days: Int): Int = {
rebaseDays(julianGregDiffSwitchDay, julianGregDiffs, days)
}

// The differences in days between Proleptic Gregorian and Julian dates.
// The diff at the index `i` is applicable for all days in the date interval:
// [gregJulianDiffSwitchDay(i), gregJulianDiffSwitchDay(i+1))
private val grepJulianDiffs = Array(-2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0)
// The sorted days in Proleptic Gregorian calendar when difference in days between
// Proleptic Gregorian and Julian was changed.
// The starting point is the `0001-01-01` (-719162 days since the epoch in
// Proleptic Gregorian calendar). All dates before the staring point have the same
// difference of -2 days in Proleptic Gregorian and Julian calendars.
private val gregJulianDiffSwitchDay = Array(
-719162, -682944, -646420, -609896, -536847, -500323, -463799,
-390750, -354226, -317702, -244653, -208129, -171605, -141427)

/**
* Rebasing days since the epoch to store the same number of days
* as by Spark 2.4 and earlier versions. Spark 3.0 switched to
* Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
* this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
* Julian calendar for dates before 1582-10-15. So, the same local date may
* be mapped to different number of days since the epoch in different calendars.
*
* For example:
* Proleptic Gregorian calendar: 1582-01-01 -> -141714
* Julian calendar: 1582-01-01 -> -141704
* The code below converts -141714 to -141704.
*
* @param days The number of days since the epoch 1970-01-01. It can be negative.
* @return The rebased number of days since the epoch in Julian calendar.
*/
def rebaseGregorianToJulianDays(days: Int): Int = {
rebaseDays(gregJulianDiffSwitchDay, grepJulianDiffs, days)
}
}
Loading