Skip to content

Commit 65805ab

Browse files
committed
Revert "Revert "[SPARK-13383][SQL] Keep broadcast hint after column pruning""
This reverts commit 382b27b.
1 parent d563c8f commit 65805ab

File tree

3 files changed

+42
-9
lines changed

3 files changed

+42
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ case class Join(
332332
*/
333333
case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
334334
override def output: Seq[Attribute] = child.output
335+
336+
// We manually set statistics of BroadcastHint to smallest value to make sure
337+
// the plan wrapped by BroadcastHint will be considered to broadcast later.
338+
override def statistics: Statistics = Statistics(sizeInBytes = 1)
335339
}
336340

337341
case class InsertIntoTable(

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala renamed to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
2323
import org.apache.spark.sql.catalyst.dsl.plans._
2424
import org.apache.spark.sql.catalyst.expressions.Expression
2525
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
26-
import org.apache.spark.sql.catalyst.plans.PlanTest
27-
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
26+
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
27+
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.rules.RuleExecutor
2929

3030

31-
class JoinOrderSuite extends PlanTest {
31+
class JoinOptimizationSuite extends PlanTest {
3232

3333
object Optimize extends RuleExecutor[LogicalPlan] {
3434
val batches =
3535
Batch("Subqueries", Once,
3636
EliminateSubqueryAliases) ::
37-
Batch("Filter Pushdown", Once,
37+
Batch("Filter Pushdown", FixedPoint(100),
3838
CombineFilters,
3939
PushPredicateThroughProject,
4040
BooleanSimplification,
@@ -92,4 +92,31 @@ class JoinOrderSuite extends PlanTest {
9292

9393
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
9494
}
95+
96+
test("broadcasthint sets relation statistics to smallest value") {
97+
val input = LocalRelation('key.int, 'value.string)
98+
99+
val query =
100+
Project(Seq($"x.key", $"y.key"),
101+
Join(
102+
SubqueryAlias("x", input),
103+
BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
104+
105+
val optimized = Optimize.execute(query)
106+
107+
val expected =
108+
Project(Seq($"x.key", $"y.key"),
109+
Join(
110+
Project(Seq($"x.key"), SubqueryAlias("x", input)),
111+
BroadcastHint(
112+
Project(Seq($"y.key"), SubqueryAlias("y", input))),
113+
Inner, None)).analyze
114+
115+
comparePlans(optimized, expected)
116+
117+
val broadcastChildren = optimized.collect {
118+
case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r
119+
}
120+
assert(broadcastChildren.size == 1)
121+
}
95122
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
8181
* Matches a plan whose output should be small enough to be used in broadcast join.
8282
*/
8383
object CanBroadcast {
84-
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
85-
case BroadcastHint(p) => Some(p)
86-
case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
87-
p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => Some(p)
88-
case _ => None
84+
def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
85+
if (sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
86+
plan.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) {
87+
Some(plan)
88+
} else {
89+
None
90+
}
8991
}
9092
}
9193

0 commit comments

Comments
 (0)