Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup
case _ => false
}

protected def empty(plan: LogicalPlan): LocalRelation =
protected def empty(plan: LogicalPlan): LogicalPlan =
LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming)

// Construct a project list from plan's output, while the value is always NULL.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.SortOrder

case class EmptyRelation(logical: LogicalPlan) extends LeafNode {
override val isStreaming: Boolean = logical.isStreaming

override val outputOrdering: Seq[SortOrder] = logical.outputOrdering

override def output: Seq[Attribute] = logical.output

override def computeStats(): Statistics = Statistics(sizeInBytes = 0, rowCount = Some(0))

override def maxRows: Option[Long] = Some(0)

override def maxRowsPerPartition: Option[Long] = Some(0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A leaf node wrapper for propagated empty relation, which preserved the eliminated logical plan.
* The logical plan might be partial executed, i.e., containing LogicalQueryStage.
*/
case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNode
with InputRDDCodegen {
private val rdd = sparkContext.emptyRDD[InternalRow]

// Here we can not use def, because logical won't be serialized to executor while this method
// will be call in executor.
override val output: Seq[Attribute] = logical.output

override protected def doExecute(): RDD[InternalRow] = rdd

override def executeCollect(): Array[InternalRow] = Array.empty

override def executeTake(limit: Int): Array[InternalRow] = Array.empty

override def executeTail(limit: Int): Array[InternalRow] = Array.empty

protected override def doExecuteColumnar(): RDD[ColumnarBatch] = sparkContext.emptyRDD

override def inputRDD: RDD[InternalRow] = rdd

override protected val createUnsafeProjection: Boolean = false

protected override def stringArgs: Iterator[Any] = Iterator(s"[plan_id=$id]")

override def generateTreeString(
depth: Int,
lastChildren: java.util.ArrayList[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(depth,
lastChildren,
append,
verbose,
prefix,
addSuffix,
maxFields,
printNodeId,
indent)
lastChildren.add(true)
logical.generateTreeString(
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent)
lastChildren.remove(lastChildren.size() - 1)
}

override def doCanonicalize(): SparkPlan = {
this.copy(logical = LocalRelation(logical.output).canonicalized)
}

override protected[sql] def cleanupResources(): Unit = {
logical.foreach {
case LogicalQueryStage(_, physical) =>
physical.cleanupResources()
}
super.cleanupResources()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
Expand Down Expand Up @@ -51,13 +53,27 @@ class SparkPlanInfo(

private[execution] object SparkPlanInfo {

private def fromLogicalPlan(plan: LogicalPlan): SparkPlanInfo = {
val childrenInfo = plan match {
case LogicalQueryStage(_, physical) => Seq(fromSparkPlan(physical))
case _ => (plan.children ++ plan.subqueries).map(fromLogicalPlan)
}
new SparkPlanInfo(
plan.nodeName,
plan.simpleString(SQLConf.get.maxToStringFields),
childrenInfo,
Map[String, String](),
Seq.empty)
}

def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case ReusedSubqueryExec(child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil
case EmptyRelationExec(logical) => (logical :: Nil)
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand All @@ -69,10 +85,17 @@ private[execution] object SparkPlanInfo {
case fileScan: FileSourceScanLike => fileScan.metadata
case _ => Map[String, String]()
}
val childrenInfo = children.flatMap {
case child: SparkPlan =>
Some(fromSparkPlan(child))
case child: LogicalPlan =>
Some(fromLogicalPlan(child))
case _ => None
}
new SparkPlanInfo(
plan.nodeName,
plan.simpleString(SQLConf.get.maxToStringFields),
children.map(fromSparkPlan),
childrenInfo,
metadata,
metrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data, _) =>
LocalTableScanExec(output, data) :: Nil
case logical.EmptyRelation(l) => EmptyRelationExec(l) :: Nil
case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil
// We should match the combination of limit and offset first, to get the optimal physical
// plan, instead of planning limit and offset separately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,10 @@ case class CollapseCodegenStages(
// Do not make LogicalTableScanExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
plan
case plan: EmptyRelationExec =>
// Do not make EmptyRelationExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
plan
case plan: CommandResultExec =>
// Do not make CommandResultExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase
import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
Expand All @@ -39,6 +40,8 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
override protected def nonEmpty(plan: LogicalPlan): Boolean =
super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0)

override protected def empty(plan: LogicalPlan): LogicalPlan = EmptyRelation(plan)

private def isRootRepartition(plan: LogicalPlan): Boolean = plan match {
case l: LogicalQueryStage if l.getTagValue(ROOT_REPARTITION).isDefined => true
case _ => false
Expand All @@ -61,6 +64,8 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
None
}

case _: EmptyRelation => Some(0)

case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, EmptyRelationExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
Expand Down Expand Up @@ -1650,32 +1650,32 @@ class AdaptiveQueryExecSuite
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
"SELECT key FROM testData WHERE key = 0 ORDER BY key, value")
assert(findTopLevelSort(plan1).size == 1)
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])

val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
"SELECT key FROM (SELECT * FROM testData WHERE value = 'no_match' ORDER BY key)" +
" WHERE key > rand()")
assert(findTopLevelSort(plan2).size == 1)
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])
}
}

test("SPARK-35442: Support propagate empty relation through aggregate") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key")
assert(!plan1.isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
assert(!plan1.isInstanceOf[EmptyRelationExec])
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])

val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1")
assert(!plan2.isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
assert(!plan2.isInstanceOf[EmptyRelationExec])
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])

val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult(
"SELECT count(*) FROM testData WHERE value = 'no_match'")
assert(!plan3.isInstanceOf[LocalTableScanExec])
assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[LocalTableScanExec])
assert(!plan3.isInstanceOf[EmptyRelationExec])
assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[EmptyRelationExec])
}
}

Expand All @@ -1696,7 +1696,7 @@ class AdaptiveQueryExecSuite
|""".stripMargin)
checkNumUnion(plan1, 1)
checkNumUnion(adaptivePlan1, 0)
assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])

val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
"""
Expand All @@ -1706,7 +1706,7 @@ class AdaptiveQueryExecSuite
|""".stripMargin)
checkNumUnion(plan2, 1)
checkNumUnion(adaptivePlan2, 0)
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])
}
}

Expand Down