From 7f4b874e493acc53519d1cb9586a6adf58523a6c Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Wed, 29 May 2024 16:29:11 -0700 Subject: [PATCH 1/9] dedicated node for EmptyRelation --- .../optimizer/PropagateEmptyRelation.scala | 2 +- .../sql/catalyst/planning/QueryPlanner.scala | 6 +- .../plans/logical/EmptyRelation.scala | 35 +++++++ .../sql/execution/EmptyRelationExec.scala | 92 +++++++++++++++++++ .../spark/sql/execution/SparkPlanInfo.scala | 1 + .../spark/sql/execution/SparkPlanner.scala | 15 ++- .../spark/sql/execution/SparkStrategies.scala | 6 +- .../sql/execution/WholeStageCodegenExec.scala | 4 + .../adaptive/AQEPropagateEmptyRelation.scala | 6 +- .../EmptyRelationPropagationStrategy.scala | 37 ++++++++ .../adaptive/PlanAdaptiveSubqueries.scala | 8 +- 11 files changed, 206 insertions(+), 6 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 21679703093a..832af340c339 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -58,7 +58,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup case _ => false } - protected def empty(plan: LogicalPlan): LocalRelation = + protected def empty(plan: LogicalPlan): LogicalPlan = LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) // Construct a project list from plan's output, while the value is always NULL. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 6fa5203a06f7..e4b883e41630 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -75,7 +75,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => // Plan the logical plan for the placeholder. - val childPlans = this.plan(logicalPlan) + val childPlans = planPlaceHolder(placeholder, logicalPlan) candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => childPlans.map { childPlan => @@ -94,6 +94,10 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { pruned } + protected def planPlaceHolder( + placeHolder: PhysicalPlan, + logical: LogicalPlan): Iterator[PhysicalPlan] = this.plan(logical) + /** * Collects placeholders marked using [[GenericStrategy#planLater planLater]] * by [[strategies]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala new file mode 100644 index 000000000000..3910d51b1868 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.SortOrder + +case class EmptyRelation(logical: LogicalPlan) extends LeafNode { + override val isStreaming: Boolean = logical.isStreaming + + override val outputOrdering: Seq[SortOrder] = logical.outputOrdering + + override def output: Seq[Attribute] = logical.output + + override def computeStats(): Statistics = Statistics(sizeInBytes = 0, rowCount = Some(0)) + + override def maxRows: Option[Long] = Some(0) + + override def maxRowsPerPartition: Option[Long] = Some(0) +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala new file mode 100644 index 000000000000..17e2d7c48fe1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * An intermediate placeholder for EmptyRelation planning, will be replaced with + * EmptyRelationExec eventually. + */ +case class EmptyRelationPlanLater(plan: LogicalPlan) extends PlanLaterBase + +/** + * A leaf node wrapper for propagated empty relation, which preserved the physical plan. + */ +case class EmptyRelationExec(plan: SparkPlan) extends LeafExecNode with InputRDDCodegen { + private val rdd = sparkContext.emptyRDD[InternalRow] + + override def output: Seq[Attribute] = plan.output + + override protected def doExecute(): RDD[InternalRow] = rdd + + override def executeCollect(): Array[InternalRow] = Array.empty + + override def executeTake(limit: Int): Array[InternalRow] = Array.empty + + override def executeTail(limit: Int): Array[InternalRow] = Array.empty + + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = sparkContext.emptyRDD + + override def inputRDD: RDD[InternalRow] = rdd + + override protected val createUnsafeProjection: Boolean = false + + protected override def stringArgs: Iterator[Any] = Iterator(s"[plan_id=$id]") + + override def generateTreeString( + depth: Int, + lastChildren: java.util.ArrayList[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean, + indent: Int = 0): Unit = { + super.generateTreeString(depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId, + indent) + lastChildren.add(true) + plan.generateTreeString( + depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent) + lastChildren.remove(lastChildren.size() - 1) + } + + override def doCanonicalize(): SparkPlan = { + this.copy(plan = LocalTableScanExec(plan.output, Nil)) + } + + override protected[sql] def cleanupResources(): Unit = { + plan.cleanupResources() + super.cleanupResources() + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 7c45b02ee846..e28255f5bf47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -58,6 +58,7 @@ private[execution] object SparkPlanInfo { case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil case stage: QueryStageExec => stage.plan :: Nil case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil + case EmptyRelationExec(plan) => plan :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index da3159319f98..7fbea585b44b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.adaptive.EmptyRelationPropagationStrategy import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy import org.apache.spark.sql.execution.command.v2.V2CommandStrategy import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} @@ -34,6 +35,7 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen override def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( + EmptyRelationPropagationStrategy :: LogicalQueryStageStrategy :: PythonEvals :: new DataSourceV2Strategy(session) :: @@ -57,7 +59,18 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { plan.collect { - case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan + case placeholder: PlanLaterBase => placeholder -> placeholder.plan + } + } + + override protected def planPlaceHolder( + placeHolder: SparkPlan, + logical: LogicalPlan): Iterator[SparkPlan] = { + placeHolder match { + case EmptyRelationPlanLater(p) => + super.plan(logical).map(EmptyRelationExec) + case _ => + super.plan(logical) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f0682e6b9afc..787b765ff7d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -54,7 +54,9 @@ abstract class SparkStrategy extends GenericStrategy[SparkPlan] { override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan) } -case class PlanLater(plan: LogicalPlan) extends LeafExecNode { +abstract class PlanLaterBase extends LeafExecNode { + + def plan: LogicalPlan override def output: Seq[Attribute] = plan.output @@ -63,6 +65,8 @@ case class PlanLater(plan: LogicalPlan) extends LeafExecNode { } } +case class PlanLater(plan: LogicalPlan) extends PlanLaterBase + abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 382f8cf8861a..336cedc46e9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -953,6 +953,10 @@ case class CollapseCodegenStages( // Do not make LogicalTableScanExec the root of WholeStageCodegen // to support the fast driver-local collect/take paths. plan + case plan: EmptyRelationExec => + // Do not make EmptyRelationExec the root of WholeStageCodegen + // to support the fast driver-local collect/take paths. + plan case plan: CommandResultExec => // Do not make CommandResultExec the root of WholeStageCodegen // to support the fast driver-local collect/take paths. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 858130fae32b..1de63cddd745 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec @@ -34,11 +35,14 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def isEmpty(plan: LogicalPlan): Boolean = - super.isEmpty(plan) || (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) + super.isEmpty(plan) || plan.isInstanceOf[EmptyRelation] || + (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) override protected def nonEmpty(plan: LogicalPlan): Boolean = super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) + override protected def empty(plan: LogicalPlan): LogicalPlan = EmptyRelation(plan) + private def isRootRepartition(plan: LogicalPlan): Boolean = plan match { case l: LogicalQueryStage if l.getTagValue(ROOT_REPARTITION).isDefined => true case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala new file mode 100644 index 000000000000..d4a7f9c7a90b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.EmptyRelationPlanLater +import org.apache.spark.sql.execution.PlanLater +import org.apache.spark.sql.execution.SparkPlan + +/** + * A strategy that plan logical [[EmptyRelation]] to physical [[EmptyRelationPlanLater]] which + * will be planned later. + */ +object EmptyRelationPropagationStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case EmptyRelation(logical) => + EmptyRelationPlanLater(logical) :: Nil + case _ => Nil + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala index df4d89586758..bd213d176c93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala @@ -23,11 +23,17 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY, SCALAR_SUBQUERY} import org.apache.spark.sql.execution import org.apache.spark.sql.execution.{InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryExec} +import org.apache.spark.sql.execution.EmptyRelationExec case class PlanAdaptiveSubqueries( subqueryMap: Map[Long, SparkPlan]) extends Rule[SparkPlan] { - def apply(plan: SparkPlan): SparkPlan = { + def apply(plan: SparkPlan): SparkPlan = applyInternal(plan.transformUp { + case emptyRelation@EmptyRelationExec(p) => + emptyRelation.copy(plan = apply(p)) + }) + + def applyInternal(plan: SparkPlan): SparkPlan = { plan.transformAllExpressionsWithPruning( _.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) { case expressions.ScalarSubquery(_, _, exprId, _, _, _) => From 192a00a4bb6bee7e5be6a6138aa95b808d2ce786 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Fri, 31 May 2024 17:38:48 -0700 Subject: [PATCH 2/9] fix --- .../org/apache/spark/sql/execution/EmptyRelationExec.scala | 3 --- .../execution/adaptive/EmptyRelationPropagationStrategy.scala | 1 - 2 files changed, 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 17e2d7c48fe1..48796019b497 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -20,10 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.Statistics -import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.vectorized.ColumnarBatch /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala index d4a7f9c7a90b..cb0a64981d6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.EmptyRelationPlanLater -import org.apache.spark.sql.execution.PlanLater import org.apache.spark.sql.execution.SparkPlan /** From 9e65fa502a4b54e56c2941d7e5cbb520c0540800 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Sat, 1 Jun 2024 18:21:06 -0700 Subject: [PATCH 3/9] fix --- .../apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala | 2 +- .../org/apache/spark/sql/execution/EmptyRelationExec.scala | 2 +- .../execution/adaptive/EmptyRelationPropagationStrategy.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala index 3910d51b1868..9e055ae7f3bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala @@ -32,4 +32,4 @@ case class EmptyRelation(logical: LogicalPlan) extends LeafNode { override def maxRows: Option[Long] = Some(0) override def maxRowsPerPartition: Option[Long] = Some(0) -} \ No newline at end of file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 48796019b497..a0c2df01f545 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -86,4 +86,4 @@ case class EmptyRelationExec(plan: SparkPlan) extends LeafExecNode with InputRDD plan.cleanupResources() super.cleanupResources() } -} \ No newline at end of file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala index cb0a64981d6b..9e572ea538d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala @@ -33,4 +33,4 @@ object EmptyRelationPropagationStrategy extends Strategy { EmptyRelationPlanLater(logical) :: Nil case _ => Nil } -} \ No newline at end of file +} From 76a9c2603204ec811bdc76a04d8ded7b7d5f9c24 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Mon, 3 Jun 2024 14:01:46 -0700 Subject: [PATCH 4/9] fix test --- .../adaptive/AdaptiveQueryExecSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 4e1e171c8a84..c71149644d0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec} +import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, EmptyRelationExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -1650,13 +1650,13 @@ class AdaptiveQueryExecSuite val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( "SELECT key FROM testData WHERE key = 0 ORDER BY key, value") assert(findTopLevelSort(plan1).size == 1) - assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec]) val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( "SELECT key FROM (SELECT * FROM testData WHERE value = 'no_match' ORDER BY key)" + " WHERE key > rand()") assert(findTopLevelSort(plan2).size == 1) - assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec]) } } @@ -1664,18 +1664,18 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") - assert(!plan1.isInstanceOf[LocalTableScanExec]) - assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + assert(!plan1.isInstanceOf[EmptyRelationExec]) + assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec]) val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1") - assert(!plan2.isInstanceOf[LocalTableScanExec]) - assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + assert(!plan2.isInstanceOf[EmptyRelationExec]) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec]) val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult( "SELECT count(*) FROM testData WHERE value = 'no_match'") - assert(!plan3.isInstanceOf[LocalTableScanExec]) - assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[LocalTableScanExec]) + assert(!plan3.isInstanceOf[EmptyRelationExec]) + assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[EmptyRelationExec]) } } @@ -1696,7 +1696,7 @@ class AdaptiveQueryExecSuite |""".stripMargin) checkNumUnion(plan1, 1) checkNumUnion(adaptivePlan1, 0) - assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec]) val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( """ @@ -1706,7 +1706,7 @@ class AdaptiveQueryExecSuite |""".stripMargin) checkNumUnion(plan2, 1) checkNumUnion(adaptivePlan2, 0) - assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec]) } } From d4b21ec3a2b0e0b426c6ba9f0a29c5c4b4fcf4af Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Thu, 6 Jun 2024 17:35:59 -0700 Subject: [PATCH 5/9] refactor: store logical plan in EmptyRelationExec --- .../sql/catalyst/planning/QueryPlanner.scala | 6 +--- .../sql/execution/EmptyRelationExec.scala | 25 ++++++------- .../spark/sql/execution/SparkPlanInfo.scala | 26 ++++++++++++-- .../spark/sql/execution/SparkPlanner.scala | 15 +------- .../spark/sql/execution/SparkStrategies.scala | 7 ++-- .../EmptyRelationPropagationStrategy.scala | 36 ------------------- .../adaptive/PlanAdaptiveSubqueries.scala | 8 +---- 7 files changed, 42 insertions(+), 81 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index e4b883e41630..6fa5203a06f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -75,7 +75,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => // Plan the logical plan for the placeholder. - val childPlans = planPlaceHolder(placeholder, logicalPlan) + val childPlans = this.plan(logicalPlan) candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => childPlans.map { childPlan => @@ -94,10 +94,6 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { pruned } - protected def planPlaceHolder( - placeHolder: PhysicalPlan, - logical: LogicalPlan): Iterator[PhysicalPlan] = this.plan(logical) - /** * Collects placeholders marked using [[GenericStrategy#planLater planLater]] * by [[strategies]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index a0c2df01f545..07a6d941b647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -20,22 +20,20 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.adaptive.LogicalQueryStage import org.apache.spark.sql.vectorized.ColumnarBatch /** - * An intermediate placeholder for EmptyRelation planning, will be replaced with - * EmptyRelationExec eventually. + * A leaf node wrapper for propagated empty relation, which preserved the eliminated logical plan. + * The logical plan might be partial executed, i.e., containing LogicalQueryStage. */ -case class EmptyRelationPlanLater(plan: LogicalPlan) extends PlanLaterBase - -/** - * A leaf node wrapper for propagated empty relation, which preserved the physical plan. - */ -case class EmptyRelationExec(plan: SparkPlan) extends LeafExecNode with InputRDDCodegen { +case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNode + with InputRDDCodegen { private val rdd = sparkContext.emptyRDD[InternalRow] - override def output: Seq[Attribute] = plan.output + override def output: Seq[Attribute] = logical.output override protected def doExecute(): RDD[InternalRow] = rdd @@ -73,17 +71,20 @@ case class EmptyRelationExec(plan: SparkPlan) extends LeafExecNode with InputRDD printNodeId, indent) lastChildren.add(true) - plan.generateTreeString( + logical.generateTreeString( depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent) lastChildren.remove(lastChildren.size() - 1) } override def doCanonicalize(): SparkPlan = { - this.copy(plan = LocalTableScanExec(plan.output, Nil)) + this.copy(logical = LocalRelation(logical.output)) } override protected[sql] def cleanupResources(): Unit = { - plan.cleanupResources() + logical.foreach { + case LogicalQueryStage(_, physical) => + physical.cleanupResources() + } super.cleanupResources() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index e28255f5bf47..b272cd10519c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.adaptive.LogicalQueryStage import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo @@ -51,6 +53,19 @@ class SparkPlanInfo( private[execution] object SparkPlanInfo { + private def fromSparkPlan(plan: LogicalPlan): SparkPlanInfo = { + val childrenInfo = plan match { + case LogicalQueryStage(_, physical) => Seq(fromSparkPlan(physical)) + case _ => (plan.children ++ plan.subqueries).map(fromSparkPlan) + } + new SparkPlanInfo( + plan.nodeName, + plan.simpleString(SQLConf.get.maxToStringFields), + childrenInfo, + Map[String, String](), + Seq.empty) + } + def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { case ReusedExchangeExec(_, child) => child :: Nil @@ -58,7 +73,7 @@ private[execution] object SparkPlanInfo { case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil case stage: QueryStageExec => stage.plan :: Nil case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil - case EmptyRelationExec(plan) => plan :: Nil + case EmptyRelationExec(logical) => (logical :: Nil) case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => @@ -70,10 +85,17 @@ private[execution] object SparkPlanInfo { case fileScan: FileSourceScanLike => fileScan.metadata case _ => Map[String, String]() } + val childrenInfo = children.flatMap { + case child: SparkPlan => + Some(fromSparkPlan(child)) + case child: LogicalPlan => + Some(fromSparkPlan(child)) + case _ => None + } new SparkPlanInfo( plan.nodeName, plan.simpleString(SQLConf.get.maxToStringFields), - children.map(fromSparkPlan), + childrenInfo, metadata, metrics) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 7fbea585b44b..da3159319f98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.adaptive.EmptyRelationPropagationStrategy import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy import org.apache.spark.sql.execution.command.v2.V2CommandStrategy import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} @@ -35,7 +34,6 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen override def strategies: Seq[Strategy] = experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ ( - EmptyRelationPropagationStrategy :: LogicalQueryStageStrategy :: PythonEvals :: new DataSourceV2Strategy(session) :: @@ -59,18 +57,7 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { plan.collect { - case placeholder: PlanLaterBase => placeholder -> placeholder.plan - } - } - - override protected def planPlaceHolder( - placeHolder: SparkPlan, - logical: LogicalPlan): Iterator[SparkPlan] = { - placeHolder match { - case EmptyRelationPlanLater(p) => - super.plan(logical).map(EmptyRelationExec) - case _ => - super.plan(logical) + case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 787b765ff7d7..f8392012ab9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -54,9 +54,7 @@ abstract class SparkStrategy extends GenericStrategy[SparkPlan] { override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan) } -abstract class PlanLaterBase extends LeafExecNode { - - def plan: LogicalPlan +case class PlanLater(plan: LogicalPlan) extends LeafExecNode { override def output: Seq[Attribute] = plan.output @@ -65,8 +63,6 @@ abstract class PlanLaterBase extends LeafExecNode { } } -case class PlanLater(plan: LogicalPlan) extends PlanLaterBase - abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => @@ -963,6 +959,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data, _) => LocalTableScanExec(output, data) :: Nil + case logical.EmptyRelation(logical) => EmptyRelationExec(logical) :: Nil case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil // We should match the combination of limit and offset first, to get the optimal physical // plan, instead of planning limit and offset separately. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala deleted file mode 100644 index 9e572ea538d9..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EmptyRelationPropagationStrategy.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.sql.Strategy -import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.EmptyRelationPlanLater -import org.apache.spark.sql.execution.SparkPlan - -/** - * A strategy that plan logical [[EmptyRelation]] to physical [[EmptyRelationPlanLater]] which - * will be planned later. - */ -object EmptyRelationPropagationStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case EmptyRelation(logical) => - EmptyRelationPlanLater(logical) :: Nil - case _ => Nil - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala index bd213d176c93..df4d89586758 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala @@ -23,17 +23,11 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY, SCALAR_SUBQUERY} import org.apache.spark.sql.execution import org.apache.spark.sql.execution.{InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryExec} -import org.apache.spark.sql.execution.EmptyRelationExec case class PlanAdaptiveSubqueries( subqueryMap: Map[Long, SparkPlan]) extends Rule[SparkPlan] { - def apply(plan: SparkPlan): SparkPlan = applyInternal(plan.transformUp { - case emptyRelation@EmptyRelationExec(p) => - emptyRelation.copy(plan = apply(p)) - }) - - def applyInternal(plan: SparkPlan): SparkPlan = { + def apply(plan: SparkPlan): SparkPlan = { plan.transformAllExpressionsWithPruning( _.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) { case expressions.ScalarSubquery(_, _, exprId, _, _, _) => From d435954808ad451dd5fddfc0327f4847a0cc3528 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Tue, 11 Jun 2024 23:21:46 -0700 Subject: [PATCH 6/9] address comment --- .../sql/execution/EmptyRelationExec.scala | 24 ++++++++++--------- .../spark/sql/execution/SparkPlanInfo.scala | 6 ++--- .../spark/sql/execution/SparkStrategies.scala | 2 +- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala index 07a6d941b647..fc90d72b90de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -33,7 +33,9 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo with InputRDDCodegen { private val rdd = sparkContext.emptyRDD[InternalRow] - override def output: Seq[Attribute] = logical.output + // Here we can not use def, because logical won't be serialized to executor while this method + // will be call in executor. + override val output: Seq[Attribute] = logical.output override protected def doExecute(): RDD[InternalRow] = rdd @@ -52,15 +54,15 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo protected override def stringArgs: Iterator[Any] = Iterator(s"[plan_id=$id]") override def generateTreeString( - depth: Int, - lastChildren: java.util.ArrayList[Boolean], - append: String => Unit, - verbose: Boolean, - prefix: String = "", - addSuffix: Boolean = false, - maxFields: Int, - printNodeId: Boolean, - indent: Int = 0): Unit = { + depth: Int, + lastChildren: java.util.ArrayList[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean, + indent: Int = 0): Unit = { super.generateTreeString(depth, lastChildren, append, @@ -77,7 +79,7 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo } override def doCanonicalize(): SparkPlan = { - this.copy(logical = LocalRelation(logical.output)) + this.copy(logical = LocalRelation(logical.output).canonicalized) } override protected[sql] def cleanupResources(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index b272cd10519c..615c8746a3e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -53,10 +53,10 @@ class SparkPlanInfo( private[execution] object SparkPlanInfo { - private def fromSparkPlan(plan: LogicalPlan): SparkPlanInfo = { + private def fromLogicalPlan(plan: LogicalPlan): SparkPlanInfo = { val childrenInfo = plan match { case LogicalQueryStage(_, physical) => Seq(fromSparkPlan(physical)) - case _ => (plan.children ++ plan.subqueries).map(fromSparkPlan) + case _ => (plan.children ++ plan.subqueries).map(fromLogicalPlan) } new SparkPlanInfo( plan.nodeName, @@ -89,7 +89,7 @@ private[execution] object SparkPlanInfo { case child: SparkPlan => Some(fromSparkPlan(child)) case child: LogicalPlan => - Some(fromSparkPlan(child)) + Some(fromLogicalPlan(child)) case _ => None } new SparkPlanInfo( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f8392012ab9a..ed7a6162cc9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -959,7 +959,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data, _) => LocalTableScanExec(output, data) :: Nil - case logical.EmptyRelation(logical) => EmptyRelationExec(logical) :: Nil + case logical.EmptyRelation(l) => EmptyRelationExec(l) :: Nil case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil // We should match the combination of limit and offset first, to get the optimal physical // plan, instead of planning limit and offset separately. From 85328c37776f6aa9b6e0ae538dc33252beb9a76c Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Tue, 11 Jun 2024 23:23:44 -0700 Subject: [PATCH 7/9] minor --- .../sql/execution/adaptive/AQEPropagateEmptyRelation.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 1de63cddd745..88d695148ce3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -35,8 +35,7 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def isEmpty(plan: LogicalPlan): Boolean = - super.isEmpty(plan) || plan.isInstanceOf[EmptyRelation] || - (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) + super.isEmpty(plan) || (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) override protected def nonEmpty(plan: LogicalPlan): Boolean = super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) From 61aba248608e6407f6e5411fb4662a6e1363e86a Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Wed, 12 Jun 2024 12:56:01 -0700 Subject: [PATCH 8/9] fix --- .../sql/execution/adaptive/AQEPropagateEmptyRelation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 88d695148ce3..1de63cddd745 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -35,7 +35,8 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def isEmpty(plan: LogicalPlan): Boolean = - super.isEmpty(plan) || (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) + super.isEmpty(plan) || plan.isInstanceOf[EmptyRelation] || + (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) override protected def nonEmpty(plan: LogicalPlan): Boolean = super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) From d0ca18748b6212d9ac0f7ac873bec3e292d21488 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Tue, 18 Jun 2024 21:45:55 -0700 Subject: [PATCH 9/9] address comment --- .../sql/execution/adaptive/AQEPropagateEmptyRelation.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 1de63cddd745..7b3e0cd549b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -35,8 +35,7 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def isEmpty(plan: LogicalPlan): Boolean = - super.isEmpty(plan) || plan.isInstanceOf[EmptyRelation] || - (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) + super.isEmpty(plan) || (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) override protected def nonEmpty(plan: LogicalPlan): Boolean = super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) @@ -65,6 +64,8 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { None } + case _: EmptyRelation => Some(0) + case _ => None }