Skip to content
Closed
Prev Previous commit
Next Next commit
resolve the comment and update the package of ReduceNumShufflePartiti…
…ons rule
  • Loading branch information
JkSelf committed Oct 31, 2019
commit e510e96d6ea489092385e4f8e50da05df8e0b6fb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,25 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartit
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.internal.SQLConf

object BroadcastJoinWithShuffleLeft {
def unapply(plan: SparkPlan): Option[(QueryStageExec, BuildSide)] = plan match {
case join: BroadcastHashJoinExec if ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) =>
Some((join.left.asInstanceOf[QueryStageExec], join.buildSide))
case _ => None
}
}

object BroadcastJoinWithShuffleRight {
def unapply(plan: SparkPlan): Option[(QueryStageExec, BuildSide)] = plan match {
case join: BroadcastHashJoinExec if ShuffleQueryStageExec.isShuffleQueryStageExec(join.right) =>
Some((join.right.asInstanceOf[QueryStageExec], join.buildSide))
case _ => None
}
}

/**
* A rule to optimize the shuffle reader to local reader as far as possible
* when converting the 'SortMergeJoinExec' to 'BroadcastHashJoinExec' in runtime.
Expand Down Expand Up @@ -55,12 +71,12 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
}
// Add local reader in probe side.
val withProbeSideLocalReader = plan.transformDown {
case join: BroadcastHashJoinExec if canUseLocalShuffleReaderProbeRight(join) =>
val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
join.copy(right = localReader)
case join: BroadcastHashJoinExec if canUseLocalShuffleReaderProbeLeft(join) =>
val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
join.copy(left = localReader)
case join @ BroadcastJoinWithShuffleLeft(shuffleStage, BuildRight) =>
val localReader = LocalShuffleReaderExec(shuffleStage)
join.asInstanceOf[BroadcastHashJoinExec].copy(left = localReader)
case join @ BroadcastJoinWithShuffleRight(shuffleStage, BuildLeft) =>
val localReader = LocalShuffleReaderExec(shuffleStage)
join.asInstanceOf[BroadcastHashJoinExec].copy(right = localReader)
}

def numExchanges(plan: SparkPlan): Int = {
Expand All @@ -81,14 +97,12 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] {
// Add the local reader in build side and and do not need to check whether
// additional shuffle introduced.
optimizedPlan.transformDown {
case join: BroadcastHashJoinExec if (join.buildSide == BuildLeft &&
ShuffleQueryStageExec.isShuffleQueryStageExec(join.left)) =>
val localReader = LocalShuffleReaderExec(join.left.asInstanceOf[QueryStageExec])
join.copy(left = localReader)
case join: BroadcastHashJoinExec if (join.buildSide == BuildRight &&
ShuffleQueryStageExec.isShuffleQueryStageExec(join.right)) =>
val localReader = LocalShuffleReaderExec(join.right.asInstanceOf[QueryStageExec])
join.copy(right = localReader)
case join @ BroadcastJoinWithShuffleLeft(shuffleStage, BuildLeft) =>
val localReader = LocalShuffleReaderExec(shuffleStage)
join.asInstanceOf[BroadcastHashJoinExec].copy(left = localReader)
case join @ BroadcastJoinWithShuffleRight(shuffleStage, BuildRight) =>
val localReader = LocalShuffleReaderExec(shuffleStage)
join.asInstanceOf[BroadcastHashJoinExec].copy(right = localReader)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive.rule
package org.apache.spark.sql.execution.adaptive

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration
Expand Down