Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ license: |

## Upgrading from Spark SQL 3.2 to 3.3

- Since Spark 3.3, `DayTimeIntervalType` in Spark SQL is mapped to Arrow's `Duration` type in `ArrowWriter` and `ArrowColumnVector` developer APIs. Previously, `DayTimeIntervalType` was mapped to Arrow's `Interval` type which does not match with the types of other languages Spark SQL maps. For example, `DayTimeIntervalType` is mapped to `java.time.Duration` in Java.

- Since Spark 3.3, the functions `lpad` and `rpad` have been overloaded to support byte sequences. When the first argument is a byte sequence, the optional padding pattern must also be a byte sequence and the result is a BINARY value. The default padding pattern in this case is the zero byte.

- Since Spark 3.3, Spark turns a non-nullable schema into nullable for API `DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` and `DataFrameReader.schema(schema: StructType).csv(csvDataset: Dataset[String])` when the schema is specified by the user and contains non-nullable fields.
Expand Down
27 changes: 24 additions & 3 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

from pyspark.rdd import _load_from_socket # type: ignore[attr-defined]
from pyspark.sql.pandas.serializers import ArrowCollectSerializer
from pyspark.sql.types import IntegralType
from pyspark.sql.types import (
IntegralType,
ByteType,
ShortType,
IntegerType,
Expand All @@ -33,6 +33,7 @@
MapType,
TimestampType,
TimestampNTZType,
DayTimeIntervalType,
StructType,
DataType,
)
Expand Down Expand Up @@ -85,6 +86,7 @@ def toPandas(self) -> "PandasDataFrameLike":

import numpy as np
import pandas as pd
from pandas.core.dtypes.common import is_timedelta64_dtype

timezone = self.sql_ctx._conf.sessionLocalTimeZone() # type: ignore[attr-defined]

Expand Down Expand Up @@ -225,7 +227,10 @@ def toPandas(self) -> "PandasDataFrameLike":
else:
series = pdf[column_name]

if t is not None:
# No need to cast for non-empty series for timedelta. The type is already correct.
should_check_timedelta = is_timedelta64_dtype(t) and len(pdf) == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you actually meaning len(pdf) != 0? Or I miss-read the code/comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh comments are wrong. Let me rewrite.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is, BTW, to work around a bug from Arrow <> pandas.

For some reasons, pd.Series(pd.Timedelta(...), dtype="object") created from Arrow becomes float64 when you cast with series.astype("timedelta64[us]") when the data is non-empty - this cannot be reproduced with plain pandas Series.

So, here I avoided it by just skipping the casting because the type becomes correct when it is not empty. When data is empty, the type becomes object, and it has to be casted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating it. Looks good now.


if (t is not None and not is_timedelta64_dtype(t)) or should_check_timedelta:
series = series.astype(t, copy=False)

# `insert` API makes copy of data, we only do it for Series of duplicate column names.
Expand Down Expand Up @@ -278,6 +283,8 @@ def _to_corrected_pandas_type(dt: DataType) -> Optional[Type]:
return np.datetime64
elif type(dt) == TimestampNTZType:
return np.datetime64
elif type(dt) == DayTimeIntervalType:
return np.timedelta64
else:
return None

