Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
49e8bd9
Support optimize skew join even if introduce extra shuffle
ulysses-you Jun 8, 2021
db77ddd
EnsureRequirements
ulysses-you Jun 8, 2021
a63cd72
make a new rules
ulysses-you Jun 9, 2021
59a5e4a
fix local reader number
ulysses-you Jun 10, 2021
9c985da
more cost
ulysses-you Jul 2, 2021
e2102a5
Merge branch 'master' of https://github.com/apache/spark into support…
ulysses-you Jul 9, 2021
8bc22ad
nit
ulysses-you Jul 9, 2021
7734d3e
nit
ulysses-you Jul 9, 2021
cbc7553
Merge branch 'master' of https://github.com/apache/spark into support…
ulysses-you Aug 3, 2021
3dc61a3
force optimize skewed join
ulysses-you Aug 3, 2021
30b7de0
style
ulysses-you Aug 3, 2021
6caa4a3
name
ulysses-you Aug 3, 2021
cd1a379
final stage
ulysses-you Aug 13, 2021
2b3bfe6
style
ulysses-you Aug 13, 2021
d305894
conflick
ulysses-you Aug 13, 2021
6725f97
checkDistribution
ulysses-you Aug 13, 2021
7a0448b
SimpleCostEvaluator
ulysses-you Aug 19, 2021
60b7b9d
address comment
ulysses-you Aug 19, 2021
fbf9727
cost
ulysses-you Aug 19, 2021
b54e9c2
plan twice
ulysses-you Aug 20, 2021
f5ad40e
nit
ulysses-you Aug 20, 2021
8058fe9
nit
ulysses-you Aug 20, 2021
369bf33
ensureRequiredDistribution
ulysses-you Aug 25, 2021
d93c3df
remove dead code
ulysses-you Aug 25, 2021
b215e2d
simplify code
ulysses-you Aug 25, 2021
5b63e4d
address comment
ulysses-you Aug 25, 2021
3ccc29b
style
ulysses-you Aug 25, 2021
bc45d70
fix order
ulysses-you Aug 25, 2021
580a0a4
address comment
ulysses-you Aug 26, 2021
bc39694
address comment
ulysses-you Aug 26, 2021
bb2e713
address comment
ulysses-you Sep 2, 2021
d3f0131
nit
ulysses-you Sep 2, 2021
4712986
nit
ulysses-you Sep 2, 2021
23ebea0
address comment
ulysses-you Sep 5, 2021
ef0765f
pass EnsureRequirements
ulysses-you Sep 7, 2021
76c363d
simplify
ulysses-you Sep 7, 2021
8961084
nit
ulysses-you Sep 7, 2021
5ba73c4
EnsureRequirements
ulysses-you Sep 7, 2021
ca63321
pull out shuffle origin
ulysses-you Sep 9, 2021
f5e4b91
address comment
ulysses-you Sep 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Support optimize skew join even if introduce extra shuffle
  • Loading branch information
ulysses-you committed Jul 2, 2021
commit 49e8bd9d7be91be1f6b23e1e929623cffbd126d2
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ADAPTIVE_FORCE_ENABLE_SKEW_JOIN = buildConf("spark.sql.adaptive.forceEnableSkewJoin")
.doc("When true, force enable OptimizeSkewJoin even if it introduces extra shuffle.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ case class AdaptiveSparkPlanExec(
private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
RemoveRedundantProjects,
EnsureRequirements,
// Apply OptimizeSkewedJoin rule at preparation side so that we can compare the cost of
// skew join and extra shuffle nodes.
OptimizeSkewedJoin,
// Add the EnsureRequirements rule here since OptimizeSkewedJoin may change the
// output partitioning
EnsureRequirements,
RemoveRedundantSorts,
DisableUnnecessaryBucketedScan
) ++ context.session.sessionState.queryStagePrepRules
Expand All @@ -97,8 +103,6 @@ case class AdaptiveSparkPlanExec(
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
PlanAdaptiveDynamicPruningFilters(this),
ReuseAdaptiveSubquery(context.subqueryCache),
// Skew join does not handle `CustomShuffleReader` so needs to be applied first.
OptimizeSkewedJoin,
OptimizeSkewInRebalancePartitions,
CoalesceShufflePartitions(context.session),
// `OptimizeLocalShuffleReader` needs to make use of 'CustomShuffleReaderExec.partitionSpecs'
Expand All @@ -113,6 +117,19 @@ case class AdaptiveSparkPlanExec(
CollapseCodegenStages()
)

// OptimizeSkewedJoin has moved into preparation rules, so we should make
// finalPreparationStageRules same as finalStageOptimizerRules
private def finalPreparationStageRules: Seq[Rule[SparkPlan]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it mainly to exclude OptimizeSkewedJoin in the final stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, currently it's only for OptimizeSkewedJoin

val origins = inputPlan.collect {
case s: ShuffleExchangeLike => s.shuffleOrigin
}
(preprocessingRules ++ queryStagePreparationRules).filter {
case c: CustomShuffleReaderRule =>
origins.forall(c.supportedShuffleOrigins.contains)
case _ => true
}
}

// The partitioning of the query output depends on the shuffle(s) in the final stage. If the
// original plan contains a repartition operator, we need to preserve the specified partitioning,
// whether or not the repartition-introduced shuffle is optimized out because of an underlying
Expand All @@ -130,7 +147,12 @@ case class AdaptiveSparkPlanExec(
}
}

@transient private val costEvaluator = SimpleCostEvaluator
@transient private val costEvaluator =
if (conf.getConf(SQLConf.ADAPTIVE_FORCE_ENABLE_SKEW_JOIN)) {
SkewJoinAwareCostEvaluator
} else {
SimpleCostEvaluator
}

