Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Nov 17, 2021

What changes were proposed in this pull request?

This PR proposes to support DayTimeIntervalType in pandas UDF and Arrow optimization.

  • Change the mapping of Arrow's IntervalType to DurationType for DayTimeIntervalType (migration guide updated for Arrow developer APIs).
  • Add a type mapping for other code paths: numpy.timedelta64 <> pyarrow.duration("us") <> DayTimeIntervalType

Why are the changes needed?

For changing the mapping of Arrow's Interval type to Duration type for DayTimeIntervalType, please refer to #32340 (comment).

DayTimeIntervalType is already mapped to the concept of duration instead of calendar instance: it's is matched to pyarrow.duration("us"), datetime.timedelta, and java.util.Duration.

Spark SQL example

scala> sql("SELECT timestamp '2029-01-01 00:00:00' - timestamp '2019-01-01 00:00:00'").show()
+-------------------------------------------------------------------+
|(TIMESTAMP '2029-01-01 00:00:00' - TIMESTAMP '2019-01-01 00:00:00')|
+-------------------------------------------------------------------+
|                                               INTERVAL '3653 00...|
+-------------------------------------------------------------------+
scala> sql("SELECT timestamp '2029-01-01 00:00:00' - timestamp '2019-01-01 00:00:00'").printSchema()
root
 |-- (TIMESTAMP '2029-01-01 00:00:00' - TIMESTAMP '2019-01-01 00:00:00'): interval day to second (nullable = false)

Python example:

>>> import datetime
>>> datetime.datetime.now() - datetime.datetime.now()
datetime.timedelta(days=-1, seconds=86399, microseconds=999996)

pandas / PyArrow example:

>>> import pyarrow as pa
>>> import pandas as pd
>>> pdf = pd.DataFrame({'a': [datetime.datetime.now() - datetime.datetime.now()]})
>>> tbl = pa.Table.from_pandas(pdf)
>>> tbl.schema
a: duration[ns]
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 368

Does this PR introduce any user-facing change?

Yes, after this change, users can use DayTimeIntervalType in SparkSession.createDataFrame(pandas_df), DataFrame.to_pandas, and pandas UDFs:

>>> import datetime
>>> import pandas as pd
>>> from pyspark.sql.functions import pandas_udf
>>>
>>> @pandas_udf("interval day to second")
... def noop(s: pd.Series) -> pd.Series:
...     assert s.iloc[0] == datetime.timedelta(microseconds=123)
...     return s
...
>>> df = spark.createDataFrame(pd.DataFrame({"a": [pd.Timedelta(microseconds=123)]}))
>>> df.toPandas()
                       a
0 0 days 00:00:00.000123

How was this patch tested?

Manually tested, and unittests were added.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Nov 18, 2021

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.


private final IntervalDayVector accessor;
private final NullableIntervalDayHolder intervalDayHolder = new NullableIntervalDayHolder();
private final DurationVector accessor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to care about precision here? (microsecond)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, so it's just an int64 physically, and the type annotation (or logical type) decides its semantic

@Peng-Lei
Copy link
Contributor

LGTM for *.scala and *.java

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49844/


if t is not None:
# No need to cast for empty series for timedelta.
should_check_timedelta = is_timedelta64_dtype(t) and len(pdf) == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you actually meaning len(pdf) != 0? Or I miss-read the code/comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh comments are wrong. Let me rewrite.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is, BTW, to work around a bug from Arrow <> pandas.

For some reasons, pd.Series(pd.Timedelta(...), dtype="object") created from Arrow becomes float64 when you cast with series.astype("timedelta64[us]") when the data is non-empty - this cannot be reproduced with plain pandas Series.

So, here I avoided it by just skipping the casting because the type becomes correct when it is not empty. When data is empty, the type becomes object, and it has to be casted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating it. Looks good now.

Comment on lines -33 to -34
* A column vector backed by Apache Arrow. Currently calendar interval type and map type are not
* supported.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to keep map type ....?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Nov 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map type was added from #30393 but the doc fix was missing 😂

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49844/

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49850/

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49850/

@HyukjinKwon
Copy link
Member Author

Thanks guys.

Merged to master.

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Test build #145371 has finished for PR 34631 at commit d90d8bf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Test build #145377 has finished for PR 34631 at commit eb2a55e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants