Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
doc update
  • Loading branch information
grundprinzip committed Apr 2, 2024
commit ac919827b67ffec73a9fff62be2ad4e159f2fd98
43 changes: 6 additions & 37 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,51 +328,20 @@ def readStream(self) -> "DataStreamReader":
readStream.__doc__ = PySparkSession.readStream.__doc__

def registerProgressHandler(self, handler: "ProgressHandler") -> None:
"""
Register a progress handler to be called when a progress update is received from the server.

.. versionadded:: 4.0

Parameters
----------
handler : ProgressHandler
A callable that follows the ProgressHandler interface. This handler will be called
on every progress update.

Examples
--------

>>> def progress_handler(stages, inflight_tasks, done):
... print(f"{len(stages)} Stages known, Done: {done}")
>>> spark.registerProgressHandler(progress_handler)
>>> res = spark.range(10).repartition(1).collect()
3 Stages known, Done: False
3 Stages known, Done: True
>>> spark.clearProgressHandlers()
"""
self._client.register_progress_handler(handler)

def removeProgressHandler(self, handler: "ProgressHandler") -> None:
"""
Remove a progress handler that was previously registered.

.. versionadded:: 4.0
registerProgressHandler.__doc__ = PySparkSession.registerProgressHandler.__doc__

Parameters
----------
handler : ProgressHandler
The handler to remove if present in the list of progress handlers.
"""
def removeProgressHandler(self, handler: "ProgressHandler") -> None:
self._client.remove_progress_handler(handler)

def clearProgressHandlers(self) -> None:
"""
Clear all registered progress handlers.
removeProgressHandler.__doc__ = PySparkSession.removeProgressHandler.__doc__

.. versionadded:: 4.0
"""
def clearProgressHandlers(self) -> None:
self._client.clear_progress_handlers()

clearProgressHandlers.__doc__ = PySparkSession.clearProgressHandlers.__doc__

def _inferSchemaFromList(
self, data: Iterable[Any], names: Optional[List[str]] = None
) -> StructType:
Expand Down
59 changes: 59 additions & 0 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
# Running MyPy type checks will always require pandas and
# other dependencies so importing here is fine.
from pyspark.sql.connect.client import SparkConnectClient
from pyspark.sql.connect.shell.progress import ProgressHandler

try:
import memory_profiler # noqa: F401
Expand Down Expand Up @@ -1967,6 +1968,8 @@ def client(self) -> "SparkConnectClient":
message_parameters={"feature": "SparkSession.client"},
)

def

def addArtifacts(
self, *path: str, pyfile: bool = False, archive: bool = False, file: bool = False
) -> None:
Expand Down Expand Up @@ -2002,6 +2005,62 @@ def addArtifacts(

addArtifact = addArtifacts

def registerProgressHandler(self, handler: "ProgressHandler") -> None:
"""
Register a progress handler to be called when a progress update is received from the server.

.. versionadded:: 4.0
Comment on lines +2008 to +2010
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially silly question, but: When you look at the docs for this, it's not obvious that Spark Connect supports this method. Should this be explicitly noted in the docstring somehow? Or are users supposed to assume that everything supports Spark Connect unless explicitly noted otherwise?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another related question: Should there be narrative documentation of ProgressHandler on the monitoring page, or are we happy with it just being tucked away in the API docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a PR with documentation updates would be very much appreciated!


Parameters
----------
handler : ProgressHandler
A callable that follows the ProgressHandler interface. This handler will be called
on every progress update.

Examples
--------

>>> def progress_handler(stages, inflight_tasks, done):
... print(f"{len(stages)} Stages known, Done: {done}")
>>> spark.registerProgressHandler(progress_handler)
>>> res = spark.range(10).repartition(1).collect()
3 Stages known, Done: False
3 Stages known, Done: True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is flaky:

File "/__w/spark/spark/python/pyspark/sql/connect/session.py", line 346, in pyspark.sql.connect.session.SparkSession.registerProgressHandler
Failed example:
    res = spark.range(10).repartition(1).collect()
Expected:
    3 Stages known, Done: False
    3 Stages known, Done: True
Got:
    0 Stages known, Done: True

https://github.com/apache/spark/actions/runs/8564043093/job/23470007059.

Let me skip it for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll unflake it. Thanks!

>>> spark.clearProgressHandlers()
"""
raise PySparkRuntimeError(
error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
message_parameters={"feature": "SparkSession.registerProgressHandler"},
)

def removeProgressHandler(self, handler: "ProgressHandler") -> None:
"""
Remove a progress handler that was previously registered.

.. versionadded:: 4.0

Parameters
----------
handler : ProgressHandler
The handler to remove if present in the list of progress handlers.
"""
raise PySparkRuntimeError(
error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
message_parameters={"feature": "SparkSession.removeProgressHandler"},
)

def clearProgressHandlers(self) -> None:
"""
Clear all registered progress handlers.

.. versionadded:: 4.0
"""
raise PySparkRuntimeError(
error_class="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
message_parameters={"feature": "SparkSession.clearProgressHandlers"},
)


def copyFromLocalToFs(self, local_path: str, dest_path: str) -> None:
"""
Copy file from local to cloud storage file system.
Expand Down