Skip to content

Commit 1af19a7

Browse files
HyukjinKwonBryanCutler
authored andcommitted
[SPARK-32098][PYTHON] Use iloc for positional slicing instead of direct slicing in createDataFrame with Arrow
### What changes were proposed in this pull request? When you use floats are index of pandas, it creates a Spark DataFrame with a wrong results as below when Arrow is enabled: ```bash ./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true ``` ```python >>> import pandas as pd >>> spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show() +---+ | a| +---+ | 1| | 1| | 2| +---+ ``` This is because direct slicing uses the value as index when the index contains floats: ```python >>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])[2:] a 2.0 1 3.0 2 4.0 3 >>> pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.]).iloc[2:] a 4.0 3 >>> pd.DataFrame({'a': [1,2,3]}, index=[2, 3, 4])[2:] a 4 3 ``` This PR proposes to explicitly use `iloc` to positionally slide when we create a DataFrame from a pandas DataFrame with Arrow enabled. FWIW, I was trying to investigate why direct slicing refers the index value or the positional index sometimes but I stopped investigating further after reading this https://pandas.pydata.org/pandas-docs/stable/getting_started/10min.html#selection > While standard Python / Numpy expressions for selecting and setting are intuitive and come in handy for interactive work, for production code, we recommend the optimized pandas data access methods, `.at`, `.iat`, `.loc` and `.iloc`. ### Why are the changes needed? To create the correct Spark DataFrame from a pandas DataFrame without a data loss. ### Does this PR introduce _any_ user-facing change? Yes, it is a bug fix. ```bash ./bin/pyspark --conf spark.sql.execution.arrow.pyspark.enabled=true ``` ```python import pandas as pd spark.createDataFrame(pd.DataFrame({'a': [1,2,3]}, index=[2., 3., 4.])).show() ``` Before: ``` +---+ | a| +---+ | 1| | 1| | 2| +---+ ``` After: ``` +---+ | a| +---+ | 1| | 2| | 3| +---+ ``` ### How was this patch tested? Manually tested and unittest were added. Closes apache#28928 from HyukjinKwon/SPARK-32098. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
1 parent d06604f commit 1af19a7

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

python/pyspark/sql/pandas/conversion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone):
413413

414414
# Slice the DataFrame to be batched
415415
step = -(-len(pdf) // self.sparkContext.defaultParallelism) # round int up
416-
pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step))
416+
pdf_slices = (pdf.iloc[start:start + step] for start in xrange(0, len(pdf), step))
417417

418418
# Create list of Arrow (columns, type) for serializer dump_stream
419419
arrow_data = [[(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]

python/pyspark/sql/tests/test_arrow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,12 @@ def test_createDateFrame_with_category_type(self):
442442
self.assertIsInstance(arrow_first_category_element, str)
443443
self.assertIsInstance(spark_first_category_element, str)
444444

445+
def test_createDataFrame_with_float_index(self):
446+
# SPARK-32098: float index should not produce duplicated or truncated Spark DataFrame
447+
self.assertEqual(
448+
self.spark.createDataFrame(
449+
pd.DataFrame({'a': [1, 2, 3]}, index=[2., 3., 4.])).distinct().count(), 3)
450+
445451

446452
@unittest.skipIf(
447453
not have_pandas or not have_pyarrow,

0 commit comments

Comments
 (0)