Skip to content

Commit 2fe0692

Browse files
liuzqtcloud-fan
authored andcommitted
[SPARK-48466][SQL] Create dedicated node for EmptyRelation in AQE
### What changes were proposed in this pull request? Add dedicated node to represent empty relation (**AQE only**, i.e., `AQEPropagateEmptyRelation.scala`). - `logical.EmptyRelation` - `execution.EmptyRelationExec` both are leaf node and store the eliminated logical plan as a field. In order to display the plan in spark UI, I extended `SparkPlanInfo` to support mix of logical and physical plan. #### Spark UI <img width="818" alt="Screenshot 2024-06-07 at 12 50 33 PM" src="https://github.com/apache/spark/assets/22358241/07fe34f4-7e0d-429c-b8b8-6bbf2ec01565"> #### String representation ``` AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == EmptyRelation [plan_id=260] +- Join Inner, (key#3 = a#23) :- LogicalQueryStage SerializeFromObject [knownnotnull(assertnotnull(input[0, TestData, true])).key AS key#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).value, true, false, true) AS value#4], ShuffleQueryStage 0 +- LogicalQueryStage Filter (isnotnull(b#24) AND (b#24 = 1)), ShuffleQueryStage 1 +- == Initial Plan == SortMergeJoin [key#3], [a#23], Inner :- Sort [key#3 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#3, 200), ENSURE_REQUIREMENTS, [plan_id=204] : ... ``` ### Why are the changes needed? Currently we replace with a `LocalTableScan` in case of empty relation propagation, which lost the information about the original query plan and make it less human readable. The idea is to create a dedicated `EmptyRelation` node which is a leaf node but wraps the original query plan inside. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? NO Closes #46830 from liuzqt/SPARK-48466. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent b0e2cb5 commit 2fe0692

File tree

8 files changed

+173
-13
lines changed

8 files changed

+173
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup
5858
case _ => false
5959
}
6060

61-
protected def empty(plan: LogicalPlan): LocalRelation =
61+
protected def empty(plan: LogicalPlan): LogicalPlan =
6262
LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming)
6363