@transient val initialPlan = context.session.withActive {
applyPhysicalRules(
Expand Down Expand Up @@ -593,16 +615,41 @@ case class AdaptiveSparkPlanExec(
logicalPlan
}

private def isFinalStage(sparkPlan: SparkPlan): Boolean = {
sparkPlan match {
// avoid top level node is Exchange
case _: Exchange => false
case plan =>
// Plan is regarded as a final plan iff all shuffle nodes are wrapped inside query stage
// and all query stages are materialized.
plan.find {
case p if p.children.exists(
child => child.isInstanceOf[Exchange] || child.isInstanceOf[ReusedExchangeExec]) =>
p match {
case stage: QueryStageExec if stage.isMaterialized => false
case _ => true
}
case _ => false
}.isEmpty
}
}

/**
* Re-optimize and run physical planning on the current logical plan based on the latest stats.
*/
private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
logicalPlan.invalidateStatsCache()
val optimized = optimizer.execute(logicalPlan)
val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
val rules = if (isFinalStage(sparkPlan)) {
finalPreparationStageRules
} else {
preprocessingRules ++ queryStagePreparationRules
}

val newPlan = applyPhysicalRules(
sparkPlan,
preprocessingRules ++ queryStagePreparationRules,
rules,
Some((planChangeLogger, "AQE Replanning")))

// When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.commons.io.FileUtils

import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, EnsureRequirements, ShuffleExchangeExec, ShuffleOrigin}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleOrigin}
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -52,8 +52,6 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {

override val supportedShuffleOrigins: Seq[ShuffleOrigin] = Seq(ENSURE_REQUIREMENTS)

private val ensureRequirements = EnsureRequirements

/**
* A partition is considered as a skewed partition if its size is larger than the median
* partition size * SKEW_JOIN_SKEWED_PARTITION_FACTOR and also larger than
Expand Down Expand Up @@ -248,18 +246,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {
// Shuffle
// Sort
// Shuffle
val optimizePlan = optimizeSkewJoin(plan)
val numShuffles = ensureRequirements.apply(optimizePlan).collect {
case e: ShuffleExchangeExec => e
}.length

if (numShuffles > 0) {
logDebug("OptimizeSkewedJoin rule is not applied due" +
" to additional shuffles will be introduced.")
plan
} else {
optimizePlan
}
optimizeSkewJoin(plan)
} else {
plan
}
Expand All @@ -268,7 +255,7 @@ object OptimizeSkewedJoin extends CustomShuffleReaderRule {

private object ShuffleStage {
def unapply(plan: SparkPlan): Option[ShuffleQueryStageExec] = plan match {
case s: ShuffleQueryStageExec if s.mapStats.isDefined &&
case s: ShuffleQueryStageExec if s.isMaterialized && s.mapStats.isDefined &&
OptimizeSkewedJoin.supportedShuffleOrigins.contains(s.shuffle.shuffleOrigin) =>
Some(s)
case _ => None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.joins.ShuffledJoin

/**
* A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number
*/
case class SkewJoinAwareCost(numShuffles: Int, numSkewJoins: Int) extends Cost {
override def compare(that: Cost): Int = that match {
case other: SkewJoinAwareCost =>
if (numSkewJoins > other.numSkewJoins || numShuffles < other.numShuffles) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if numSkewJoins < other.numSkewJoins && numShuffles < other.numShuffles, the return value is -1, is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point I think we should always pick the bigger skew join number first.

// If more skew joins are optimized or less shuffle nodes, it means the cost is lower
-1
} else if (numShuffles > other.numShuffles) {
1
} else {
0
}
case _ =>
throw QueryExecutionErrors.cannotCompareCostWithTargetCostError(that.toString)
}
}

/**
* A skew join aware implementation of [[CostEvaluator]], which counts the number of
* [[ShuffleExchangeLike]] nodes and skew join nodes in the plan.
*/
object SkewJoinAwareCostEvaluator extends CostEvaluator {
override def evaluateCost(plan: SparkPlan): Cost = {
val shuffleNumber = plan.collect {
case s: ShuffleExchangeLike => s
}.size
val skewJoinNumber = plan.collect {
case j: ShuffledJoin if j.isSkewJoin => j
}.size
SkewJoinAwareCost(shuffleNumber, skewJoinNumber)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,7 @@ class AdaptiveQueryExecSuite
}
}

<<<<<<< HEAD
test("SPARK-35650: Coalesce number of partitions by AEQ") {
withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
Seq("REPARTITION", "REBALANCE(key)")
Expand Down Expand Up @@ -1885,4 +1886,38 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-33832: Support optimize skew join even if introduce extra shuffle") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.SHUFFLE_PARTITIONS.key -> "10",
SQLConf.ADAPTIVE_FORCE_ENABLE_SKEW_JOIN.key -> "true") {
withTempView("skewData1", "skewData2") {
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 3 as key1", "id as value1")
.createOrReplaceTempView("skewData1")
spark
.range(0, 1000, 1, 10)
.selectExpr("id % 1 as key2", "id as value2")
.createOrReplaceTempView("skewData2")

val (_, adaptive) = runAdaptiveAndVerifyResult(
"SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 GROUP BY key1")
val smj = findTopLevelSortMergeJoin(adaptive)
assert(smj.size == 1 && smj.forall(_.isSkewJoin))
checkNumLocalShuffleReaders(adaptive, 3)

val (_, adaptive2) = runAdaptiveAndVerifyResult(
"SELECT /*+ repartition */ key1 FROM skewData1 JOIN skewData2 ON key1 = key2")
val smj2 = findTopLevelSortMergeJoin(adaptive2)
assert(smj2.size == 1 && smj2.forall(_.isSkewJoin))
checkNumLocalShuffleReaders(adaptive2, 3)
}
}
}
}