Skip to content
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ae83fdc
Add rebaseGregorianToJulianMicros()
MaxGekk Mar 13, 2020
4c35ddb
Add rebaseJulianToGregorianMicros()
MaxGekk Mar 14, 2020
74774fc
Add a comment to rebaseJulianToGregorianMicros()
MaxGekk Mar 14, 2020
8d74214
Add comment about calendar
MaxGekk Mar 14, 2020
56ca744
Add round trip test for micros
MaxGekk Mar 14, 2020
0cfeed5
Rename to JULIAN_CUTOVER_MICROS
MaxGekk Mar 14, 2020
78cbd6c
Add rebaseJulianToGregorianDays and rebaseGregorianToJulianDays
MaxGekk Mar 14, 2020
13aad60
Minor fix code style
MaxGekk Mar 14, 2020
96573a9
Add comments for rebaseGregorianToJulianDays()
MaxGekk Mar 14, 2020
36c0400
Add the SQL config spark.sql.legacy.parquet.rebaseDateTime.enabled
MaxGekk Mar 14, 2020
f0a2df6
Perform rebase in write
MaxGekk Mar 14, 2020
9e3c201
Perform rebase dates in write
MaxGekk Mar 14, 2020
053861c
Perform rebase dates/timestamps in read
MaxGekk Mar 14, 2020
1624756
Rewrite days rebasing using Java 7 API
MaxGekk Mar 15, 2020
e3bbcb5
Rewrite micros rebasing using Java 7 API
MaxGekk Mar 15, 2020
d1e6d84
Extract common code
MaxGekk Mar 15, 2020
acd33f1
Revert "Extract common code"
MaxGekk Mar 16, 2020
41fc33f
Revert "Rewrite micros rebasing using Java 7 API"
MaxGekk Mar 16, 2020
fe9f130
Revert "Rewrite days rebasing using Java 7 API"
MaxGekk Mar 16, 2020
c2c53b8
Remove branching by cutover days in rebase functions
MaxGekk Mar 16, 2020
8e94359
Rebasing via system time zone
MaxGekk Mar 16, 2020
d6f7e6b
Rebase dates via UTC local date
MaxGekk Mar 16, 2020
81d342a
Check more time zones in days rebasing
MaxGekk Mar 16, 2020
63428ab
More dates/timestamps for testing
MaxGekk Mar 16, 2020
a34a9ce
Rename utcCal to cal
MaxGekk Mar 16, 2020
e590d36
Test multiple time zones in rebasing timestamps
MaxGekk Mar 16, 2020
262f744
Test reading parquet files written by Spark 2.4
MaxGekk Mar 16, 2020
8947298
Remove .asInstanceOf[DateType#InternalType]
MaxGekk Mar 16, 2020
276d159
Change SQL config description
MaxGekk Mar 16, 2020
167b463
Rebase timestamp INT96
MaxGekk Mar 16, 2020
bbc4a1a
Support rebasing in VectorizedColumnReader
MaxGekk Mar 16, 2020
a1b34cb
Bug fix in write
MaxGekk Mar 16, 2020
d7debb4
Add test for write
MaxGekk Mar 16, 2020
6bebf3b
Read SQL config in place
MaxGekk Mar 17, 2020
67cec02
Remove a gap
MaxGekk Mar 17, 2020
8fa19a6
Remove config settings from ParquetWriteBuilder
MaxGekk Mar 17, 2020
a061870
Initialize rebaseDateTime in default constructor in ParquetWriteSupport
MaxGekk Mar 18, 2020
ae49cc4
Check INT96 rebasing regardless of SQL config settings
MaxGekk Mar 18, 2020
a96392c
Add JIRA id
MaxGekk Mar 18, 2020
5b52735
Test INT96 w/ and w/o vectorized reader
MaxGekk Mar 18, 2020
184fcd8
Merge remote-tracking branch 'remotes/origin/master' into rebase-parq…
MaxGekk Mar 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time._
import java.time.temporal.{ChronoField, ChronoUnit, IsoFields}
import java.util.{Locale, TimeZone}
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit._

import scala.util.control.NonFatal
Expand Down Expand Up @@ -974,4 +974,102 @@ object DateTimeUtils {
}
}.mkString("'")
}

