Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup
case _ => false
}

protected def empty(plan: LogicalPlan): LocalRelation =
protected def empty(plan: LogicalPlan): LogicalPlan =
LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming)

// Construct a project list from plan's output, while the value is always NULL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
placeholders.iterator.foldLeft(Iterator(candidate)) {
case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// Plan the logical plan for the placeholder.
val childPlans = this.plan(logicalPlan)
val childPlans = planPlaceHolder(placeholder, logicalPlan)

candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
childPlans.map { childPlan =>
Expand All @@ -94,6 +94,10 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
pruned
}

protected def planPlaceHolder(
placeHolder: PhysicalPlan,
logical: LogicalPlan): Iterator[PhysicalPlan] = this.plan(logical)

/**
* Collects placeholders marked using [[GenericStrategy#planLater planLater]]
* by [[strategies]].
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.SortOrder

case class EmptyRelation(logical: LogicalPlan) extends LeafNode {
override val isStreaming: Boolean = logical.isStreaming

override val outputOrdering: Seq[SortOrder] = logical.outputOrdering

override def output: Seq[Attribute] = logical.output

override def computeStats(): Statistics = Statistics(sizeInBytes = 0, rowCount = Some(0))

override def maxRows: Option[Long] = Some(0)

override def maxRowsPerPartition: Option[Long] = Some(0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* An intermediate placeholder for EmptyRelation planning, will be replaced with
* EmptyRelationExec eventually.
*/
case class EmptyRelationPlanLater(plan: LogicalPlan) extends PlanLaterBase

/**
* A leaf node wrapper for propagated empty relation, which preserved the physical plan.
*/
case class EmptyRelationExec(plan: SparkPlan) extends LeafExecNode with InputRDDCodegen {
private val rdd = sparkContext.emptyRDD[InternalRow]

override def output: Seq[Attribute] = plan.output

override protected def doExecute(): RDD[InternalRow] = rdd

override def executeCollect(): Array[InternalRow] = Array.empty

override def executeTake(limit: Int): Array[InternalRow] = Array.empty

override def executeTail(limit: Int): Array[InternalRow] = Array.empty

protected override def doExecuteColumnar(): RDD[ColumnarBatch] = sparkContext.emptyRDD

override def inputRDD: RDD[InternalRow] = rdd

override protected val createUnsafeProjection: Boolean = false

protected override def stringArgs: Iterator[Any] = Iterator(s"[plan_id=$id]")

override def generateTreeString(
depth: Int,
lastChildren: java.util.ArrayList[Boolean],
append: String => Unit,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(depth,
lastChildren,
append,
verbose,
prefix,
addSuffix,
maxFields,
printNodeId,
indent)
lastChildren.add(true)
plan.generateTreeString(
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent)
lastChildren.remove(lastChildren.size() - 1)
}

override def doCanonicalize(): SparkPlan = {
this.copy(plan = LocalTableScanExec(plan.output, Nil))
}

override protected[sql] def cleanupResources(): Unit = {
plan.cleanupResources()
super.cleanupResources()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ private[execution] object SparkPlanInfo {
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil
case EmptyRelationExec(plan) => plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.adaptive.EmptyRelationPropagationStrategy
import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy
import org.apache.spark.sql.execution.command.v2.V2CommandStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}
Expand All @@ -34,6 +35,7 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
EmptyRelationPropagationStrategy ::
LogicalQueryStageStrategy ::
PythonEvals ::
new DataSourceV2Strategy(session) ::
Expand All @@ -57,7 +59,18 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen

override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
plan.collect {
case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
case placeholder: PlanLaterBase => placeholder -> placeholder.plan
}
}

override protected def planPlaceHolder(
placeHolder: SparkPlan,
logical: LogicalPlan): Iterator[SparkPlan] = {
placeHolder match {
case EmptyRelationPlanLater(p) =>
super.plan(logical).map(EmptyRelationExec)
case _ =>
super.plan(logical)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ abstract class SparkStrategy extends GenericStrategy[SparkPlan] {
override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan)
}

case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
abstract class PlanLaterBase extends LeafExecNode {

def plan: LogicalPlan

override def output: Seq[Attribute] = plan.output

Expand All @@ -63,6 +65,8 @@ case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
}
}

case class PlanLater(plan: LogicalPlan) extends PlanLaterBase

abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SparkPlanner =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,10 @@ case class CollapseCodegenStages(
// Do not make LogicalTableScanExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
plan
case plan: EmptyRelationExec =>
// Do not make EmptyRelationExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
plan
case plan: CommandResultExec =>
// Do not make CommandResultExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase
import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
Expand All @@ -34,11 +35,14 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys
*/
object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
override protected def isEmpty(plan: LogicalPlan): Boolean =
super.isEmpty(plan) || (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0))
super.isEmpty(plan) || plan.isInstanceOf[EmptyRelation] ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check the type explicitly? It has row count which should be sufficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@liuzqt liuzqt Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan we still need to check this explicitly. getEstimatedRowCount only match for specific patterns. Before the change, the reason why it works is because super.isEmpty explicitly match empty LocalRelation.

We can also match EmptyRelation in getEstimatedRowCount alternatively. I don't have preference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more reasonable to make getEstimatedRowCount recognize this new EmptyRelation

(!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0))

override protected def nonEmpty(plan: LogicalPlan): Boolean =
super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0)

