Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add more agg exec
  • Loading branch information
LantaoJin committed Jul 8, 2020
commit 80bef0d2c22bf91d216784f29839b95f22fb230f
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -133,13 +133,21 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {

private def canSplitLeftSide(joinType: JoinType, plan: SparkPlan) = {
(joinType == Inner || joinType == Cross || joinType == LeftSemi ||
joinType == LeftAnti || joinType == LeftOuter) &&
plan.find(_.isInstanceOf[HashAggregateExec]).isEmpty
joinType == LeftAnti || joinType == LeftOuter) && !containsAggregateExec(plan)
}

private def canSplitRightSide(joinType: JoinType, plan: SparkPlan) = {
(joinType == Inner || joinType == Cross || joinType == RightOuter) &&
plan.find(_.isInstanceOf[HashAggregateExec]).isEmpty
(joinType == Inner || joinType == Cross ||
joinType == RightOuter) && !containsAggregateExec(plan)
}

private def containsAggregateExec(plan: SparkPlan) = {
plan.find {
case _: HashAggregateExec => true
case _: SortAggregateExec => true
case _: ObjectHashAggregateExec => true
case _ => false
}.isDefined
}

private def getSizeInfo(medianSize: Long, sizes: Seq[Long]): String = {
Expand Down