Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
49e8bd9
Support optimize skew join even if introduce extra shuffle
ulysses-you Jun 8, 2021
db77ddd
EnsureRequirements
ulysses-you Jun 8, 2021
a63cd72
make a new rules
ulysses-you Jun 9, 2021
59a5e4a
fix local reader number
ulysses-you Jun 10, 2021
9c985da
more cost
ulysses-you Jul 2, 2021
e2102a5
Merge branch 'master' of https://github.com/apache/spark into support…
ulysses-you Jul 9, 2021
8bc22ad
nit
ulysses-you Jul 9, 2021
7734d3e
nit
ulysses-you Jul 9, 2021
cbc7553
Merge branch 'master' of https://github.com/apache/spark into support…
ulysses-you Aug 3, 2021
3dc61a3
force optimize skewed join
ulysses-you Aug 3, 2021
30b7de0
style
ulysses-you Aug 3, 2021
6caa4a3
name
ulysses-you Aug 3, 2021
cd1a379
final stage
ulysses-you Aug 13, 2021
2b3bfe6
style
ulysses-you Aug 13, 2021
d305894
conflick
ulysses-you Aug 13, 2021
6725f97
checkDistribution
ulysses-you Aug 13, 2021
7a0448b
SimpleCostEvaluator
ulysses-you Aug 19, 2021
60b7b9d
address comment
ulysses-you Aug 19, 2021
fbf9727
cost
ulysses-you Aug 19, 2021
b54e9c2
plan twice
ulysses-you Aug 20, 2021
f5ad40e
nit
ulysses-you Aug 20, 2021
8058fe9
nit
ulysses-you Aug 20, 2021
369bf33
ensureRequiredDistribution
ulysses-you Aug 25, 2021
d93c3df
remove dead code
ulysses-you Aug 25, 2021
b215e2d
simplify code
ulysses-you Aug 25, 2021
5b63e4d
address comment
ulysses-you Aug 25, 2021
3ccc29b
style
ulysses-you Aug 25, 2021
bc45d70
fix order
ulysses-you Aug 25, 2021
580a0a4
address comment
ulysses-you Aug 26, 2021
bc39694
address comment
ulysses-you Aug 26, 2021
bb2e713
address comment
ulysses-you Sep 2, 2021
d3f0131
nit
ulysses-you Sep 2, 2021
4712986
nit
ulysses-you Sep 2, 2021
23ebea0
address comment
ulysses-you Sep 5, 2021
ef0765f
pass EnsureRequirements
ulysses-you Sep 7, 2021
76c363d
simplify
ulysses-you Sep 7, 2021
8961084
nit
ulysses-you Sep 7, 2021
5ba73c4
EnsureRequirements
ulysses-you Sep 7, 2021
ca63321
pull out shuffle origin
ulysses-you Sep 9, 2021
f5e4b91
address comment
ulysses-you Sep 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SimpleCostEvaluator
  • Loading branch information
ulysses-you committed Aug 19, 2021
commit 7a0448b0a80b52bf226f219bea515a5d0f7e96c4
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ object SQLConf {
val ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN =
buildConf("spark.sql.adaptive.forceOptimizeSkewedJoin")
.doc("When true, force enable OptimizeSkewedJoin even if it introduces extra shuffle.")
.version("3.2.0")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ case class AdaptiveSparkPlanExec(
conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match {
case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf)
case _ =>
SkewJoinAwareCostEvaluator(conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN))
SimpleCostEvaluator(conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just move this code up


@transient val initialPlan = context.session.withActive {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.joins.ShuffledJoin

/**
* A simple implementation of [[Cost]], which takes a number of [[Long]] as the cost value.
Expand All @@ -35,15 +36,44 @@ case class SimpleCost(value: Long) extends Cost {
}

/**
* A simple implementation of [[CostEvaluator]], which counts the number of
* [[ShuffleExchangeLike]] nodes in the plan.
* A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more description on how the cost is calculated in the presence of skew joins? What happens if there's two or more extra shuffles by adding a skew join optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, add more comment to show how we pick the cost with skew join and shuffle.

*/
object SimpleCostEvaluator extends CostEvaluator {
case class SkewJoinAwareCost(
numShuffles: Int,
numSkewJoins: Int) extends Cost {
override def compare(that: Cost): Int = that match {
case other: SkewJoinAwareCost =>
if (numSkewJoins > other.numSkewJoins || numShuffles < other.numShuffles) {
// If more skew joins are optimized or less shuffle nodes, it means the cost is lower
-1
} else if (numShuffles > other.numShuffles) {
1
} else {
0
}

case _ =>
throw QueryExecutionErrors.cannotCompareCostWithTargetCostError(that.toString)
}
}

/**
* A skew join aware implementation of [[CostEvaluator]], which counts the number of
* [[ShuffleExchangeLike]] nodes and skew join nodes in the plan.
*/
case class SimpleCostEvaluator(forceOptimizeSkewedJoin: Boolean) extends CostEvaluator {
override def evaluateCost(plan: SparkPlan): Cost = {
val cost = plan.collect {
val shuffleNumber = plan.collect {
case s: ShuffleExchangeLike => s
}.size
SimpleCost(cost)

if (forceOptimizeSkewedJoin) {
val skewJoinNumber = plan.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: numShuffles and numSkewJoins

case j: ShuffledJoin if j.isSkewJoin => j
}.size
SkewJoinAwareCost(shuffleNumber, skewJoinNumber)
} else {
SimpleCost(shuffleNumber)
}
}
}