6464
// Construct a project list from plan's output, while the value is always NULL.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.expressions.SortOrder
22+
23+
case class EmptyRelation(logical: LogicalPlan) extends LeafNode {
24+
override val isStreaming: Boolean = logical.isStreaming
25+
26+
override val outputOrdering: Seq[SortOrder] = logical.outputOrdering
27+
28+
override def output: Seq[Attribute] = logical.output
29+
30+
override def computeStats(): Statistics = Statistics(sizeInBytes = 0, rowCount = Some(0))
31+
32+
override def maxRows: Option[Long] = Some(0)
33+
34+
override def maxRowsPerPartition: Option[Long] = Some(0)
35+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
24+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
25+
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
26+
import org.apache.spark.sql.vectorized.ColumnarBatch
27+
28+
/**
29+
* A leaf node wrapper for propagated empty relation, which preserved the eliminated logical plan.
30+
* The logical plan might be partial executed, i.e., containing LogicalQueryStage.
31+
*/
32+
case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNode
33+
with InputRDDCodegen {
34+
private val rdd = sparkContext.emptyRDD[InternalRow]
35+
36+
// Here we can not use def, because logical won't be serialized to executor while this method
37+
// will be call in executor.
38+
override val output: Seq[Attribute] = logical.output
39+
40+
override protected def doExecute(): RDD[InternalRow] = rdd
41+
42+
override def executeCollect(): Array[InternalRow] = Array.empty
43+
44+
override def executeTake(limit: Int): Array[InternalRow] = Array.empty
45+
46+
override def executeTail(limit: Int): Array[InternalRow] = Array.empty
47+
48+
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = sparkContext.emptyRDD
49+
50+
override def inputRDD: RDD[InternalRow] = rdd
51+
52+
override protected val createUnsafeProjection: Boolean = false
53+
54+
protected override def stringArgs: Iterator[Any] = Iterator(s"[plan_id=$id]")
55+
56+
override def generateTreeString(
57+
depth: Int,
58+
lastChildren: java.util.ArrayList[Boolean],
59+
append: String => Unit,
60+
verbose: Boolean,
61+
prefix: String = "",
62+
addSuffix: Boolean = false,
63+
maxFields: Int,
64+
printNodeId: Boolean,
65+
indent: Int = 0): Unit = {
66+
super.generateTreeString(depth,
67+
lastChildren,
68+
append,
69+
verbose,
70+
prefix,
71+
addSuffix,
72+
maxFields,
73+
printNodeId,
74+
indent)
75+
lastChildren.add(true)
76+
logical.generateTreeString(
77+
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent)
78+
lastChildren.remove(lastChildren.size() - 1)
79+
}
80+
81+
override def doCanonicalize(): SparkPlan = {
82+
this.copy(logical = LocalRelation(logical.output).canonicalized)
83+
}
84+
85+
override protected[sql] def cleanupResources(): Unit = {
86+
logical.foreach {
87+
case LogicalQueryStage(_, physical) =>
88+
physical.cleanupResources()
89+
}
90+
super.cleanupResources()
91+
}
92+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2122
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
23+
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
2224
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
2325
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
2426
import org.apache.spark.sql.execution.metric.SQLMetricInfo
@@ -51,13 +53,27 @@ class SparkPlanInfo(
5153

5254
private[execution] object SparkPlanInfo {
5355

56+
private def fromLogicalPlan(plan: LogicalPlan): SparkPlanInfo = {
57+
val childrenInfo = plan match {
58+
case LogicalQueryStage(_, physical) => Seq(fromSparkPlan(physical))
59+
case _ => (plan.children ++ plan.subqueries).map(fromLogicalPlan)
60+
}
61+
new SparkPlanInfo(
62+
plan.nodeName,
63+
plan.simpleString(SQLConf.get.maxToStringFields),
64+
childrenInfo,
65+
Map[String, String](),
66+
Seq.empty)
67+
}
68+
5469
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
5570
val children = plan match {
5671
case ReusedExchangeExec(_, child) => child :: Nil
5772
case ReusedSubqueryExec(child) => child :: Nil
5873
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
5974
case stage: QueryStageExec => stage.plan :: Nil
6075
case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil
76+
case EmptyRelationExec(logical) => (logical :: Nil)
6177
case _ => plan.children ++ plan.subqueries
6278
}
6379
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
@@ -69,10 +85,17 @@ private[execution] object SparkPlanInfo {
6985
case fileScan: FileSourceScanLike => fileScan.metadata
7086
case _ => Map[String, String]()
7187
}
88+
val childrenInfo = children.flatMap {
89+
case child: SparkPlan =>
90+
Some(fromSparkPlan(child))
91+
case child: LogicalPlan =>
92+
Some(fromLogicalPlan(child))
93+
case _ => None
94+
}
7295
new SparkPlanInfo(
7396
plan.nodeName,
7497
plan.simpleString(SQLConf.get.maxToStringFields),
75-
children.map(fromSparkPlan),
98+
childrenInfo,
7699
metadata,
77100
metrics)
78101
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
959959
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
960960
case logical.LocalRelation(output, data, _) =>
961961
LocalTableScanExec(output, data) :: Nil
962+
case logical.EmptyRelation(l) => EmptyRelationExec(l) :: Nil
962963
case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil
963964
// We should match the combination of limit and offset first, to get the optimal physical
964965
// plan, instead of planning limit and offset separately.

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,10 @@ case class CollapseCodegenStages(
955955
// Do not make LogicalTableScanExec the root of WholeStageCodegen
956956
// to support the fast driver-local collect/take paths.
957957
plan
958+
case plan: EmptyRelationExec =>
959+
// Do not make EmptyRelationExec the root of WholeStageCodegen
960+
// to support the fast driver-local collect/take paths.
961+
plan
958962
case plan: CommandResultExec =>
959963
// Do not make CommandResultExec the root of WholeStageCodegen
960964
// to support the fast driver-local collect/take paths.

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive
1919

2020
import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase
2121
import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
22+
import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation
2223
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2324
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL}
2425
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
@@ -39,6 +40,8 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
3940
override protected def nonEmpty(plan: LogicalPlan): Boolean =
4041
super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0)
4142

43+
override protected def empty(plan: LogicalPlan): LogicalPlan = EmptyRelation(plan)
44+
4245
private def isRootRepartition(plan: LogicalPlan): Boolean = plan match {
4346
case l: LogicalQueryStage if l.getTagValue(ROOT_REPARTITION).isDefined => true
4447
case _ => false
@@ -61,6 +64,8 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
6164
None
6265
}
6366

67+
case _: EmptyRelation => Some(0)
68+
6469
case _ => None
6570
}
6671

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow
3333
import org.apache.spark.sql.catalyst.expressions.Attribute
3434
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
3535
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
36-
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
36+
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, EmptyRelationExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
3737
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
3838
import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike}
3939
import org.apache.spark.sql.execution.command.DataWritingCommandExec
@@ -1650,32 +1650,32 @@ class AdaptiveQueryExecSuite
16501650
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
16511651
"SELECT key FROM testData WHERE key = 0 ORDER BY key, value")
16521652
assert(findTopLevelSort(plan1).size == 1)
1653-
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
1653+
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])
16541654

16551655
val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
16561656
"SELECT key FROM (SELECT * FROM testData WHERE value = 'no_match' ORDER BY key)" +
16571657
" WHERE key > rand()")
16581658
assert(findTopLevelSort(plan2).size == 1)
1659-
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
1659+
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])
16601660
}
16611661
}
16621662

16631663
test("SPARK-35442: Support propagate empty relation through aggregate") {
16641664
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
16651665
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
16661666
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key")
1667-
assert(!plan1.isInstanceOf[LocalTableScanExec])
1668-
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
1667+
assert(!plan1.isInstanceOf[EmptyRelationExec])
1668+
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])
16691669

16701670
val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
16711671
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1")
1672-
assert(!plan2.isInstanceOf[LocalTableScanExec])
1673-
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
1672+
assert(!plan2.isInstanceOf[EmptyRelationExec])
1673+
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])
16741674

16751675
val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult(
16761676
"SELECT count(*) FROM testData WHERE value = 'no_match'")
1677-
assert(!plan3.isInstanceOf[LocalTableScanExec])
1678-
assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[LocalTableScanExec])
1677+
assert(!plan3.isInstanceOf[EmptyRelationExec])
1678+
assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[EmptyRelationExec])
16791679
}
16801680
}
16811681

@@ -1696,7 +1696,7 @@ class AdaptiveQueryExecSuite
16961696
|""".stripMargin)
16971697
checkNumUnion(plan1, 1)
16981698
checkNumUnion(adaptivePlan1, 0)
1699-
assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
1699+
assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])
17001700

17011701
val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
17021702
"""
@@ -1706,7 +1706,7 @@ class AdaptiveQueryExecSuite
17061706
|""".stripMargin)
17071707
checkNumUnion(plan2, 1)
17081708
checkNumUnion(adaptivePlan2, 0)
1709-
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
1709+
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])
17101710
}
17111711
}
17121712

0 commit comments

Comments
 (0)