Expand Down Expand Up @@ -424,13 +431,14 @@ def _convert_from_pandas(
list
list of records
"""
import pandas as pd
from pyspark.sql import SparkSession

assert isinstance(self, SparkSession)

if timezone is not None:
from pyspark.sql.pandas.types import _check_series_convert_timestamps_tz_local
from pandas.core.dtypes.common import is_datetime64tz_dtype
from pandas.core.dtypes.common import is_datetime64tz_dtype, is_timedelta64_dtype

copied = False
if isinstance(schema, StructType):
Expand Down Expand Up @@ -459,6 +467,19 @@ def _convert_from_pandas(
copied = True
pdf[column] = s

for column, series in pdf.iteritems():
if is_timedelta64_dtype(series):
if not copied:
pdf = pdf.copy()
copied = True
# Explicitly set the timedelta as object so the output of numpy records can
# hold the timedelta instances as are. Otherwise, it converts to the internal
# numeric values.
ser = pdf[column]
pdf[column] = pd.Series(
ser.dt.to_pytimedelta(), index=ser.index, dtype="object", name=ser.name
)

# Convert pandas.DataFrame to list of numpy records
np_records = pdf.to_records(index=False)

Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/pandas/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
DateType,
TimestampType,
TimestampNTZType,
DayTimeIntervalType,
ArrayType,
MapType,
StructType,
Expand Down Expand Up @@ -81,6 +82,8 @@ def to_arrow_type(dt: DataType) -> "pa.DataType":
arrow_type = pa.timestamp("us", tz="UTC")
elif type(dt) == TimestampNTZType:
arrow_type = pa.timestamp("us", tz=None)
elif type(dt) == DayTimeIntervalType:
arrow_type = pa.duration("us")
elif type(dt) == ArrayType:
if type(dt.elementType) in [StructType, TimestampType]:
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
Expand Down Expand Up @@ -153,6 +156,8 @@ def from_arrow_type(at: "pa.DataType", prefer_timestamp_ntz: bool = False) -> Da
spark_type = TimestampNTZType()
elif types.is_timestamp(at):
spark_type = TimestampType()
elif types.is_duration(at):
spark_type = DayTimeIntervalType()
elif types.is_list(at):
if types.is_timestamp(at.value_type):
raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
Expand Down
14 changes: 13 additions & 1 deletion python/pyspark/sql/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from pyspark import SparkContext, SparkConf
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import rand, udf
from pyspark.sql.functions import rand, udf, assert_true, lit
from pyspark.sql.types import (
StructType,
StringType,
Expand Down Expand Up @@ -241,6 +241,18 @@ def test_create_data_frame_to_pandas_timestamp_ntz(self):
assert_frame_equal(origin, pdf)
assert_frame_equal(pdf, pdf_arrow)

def test_create_data_frame_to_pandas_day_time_internal(self):
# SPARK-37279: Test DayTimeInterval in createDataFrame and toPandas
origin = pd.DataFrame({"a": [datetime.timedelta(microseconds=123)]})
df = self.spark.createDataFrame(origin)
df.select(
assert_true(lit("INTERVAL '0 00:00:00.000123' DAY TO SECOND") == df.a.cast("string"))
).collect()

pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
assert_frame_equal(origin, pdf)
assert_frame_equal(pdf, pdf_arrow)

def test_toPandas_respect_session_timezone(self):
df = self.spark.createDataFrame(self.data, schema=self.schema)

Expand Down
33 changes: 26 additions & 7 deletions python/pyspark/sql/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
TimestampType,
TimestampNTZType,
FloatType,
DayTimeIntervalType,
)
from pyspark.sql.utils import AnalysisException, IllegalArgumentException
from pyspark.testing.sqlutils import (
Expand Down Expand Up @@ -678,7 +679,7 @@ def test_cache(self):
)

def _to_pandas(self):
from datetime import datetime, date
from datetime import datetime, date, timedelta

schema = (
StructType()
Expand All @@ -689,6 +690,7 @@ def _to_pandas(self):
.add("dt", DateType())
.add("ts", TimestampType())
.add("ts_ntz", TimestampNTZType())
.add("dt_interval", DayTimeIntervalType())
)
data = [
(
Expand All @@ -699,8 +701,9 @@ def _to_pandas(self):
date(1969, 1, 1),
datetime(1969, 1, 1, 1, 1, 1),
datetime(1969, 1, 1, 1, 1, 1),
timedelta(days=1),
),
(2, "foo", True, 5.0, None, None, None),
(2, "foo", True, 5.0, None, None, None, None),
(
3,
"bar",
Expand All @@ -709,6 +712,7 @@ def _to_pandas(self):
date(2012, 3, 3),
datetime(2012, 3, 3, 3, 3, 3),
datetime(2012, 3, 3, 3, 3, 3),
timedelta(hours=-1, milliseconds=421),
),
(
4,
Expand All @@ -718,6 +722,7 @@ def _to_pandas(self):
date(2100, 4, 4),
datetime(2100, 4, 4, 4, 4, 4),
datetime(2100, 4, 4, 4, 4, 4),
timedelta(microseconds=123),
),
]
df = self.spark.createDataFrame(data, schema)
Expand All @@ -736,6 +741,7 @@ def test_to_pandas(self):
self.assertEqual(types[4], np.object) # datetime.date
self.assertEqual(types[5], "datetime64[ns]")
self.assertEqual(types[6], "datetime64[ns]")
self.assertEqual(types[7], "timedelta64[ns]")

@unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore
def test_to_pandas_with_duplicated_column_names(self):
Expand Down Expand Up @@ -808,7 +814,8 @@ def test_to_pandas_from_empty_dataframe(self):
CAST(1 AS BOOLEAN) AS boolean,
CAST('foo' AS STRING) AS string,
CAST('2019-01-01' AS TIMESTAMP) AS timestamp,
CAST('2019-01-01' AS TIMESTAMP_NTZ) AS timestamp_ntz
CAST('2019-01-01' AS TIMESTAMP_NTZ) AS timestamp_ntz,
INTERVAL '1563:04' MINUTE TO SECOND AS day_time_interval
"""
dtypes_when_nonempty_df = self.spark.sql(sql).toPandas().dtypes
dtypes_when_empty_df = self.spark.sql(sql).filter("False").toPandas().dtypes
Expand All @@ -830,7 +837,8 @@ def test_to_pandas_from_null_dataframe(self):
CAST(NULL AS BOOLEAN) AS boolean,
CAST(NULL AS STRING) AS string,
CAST(NULL AS TIMESTAMP) AS timestamp,
CAST(NULL AS TIMESTAMP_NTZ) AS timestamp_ntz
CAST(NULL AS TIMESTAMP_NTZ) AS timestamp_ntz,
INTERVAL '1563:04' MINUTE TO SECOND AS day_time_interval
"""
pdf = self.spark.sql(sql).toPandas()
types = pdf.dtypes
Expand All @@ -844,6 +852,7 @@ def test_to_pandas_from_null_dataframe(self):
self.assertEqual(types[7], np.object)
self.assertTrue(np.can_cast(np.datetime64, types[8]))
self.assertTrue(np.can_cast(np.datetime64, types[9]))
self.assertTrue(np.can_cast(np.timedelta64, types[10]))

@unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore
def test_to_pandas_from_mixed_dataframe(self):
Expand All @@ -861,9 +870,10 @@ def test_to_pandas_from_mixed_dataframe(self):
CAST(col7 AS BOOLEAN) AS boolean,
CAST(col8 AS STRING) AS string,
timestamp_seconds(col9) AS timestamp,
timestamp_seconds(col10) AS timestamp_ntz
FROM VALUES (1, 1, 1, 1, 1, 1, 1, 1, 1, 1),
(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
timestamp_seconds(col10) AS timestamp_ntz,
INTERVAL '1563:04' MINUTE TO SECOND AS day_time_interval
FROM VALUES (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1),
(NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
"""
pdf_with_some_nulls = self.spark.sql(sql).toPandas()
pdf_with_only_nulls = self.spark.sql(sql).filter("tinyint is null").toPandas()
Expand Down Expand Up @@ -937,6 +947,15 @@ def test_create_dataframe_from_pandas_with_dst(self):
os.environ["TZ"] = orig_env_tz
time.tzset()

@unittest.skipIf(not have_pandas, pandas_requirement_message) # type: ignore
def test_create_dataframe_from_pandas_with_day_time_interval(self):
# SPARK-37277: Test DayTimeIntervalType in createDataFrame without Arrow.
import pandas as pd
from datetime import timedelta

df = self.spark.createDataFrame(pd.DataFrame({"a": [timedelta(microseconds=123)]}))
self.assertEqual(df.toPandas().a.iloc[0], timedelta(microseconds=123))

def test_repr_behaviors(self):
import re

Expand Down
23 changes: 21 additions & 2 deletions python/pyspark/sql/tests/test_pandas_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import datetime
from typing import cast

from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType, StructType, StructField, LongType
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType, assert_true, lit
from pyspark.sql.types import DoubleType, StructType, StructField, LongType, DayTimeIntervalType
from pyspark.sql.utils import ParseException, PythonException
from pyspark.rdd import PythonEvalType
from pyspark.testing.sqlutils import (
Expand Down Expand Up @@ -272,6 +272,25 @@ def noop(s):
self.assertEqual(df.schema[0].dataType.typeName(), "timestamp_ntz")
self.assertEqual(df.first()[0], datetime.datetime(1970, 1, 1, 0, 0))

def test_pandas_udf_day_time_interval_type(self):
# SPARK-37277: Test DayTimeIntervalType in pandas UDF
import pandas as pd

@pandas_udf(DayTimeIntervalType(DayTimeIntervalType.DAY, DayTimeIntervalType.SECOND))
def noop(s: pd.Series) -> pd.Series:
assert s.iloc[0] == datetime.timedelta(microseconds=123)
return s

df = self.spark.createDataFrame(
[(datetime.timedelta(microseconds=123),)], schema="td interval day to second"
).select(noop("td").alias("td"))

df.select(
assert_true(lit("INTERVAL '0 00:00:00.000123' DAY TO SECOND") == df.td.cast("string"))
).collect()
self.assertEqual(df.schema[0].dataType.simpleString(), "interval day to second")
self.assertEqual(df.first()[0], datetime.timedelta(microseconds=123))


if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf import * # noqa: F401
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@

import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.*;
import org.apache.arrow.vector.holders.NullableIntervalDayHolder;
import org.apache.arrow.vector.holders.NullableVarCharHolder;

import org.apache.spark.sql.util.ArrowUtils;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.UTF8String;

import static org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY;
import static org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS;

/**
* A column vector backed by Apache Arrow. Currently calendar interval type and map type are not
* supported.
Comment on lines -33 to -34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't want to keep map type ....?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Nov 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map type was added from #30393 but the doc fix was missing 😂

* A column vector backed by Apache Arrow.
*/
public final class ArrowColumnVector extends ColumnVector {

Expand Down Expand Up @@ -180,8 +175,8 @@ public ArrowColumnVector(ValueVector vector) {
accessor = new NullAccessor((NullVector) vector);
} else if (vector instanceof IntervalYearVector) {
accessor = new IntervalYearAccessor((IntervalYearVector) vector);
} else if (vector instanceof IntervalDayVector) {
accessor = new IntervalDayAccessor((IntervalDayVector) vector);
} else if (vector instanceof DurationVector) {
accessor = new DurationAccessor((DurationVector) vector);
} else {
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -549,21 +544,18 @@ int getInt(int rowId) {
}
}

private static class IntervalDayAccessor extends ArrowVectorAccessor {
private static class DurationAccessor extends ArrowVectorAccessor {

private final IntervalDayVector accessor;
private final NullableIntervalDayHolder intervalDayHolder = new NullableIntervalDayHolder();
private final DurationVector accessor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to care about precision here? (microsecond)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, so it's just an int64 physically, and the type annotation (or logical type) decides its semantic


IntervalDayAccessor(IntervalDayVector vector) {
DurationAccessor(DurationVector vector) {
super(vector);
this.accessor = vector;
}

@Override
long getLong(int rowId) {
accessor.get(rowId, intervalDayHolder);
return Math.addExact(Math.multiplyExact(intervalDayHolder.days, MICROS_PER_DAY),
intervalDayHolder.milliseconds * MICROS_PER_MILLIS);
final long getLong(int rowId) {
return DurationVector.get(accessor.getDataBuffer(), rowId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[sql] object ArrowUtils {
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
case NullType => ArrowType.Null.INSTANCE
case _: YearMonthIntervalType => new ArrowType.Interval(IntervalUnit.YEAR_MONTH)
case _: DayTimeIntervalType => new ArrowType.Interval(IntervalUnit.DAY_TIME)
case _: DayTimeIntervalType => new ArrowType.Duration(TimeUnit.MICROSECOND)
case _ =>
throw QueryExecutionErrors.unsupportedDataTypeError(dt.catalogString)
}
Expand All @@ -81,7 +81,7 @@ private[sql] object ArrowUtils {
case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND => TimestampType
case ArrowType.Null.INSTANCE => NullType
case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType()
case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME => DayTimeIntervalType()
case di: ArrowType.Duration if di.getUnit == TimeUnit.MICROSECOND => DayTimeIntervalType()
case _ => throw QueryExecutionErrors.unsupportedDataTypeError(dt.toString)
}

Expand Down
Loading