Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
49e8bd9
Support optimize skew join even if introduce extra shuffle
ulysses-you Jun 8, 2021
db77ddd
EnsureRequirements
ulysses-you Jun 8, 2021
a63cd72
make a new rules
ulysses-you Jun 9, 2021
59a5e4a
fix local reader number
ulysses-you Jun 10, 2021
9c985da
more cost
ulysses-you Jul 2, 2021
e2102a5
Merge branch 'master' of https://github.com/apache/spark into support…
ulysses-you Jul 9, 2021
8bc22ad
nit
ulysses-you Jul 9, 2021
7734d3e
nit
ulysses-you Jul 9, 2021
cbc7553
Merge branch 'master' of https://github.com/apache/spark into support…
ulysses-you Aug 3, 2021
3dc61a3
force optimize skewed join
ulysses-you Aug 3, 2021
30b7de0
style
ulysses-you Aug 3, 2021
6caa4a3
name
ulysses-you Aug 3, 2021
cd1a379
final stage
ulysses-you Aug 13, 2021
2b3bfe6
style
ulysses-you Aug 13, 2021
d305894
conflick
ulysses-you Aug 13, 2021
6725f97
checkDistribution
ulysses-you Aug 13, 2021
7a0448b
SimpleCostEvaluator
ulysses-you Aug 19, 2021
60b7b9d
address comment
ulysses-you Aug 19, 2021
fbf9727
cost
ulysses-you Aug 19, 2021
b54e9c2
plan twice
ulysses-you Aug 20, 2021
f5ad40e
nit
ulysses-you Aug 20, 2021
8058fe9
nit
ulysses-you Aug 20, 2021
369bf33
ensureRequiredDistribution
ulysses-you Aug 25, 2021
d93c3df
remove dead code
ulysses-you Aug 25, 2021
b215e2d
simplify code
ulysses-you Aug 25, 2021
5b63e4d
address comment
ulysses-you Aug 25, 2021
3ccc29b
style
ulysses-you Aug 25, 2021
bc45d70
fix order
ulysses-you Aug 25, 2021
580a0a4
address comment
ulysses-you Aug 26, 2021
bc39694
address comment
ulysses-you Aug 26, 2021
bb2e713
address comment
ulysses-you Sep 2, 2021
d3f0131
nit
ulysses-you Sep 2, 2021
4712986
nit
ulysses-you Sep 2, 2021
23ebea0
address comment
ulysses-you Sep 5, 2021
ef0765f
pass EnsureRequirements
ulysses-you Sep 7, 2021
76c363d
simplify
ulysses-you Sep 7, 2021
8961084
nit
ulysses-you Sep 7, 2021
5ba73c4
EnsureRequirements
ulysses-you Sep 7, 2021
ca63321
pull out shuffle origin
ulysses-you Sep 9, 2021
f5e4b91
address comment
ulysses-you Sep 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
plan twice
  • Loading branch information
ulysses-you committed Aug 20, 2021
commit b54e9c232a43ce05e320d5459b16b770261ee38b
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,27 @@ case class AdaptiveSparkPlanExec(
// A list of physical plan rules to be applied before creation of query stages. The physical
// plan should reach a final status of query stages (i.e., no more addition or removal of
// Exchange nodes) after running these rules.
@transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
RemoveRedundantProjects,
// For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for
// the final plan, but we do need to respect the user-specified repartition. Here we ask
// `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work
// around this case.
EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined),
RemoveRedundantSorts,
DisableUnnecessaryBucketedScan
) ++ context.session.sessionState.queryStagePrepRules

// This list rules are applied between queryStagePreparationRules and estimate physical plan cost
// so that we can support introduce extra shuffle
@transient private val optimizeSkewedJoinWithExtraShuffleRules: Seq[Rule[SparkPlan]] = {
val ensureExtraShuffleRule = if (forceOptimizeSkewedJoin) {
// Add the EnsureRequirements rule here and don't optimize out repartition so that we can
// ensure the output partitioning of OptimizeSkewedJoin is always expected.
Seq(EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined))
private def queryStagePreparationRules(
optimizeSkewedJoin: Boolean = false): Seq[Rule[SparkPlan]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is mainly for initalPlan which is not needed optimize skewed join

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im OK to remove it

val optimizeSkewedJoinRules = if (optimizeSkewedJoin) {
Seq(OptimizeSkewedJoin,
// Add the EnsureRequirements rule here and don't optimize out repartition so that we can
// ensure the output partitioning of OptimizeSkewedJoin is always expected.
EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined))
} else {
Nil
}
Seq(OptimizeSkewedJoin) ++ ensureExtraShuffleRule

Seq(
RemoveRedundantProjects,
// For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for
// the final plan, but we do need to respect the user-specified repartition. Here we ask
// `EnsureRequirements` to not optimize out the user-specified repartition-by-col to work
// around this case.
EnsureRequirements(optimizeOutRepartition = requiredDistribution.isDefined),
RemoveRedundantSorts,
DisableUnnecessaryBucketedScan
) ++ optimizeSkewedJoinRules ++ context.session.sessionState.queryStagePrepRules
}

