Skip to content

Commit 76ca434

Browse files
committed
A simple strategy that broadcasts tables only when they are found in a configuration hint.
1 parent cf6b381 commit 76ca434

File tree

1 file changed

+25
-4
lines changed

1 file changed

+25
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,41 @@ import org.apache.spark.sql.{SQLContext, execution}
2121
import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.planning._
2323
import org.apache.spark.sql.catalyst.plans._
24-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
2525
import org.apache.spark.sql.catalyst.plans.physical._
2626
import org.apache.spark.sql.parquet._
2727

2828
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
2929
self: SQLContext#SparkPlanner =>
3030

31+
/**
32+
* Uses the HashFilteredJoin pattern to find joins where at least some of the predicates can be
33+
* evaluated by matching hash keys.
34+
*/
3135
object HashJoin extends Strategy with PredicateHelper {
36+
var broadcastTables: Seq[String] =
37+
sparkContext.conf.get("spark.sql.hints.broadcastTables", "").split(",")
38+
3239
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
33-
// Find inner joins where at least some predicates can be evaluated by matching hash keys
34-
// using the HashFilteredJoin pattern.
40+
41+
case HashFilteredJoin(Inner, leftKeys, rightKeys, condition,
42+
left, PhysicalOperation(_, _, b: BaseRelation)) if broadcastTables.contains(b.tableName)=>
43+
val hashJoin =
44+
execution.BroadcastHashJoin(
45+
leftKeys, rightKeys, BuildRight, planLater(left), planLater(b))(sparkContext)
46+
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
47+
48+
case HashFilteredJoin(Inner, leftKeys, rightKeys, condition,
49+
PhysicalOperation(_, _, b: BaseRelation), right) if broadcastTables.contains(b.tableName)=>
50+
val hashJoin =
51+
execution.BroadcastHashJoin(
52+
leftKeys, rightKeys, BuildLeft, planLater(b), planLater(right))(sparkContext)
53+
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
54+
3555
case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
3656
val hashJoin =
37-
execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
57+
execution.ShuffledHashJoin(
58+
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
3859
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
3960
case _ => Nil
4061
}

0 commit comments

Comments
 (0)