From 5231eb9a41c4e5cc418e1662eabc0e1b207c14c3 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 5 Jun 2023 10:29:18 +0800 Subject: [PATCH 1/5] lazy construct subquery to improve reuse subquery --- .../adaptive/InsertAdaptiveSparkPlan.scala | 28 +++++++++++-------- .../adaptive/PlanAdaptiveSubqueries.scala | 8 +++--- .../adaptive/ReuseAdaptiveSubquery.scala | 15 ++++------ 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 1f05adc57a4b..f308a5ef375d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -126,9 +126,12 @@ case class InsertAdaptiveSparkPlan( * Returns an expression-id-to-execution-plan map for all the sub-queries. * For each sub-query, generate the adaptive execution plan for each sub-query by applying this * rule. + * The reason return a subquery build func is to avoid use same instance when apply + * [[PlanAdaptiveSubqueries]], so then [[ReuseAdaptiveSubquery]] can use reference to check + * whether should wrap [[ReusedSubqueryExec]] or not. */ - private def buildSubqueryMap(plan: SparkPlan): Map[Long, BaseSubqueryExec] = { - val subqueryMap = mutable.HashMap.empty[Long, BaseSubqueryExec] + private def buildSubqueryMap(plan: SparkPlan): Map[Long, () => BaseSubqueryExec] = { + val subqueryMap = mutable.HashMap.empty[Long, () => BaseSubqueryExec] if (!plan.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) { return subqueryMap.toMap } @@ -137,15 +140,17 @@ case class InsertAdaptiveSparkPlan( if !subqueryMap.contains(exprId.id) => val executedPlan = compileSubquery(p) verifyAdaptivePlan(executedPlan, p) - val subquery = SubqueryExec.createForScalarSubquery( - s"subquery#${exprId.id}", executedPlan) - subqueryMap.put(exprId.id, subquery) + subqueryMap.put(exprId.id, () => { + SubqueryExec.createForScalarSubquery( + s"subquery#${exprId.id}", executedPlan) + }) case expressions.InSubquery(_, ListQuery(query, _, exprId, _, _, _)) if !subqueryMap.contains(exprId.id) => val executedPlan = compileSubquery(query) verifyAdaptivePlan(executedPlan, query) - val subquery = SubqueryExec(s"subquery#${exprId.id}", executedPlan) - subqueryMap.put(exprId.id, subquery) + subqueryMap.put(exprId.id, () => { + SubqueryExec(s"subquery#${exprId.id}", executedPlan) + }) case expressions.DynamicPruningSubquery(value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId, _) if !subqueryMap.contains(exprId.id) => @@ -153,10 +158,11 @@ case class InsertAdaptiveSparkPlan( verifyAdaptivePlan(executedPlan, buildPlan) val name = s"dynamicpruning#${exprId.id}" - val subquery = SubqueryAdaptiveBroadcastExec( - name, broadcastKeyIndex, onlyInBroadcast, - buildPlan, buildKeys, executedPlan) - subqueryMap.put(exprId.id, subquery) + subqueryMap.put(exprId.id, () => { + SubqueryAdaptiveBroadcastExec( + name, broadcastKeyIndex, onlyInBroadcast, + buildPlan, buildKeys, executedPlan) + }) case _ => })) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala index c3f427405835..3fb2270bc6a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala @@ -26,13 +26,13 @@ import org.apache.spark.sql.execution import org.apache.spark.sql.execution.{BaseSubqueryExec, InSubqueryExec, SparkPlan} case class PlanAdaptiveSubqueries( - subqueryMap: Map[Long, BaseSubqueryExec]) extends Rule[SparkPlan] { + subqueryMap: Map[Long, () => BaseSubqueryExec]) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { plan.transformAllExpressionsWithPruning( _.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) { case expressions.ScalarSubquery(_, _, exprId, _, _, _) => - execution.ScalarSubquery(subqueryMap(exprId.id), exprId) + execution.ScalarSubquery(subqueryMap(exprId.id).apply(), exprId) case expressions.InSubquery(values, ListQuery(_, _, exprId, _, _, _)) => val expr = if (values.length == 1) { values.head @@ -43,9 +43,9 @@ case class PlanAdaptiveSubqueries( } ) } - InSubqueryExec(expr, subqueryMap(exprId.id), exprId, shouldBroadcast = true) + InSubqueryExec(expr, subqueryMap(exprId.id).apply(), exprId, shouldBroadcast = true) case expressions.DynamicPruningSubquery(value, _, _, _, _, exprId, _) => - DynamicPruningExpression(InSubqueryExec(value, subqueryMap(exprId.id), exprId)) + DynamicPruningExpression(InSubqueryExec(value, subqueryMap(exprId.id).apply(), exprId)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala index df6849447215..c1d0e93e3b97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala @@ -33,16 +33,11 @@ case class ReuseAdaptiveSubquery( plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { case sub: ExecSubqueryExpression => - // The subquery can be already reused (the same Java object) due to filter pushdown - // of table cache. If it happens, we just need to wrap the current subquery with - // `ReusedSubqueryExec` and no need to update the `reuseMap`. - reuseMap.get(sub.plan.canonicalized).map { subquery => - sub.withNewPlan(ReusedSubqueryExec(subquery)) - }.getOrElse { - reuseMap.putIfAbsent(sub.plan.canonicalized, sub.plan) match { - case Some(subquery) => sub.withNewPlan(ReusedSubqueryExec(subquery)) - case None => sub - } + val newPlan = reuseMap.getOrElseUpdate(sub.plan.canonicalized, sub.plan) + if (newPlan.ne(sub.plan)) { + sub.withNewPlan(ReusedSubqueryExec(newPlan)) + } else { + sub } } } From 0ea7763805e95c3aeb18e7b28999b4b5ac40e7b8 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 5 Jun 2023 11:01:09 +0800 Subject: [PATCH 2/5] hold executed plan --- .../adaptive/InsertAdaptiveSparkPlan.scala | 29 +++++-------------- .../adaptive/PlanAdaptiveSubqueries.scala | 22 +++++++++----- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index f308a5ef375d..af4b7ac32bbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -126,12 +126,11 @@ case class InsertAdaptiveSparkPlan( * Returns an expression-id-to-execution-plan map for all the sub-queries. * For each sub-query, generate the adaptive execution plan for each sub-query by applying this * rule. - * The reason return a subquery build func is to avoid use same instance when apply - * [[PlanAdaptiveSubqueries]], so then [[ReuseAdaptiveSubquery]] can use reference to check - * whether should wrap [[ReusedSubqueryExec]] or not. + * The returned subquery map holds executed plan, then the [[PlanAdaptiveSubqueries]] can take + * them and create a new subquery. */ - private def buildSubqueryMap(plan: SparkPlan): Map[Long, () => BaseSubqueryExec] = { - val subqueryMap = mutable.HashMap.empty[Long, () => BaseSubqueryExec] + private def buildSubqueryMap(plan: SparkPlan): Map[Long, SparkPlan] = { + val subqueryMap = mutable.HashMap.empty[Long, SparkPlan] if (!plan.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) { return subqueryMap.toMap } @@ -140,29 +139,17 @@ case class InsertAdaptiveSparkPlan( if !subqueryMap.contains(exprId.id) => val executedPlan = compileSubquery(p) verifyAdaptivePlan(executedPlan, p) - subqueryMap.put(exprId.id, () => { - SubqueryExec.createForScalarSubquery( - s"subquery#${exprId.id}", executedPlan) - }) + subqueryMap.put(exprId.id, executedPlan) case expressions.InSubquery(_, ListQuery(query, _, exprId, _, _, _)) if !subqueryMap.contains(exprId.id) => val executedPlan = compileSubquery(query) verifyAdaptivePlan(executedPlan, query) - subqueryMap.put(exprId.id, () => { - SubqueryExec(s"subquery#${exprId.id}", executedPlan) - }) - case expressions.DynamicPruningSubquery(value, buildPlan, - buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId, _) + subqueryMap.put(exprId.id, executedPlan) + case expressions.DynamicPruningSubquery(_, buildPlan, _, _, _, exprId, _) if !subqueryMap.contains(exprId.id) => val executedPlan = compileSubquery(buildPlan) verifyAdaptivePlan(executedPlan, buildPlan) - - val name = s"dynamicpruning#${exprId.id}" - subqueryMap.put(exprId.id, () => { - SubqueryAdaptiveBroadcastExec( - name, broadcastKeyIndex, onlyInBroadcast, - buildPlan, buildKeys, executedPlan) - }) + subqueryMap.put(exprId.id, executedPlan) case _ => })) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala index 3fb2270bc6a3..5b4a7e50db71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveSubqueries.scala @@ -20,19 +20,20 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, DynamicPruningExpression, ListQuery, Literal} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY, - SCALAR_SUBQUERY} +import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY, SCALAR_SUBQUERY} import org.apache.spark.sql.execution -import org.apache.spark.sql.execution.{BaseSubqueryExec, InSubqueryExec, SparkPlan} +import org.apache.spark.sql.execution.{InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec, SubqueryExec} case class PlanAdaptiveSubqueries( - subqueryMap: Map[Long, () => BaseSubqueryExec]) extends Rule[SparkPlan] { + subqueryMap: Map[Long, SparkPlan]) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { plan.transformAllExpressionsWithPruning( _.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) { case expressions.ScalarSubquery(_, _, exprId, _, _, _) => - execution.ScalarSubquery(subqueryMap(exprId.id).apply(), exprId) + val subquery = SubqueryExec.createForScalarSubquery( + s"subquery#${exprId.id}", subqueryMap(exprId.id)) + execution.ScalarSubquery(subquery, exprId) case expressions.InSubquery(values, ListQuery(_, _, exprId, _, _, _)) => val expr = if (values.length == 1) { values.head @@ -43,9 +44,14 @@ case class PlanAdaptiveSubqueries( } ) } - InSubqueryExec(expr, subqueryMap(exprId.id).apply(), exprId, shouldBroadcast = true) - case expressions.DynamicPruningSubquery(value, _, _, _, _, exprId, _) => - DynamicPruningExpression(InSubqueryExec(value, subqueryMap(exprId.id).apply(), exprId)) + val subquery = SubqueryExec(s"subquery#${exprId.id}", subqueryMap(exprId.id)) + InSubqueryExec(expr, subquery, exprId, shouldBroadcast = true) + case expressions.DynamicPruningSubquery(value, buildPlan, + buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId, _) => + val name = s"dynamicpruning#${exprId.id}" + val subquery = SubqueryAdaptiveBroadcastExec(name, broadcastKeyIndex, onlyInBroadcast, + buildPlan, buildKeys, subqueryMap(exprId.id)) + DynamicPruningExpression(InSubqueryExec(value, subquery, exprId)) } } } From eba1234335399f0431d9cda768bd8fcad5d997aa Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 5 Jun 2023 22:34:15 +0800 Subject: [PATCH 3/5] improve table cache --- .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 1b2e802ae939..26c95c0fedd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -590,7 +590,12 @@ case class AdaptiveSparkPlanExec( // Apply `queryStageOptimizerRules` so that we can reuse subquery. // No need to apply `postStageCreationRules` for `InMemoryTableScanExec` // as it's a leaf node. - TableCacheQueryStageExec(currentStageId, optimizeQueryStage(i, isFinalStage = false)) + val newPlan = optimizeQueryStage(i, isFinalStage = false) + if (!newPlan.isInstanceOf[InMemoryTableScanExec]) { + throw SparkException.internalError( + "Custom AQE rules cannot transform table scan node to something else.") + } + TableCacheQueryStageExec(currentStageId, newPlan) } currentStageId += 1 setLogicalLinkForNewQueryStage(queryStage, plan) From c231b92953391c7d99d7e5f7961e4906ca8faf59 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 6 Jun 2023 09:59:47 +0800 Subject: [PATCH 4/5] address comment --- .../adaptive/InsertAdaptiveSparkPlan.scala | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index af4b7ac32bbf..06a82e0d0a13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.{ListQuery, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution import org.apache.spark.sql.catalyst.rules.Rule @@ -135,21 +134,10 @@ case class InsertAdaptiveSparkPlan( return subqueryMap.toMap } plan.foreach(_.expressions.filter(_.containsPattern(PLAN_EXPRESSION)).foreach(_.foreach { - case expressions.ScalarSubquery(p, _, exprId, _, _, _) - if !subqueryMap.contains(exprId.id) => - val executedPlan = compileSubquery(p) - verifyAdaptivePlan(executedPlan, p) - subqueryMap.put(exprId.id, executedPlan) - case expressions.InSubquery(_, ListQuery(query, _, exprId, _, _, _)) - if !subqueryMap.contains(exprId.id) => - val executedPlan = compileSubquery(query) - verifyAdaptivePlan(executedPlan, query) - subqueryMap.put(exprId.id, executedPlan) - case expressions.DynamicPruningSubquery(_, buildPlan, _, _, _, exprId, _) - if !subqueryMap.contains(exprId.id) => - val executedPlan = compileSubquery(buildPlan) - verifyAdaptivePlan(executedPlan, buildPlan) - subqueryMap.put(exprId.id, executedPlan) + case subquery: SubqueryExpression if !subqueryMap.contains(subquery.exprId.id) => + val executedPlan = compileSubquery(subquery.plan) + verifyAdaptivePlan(executedPlan, subquery.plan) + subqueryMap.put(subquery.exprId.id, executedPlan) case _ => })) From ee7a476c1ff48a54259662313fc4ed4f0f763060 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 7 Jun 2023 09:43:40 +0800 Subject: [PATCH 5/5] address comment --- .../adaptive/InsertAdaptiveSparkPlan.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 06a82e0d0a13..9986e5d47870 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions.{DynamicPruningSubquery, ListQuery, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution import org.apache.spark.sql.catalyst.rules.Rule @@ -134,10 +135,13 @@ case class InsertAdaptiveSparkPlan( return subqueryMap.toMap } plan.foreach(_.expressions.filter(_.containsPattern(PLAN_EXPRESSION)).foreach(_.foreach { - case subquery: SubqueryExpression if !subqueryMap.contains(subquery.exprId.id) => - val executedPlan = compileSubquery(subquery.plan) - verifyAdaptivePlan(executedPlan, subquery.plan) - subqueryMap.put(subquery.exprId.id, executedPlan) + case e @ (_: expressions.ScalarSubquery | _: ListQuery | _: DynamicPruningSubquery) => + val subquery = e.asInstanceOf[SubqueryExpression] + if (!subqueryMap.contains(subquery.exprId.id)) { + val executedPlan = compileSubquery(subquery.plan) + verifyAdaptivePlan(executedPlan, subquery.plan) + subqueryMap.put(subquery.exprId.id, executedPlan) + } case _ => }))