// A list of physical optimizer rules to be applied to a new stage before its execution. These
Expand Down Expand Up @@ -200,7 +199,7 @@ case class AdaptiveSparkPlanExec(

@transient val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
inputPlan, queryStagePreparationRules(), Some((planChangeLogger, "AQE Preparations")))
}

@volatile private var currentPhysicalPlan = initialPlan
Expand Down Expand Up @@ -332,7 +331,7 @@ case class AdaptiveSparkPlanExec(
}
}._1

if (!newPhysicalPlan.fastEquals(currentPhysicalPlan)) {
if (newPhysicalPlan.ne(currentPhysicalPlan)) {
logOnLevel(s"Plan changed from\n$currentPhysicalPlan\nto\n$newPhysicalPlan")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
Expand Down Expand Up @@ -667,6 +666,25 @@ case class AdaptiveSparkPlanExec(
logicalPlan
}

private def isFinalStage(sparkPlan: SparkPlan): Boolean = {
sparkPlan match {
// avoid top level node is Exchange
case _: Exchange => false
case plan =>
// Plan is regarded as a final plan iff all shuffle nodes are wrapped inside query stage
// and all query stages are materialized.
plan.find {
case p if p.children.exists(
child => child.isInstanceOf[Exchange] || child.isInstanceOf[ReusedExchangeExec]) =>
p match {
case stage: QueryStageExec if stage.isMaterialized => false
case _ => true
}
case _ => false
}.isEmpty
}
}

/**
* Re-optimize and run physical planning on the current logical plan based on the latest stats.
*/
Expand All @@ -675,11 +693,25 @@ case class AdaptiveSparkPlanExec(
logicalPlan.invalidateStatsCache()
val optimized = optimizer.execute(logicalPlan)
val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()

val optimizedPhysicalPlan = applyPhysicalRules(
sparkPlan,
preprocessingRules ++ queryStagePreparationRules,
preprocessingRules ++ queryStagePreparationRules(),
Some((planChangeLogger, "AQE Replanning")))

val optimizedWithSkewedJoin = applyPhysicalRules(
sparkPlan,
preprocessingRules ++ queryStagePreparationRules(true),
Some((planChangeLogger, "AQE Replanning With Optimize Skewed Join")))

// respect the requiredDistribution for final stage
val validatedWithSkewedJoin =
checkDistribution(
optimizedWithSkewedJoin,
optimizedPhysicalPlan,
isFinalStage(optimizedWithSkewedJoin),
OptimizeSkewedJoin.ruleName)

// When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will
// add the `BroadcastExchangeExec` node manually in the DPP subquery,
// not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated
Expand All @@ -693,23 +725,10 @@ case class AdaptiveSparkPlanExec(
case _ => plan
}

val optimizedWithSkewedJoin = applyPhysicalRules(
optimizedPhysicalPlan,
optimizeSkewedJoinWithExtraShuffleRules,
Some((planChangeLogger, "AQE Optimize Skewed Join With Extra Shuffle"))
)
val validatedWithSkewedJoin =
checkDistribution(
optimizedWithSkewedJoin,
optimizedPhysicalPlan,
isFinalStage(optimizedWithSkewedJoin),
OptimizeSkewedJoin.ruleName)

// here are three reasons if validatedWithSkewedJoin is equal to optimizedPhysicalPlan:
// here are two reasons if validatedWithSkewedJoin is equal to optimizedPhysicalPlan:
// 1. no skewed join optimized
// 2. optimize skewed join introduce extra shuffle and force optimize is disabled
// 3. optimize skewed join change final stage output partitioning
val newPhysicalPlans = if (validatedWithSkewedJoin.fastEquals(optimizedPhysicalPlan)) {
// 2. optimize skewed join doesn't satisfy requiredDistribution for final stage
val newPhysicalPlans = if (optimizedPhysicalPlan.fastEquals(validatedWithSkewedJoin)) {
updateBroadcastExchange(optimizedPhysicalPlan) :: Nil
} else {
updateBroadcastExchange(optimizedPhysicalPlan) ::
Expand All @@ -718,25 +737,6 @@ case class AdaptiveSparkPlanExec(
(newPhysicalPlans, optimized)
}

private def isFinalStage(sparkPlan: SparkPlan): Boolean = {
sparkPlan match {
// avoid top level node is Exchange
case _: Exchange => false
case plan =>
// Plan is regarded as a final plan iff all shuffle nodes are wrapped inside query stage
// and all query stages are materialized.
plan.find {
case p if p.children.exists(
child => child.isInstanceOf[Exchange] || child.isInstanceOf[ReusedExchangeExec]) =>
p match {
case stage: QueryStageExec if stage.isMaterialized => false
case _ => true
}
case _ => false
}.isEmpty
}
}

/**
* Recursively set `TEMP_LOGICAL_PLAN_TAG` for the current `plan` node.
*/
Expand Down