Skip to content

Commit 1f66a40

Browse files
JoshRosenHeartSaVioR
authored andcommitted
[SPARK-47734][PYTHON][TESTS][3.4] Fix flaky DataFrame.writeStream doctest by stopping streaming query
### What changes were proposed in this pull request? Backport of #45885. This PR deflakes the `pyspark.sql.dataframe.DataFrame.writeStream` doctest. PR #45298 aimed to fix that test but misdiagnosed the root issue. The problem is not that concurrent tests were colliding on a temporary directory. Rather, the issue is specific to the `DataFrame.writeStream` test's logic: that test is starting a streaming query that writes files to the temporary directory, the exits the temp directory context manager without first stopping the streaming query. That creates a race condition where the context manager might be deleting the directory while the streaming query is writing new files into it, leading to the following type of error during cleanup: ``` File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in pyspark.sql.dataframe.DataFrame.writeStream Failed example: with tempfile.TemporaryDirectory() as d: # Create a table with Rate source. df.writeStream.toTable( "my_table", checkpointLocation=d) Exception raised: Traceback (most recent call last): File "/usr/lib/python3.11/doctest.py", line 1353, in __run exec(compile(example.source, filename, "single", File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line 1, in <module> with tempfile.TemporaryDirectory() as d: File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__ self.cleanup() File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors) File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree _rmtree(name, onerror=onerror) File "/usr/lib/python3.11/shutil.py", line 738, in rmtree onerror(os.rmdir, path, sys.exc_info()) File "/usr/lib/python3.11/shutil.py", line 736, in rmtree os.rmdir(path, dir_fd=dir_fd) OSError: [Errno 39] Directory not empty: '/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq' ``` In this PR, I update the doctest to properly stop the streaming query. ### Why are the changes needed? Fix flaky test. ### Does this PR introduce _any_ user-facing change? No, test-only. Small user-facing doc change, but one that is consistent with other doctest examples. ### How was this patch tested? Manually ran updated test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45908 from JoshRosen/fix-flaky-writestream-doctest-3.4. Authored-by: Josh Rosen <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 6ab31d4 commit 1f66a40

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -527,16 +527,18 @@ def writeStream(self) -> DataStreamWriter:
527527
528528
Examples
529529
--------
530+
>>> import time
530531
>>> import tempfile
531532
>>> df = spark.readStream.format("rate").load()
532533
>>> type(df.writeStream)
533534
<class 'pyspark.sql.streaming.readwriter.DataStreamWriter'>
534535
535536
>>> with tempfile.TemporaryDirectory() as d:
536537
... # Create a table with Rate source.
537-
... df.writeStream.toTable(
538-
... "my_table", checkpointLocation=d) # doctest: +ELLIPSIS
539-
<pyspark.sql.streaming.query.StreamingQuery object at 0x...>
538+
... query = df.writeStream.toTable(
539+
... "my_table", checkpointLocation=d)
540+
... time.sleep(3)
541+
... query.stop()
540542
"""
541543
return DataStreamWriter(self)
542544

0 commit comments

Comments
 (0)