Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rebase on new field - days
  • Loading branch information
MaxGekk committed Nov 1, 2019
commit 712378c6001c352d76c28c6cdf6176b694ea5355
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
} else if (t instanceof CalendarIntervalType) {
CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t);
col.getChild(0).putInts(0, capacity, c.months);
col.getChild(1).putLongs(0, capacity, c.microseconds);
col.getChild(1).putInts(0, capacity, c.days);
col.getChild(2).putLongs(0, capacity, c.microseconds);
} else if (t instanceof DateType) {
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,10 @@ private[parquet] class ParquetRowConverter(
s"but got a ${value.length()}-byte array.")

val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val milliseconds = buf.getInt
var microseconds = milliseconds * DateTimeUtils.MICROS_PER_MILLIS
val microseconds = buf.getInt * DateTimeUtils.MICROS_PER_MILLIS
val days = buf.getInt
val daysInUs = Math.multiplyExact(days, DateTimeUtils.MICROS_PER_DAY)
microseconds = Math.addExact(microseconds, daysInUs)
val months = buf.getInt
updater.set(new CalendarInterval(months, microseconds))
updater.set(new CalendarInterval(months, days, microseconds))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,10 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
case CalendarIntervalType =>
(row: SpecializedGetters, ordinal: Int) =>
val interval = row.getInterval(ordinal)
val microseconds = interval.microseconds % DateTimeUtils.MICROS_PER_DAY
val milliseconds: Int = (microseconds / DateTimeUtils.MICROS_PER_MILLIS).toInt
val days: Int = Math.toIntExact(interval.microseconds / DateTimeUtils.MICROS_PER_DAY)
val buf = ByteBuffer.wrap(reusableBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN)
.putInt(milliseconds)
.putInt(days)
.putInt((interval.milliseconds()).toInt)
Copy link
Member

@HyukjinKwon HyukjinKwon Nov 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk the doc(https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval) says:

three little-endian unsigned integers

what happens if we set negative values for some parts in the interval and negative values are written here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark will read them back as negative values: https://github.com/apache/spark/pull/26102/files#diff-35a70bb270f17ea3a1d964c4bec0e0a2R912 . I don't know about other systems.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, shouldn't we maybe add an assert to reject negative parts for now? Seems it doesn't comply parquet format. I'm just worried about the case we have to explain this multiple times to users later like https://issues.apache.org/jira/browse/SPARK-20937 and https://issues.apache.org/jira/browse/SPARK-20297

Do you think it is common to use negative parts? If not, let's just disallow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use strict toInt so that we can fail earlier if out of range.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.putInt(interval.days)
.putInt(interval.months)
recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer))

Expand Down