Skip to content
Merged

sync #11

Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow enabled to sho…
…w metrics in Query UI

### What changes were proposed in this pull request?

In `Dataset.collectAsArrowToR` and `Dataset.collectAsArrowToPython`, since the code block for `serveToStream` is run in the separate thread, `withAction` finishes as soon as it starts the thread. As a result, it doesn't collect the metrics of the actual action and Query UI shows the plan graph without metrics.

We should call `serveToStream` first, then `withAction` in it.

### Why are the changes needed?

When calling toPandas, usually Query UI shows each plan node's metric and corresponding Stage ID and Task ID:

```py
>>> df = spark.createDataFrame([(1, 10, 'abc'), (2, 20, 'def')], schema=['x', 'y', 'z'])
>>> df.toPandas()
   x   y    z
0  1  10  abc
1  2  20  def
```

![Screen Shot 2020-06-03 at 4 47 07 PM](https://user-images.githubusercontent.com/506656/83815735-bec22380-a675-11ea-8ecc-bf2954731f35.png)

but if Arrow execution is enabled, it shows only plan nodes and the duration is not correct:

```py
>>> spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
>>> df.toPandas()
   x   y    z
0  1  10  abc
1  2  20  def
```

![Screen Shot 2020-06-03 at 4 47 27 PM](https://user-images.githubusercontent.com/506656/83815804-de594c00-a675-11ea-933a-d0ffc0f534dd.png)

### Does this PR introduce _any_ user-facing change?

Yes, the Query UI will show the plan with the correct metrics.

### How was this patch tested?

I checked it manually in my local.

![Screen Shot 2020-06-04 at 3 19 41 PM](https://user-images.githubusercontent.com/506656/83816265-d77f0900-a676-11ea-84b8-2a8d80428bc6.png)

Closes apache#28730 from ueshin/issues/SPARK-31903/to_pandas_with_arrow_query_ui.

Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
ueshin authored and HyukjinKwon committed Jun 5, 2020
commit 632b5bce23c94d25712b43be83252b34ebfd3e72
8 changes: 4 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3509,8 +3509,8 @@ class Dataset[T] private[sql](
private[sql] def collectAsArrowToR(): Array[Any] = {
val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone

withAction("collectAsArrowToR", queryExecution) { plan =>
RRDD.serveToStream("serve-Arrow") { outputStream =>
RRDD.serveToStream("serve-Arrow") { outputStream =>
withAction("collectAsArrowToR", queryExecution) { plan =>
val buffer = new ByteArrayOutputStream()
val out = new DataOutputStream(outputStream)
val batchWriter = new ArrowBatchStreamWriter(schema, buffer, timeZoneId)
Expand Down Expand Up @@ -3563,8 +3563,8 @@ class Dataset[T] private[sql](
private[sql] def collectAsArrowToPython: Array[Any] = {
val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone

withAction("collectAsArrowToPython", queryExecution) { plan =>
PythonRDD.serveToStream("serve-Arrow") { outputStream =>
PythonRDD.serveToStream("serve-Arrow") { outputStream =>
withAction("collectAsArrowToPython", queryExecution) { plan =>
val out = new DataOutputStream(outputStream)
val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId)

Expand Down