-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29448][SQL] Support the INTERVAL type by Parquet datasource
#26102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
1c64fb1
0769585
cfcecf1
c7889ed
4563a4e
83b4bf2
81ef7be
de0872e
281f62b
232b1fb
3f6ad61
c260622
8fd037b
032a2ea
6834f1e
712378c
cf8023a
8fa4a0d
9041110
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions._ | |
| import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
| import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} | ||
|
|
||
| /** | ||
| * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some | ||
|
|
@@ -325,6 +325,26 @@ private[parquet] class ParquetRowConverter( | |
| override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) | ||
| }) | ||
|
|
||
| case CalendarIntervalType | ||
| if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY => | ||
| new ParquetPrimitiveConverter(updater) { | ||
| override def addBinary(value: Binary): Unit = { | ||
| assert( | ||
| value.length() == 12, | ||
| "Intervals are expected to be stored in 12-byte fixed len byte array, " + | ||
| s"but got a ${value.length()}-byte array.") | ||
|
|
||
| val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) | ||
| val milliseconds = buf.getInt | ||
| var microseconds = milliseconds * DateTimeUtils.MICROS_PER_MILLIS | ||
| val days = buf.getInt | ||
| val daysInUs = Math.multiplyExact(days, DateTimeUtils.MICROS_PER_DAY) | ||
|
||
| microseconds = Math.addExact(microseconds, daysInUs) | ||
| val months = buf.getInt | ||
| updater.set(new CalendarInterval(months, microseconds)) | ||
| } | ||
| } | ||
|
|
||
| case t => | ||
| throw new RuntimeException( | ||
| s"Unable to create Parquet converter for data type ${t.json} " + | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this change after we officially make
CalendarIntervalTypepublic. i.e. move it to a public package.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering what's the relation between this PR and opening
CalendarIntervalType? AnINTERVALcolumn could appear as the result of subtraction of 2 datetime columns, and an user may want to store it into fs.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interval type is kind of an internal type for now. It's a big decision if we can read/write it from/to data sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and Python and R needs a proper conversion for both to read and write as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are Python and R involved into read/write in parquet?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, if Scala API saves interval types:
df.write.parquet("...")and Python reads it.
There's no way to map it in Python side via
collect. In case of Date type, it's mapped todate.dateinstance in Python.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We gotta make it all supported before exposing it all related interval ones (see #25022 (comment))