Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private[connect] object MetricGenerator extends AdaptiveSparkPlanHelper {
.newBuilder()
.setName(p.nodeName)
.setPlanId(p.id)
.setParent(parentId)
.putAllExecutionMetrics(mv.asJava)
.build()
Seq(mo) ++ transformChildren(p)
Expand Down
3 changes: 3 additions & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ mypy-protobuf==3.3.0
googleapis-common-protos-stubs==2.2.0
grpc-stubs==1.24.11

# Debug for Spark and Spark Connect
graphviz==0.20.3

# TorchDistributor dependencies
torch
torchvision
Expand Down
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ def __hash__(self):
"pyspark.sql.tests.connect.test_parity_pandas_udf_window",
"pyspark.sql.tests.connect.test_resources",
"pyspark.sql.tests.connect.shell.test_progress",
"pyspark.sql.tests.connect.test_df_debug",
],
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@
"Cannot <condition1> without <condition2>."
]
},
"CLASSIC_OPERATION_NOT_SUPPORTED_ON_DF": {
"message": [
"Calling property or member <member> is not supported in PySpark Classic, please use Spark Connect instead."
]
},
"COLLATION_INVALID_PROVIDER" : {
"message" : [
"The value <provider> does not represent a correct collation provider. Supported providers are: [<supportedProviders>]."
Expand Down
8 changes: 8 additions & 0 deletions python/pyspark/sql/classic/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
from pyspark.sql.session import SparkSession
from pyspark.sql.group import GroupedData
from pyspark.sql.observation import Observation
from pyspark.sql.metrics import QueryExecution


class DataFrame(ParentDataFrame, PandasMapOpsMixin, PandasConversionMixin):
Expand Down Expand Up @@ -1835,6 +1836,13 @@ def toArrow(self) -> "pa.Table":
def toPandas(self) -> "PandasDataFrameLike":
return PandasConversionMixin.toPandas(self)

@property
def queryExecution(self) -> Optional["QueryExecution"]:
raise PySparkValueError(
error_class="CLASSIC_OPERATION_NOT_SUPPORTED_ON_DF",
message_parameters={"member": "queryExecution"},
)


def _to_scala_map(sc: "SparkContext", jm: Dict) -> "JavaObject":
"""
Expand Down
68 changes: 15 additions & 53 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from pyspark.loose_version import LooseVersion
from pyspark.version import __version__
from pyspark.resource.information import ResourceInformation
from pyspark.sql.metrics import MetricValue, PlanMetrics, QueryExecution, ObservedMetrics
from pyspark.sql.connect.client.artifact import ArtifactManager
from pyspark.sql.connect.client.logging import logger
from pyspark.sql.connect.profiler import ConnectProfilerCollector
Expand Down Expand Up @@ -447,56 +448,7 @@ def toChannel(self) -> grpc.Channel:
return self._secure_channel(self.endpoint, creds)


class MetricValue:
def __init__(self, name: str, value: Union[int, float], type: str):
self._name = name
self._type = type
self._value = value

def __repr__(self) -> str:
return f"<{self._name}={self._value} ({self._type})>"

@property
def name(self) -> str:
return self._name

@property
def value(self) -> Union[int, float]:
return self._value

@property
def metric_type(self) -> str:
return self._type


class PlanMetrics:
def __init__(self, name: str, id: int, parent: int, metrics: List[MetricValue]):
self._name = name
self._id = id
self._parent_id = parent
self._metrics = metrics

def __repr__(self) -> str:
return f"Plan({self._name})={self._metrics}"

@property
def name(self) -> str:
return self._name

@property
def plan_id(self) -> int:
return self._id

@property
def parent_plan_id(self) -> int:
return self._parent_id

@property
def metrics(self) -> List[MetricValue]:
return self._metrics


class PlanObservedMetrics:
class PlanObservedMetrics(ObservedMetrics):
def __init__(self, name: str, metrics: List[pb2.Expression.Literal], keys: List[str]):
self._name = name
self._metrics = metrics
Expand All @@ -513,6 +465,13 @@ def name(self) -> str:
def metrics(self) -> List[pb2.Expression.Literal]:
return self._metrics

@property
def pairs(self) -> dict[str, Any]:
result = {}
for x in range(len(self._metrics)):
result[self.keys[x]] = LiteralExpression._to_value(self.metrics[x])
return result

@property
def keys(self) -> List[str]:
return self._keys
Expand Down Expand Up @@ -920,16 +879,19 @@ def to_table_as_iterator(

def to_table(
self, plan: pb2.Plan, observations: Dict[str, Observation]
) -> Tuple["pa.Table", Optional[StructType]]:
) -> Tuple["pa.Table", Optional[StructType], QueryExecution]:
"""
Return given plan as a PyArrow Table.
"""
logger.info(f"Executing plan {self._proto_to_string(plan)}")
req = self._execute_plan_request_with_metadata()
req.plan.CopyFrom(plan)
table, schema, _, _, _ = self._execute_and_fetch(req, observations)
table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req, observations)

# Create a query execution object.
qe = QueryExecution(metrics, observed_metrics)
assert table is not None
return table, schema
return table, schema, qe

def to_pandas(self, plan: pb2.Plan, observations: Dict[str, Observation]) -> "pd.DataFrame":
"""
Expand Down
19 changes: 18 additions & 1 deletion python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
from pyspark.sql.connect.observation import Observation
from pyspark.sql.connect.session import SparkSession
from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
from pyspark.sql.metrics import QueryExecution


class DataFrame(ParentDataFrame):
Expand Down Expand Up @@ -137,6 +138,7 @@ def __init__(
# by __repr__ and _repr_html_ while eager evaluation opens.
self._support_repr_html = False
self._cached_schema: Optional[StructType] = None
self._query_execution: Optional["QueryExecution"] = None

def __reduce__(self) -> Tuple:
"""
Expand Down Expand Up @@ -1836,7 +1838,9 @@ def collect(self) -> List[Row]:

def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]:
query = self._plan.to_proto(self._session.client)
table, schema = self._session.client.to_table(query, self._plan.observations)
table, schema, self._query_execution = self._session.client.to_table(
query, self._plan.observations
)
assert table is not None
return (table, schema)

Expand Down Expand Up @@ -2202,6 +2206,19 @@ def rdd(self) -> "RDD[Row]":
message_parameters={"feature": "rdd"},
)

@property
def queryExecution(self) -> Optional["QueryExecution"]:
"""
The queryExecution method allows to introspect information about the actual
query execution after the successful execution. Accessing this member before
the query execution has happened will return None.

Returns
-------
An instance of QueryExecution or None when the value is not set yet.
"""
return self._query_execution


class DataFrameNaFunctions(ParentDataFrameNaFunctions):
def __init__(self, df: ParentDataFrame):
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
ArrowMapIterFunction,
DataFrameLike as PandasDataFrameLike,
)
from pyspark.sql.metrics import QueryExecution


__all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]
Expand Down Expand Up @@ -6281,6 +6282,10 @@ def toPandas(self) -> "PandasDataFrameLike":
"""
...

@property
def queryExecution(self) -> Optional["QueryExecution"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably have docstring here, with the added version

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I wouldn't make it Optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the version, for the optional part:

  • in scala QueryExecution is always present but then you have to check for executedPlan the thing I'm worried about is to bring this complexity to the client. The Query Execution object allows too much direct manipulation of the query that is not ideal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IC so it has to be set after execution. Should we probably have a Spark Connect dedicated API? I think it'd be confusing if it has the same name with Scala side df.queryExecution

...


class DataFrameNaFunctions:
"""Functionality for working with missing data in :class:`DataFrame`.
Expand Down
Loading