/**
* Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian
* calendar, interprets the result as a local date-time in Julian calendar in UTC time zone.
* And takes microseconds since the epoch from the Julian timestamp.
*
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
* @return The rebased microseconds since the epoch in Julian calendar.
*/
def rebaseGregorianToJulianMicros(micros: Long): Long = {
val ldt = microsToInstant(micros).atZone(ZoneId.systemDefault).toLocalDateTime
val utcCal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
.setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
.build()
millisToMicros(utcCal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
}

/**
* Converts the given microseconds to a local date-time in UTC time zone in Julian calendar,
* interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone.
* And takes microseconds since the epoch from the Gregorian timestamp.
*
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
* @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
*/
def rebaseJulianToGregorianMicros(micros: Long): Long = {
val utcCal = new Calendar.Builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we set timezone of it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is set to the default system time zone. If we set it to to particular time zone, the conversion will be incorrect.

Let me rename utcCal to cal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if I set UTC, conversions in UTC is ok but not in PST:

time zone = PST ts = 0001-01-01 01:02:03.654321 -62135564276345679 did not equal -62135564698345679
ScalaTestFailureLocation: org.apache.spark.sql.catalyst.util.DateTimeUtilsSuite at (DateTimeUtilsSuite.scala:711)
Expected :-62135564698345679
Actual   :time zone = PST ts = 0001-01-01 01:02:03.654321 -62135564276345679
<Click to see difference>

org.scalatest.exceptions.TestFailedException: time zone = PST ts = 0001-01-01 01:02:03.654321 -62135564276345679 did not equal -62135564698345679

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Default) Time zone should be involved in the conversion to avoid the problem of different time zone offsets returned by Java 7 and Java 8 APIs:

scala> java.time.ZoneId.systemDefault
res16: java.time.ZoneId = America/Los_Angeles
scala> java.sql.Timestamp.valueOf("1883-11-10 00:00:00").getTimezoneOffset / 60.0
warning: there was one deprecation warning; re-run with -deprecation for details
res17: Double = 8.0
scala> java.time.ZoneId.of("America/Los_Angeles").getRules.getOffset(java.time.LocalDateTime.parse("1883-11-10T00:00:00"))
res18: java.time.ZoneOffset = -07:52:58

// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setInstant(microsToMillis(micros))
.build()
val localDateTime = LocalDateTime.of(
utcCal.get(Calendar.YEAR),
utcCal.get(Calendar.MONTH) + 1,
utcCal.get(Calendar.DAY_OF_MONTH),
utcCal.get(Calendar.HOUR_OF_DAY),
utcCal.get(Calendar.MINUTE),
utcCal.get(Calendar.SECOND),
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
}

/**
* Converts the given number of days since the epoch day 1970-01-01 to
* a local date in Julian calendar, interprets the result as a local
* date in Proleptic Gregorian calendar, and take the number of days
* since the epoch from the Gregorian date.
*
* @param days The number of days since the epoch in Julian calendar.
* @return The rebased number of days in Gregorian calendar.
*/
def rebaseJulianToGregorianDays(days: Int): Int = {
val utcCal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setTimeZone(TimeZoneUTC)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use particular time zone here because the conversion of "logical" days is independent from time zone, actually. UTC is selected to avoid the problem of rounding micros to/from days because zone offset in UTC is 0.

.setInstant(Math.multiplyExact(days, MILLIS_PER_DAY))
.build()
val localDate = LocalDate.of(
utcCal.get(Calendar.YEAR),
utcCal.get(Calendar.MONTH) + 1,
utcCal.get(Calendar.DAY_OF_MONTH))
Math.toIntExact(localDate.toEpochDay)
}

/**
* Rebasing days since the epoch to store the same number of days
* as by Spark 2.4 and earlier versions. Spark 3.0 switched to
* Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
* this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
* Julian calendar for dates before 1582-10-15. So, the same local date may
* be mapped to different number of days since the epoch in different calendars.
*
* For example:
* Proleptic Gregorian calendar: 1582-01-01 -> -141714
* Julian calendar: 1582-01-01 -> -141704
* The code below converts -141714 to -141704.
*
* @param days The number of days since the epoch 1970-01-01. It can be negative.
* @return The rebased number of days since the epoch in Julian calendar.
*/
def rebaseGregorianToJulianDays(days: Int): Int = {
val localDate = LocalDate.ofEpochDay(days)
val utcCal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setTimeZone(TimeZoneUTC)
.setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth)
.build()
Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, MILLIS_PER_DAY))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_PARQUET_REBASE_DATETIME =
buildConf("spark.sql.legacy.parquet.rebaseDateTime.enabled")
.internal()
.doc("When true, rebase dates/timestamps before 1582-10-15 from Proleptic " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove before 1582-10-15?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

"Gregorian calendar to Julian calendar in write and from Julian to Proleptic " +
"Gregorian calendar in read. The rebasing is performed by converting micros/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of days/micros since" +
"the epoch 1970-01-01 00:00:00Z.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3072,6 +3085,8 @@ class SQLConf extends Serializable with Logging {

def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)

def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,4 +686,61 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
assert(convertIncompatiblePattern("yyyy-MM-dd'T'HH:mm:ss.SSSz G")
=== "yyyy-MM-dd'T'HH:mm:ss.SSSz G")
}

