-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48638][CONNECT] Add ExecutionInfo support for DataFrame #46996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
0b58955
07527aa
de5541a
3e95c96
c85ce1f
a310cda
f08f598
882c12d
f25a9e6
4db1c0f
1588d43
d04636c
84668cb
164342e
08b453e
08e281c
1eb6281
8c68b17
4669827
36ca396
0412ad5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -101,6 +101,7 @@ | |||
| from pyspark.sql.connect.observation import Observation | ||||
| from pyspark.sql.connect.session import SparkSession | ||||
| from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame | ||||
| from pyspark.sql.metrics import ExecutionInfo | ||||
|
|
||||
|
|
||||
| class DataFrame(ParentDataFrame): | ||||
|
|
@@ -137,6 +138,7 @@ def __init__( | |||
| # by __repr__ and _repr_html_ while eager evaluation opens. | ||||
| self._support_repr_html = False | ||||
| self._cached_schema: Optional[StructType] = None | ||||
| self._execution_info: Optional["ExecutionInfo"] = None | ||||
|
|
||||
| def __reduce__(self) -> Tuple: | ||||
| """ | ||||
|
|
@@ -206,7 +208,10 @@ def _repr_html_(self) -> Optional[str]: | |||
|
|
||||
| @property | ||||
| def write(self) -> "DataFrameWriter": | ||||
| return DataFrameWriter(self._plan, self._session) | ||||
| def cb(qe: "ExecutionInfo") -> None: | ||||
| self._execution_info = qe | ||||
|
|
||||
| return DataFrameWriter(self._plan, self._session, cb) | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like writeStream is not overriden here. So I imagine streaming query is not supported yet. In streaming a query could have multiple data frames, what we do in scala is to access it with query.explain(), which uses this lastExecution spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala Line 192 in 9476343
That's, as it's name, the QueryExecution( We could also add a similar mechanism to StreamingQuery object. This sounds like an interesting followup that im interested in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we should look at streaming as a follow up. |
||||
|
|
||||
| @functools.cache | ||||
| def isEmpty(self) -> bool: | ||||
|
|
@@ -1839,7 +1844,9 @@ def collect(self) -> List[Row]: | |||
|
|
||||
| def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]: | ||||
| query = self._plan.to_proto(self._session.client) | ||||
| table, schema = self._session.client.to_table(query, self._plan.observations) | ||||
| table, schema, self._execution_info = self._session.client.to_table( | ||||
| query, self._plan.observations | ||||
| ) | ||||
| assert table is not None | ||||
| return (table, schema) | ||||
|
|
||||
|
|
@@ -1850,7 +1857,9 @@ def toArrow(self) -> "pa.Table": | |||
|
|
||||
| def toPandas(self) -> "PandasDataFrameLike": | ||||
| query = self._plan.to_proto(self._session.client) | ||||
| return self._session.client.to_pandas(query, self._plan.observations) | ||||
| pdf, ei = self._session.client.to_pandas(query, self._plan.observations) | ||||
| self._execution_info = ei | ||||
| return pdf | ||||
|
|
||||
| @property | ||||
| def schema(self) -> StructType: | ||||
|
|
@@ -1976,25 +1985,29 @@ def createTempView(self, name: str) -> None: | |||
| command = plan.CreateView( | ||||
| child=self._plan, name=name, is_global=False, replace=False | ||||
| ).command(session=self._session.client) | ||||
| self._session.client.execute_command(command, self._plan.observations) | ||||
| _, _, ei = self._session.client.execute_command(command, self._plan.observations) | ||||
| self._execution_info = ei | ||||
|
|
||||
| def createOrReplaceTempView(self, name: str) -> None: | ||||
| command = plan.CreateView( | ||||
| child=self._plan, name=name, is_global=False, replace=True | ||||
| ).command(session=self._session.client) | ||||
| self._session.client.execute_command(command, self._plan.observations) | ||||
| _, _, ei = self._session.client.execute_command(command, self._plan.observations) | ||||
| self._execution_info = ei | ||||
|
|
||||
| def createGlobalTempView(self, name: str) -> None: | ||||
| command = plan.CreateView( | ||||
| child=self._plan, name=name, is_global=True, replace=False | ||||
| ).command(session=self._session.client) | ||||
| self._session.client.execute_command(command, self._plan.observations) | ||||
| _, _, ei = self._session.client.execute_command(command, self._plan.observations) | ||||
| self._execution_info = ei | ||||
|
|
||||
| def createOrReplaceGlobalTempView(self, name: str) -> None: | ||||
| command = plan.CreateView( | ||||
| child=self._plan, name=name, is_global=True, replace=True | ||||
| ).command(session=self._session.client) | ||||
| self._session.client.execute_command(command, self._plan.observations) | ||||
| _, _, ei = self._session.client.execute_command(command, self._plan.observations) | ||||
| self._execution_info = ei | ||||
|
|
||||
| def cache(self) -> ParentDataFrame: | ||||
| return self.persist() | ||||
|
|
@@ -2169,22 +2182,29 @@ def semanticHash(self) -> int: | |||
| ) | ||||
|
|
||||
| def writeTo(self, table: str) -> "DataFrameWriterV2": | ||||
| return DataFrameWriterV2(self._plan, self._session, table) | ||||
| def cb(ei: "ExecutionInfo") -> None: | ||||
| self._execution_info = ei | ||||
|
|
||||
| return DataFrameWriterV2(self._plan, self._session, table, cb) | ||||
|
|
||||
| def offset(self, n: int) -> ParentDataFrame: | ||||
| return DataFrame(plan.Offset(child=self._plan, offset=n), session=self._session) | ||||
|
|
||||
| def checkpoint(self, eager: bool = True) -> "DataFrame": | ||||
| cmd = plan.Checkpoint(child=self._plan, local=False, eager=eager) | ||||
| _, properties = self._session.client.execute_command(cmd.command(self._session.client)) | ||||
| _, properties, self._execution_info = self._session.client.execute_command( | ||||
| cmd.command(self._session.client) | ||||
| ) | ||||
| assert "checkpoint_command_result" in properties | ||||
| checkpointed = properties["checkpoint_command_result"] | ||||
| assert isinstance(checkpointed._plan, plan.CachedRemoteRelation) | ||||
| return checkpointed | ||||
|
|
||||
| def localCheckpoint(self, eager: bool = True) -> "DataFrame": | ||||
| cmd = plan.Checkpoint(child=self._plan, local=True, eager=eager) | ||||
| _, properties = self._session.client.execute_command(cmd.command(self._session.client)) | ||||
| _, properties, self._execution_info = self._session.client.execute_command( | ||||
| cmd.command(self._session.client) | ||||
| ) | ||||
| assert "checkpoint_command_result" in properties | ||||
| checkpointed = properties["checkpoint_command_result"] | ||||
| assert isinstance(checkpointed._plan, plan.CachedRemoteRelation) | ||||
|
|
@@ -2205,6 +2225,10 @@ def rdd(self) -> "RDD[Row]": | |||
| message_parameters={"feature": "rdd"}, | ||||
| ) | ||||
|
|
||||
| @property | ||||
| def executionInfo(self) -> Optional["ExecutionInfo"]: | ||||
| return self._execution_info | ||||
|
|
||||
|
|
||||
| class DataFrameNaFunctions(ParentDataFrameNaFunctions): | ||||
| def __init__(self, df: ParentDataFrame): | ||||
|
|
||||
Uh oh!
There was an error while loading. Please reload this page.