Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Apr 5, 2024

What changes were proposed in this pull request?

This PR deflakes the pyspark.sql.dataframe.DataFrame.writeStream doctest.

PR #45298 aimed to fix that test but misdiagnosed the root issue. The problem is not that concurrent tests were colliding on a temporary directory. Rather, the issue is specific to the DataFrame.writeStream test's logic: that test is starting a streaming query that writes files to the temporary directory, the exits the temp directory context manager without first stopping the streaming query. That creates a race condition where the context manager might be deleting the directory while the streaming query is writing new files into it, leading to the following type of error during cleanup:

File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in pyspark.sql.dataframe.DataFrame.writeStream
Failed example:
    with tempfile.TemporaryDirectory() as d:
        # Create a table with Rate source.
        df.writeStream.toTable(
            "my_table", checkpointLocation=d)
Exception raised:
    Traceback (most recent call last):
      File "/usr/lib/python3.11/doctest.py", line 1353, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line 1, in <module>
        with tempfile.TemporaryDirectory() as d:
      File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__
        self.cleanup()
      File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup
        self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
      File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree
        _rmtree(name, onerror=onerror)
      File "/usr/lib/python3.11/shutil.py", line 738, in rmtree
        onerror(os.rmdir, path, sys.exc_info())
      File "/usr/lib/python3.11/shutil.py", line 736, in rmtree
        os.rmdir(path, dir_fd=dir_fd)
    OSError: [Errno 39] Directory not empty: '/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq'

In this PR, I update the doctest to properly stop the streaming query.

Why are the changes needed?

Fix flaky test.

Does this PR introduce any user-facing change?

No, test-only. Small user-facing doc change, but one that is consistent with other doctest examples.

How was this patch tested?

Manually ran updated test.

Was this patch authored or co-authored using generative AI tooling?

No.

... query = df.writeStream.toTable(
... "my_table", checkpointLocation=d)
<...streaming.query.StreamingQuery object at 0x...>
... time.sleep(3)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied this from

>>> query = (df
... .withWatermark("timestamp", "10 minutes")
... .groupBy(
... window(df.timestamp, "10 minutes", "5 minutes"),
... df.value)
... ).count().writeStream.outputMode("complete").format("console").start()
>>> time.sleep(3)
>>> query.stop()

HyukjinKwon pushed a commit that referenced this pull request Apr 5, 2024
…by stopping streaming query

### What changes were proposed in this pull request?

This PR deflakes the `pyspark.sql.dataframe.DataFrame.writeStream` doctest.

PR #45298 aimed to fix that test but misdiagnosed the root issue. The problem is not that concurrent tests were colliding on a temporary directory. Rather, the issue is specific to the `DataFrame.writeStream` test's logic: that test is starting a streaming query that writes files to the temporary directory, the exits the temp directory context manager without first stopping the streaming query. That creates a race condition where the context manager might be deleting the directory while the streaming query is writing new files into it, leading to the following type of error during cleanup:

```
File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in pyspark.sql.dataframe.DataFrame.writeStream
Failed example:
    with tempfile.TemporaryDirectory() as d:
        # Create a table with Rate source.
        df.writeStream.toTable(
            "my_table", checkpointLocation=d)
Exception raised:
    Traceback (most recent call last):
      File "/usr/lib/python3.11/doctest.py", line 1353, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line 1, in <module>
        with tempfile.TemporaryDirectory() as d:
      File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__
        self.cleanup()
      File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup
        self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
      File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree
        _rmtree(name, onerror=onerror)
      File "/usr/lib/python3.11/shutil.py", line 738, in rmtree
        onerror(os.rmdir, path, sys.exc_info())
      File "/usr/lib/python3.11/shutil.py", line 736, in rmtree
        os.rmdir(path, dir_fd=dir_fd)
    OSError: [Errno 39] Directory not empty: '/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq'
```

In this PR, I update the doctest to properly stop the streaming query.

### Why are the changes needed?

Fix flaky test.

### Does this PR introduce _any_ user-facing change?

No, test-only. Small user-facing doc change, but one that is consistent with other doctest examples.

### How was this patch tested?

Manually ran updated test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45885 from JoshRosen/fix-flaky-writestream-doctest.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 0107435)
Signed-off-by: Hyukjin Kwon <[email protected]>
@HyukjinKwon
Copy link
Member

Merged to master and branch-3.5.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM, too.

Thank you for fixing the root cause, @JoshRosen .

@dongjoon-hyun
Copy link
Member

Can we have this in branch-3.4 too?

@HyukjinKwon
Copy link
Member

@JoshRosen would you mind opening a backport to branch-3.4?

@JoshRosen
Copy link
Contributor Author

Opened a backport PR here: #45908

HeartSaVioR pushed a commit that referenced this pull request Apr 7, 2024
…test by stopping streaming query

### What changes were proposed in this pull request?

Backport of #45885.

This PR deflakes the `pyspark.sql.dataframe.DataFrame.writeStream` doctest.

PR #45298 aimed to fix that test but misdiagnosed the root issue. The problem is not that concurrent tests were colliding on a temporary directory. Rather, the issue is specific to the `DataFrame.writeStream` test's logic: that test is starting a streaming query that writes files to the temporary directory, the exits the temp directory context manager without first stopping the streaming query. That creates a race condition where the context manager might be deleting the directory while the streaming query is writing new files into it, leading to the following type of error during cleanup:

```
File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in pyspark.sql.dataframe.DataFrame.writeStream
Failed example:
    with tempfile.TemporaryDirectory() as d:
        # Create a table with Rate source.
        df.writeStream.toTable(
            "my_table", checkpointLocation=d)
Exception raised:
    Traceback (most recent call last):
      File "/usr/lib/python3.11/doctest.py", line 1353, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line 1, in <module>
        with tempfile.TemporaryDirectory() as d:
      File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__
        self.cleanup()
      File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup
        self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
      File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree
        _rmtree(name, onerror=onerror)
      File "/usr/lib/python3.11/shutil.py", line 738, in rmtree
        onerror(os.rmdir, path, sys.exc_info())
      File "/usr/lib/python3.11/shutil.py", line 736, in rmtree
        os.rmdir(path, dir_fd=dir_fd)
    OSError: [Errno 39] Directory not empty: '/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq'
```

In this PR, I update the doctest to properly stop the streaming query.

### Why are the changes needed?

Fix flaky test.

### Does this PR introduce _any_ user-facing change?

No, test-only. Small user-facing doc change, but one that is consistent with other doctest examples.

### How was this patch tested?

Manually ran updated test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45908 from JoshRosen/fix-flaky-writestream-doctest-3.4.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants