diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 21679703093a..832af340c339 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -58,7 +58,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup case _ => false } - protected def empty(plan: LogicalPlan): LocalRelation = + protected def empty(plan: LogicalPlan): LogicalPlan = LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) // Construct a project list from plan's output, while the value is always NULL. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala new file mode 100644 index 000000000000..9e055ae7f3bd --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EmptyRelation.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.SortOrder + +case class EmptyRelation(logical: LogicalPlan) extends LeafNode { + override val isStreaming: Boolean = logical.isStreaming + + override val outputOrdering: Seq[SortOrder] = logical.outputOrdering + + override def output: Seq[Attribute] = logical.output + + override def computeStats(): Statistics = Statistics(sizeInBytes = 0, rowCount = Some(0)) + + override def maxRows: Option[Long] = Some(0) + + override def maxRowsPerPartition: Option[Long] = Some(0) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala new file mode 100644 index 000000000000..fc90d72b90de --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/EmptyRelationExec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.adaptive.LogicalQueryStage +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A leaf node wrapper for propagated empty relation, which preserved the eliminated logical plan. + * The logical plan might be partial executed, i.e., containing LogicalQueryStage. + */ +case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNode + with InputRDDCodegen { + private val rdd = sparkContext.emptyRDD[InternalRow] + + // Here we can not use def, because logical won't be serialized to executor while this method + // will be call in executor. + override val output: Seq[Attribute] = logical.output + + override protected def doExecute(): RDD[InternalRow] = rdd + + override def executeCollect(): Array[InternalRow] = Array.empty + + override def executeTake(limit: Int): Array[InternalRow] = Array.empty + + override def executeTail(limit: Int): Array[InternalRow] = Array.empty + + protected override def doExecuteColumnar(): RDD[ColumnarBatch] = sparkContext.emptyRDD + + override def inputRDD: RDD[InternalRow] = rdd + + override protected val createUnsafeProjection: Boolean = false + + protected override def stringArgs: Iterator[Any] = Iterator(s"[plan_id=$id]") + + override def generateTreeString( + depth: Int, + lastChildren: java.util.ArrayList[Boolean], + append: String => Unit, + verbose: Boolean, + prefix: String = "", + addSuffix: Boolean = false, + maxFields: Int, + printNodeId: Boolean, + indent: Int = 0): Unit = { + super.generateTreeString(depth, + lastChildren, + append, + verbose, + prefix, + addSuffix, + maxFields, + printNodeId, + indent) + lastChildren.add(true) + logical.generateTreeString( + depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent) + lastChildren.remove(lastChildren.size() - 1) + } + + override def doCanonicalize(): SparkPlan = { + this.copy(logical = LocalRelation(logical.output).canonicalized) + } + + override protected[sql] def cleanupResources(): Unit = { + logical.foreach { + case LogicalQueryStage(_, physical) => + physical.cleanupResources() + } + super.cleanupResources() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 7c45b02ee846..615c8746a3e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.adaptive.LogicalQueryStage import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo @@ -51,6 +53,19 @@ class SparkPlanInfo( private[execution] object SparkPlanInfo { + private def fromLogicalPlan(plan: LogicalPlan): SparkPlanInfo = { + val childrenInfo = plan match { + case LogicalQueryStage(_, physical) => Seq(fromSparkPlan(physical)) + case _ => (plan.children ++ plan.subqueries).map(fromLogicalPlan) + } + new SparkPlanInfo( + plan.nodeName, + plan.simpleString(SQLConf.get.maxToStringFields), + childrenInfo, + Map[String, String](), + Seq.empty) + } + def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { case ReusedExchangeExec(_, child) => child :: Nil @@ -58,6 +73,7 @@ private[execution] object SparkPlanInfo { case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil case stage: QueryStageExec => stage.plan :: Nil case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil + case EmptyRelationExec(logical) => (logical :: Nil) case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => @@ -69,10 +85,17 @@ private[execution] object SparkPlanInfo { case fileScan: FileSourceScanLike => fileScan.metadata case _ => Map[String, String]() } + val childrenInfo = children.flatMap { + case child: SparkPlan => + Some(fromSparkPlan(child)) + case child: LogicalPlan => + Some(fromLogicalPlan(child)) + case _ => None + } new SparkPlanInfo( plan.nodeName, plan.simpleString(SQLConf.get.maxToStringFields), - children.map(fromSparkPlan), + childrenInfo, metadata, metrics) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f0682e6b9afc..ed7a6162cc9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -959,6 +959,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data, _) => LocalTableScanExec(output, data) :: Nil + case logical.EmptyRelation(l) => EmptyRelationExec(l) :: Nil case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil // We should match the combination of limit and offset first, to get the optimal physical // plan, instead of planning limit and offset separately. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 382f8cf8861a..336cedc46e9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -953,6 +953,10 @@ case class CollapseCodegenStages( // Do not make LogicalTableScanExec the root of WholeStageCodegen // to support the fast driver-local collect/take paths. plan + case plan: EmptyRelationExec => + // Do not make EmptyRelationExec the root of WholeStageCodegen + // to support the fast driver-local collect/take paths. + plan case plan: CommandResultExec => // Do not make CommandResultExec the root of WholeStageCodegen // to support the fast driver-local collect/take paths. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 858130fae32b..7b3e0cd549b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec @@ -39,6 +40,8 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def nonEmpty(plan: LogicalPlan): Boolean = super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) + override protected def empty(plan: LogicalPlan): LogicalPlan = EmptyRelation(plan) + private def isRootRepartition(plan: LogicalPlan): Boolean = plan match { case l: LogicalQueryStage if l.getTagValue(ROOT_REPARTITION).isDefined => true case _ => false @@ -61,6 +64,8 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { None } + case _: EmptyRelation => Some(0) + case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 4e1e171c8a84..c71149644d0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec} +import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, EmptyRelationExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec} import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -1650,13 +1650,13 @@ class AdaptiveQueryExecSuite val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( "SELECT key FROM testData WHERE key = 0 ORDER BY key, value") assert(findTopLevelSort(plan1).size == 1) - assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec]) val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( "SELECT key FROM (SELECT * FROM testData WHERE value = 'no_match' ORDER BY key)" + " WHERE key > rand()") assert(findTopLevelSort(plan2).size == 1) - assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec]) } } @@ -1664,18 +1664,18 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key") - assert(!plan1.isInstanceOf[LocalTableScanExec]) - assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + assert(!plan1.isInstanceOf[EmptyRelationExec]) + assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec]) val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( "SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1") - assert(!plan2.isInstanceOf[LocalTableScanExec]) - assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + assert(!plan2.isInstanceOf[EmptyRelationExec]) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec]) val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult( "SELECT count(*) FROM testData WHERE value = 'no_match'") - assert(!plan3.isInstanceOf[LocalTableScanExec]) - assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[LocalTableScanExec]) + assert(!plan3.isInstanceOf[EmptyRelationExec]) + assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[EmptyRelationExec]) } } @@ -1696,7 +1696,7 @@ class AdaptiveQueryExecSuite |""".stripMargin) checkNumUnion(plan1, 1) checkNumUnion(adaptivePlan1, 0) - assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec]) val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( """ @@ -1706,7 +1706,7 @@ class AdaptiveQueryExecSuite |""".stripMargin) checkNumUnion(plan2, 1) checkNumUnion(adaptivePlan2, 0) - assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec]) } }