test("rebase julian to/from gregorian micros") {
val timeZone = DateTimeUtils.TimeZoneUTC
withDefaultTimeZone(timeZone) {
Seq(
"0001-01-01 01:02:03.654321",
"1000-01-01 03:02:01.123456",
"1582-10-04 00:00:00.000000",
"1582-10-15 00:00:00.999999", // Gregorian cutover day
"1883-11-10 00:00:00.000000", // America/Los_Angeles -7:52:58 zone offset
"1883-11-20 00:00:00.000000", // America/Los_Angeles -08:00 zone offset
"1969-12-31 11:22:33.000100",
"1970-01-01 00:00:00.000001", // The epoch day
"2020-03-14 09:33:01.500000").foreach { ts =>
val julianTs = Timestamp.valueOf(ts)
val julianMicros = millisToMicros(julianTs.getTime) +
((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T'))
.atZone(timeZone.toZoneId)
.toInstant)

assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros)
assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros)
}
}
}

test("rebase gregorian to/from julian days") {
// millisToDays() and fromJavaDate() are taken from Spark 2.4
def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = {
val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
}
def fromJavaDate(date: Date): Int = {
millisToDays(date.getTime, defaultTimeZone())
}
outstandingTimezones.foreach { timeZone =>
withDefaultTimeZone(timeZone) {
Seq(
"0001-01-01",
"1000-01-01",
"1582-10-04",
"1582-10-15", // Gregorian cutover day
"1883-11-10", // America/Los_Angeles -7:52:58 zone offset
"1883-11-20", // America/Los_Angeles -08:00 zone offset
"1969-12-31",
"1970-01-01", // The epoch day
"2020-03-14").foreach { date =>
val julianDays = fromJavaDate(Date.valueOf(date))
val gregorianDays = localDateToDays(LocalDate.parse(date))

assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays)
assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class ParquetFileFormat
s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.")
}

conf.set(
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use SQLConf.get in the place we need to read the config directly? It can simplify the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can but I just follow the style of the method. See lines above.

sparkSession.sessionState.conf.parquetRebaseDateTimeEnabled.toString)

new OutputWriterFactory {
// This OutputWriterFactory instance is deserialized when writing Parquet files on the
// executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
enableVectorizedReader: Boolean)
extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _
private var rebaseDateTime: Boolean = _

def this() {
// We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only
Expand Down Expand Up @@ -109,6 +110,10 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
|Catalyst requested schema:
|${catalystRequestedSchema.treeString}
""".stripMargin)

rebaseDateTime = conf.getBoolean(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key,
SQLConf.LEGACY_PARQUET_REBASE_DATETIME.defaultValue.get)

new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}

Expand All @@ -127,7 +132,8 @@ class ParquetReadSupport(val convertTz: Option[ZoneId],
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetToSparkSchemaConverter(conf),
convertTz)
convertTz,
rebaseDateTime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,23 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
* @param rebaseDateTime Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
*/
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetToSparkSchemaConverter,
convertTz: Option[ZoneId])
convertTz: Option[ZoneId],
rebaseDateTime: Boolean)
extends RecordMaterializer[InternalRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater)
private val rootConverter = new ParquetRowConverter(
schemaConverter,
parquetSchema,
catalystSchema,
convertTz,
NoopUpdater,
rebaseDateTime)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,16 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
* types should have been expanded.
* @param convertTz the optional time zone to convert to for int96 data
* @param updater An updater which propagates converted field values to the parent container
* @param updater An updater which propagates converted field values to the parent container
* @param rebaseDateTime Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
*/
private[parquet] class ParquetRowConverter(
schemaConverter: ParquetToSparkSchemaConverter,
parquetType: GroupType,
catalystType: StructType,
convertTz: Option[ZoneId],
updater: ParentContainerUpdater)
updater: ParentContainerUpdater,
rebaseDateTime: Boolean)
extends ParquetGroupConverter(updater) with Logging {

assert(
Expand Down Expand Up @@ -263,16 +265,35 @@ private[parquet] class ParquetRowConverter(
new ParquetStringConverter(updater)

case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(value)
if (rebaseDateTime) {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(value)
updater.setLong(rebased)
}
}
} else {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(value)
}
}
}

case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(DateTimeUtils.millisToMicros(value))
if (rebaseDateTime) {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
val micros = DateTimeUtils.millisToMicros(value)
val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
updater.setLong(rebased)
}
}
} else {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(DateTimeUtils.millisToMicros(value))
}
}
}

Expand All @@ -296,6 +317,15 @@ private[parquet] class ParquetRowConverter(
}
}

case DateType if rebaseDateTime =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
val rebased = DateTimeUtils.rebaseJulianToGregorianDays(value)
// DateType is not specialized in `SpecificMutableRow`, have to box it here.
updater.set(rebased.asInstanceOf[DateType#InternalType])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the error if we simply write updater.set(rebased) here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No error, I will remove .asInstanceOf[DateType#InternalType]

}
}

case DateType =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
Expand Down Expand Up @@ -348,7 +378,7 @@ private[parquet] class ParquetRowConverter(
}
}
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater)
schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater, rebaseDateTime)

case t =>
throw new RuntimeException(
Expand Down
Loading