Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update the comments
  • Loading branch information
JkSelf committed Oct 23, 2019
commit 1bc418e5793afb488cd25ccb88331b97c9608d86
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, subqueryCache),
// Here we need put the OptimizeLocalShuffleReader rule before
// ReduceNumShufflePartitions rule to avoid the further optimizaiton.
// We will revert the all local shuffle reader node in OptimizeLocalShuffleReader rule
Copy link
Contributor

@cloud-fan cloud-fan Oct 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To polish it a little bit:

When adding local shuffle readers in `OptimizeLocalShuffleReader`, we revert all the local
readers if additional shuffles are introduced. This may be too conservative: maybe there is
only one local reader that introduces shuffle, and we can still keep other local readers.
Here we re-execute this rule with the sub-plan-tree of a query stage, to make sure necessary
local readers are added before executing the query stage.
This rule must be executed before `ReduceNumShufflePartitions`, as local shuffle readers can't
change number of partitions.

// when introduce additional shuffle. It may be too conservative. So we also re-execute this
// rule when creating new query stage.
// Why need put the OptimizeLocalShuffleReader rule before ReduceNumShufflePartitions rule:
// After we optimize the shuffle reader to local shuffle reader(leaf node),
// it does not need to further optimize the reduce number. So we need to put
// the OptimizeLocalShuffleReader rule before ReduceNumShufflePartitions rule.
OptimizeLocalShuffleReader(conf),
ReduceNumShufflePartitions(conf),
ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 3)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 3)
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
// The child of remaining one BroadcastHashJoin is not ShuffleQueryStage.
// So only two LocalShuffleReader.
checkNumLocalShuffleReaders(adaptivePlan, 2)
Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, would the change make this value consistently be 2? Because the value has changed to 2 but the value was actually flaky (neither 1 or 2 consistently) depending on the situation/randomness (maybe).

You may want to run the same for what I've discovered, 1) solely in local dev, 2) test suite in local dev, 3) trigger CI for 5 times or alike.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR With this patch, the value will consistently be 2. Because we already optimize all the possible local shuffle reader. And I have run in local dev and also the test suite, the value are all 2. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK thanks for confirming.

}
}
Expand All @@ -188,7 +189,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 3)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 3)
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
// The child of remaining two BroadcastHashJoin is not ShuffleQueryStage.
// So only two LocalShuffleReader.
checkNumLocalShuffleReaders(adaptivePlan, 1)
}
}
Expand All @@ -213,7 +215,8 @@ class AdaptiveQueryExecSuite
assert(smj.size == 3)
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
assert(bhj.size == 3)
// additional shuffle exchange introduced, only one shuffle reader to local shuffle reader.
// The child of remaining two BroadcastHashJoin is not ShuffleQueryStage.
// So only two LocalShuffleReader.
checkNumLocalShuffleReaders(adaptivePlan, 1)
}
}
Expand Down