Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
49e8bd9
Support optimize skew join even if introduce extra shuffle
ulysses-you Jun 8, 2021
db77ddd
EnsureRequirements
ulysses-you Jun 8, 2021
a63cd72
make a new rules
ulysses-you Jun 9, 2021
59a5e4a
fix local reader number
ulysses-you Jun 10, 2021
9c985da
more cost
ulysses-you Jul 2, 2021
e2102a5
Merge branch 'master' of https://github.com/apache/spark into support…
ulysses-you Jul 9, 2021
8bc22ad
nit
ulysses-you Jul 9, 2021
7734d3e
nit
ulysses-you Jul 9, 2021
cbc7553
Merge branch 'master' of https://github.com/apache/spark into support…
ulysses-you Aug 3, 2021
3dc61a3
force optimize skewed join
ulysses-you Aug 3, 2021
30b7de0
style
ulysses-you Aug 3, 2021
6caa4a3
name
ulysses-you Aug 3, 2021
cd1a379
final stage
ulysses-you Aug 13, 2021
2b3bfe6
style
ulysses-you Aug 13, 2021
d305894
conflick
ulysses-you Aug 13, 2021
6725f97
checkDistribution
ulysses-you Aug 13, 2021
7a0448b
SimpleCostEvaluator
ulysses-you Aug 19, 2021
60b7b9d
address comment
ulysses-you Aug 19, 2021
fbf9727
cost
ulysses-you Aug 19, 2021
b54e9c2
plan twice
ulysses-you Aug 20, 2021
f5ad40e
nit
ulysses-you Aug 20, 2021
8058fe9
nit
ulysses-you Aug 20, 2021
369bf33
ensureRequiredDistribution
ulysses-you Aug 25, 2021
d93c3df
remove dead code
ulysses-you Aug 25, 2021
b215e2d
simplify code
ulysses-you Aug 25, 2021
5b63e4d
address comment
ulysses-you Aug 25, 2021
3ccc29b
style
ulysses-you Aug 25, 2021
bc45d70
fix order
ulysses-you Aug 25, 2021
580a0a4
address comment
ulysses-you Aug 26, 2021
bc39694
address comment
ulysses-you Aug 26, 2021
bb2e713
address comment
ulysses-you Sep 2, 2021
d3f0131
nit
ulysses-you Sep 2, 2021
4712986
nit
ulysses-you Sep 2, 2021
23ebea0
address comment
ulysses-you Sep 5, 2021
ef0765f
pass EnsureRequirements
ulysses-you Sep 7, 2021
76c363d
simplify
ulysses-you Sep 7, 2021
8961084
nit
ulysses-you Sep 7, 2021
5ba73c4
EnsureRequirements
ulysses-you Sep 7, 2021
ca63321
pull out shuffle origin
ulysses-you Sep 9, 2021
f5e4b91
address comment
ulysses-you Sep 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
nit
  • Loading branch information
ulysses-you committed Jul 9, 2021
commit 7734d3eec1078a56f600ce7b6e718824c09a0e3b
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,25 @@ case class AdaptiveSparkPlanExec(
// plans are updated, we can clear the query stage list because at this point the two plans
// are semantically and physically in sync again.
val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
val (reOptimizePhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
val planWithExtraShuffle = rePlanWithExtraShuffle(reOptimizePhysicalPlan)
val (reOptimizationPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think reOptimize should return (Seq[SparkPlan], LogicalPlan), and here we pick the spark plan with the least cost between the current spark plan and newPhysicalPlans.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the conf is false, the returned Seq[SparkPlan] will be Nil, so that we don't introduce extra overhead if force skew join is disabled.

val planWithExtraShuffle = rePlanWithExtraShuffle(reOptimizationPhysicalPlan)
val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
val newCost = costEvaluator.evaluateCost(reOptimizePhysicalPlan)
val reOptimizationCost = costEvaluator.evaluateCost(reOptimizationPhysicalPlan)
val extraShuffleCost = costEvaluator.evaluateCost(planWithExtraShuffle)
def updateCurrentPlan(newPhysicalPlan: SparkPlan): Unit = {
logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
logOnLevel(s"Plan changed from\n$currentPhysicalPlan\nto\n$newPhysicalPlan")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
stagesToReplace = Seq.empty[QueryStageExec]
}

if (extraShuffleCost < newCost ||
(extraShuffleCost == newCost && planWithExtraShuffle != reOptimizePhysicalPlan)) {
if (extraShuffleCost < reOptimizationCost || (extraShuffleCost == reOptimizationCost &&
reOptimizationPhysicalPlan != planWithExtraShuffle)) {
updateCurrentPlan(planWithExtraShuffle)
} else if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != reOptimizePhysicalPlan)) {
updateCurrentPlan(reOptimizePhysicalPlan)
} else if (reOptimizationCost < origCost ||
(reOptimizationCost == origCost && currentPhysicalPlan != reOptimizationPhysicalPlan)) {
updateCurrentPlan(reOptimizationPhysicalPlan)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan here use 3 costs to find the better plan

  1. plan with skew join if force optimize skew join
  2. plan with reOptimize if not force optimize skew join and has no extra shuffle
  3. origin plan if reOptimize has extra shuffle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and we pick one from these 3 with the lowest cost.

// Now that some stages have finished, we can try creating new stages.
result = createQueryStages(currentPhysicalPlan)
Expand Down