-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35139][SQL] Support ANSI intervals as Arrow Column vectors #32340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ok to test |
|
cc @MaxGekk @cloud-fan @BryanCutler FYI |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
|
||
| @Override | ||
| int getInt(int rowId) { | ||
| int months = accessor.get(rowId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: return accessor.get(rowId);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
| long getLong(int rowId) { | ||
| accessor.get(rowId, intervalDayHolder); | ||
| final long microseconds = intervalDayHolder.days * MICROS_PER_DAY | ||
| + (long)intervalDayHolder.milliseconds * MICROS_PER_MILLIS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we handle overflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Math.addExact(
intervalDayHolder.days * MICROS_PER_DAY,
intervalDayHolder.milliseconds * MICROS_PER_MILLIS;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved,Thanks very much
| override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { | ||
| val totalMicroseconds = input.getLong(ordinal) | ||
| val days = totalMicroseconds / MICROS_PER_DAY | ||
| val millis = (totalMicroseconds - days * MICROS_PER_DAY) / MICROS_PER_MILLIS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (totalMicroseconds % MICROS_PER_DAY) / MICROS_PER_MILLIS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved, Thanks very much
|
Test build #137928 has finished for PR 32340 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both multiplications can overflow too. Could you use Math.multiplyExact, please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK,Thanks @MaxGekk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk done
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MaxGekk intervalDayHolder.days and intervalDayHolder.milliseconds are int, can they really overflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so:
scala> Long.MaxValue / MICROS_PER_DAY
res0: Long = 106751991
scala> (106751991 + 1) * MICROS_PER_DAY
res1: Long = -9223371964909551616There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but Int.Max * MICROS_PER_MILLIS won't overflow, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use "intervalDayHolder.days * MICROS_PER_DAY" instead of Math.multiplyExact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Peng-Lei Wenchen asked about milliseconds part, why did you changed the days multiplication? I showed above that it can overflow Long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry,My mistake. I get it. I'll fix it right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the following code for consistency with the cases above:
| case YearMonthIntervalType => Types.MinorType.INTERVALYEAR.getType | |
| case DayTimeIntervalType => Types.MinorType.INTERVALDAY.getType | |
| case YearMonthIntervalType => new ArrowType.Interval(IntervalUnit.YEAR_MONTH) | |
| case DayTimeIntervalType => new ArrowType.Interval(IntervalUnit.DAY_TIME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
MaxGekk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Peng-Lei Could you add checks to ArrowUtilsSuite:
spark/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala
Line 50 in 0494dc9
| roundtrip(DateType) |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the blank line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Int.MaxValue
| check(YearMonthIntervalType, | |
| Seq(null, 0, 1, -1, scala.Int.MaxValue, scala.Int.MinValue)) | |
| check(YearMonthIntervalType, Seq(null, 0, 1, -1, Int.MaxValue, Int.MinValue)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: scala.Long.MinValue -> Long.MinValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
Kubernetes integration test status failure |
|
Test build #137940 has finished for PR 32340 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
days has the Int type, MICROS_PER_DAY is Long but intervalDayHolder.days * MICROS_PER_DAY can overflow Long in general case. @Peng-Lei If you believe the overflow never happens, could proof that or/and add an assert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah,I test it. It did overflow. Thank you very much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such negative test could be useful, can you add it to the PR? So, we could catch the behavior change if someone will change your code in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'll try to add it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
@Peng-Lei I wonder why do you make changes in your fork master and merge them to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't clarify well
| return Math.addExact(Math.multiplyExact(intervalDayHolder.days, MICROS_PER_DAY), | |
| Math.multiplyExact(intervalDayHolder.milliseconds, MICROS_PER_MILLIS)); | |
| return Math.addExact( | |
| Math.multiplyExact(intervalDayHolder.days, MICROS_PER_DAY), | |
| intervalDayHolder.milliseconds * MICROS_PER_MILLIS); |
|
Test build #137946 has finished for PR 32340 at commit
|
@MaxGekk That's what I did. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
@Peng-Lei You can skip a few steps I think: You can merge/rebase on the master only if you see conflicts in the PR. |
|
Test build #137957 has finished for PR 32340 at commit
|
|
Test build #137960 has finished for PR 32340 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
thanks, merging to master! |
MaxGekk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Looks like we can remove the note decription on the ArrowColumnVector [1]: |
|
Test build #137982 has finished for PR 32340 at commit
|
| } else if (vector instanceof IntervalYearVector) { | ||
| accessor = new IntervalYearAccessor((IntervalYearVector) vector); | ||
| } else if (vector instanceof IntervalDayVector) { | ||
| accessor = new IntervalDayAccessor((IntervalDayVector) vector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, there's something wrong here. We mapped Spark's DayTimeIntervalType to Java (Scala)'s java.time.Duration in Java but we map it here to Arrow's IntervalType that represents a calendar instance (see also https://github.com/apache/arrow/blob/master/format/Schema.fbs).
I think we should map it to Arrow's DurationType (Python's datetime.timedelta). I am working on SPARK-37277 to support this in Arrow conversion at PySpark but this became a blocker to me. I am preparing a PR to change this but please let me know if you guys have different thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure why DayTimeIntervalType map Arrow's IntervalType here. just according ArrowUtils.scala#L60. I try to learn about Arrow types. it's sql style. And in hive INTERVAL_DAY_TIME map arrow's IntervalType with IntervalUnit.DAY_TIME unit. If we map DayTimeIntervalType to Arrow's DurationType . Then which type YearMonthIntervalType to match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least Duration cannot be mapped to YearMonthIntervalType. For DayTimeIntervalType , Arrow-wise, mapping to IntervalType makes sense but it makes less sense in Spark SQL because we're already mapping Duration.
I am not saying either way is 100% correct but I would pick the one to make it coherent in Spark's perspective if I have to pick one of both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and, YearMonthIntervalType is mapped to java.time.Period which is a calendar instance:
A date-based amount of time in the ISO-8601 calendar system, such as '2 years, 3 months and 4 days'.
So YearMonthIntervalType seems fine.
| override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { | ||
| val totalMicroseconds = input.getLong(ordinal) | ||
| val days = totalMicroseconds / MICROS_PER_DAY | ||
| val millis = (totalMicroseconds % MICROS_PER_DAY) / MICROS_PER_MILLIS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, do we lose micro seconds part? I think this is another reason to use duration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. we lose micro seconds part, end with millisecond. It's inconsistent with that convert java.time.Duration to DayTimeIntervalType that drop any excess presision that greater than microsecond precision.
…and Arrow optimization
### What changes were proposed in this pull request?
This PR proposes to support `DayTimeIntervalType` in pandas UDF and Arrow optimization.
- Change the mapping of Arrow's `IntervalType` to `DurationType` for `DayTimeIntervalType` (migration guide updated for Arrow developer APIs).
- Add a type mapping for other code paths: `numpy.timedelta64` <> `pyarrow.duration("us")` <> `DayTimeIntervalType`
### Why are the changes needed?
For changing the mapping of Arrow's `Interval` type to `Duration` type for `DayTimeIntervalType`, please refer to #32340 (comment).
`DayTimeIntervalType` is already mapped to the concept of duration instead of calendar instance: it's is matched to `pyarrow.duration("us")`, `datetime.timedelta`, and `java.util.Duration`.
**Spark SQL example**
```scala
scala> sql("SELECT timestamp '2029-01-01 00:00:00' - timestamp '2019-01-01 00:00:00'").show()
```
```
+-------------------------------------------------------------------+
|(TIMESTAMP '2029-01-01 00:00:00' - TIMESTAMP '2019-01-01 00:00:00')|
+-------------------------------------------------------------------+
| INTERVAL '3653 00...|
+-------------------------------------------------------------------+
```
```scala
scala> sql("SELECT timestamp '2029-01-01 00:00:00' - timestamp '2019-01-01 00:00:00'").printSchema()
```
```
root
|-- (TIMESTAMP '2029-01-01 00:00:00' - TIMESTAMP '2019-01-01 00:00:00'): interval day to second (nullable = false)
```
**Python example:**
```python
>>> import datetime
>>> datetime.datetime.now() - datetime.datetime.now()
datetime.timedelta(days=-1, seconds=86399, microseconds=999996)
```
**pandas / PyArrow example:**
```python
>>> import pyarrow as pa
>>> import pandas as pd
>>> pdf = pd.DataFrame({'a': [datetime.datetime.now() - datetime.datetime.now()]})
>>> tbl = pa.Table.from_pandas(pdf)
>>> tbl.schema
a: duration[ns]
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 368
```
### Does this PR introduce _any_ user-facing change?
Yes, after this change, users can use `DayTimeIntervalType` in `SparkSession.createDataFrame(pandas_df)`, `DataFrame.to_pandas`, and pandas UDFs:
```python
>>> import datetime
>>> import pandas as pd
>>> from pyspark.sql.functions import pandas_udf
>>>
>>> pandas_udf("interval day to second")
... def noop(s: pd.Series) -> pd.Series:
... assert s.iloc[0] == datetime.timedelta(microseconds=123)
... return s
...
>>> df = spark.createDataFrame(pd.DataFrame({"a": [pd.Timedelta(microseconds=123)]}))
>>> df.toPandas()
a
0 0 days 00:00:00.000123
```
### How was this patch tested?
Manually tested, and unittests were added.
Closes #34631 from HyukjinKwon/SPARK-37277-1.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
Support YearMonthIntervalType and DayTimeIntervalType to extend ArrowColumnVector
Why are the changes needed?
https://issues.apache.org/jira/browse/SPARK-35139
Does this PR introduce any user-facing change?
No
How was this patch tested?
$ ./dev/scalastyle
$ ./dev/lint-java