-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48638][CONNECT] Add ExecutionInfo support for DataFrame #46996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
python/pyspark/sql/dataframe.py
Outdated
| ... | ||
|
|
||
| @property | ||
| def queryExecution(self) -> Optional["QueryExecution"]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably have docstring here, with the added version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I wouldn't make it Optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the version, for the optional part:
- in scala QueryExecution is always present but then you have to check for
executedPlanthe thing I'm worried about is to bring this complexity to the client. The Query Execution object allows too much direct manipulation of the query that is not ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IC so it has to be set after execution. Should we probably have a Spark Connect dedicated API? I think it'd be confusing if it has the same name with Scala side df.queryExecution
|
@HyukjinKwon thanks for the review I'll look into it. |
Co-authored-by: allisonwang-db <[email protected]>
|
@HyukjinKwon @allisonwang-db @zhengruifeng Can you please have another look? |
| An instance of the graphviz.Digraph object. | ||
| """ | ||
| try: | ||
| import graphviz |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the error message, we expect the minimum version as 0.20?
If so, we should check the version here to avoid unexpected results?
Also should be documented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dot interface to graphviz is really stable. I did a quick check but we don't seem to assert the version of packages besides providing the error message. For the extra packages in the setup.py we do provide specific versions but if the user installs them manually, there is no enforcement.
| def cb(qe: "ExecutionInfo") -> None: | ||
| self._execution_info = qe | ||
|
|
||
| return DataFrameWriter(self._plan, self._session, cb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like writeStream is not overriden here. So I imagine streaming query is not supported yet.
In streaming a query could have multiple data frames, what we do in scala is to access it with query.explain(), which uses this lastExecution
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
Line 192 in 9476343
| def lastExecution: IncrementalExecution = getLatestExecutionContext().executionPlan |
That's, as it's name, the QueryExecution(IncrementalExecution) of the last execution.
We could also add a similar mechanism to StreamingQuery object. This sounds like an interesting followup that im interested in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should look at streaming as a follow up.
|
Also would this added "QueryExecution" object make implementing a |
|
Interestingly the Spark Connect ML / PyTorch Distributor tests are crashing for me locally both in Spark Classic and Spark Connect mode. |
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One minor comment: shall we follow the EXPLAIN format to render the plan tree string in the text mode? e.g.
Aggregate ...
+- Project ...
+- Relation ...
|
@cloud-fan adjusted the format to |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to document at python/docs/source/reference/pyspark.sql
Co-authored-by: Hyukjin Kwon <[email protected]>
|
Merged to master. |
…ed tests ### What changes were proposed in this pull request? This PR is a followup of #46996 that installs `graphviz` dependency so it runs the tests. ### Why are the changes needed? To run the tests added in #46996. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? CI in this PR should validate it. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47155 from HyukjinKwon/SPARK-48638-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Kent Yao <[email protected]>
What changes were proposed in this pull request?
One of the interesting shortcomings in Spark Connect is that the query execution metrics are not easily accessible directly. In Spark Classic, the query execution is only accessible via the
_jdfprivate variable and this is not available in Spark Connect.However, since the first release of Spark Connect, the response messages were already containing the metrics from the executed plan.
This patch makes them accessible directly and provides a way to visualize them.
The
toDot()method returns an instance of thegraphviz.Digraphobject that can be either directly displayed in a notebook or further manipulated.The purpose of the
executionInfoproperty and the associatedExecutionInfoclass is not to provide equivalence to theQueryExecutionclass used internally by Spark (and, for example, access to the analyzed, optimized, and executed plan) but rather provide a convenient way of accessing execution related information.Why are the changes needed?
User Experience
Does this PR introduce any user-facing change?
Adding a new API for accessing the query execution of a Spark SQL execution.
How was this patch tested?
Added new UT
Was this patch authored or co-authored using generative AI tooling?
No