Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
adding text support
  • Loading branch information
grundprinzip committed Jun 18, 2024
commit f25a9e6e3b6bc8e9727bf381b15775367e59e3d4
7 changes: 5 additions & 2 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,9 @@ def to_table(
assert table is not None
return table, schema, qe

def to_pandas(self, plan: pb2.Plan, observations: Dict[str, Observation]) -> "pd.DataFrame":
def to_pandas(
self, plan: pb2.Plan, observations: Dict[str, Observation]
) -> Tuple["pd.DataFrame", "QueryExecution"]:
"""
Return given plan as a pandas DataFrame.
"""
Expand All @@ -903,6 +905,7 @@ def to_pandas(self, plan: pb2.Plan, observations: Dict[str, Observation]) -> "pd
req, observations, self_destruct=self_destruct
)
assert table is not None
qe = QueryExecution(metrics, observed_metrics)

schema = schema or from_arrow_schema(table.schema, prefer_timestamp_ntz=True)
assert schema is not None and isinstance(schema, StructType)
Expand Down Expand Up @@ -969,7 +972,7 @@ def to_pandas(self, plan: pb2.Plan, observations: Dict[str, Observation]) -> "pd
pdf.attrs["metrics"] = metrics
if len(observed_metrics) > 0:
pdf.attrs["observed_metrics"] = observed_metrics
return pdf
return pdf, qe

def _proto_to_string(self, p: google.protobuf.message.Message) -> str:
"""
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1854,7 +1854,9 @@ def toArrow(self) -> "pa.Table":

def toPandas(self) -> "PandasDataFrameLike":
query = self._plan.to_proto(self._session.client)
return self._session.client.to_pandas(query, self._plan.observations)
pdf, qe = self._session.client.to_pandas(query, self._plan.observations)
self._query_execution = qe
return pdf

@property
def schema(self) -> StructType:
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/tests/connect/test_df_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def test_query_execution_text_format(self):
df.collect()
self.assertIn("HashAggregate", df.queryExecution.metrics.toText())

# Different execution mode.
df: DataFrame = self.connect.range(100).repartition(10).groupBy("id").count()
df.toPandas()
self.assertIn("HashAggregate", df.queryExecution.metrics.toText())

@unittest.skipIf(not have_graphviz, graphviz_requirement_message)
def test_df_query_execution_metrics_to_dot(self):
df: DataFrame = self.connect.range(100).repartition(10).groupBy("id").count()
Expand Down