override protected def empty(plan: LogicalPlan): LogicalPlan = EmptyRelation(plan)

private def isRootRepartition(plan: LogicalPlan): Boolean = plan match {
case l: LogicalQueryStage if l.getTagValue(ROOT_REPARTITION).isDefined => true
case _ => false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.plans.logical.EmptyRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.EmptyRelationPlanLater
import org.apache.spark.sql.execution.SparkPlan

/**
* A strategy that plan logical [[EmptyRelation]] to physical [[EmptyRelationPlanLater]] which
* will be planned later.
*/
object EmptyRelationPropagationStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case EmptyRelation(logical) =>
EmptyRelationPlanLater(logical) :: Nil
case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY, SCALAR_SUBQUERY}
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.{InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryExec}
import org.apache.spark.sql.execution.EmptyRelationExec

case class PlanAdaptiveSubqueries(
subqueryMap: Map[Long, SparkPlan]) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = {
def apply(plan: SparkPlan): SparkPlan = applyInternal(plan.transformUp {
case emptyRelation@EmptyRelationExec(p) =>
emptyRelation.copy(plan = apply(p))
})

def applyInternal(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressionsWithPruning(
_.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) {
case expressions.ScalarSubquery(_, _, exprId, _, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, EmptyRelationExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
Expand Down Expand Up @@ -1650,32 +1650,32 @@ class AdaptiveQueryExecSuite
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
"SELECT key FROM testData WHERE key = 0 ORDER BY key, value")
assert(findTopLevelSort(plan1).size == 1)
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])

val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
"SELECT key FROM (SELECT * FROM testData WHERE value = 'no_match' ORDER BY key)" +
" WHERE key > rand()")
assert(findTopLevelSort(plan2).size == 1)
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])
}
}

test("SPARK-35442: Support propagate empty relation through aggregate") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key")
assert(!plan1.isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
assert(!plan1.isInstanceOf[EmptyRelationExec])
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])

val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key limit 1")
assert(!plan2.isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
assert(!plan2.isInstanceOf[EmptyRelationExec])
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])

val (plan3, adaptivePlan3) = runAdaptiveAndVerifyResult(
"SELECT count(*) FROM testData WHERE value = 'no_match'")
assert(!plan3.isInstanceOf[LocalTableScanExec])
assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[LocalTableScanExec])
assert(!plan3.isInstanceOf[EmptyRelationExec])
assert(!stripAQEPlan(adaptivePlan3).isInstanceOf[EmptyRelationExec])
}
}

Expand All @@ -1696,7 +1696,7 @@ class AdaptiveQueryExecSuite
|""".stripMargin)
checkNumUnion(plan1, 1)
checkNumUnion(adaptivePlan1, 0)
assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
assert(!stripAQEPlan(adaptivePlan1).isInstanceOf[EmptyRelationExec])

val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
"""
Expand All @@ -1706,7 +1706,7 @@ class AdaptiveQueryExecSuite
|""".stripMargin)
checkNumUnion(plan2, 1)
checkNumUnion(adaptivePlan2, 0)
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[EmptyRelationExec])
}
}

Expand Down