Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
resolve the comments and fix the failed test
  • Loading branch information
JkSelf committed Apr 15, 2021
commit 657c61b7c0e08d085cc505ce0a699fa45fe3e078
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,15 @@ case class AdaptiveSparkPlanExec(
DisableUnnecessaryBucketedScan
) ++ context.session.sessionState.queryStagePrepRules

@transient private val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveDynamicPruningFilters(inputPlan),
PlanAdaptiveDynamicPruningFilters(initialPlan),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need use the initialPlan not the inputPlan, because the inputPlan is not applied the queryStagePreparationRules(EnsureRequirements).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to pass this as the root plan. AdaptiveSparkPlanExec keeps changing when more and more query stages are completed. So it's better that PlanAdaptiveDynamicPruningFilters always look at the latest plan.

ReuseAdaptiveSubquery(context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
Expand Down Expand Up @@ -129,11 +134,6 @@ case class AdaptiveSparkPlanExec(

@transient private val costEvaluator = SimpleCostEvaluator

@transient private val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

@volatile private var currentPhysicalPlan = initialPlan

private var isFinalPlan = false
Expand Down Expand Up @@ -311,8 +311,7 @@ case class AdaptiveSparkPlanExec(
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
val broadcastPlan = getFinalPhysicalPlan()
broadcastPlan.doExecuteBroadcast()
getFinalPhysicalPlan().doExecuteBroadcast()
}

protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")
Expand Down Expand Up @@ -481,7 +480,7 @@ case class AdaptiveSparkPlanExec(
throw new IllegalStateException(
"Custom columnar rules cannot transform shuffle node to something else.")
}
ShuffleQueryStageExec(currentStageId, newShuffle, s.child.canonicalized)
ShuffleQueryStageExec(currentStageId, newShuffle, s.canonicalized)
case b: BroadcastExchangeLike =>
val newBroadcast = applyPhysicalRules(
b.withNewChildren(Seq(optimizedPlan)),
Expand All @@ -491,7 +490,7 @@ case class AdaptiveSparkPlanExec(
throw new IllegalStateException(
"Custom columnar rules cannot transform broadcast node to something else.")
}
BroadcastQueryStageExec(currentStageId, newBroadcast, b.child.canonicalized)
BroadcastQueryStageExec(currentStageId, newBroadcast, b.canonicalized)
}
currentStageId += 1
setLogicalLinkForNewQueryStage(queryStage, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati
/**
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
*/
case class PlanAdaptiveDynamicPruningFilters(
originalPlan: SparkPlan) extends Rule[SparkPlan] {
case class PlanAdaptiveDynamicPruningFilters(rootPlan: SparkPlan) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
Expand All @@ -40,20 +39,20 @@ case class PlanAdaptiveDynamicPruningFilters(
adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) =>
val packedKeys = BindReferences.bindReferences(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move this into if (canReuseExchange)

HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output)
val mode = HashedRelationBroadcastMode(packedKeys)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)

val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty &&
originalPlan.find {
rootPlan.find {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _, _) =>
left.sameResult(adaptivePlan.executedPlan)
left.sameResult(exchange)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right, _) =>
right.sameResult(adaptivePlan.executedPlan)
right.sameResult(exchange)
case _ => false
}.isDefined

if(canReuseExchange) {
val mode = HashedRelationBroadcastMode(packedKeys)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, adaptivePlan.executedPlan)
if (canReuseExchange) {
exchange.setLogicalLink(adaptivePlan.executedPlan.logicalLink.get)
val newAdaptivePlan = AdaptiveSparkPlanExec(
exchange, adaptivePlan.context, adaptivePlan.preprocessingRules, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: adaptivePlan.copy(inputPlan = exchange)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ abstract class DynamicPartitionPruningSuiteBase
case _ => false
}.isDefined
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case a: AdaptiveSparkPlanExec =>
val hasReuse = collect(a) {
case r: ReusedExchangeExec => r
}.nonEmpty
assert(hasReuse, s"$s\nshould have been reused in\n$plan")
case _ =>
fail(s"Invalid child node found in\n$s")
}
Expand Down