Skip to content

Commit 9305cc7

Browse files
sadikoviHyukjinKwon
authored andcommitted
[SPARK-39084][PYSPARK] Fix df.rdd.isEmpty() by using TaskContext to stop iterator on task completion
### What changes were proposed in this pull request? This PR fixes the issue described in https://issues.apache.org/jira/browse/SPARK-39084 where calling `df.rdd.isEmpty()` on a particular dataset could result in a JVM crash and/or executor failure. The issue was due to Python iterator not being synchronised with Java iterator so when the task is complete, the Python iterator continues to process data. We have introduced ContextAwareIterator as part of https://issues.apache.org/jira/browse/SPARK-33277 but we did not fix all of the places where this should be used. ### Why are the changes needed? Fixes the JVM crash when checking isEmpty() on a dataset. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a test case that reproduces the issue 100%. I confirmed that the test fails without the fix and passes with the fix. Closes #36425 from sadikovi/fix-pyspark-iter-2. Authored-by: Ivan Sadikov <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 6479455 commit 9305cc7

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

python/pyspark/sql/tests/test_dataframe.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import tempfile
2323
import time
2424
import unittest
25+
import uuid
2526
from typing import cast
2627

2728
from pyspark.sql import SparkSession, Row
@@ -1176,6 +1177,41 @@ def test_df_show(self):
11761177
with self.assertRaisesRegex(TypeError, "Parameter 'truncate=foo'"):
11771178
df.show(truncate="foo")
11781179

1180+
def test_df_is_empty(self):
1181+
# SPARK-39084: Fix df.rdd.isEmpty() resulting in JVM crash.
1182+
1183+
# This particular example of DataFrame reproduces an issue in isEmpty call
1184+
# which could result in JVM crash.
1185+
data = []
1186+
for t in range(0, 10000):
1187+
id = str(uuid.uuid4())
1188+
if t == 0:
1189+
for i in range(0, 99):
1190+
data.append((id,))
1191+
elif t < 10:
1192+
for i in range(0, 75):
1193+
data.append((id,))
1194+
elif t < 100:
1195+
for i in range(0, 50):
1196+
data.append((id,))
1197+
elif t < 1000:
1198+
for i in range(0, 25):
1199+
data.append((id,))
1200+
else:
1201+
for i in range(0, 10):
1202+
data.append((id,))
1203+
1204+
tmpPath = tempfile.mkdtemp()
1205+
shutil.rmtree(tmpPath)
1206+
try:
1207+
df = self.spark.createDataFrame(data, ["col"])
1208+
df.coalesce(1).write.parquet(tmpPath)
1209+
1210+
res = self.spark.read.parquet(tmpPath).groupBy("col").count()
1211+
self.assertFalse(res.rdd.isEmpty())
1212+
finally:
1213+
shutil.rmtree(tmpPath)
1214+
11791215
@unittest.skipIf(
11801216
not have_pandas or not have_pyarrow,
11811217
cast(str, pandas_requirement_message or pyarrow_requirement_message),

sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
2424

2525
import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
2626

27+
import org.apache.spark.{ContextAwareIterator, TaskContext}
2728
import org.apache.spark.api.python.SerDeUtil
2829
import org.apache.spark.rdd.RDD
2930
import org.apache.spark.sql.catalyst.InternalRow
@@ -301,7 +302,7 @@ object EvaluatePython {
301302
def javaToPython(rdd: RDD[Any]): RDD[Array[Byte]] = {
302303
rdd.mapPartitions { iter =>
303304
registerPicklers() // let it called in executor
304-
new SerDeUtil.AutoBatchedPickler(iter)
305+
new SerDeUtil.AutoBatchedPickler(new ContextAwareIterator(TaskContext.get, iter))
305306
}
306307
}
307308
}

0 commit comments

Comments
 (0)