Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Separate rule.
  • Loading branch information
rxin committed Feb 14, 2017
commit 51a73d510a5faf3bbed8bb76ffe5590c68a67e2c
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ class Analyzer(
val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil

lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new SubstituteHints.SubstituteBroadcastHints(conf),
SubstituteHints.RemoveAllHints),
Batch("Substitution", fixedPoint,
new SubstituteHints(conf),
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,62 +24,80 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin


/**
* Substitute Hints.
* - BROADCAST/BROADCASTJOIN/MAPJOIN match the closest table with the given name parameters.
* Collection of rules related to hints. The only hint currently available is broadcast join hint.
*
* In the case of broadcast hint, we find the frontier of
*
* This rule substitutes `UnresolvedRelation`s in `Substitute` batch before `ResolveRelations`
* rule is applied. Here are two reasons.
* - To support `MetastoreRelation` in Hive module.
* - To reduce the effect of `Hint` on the other rules.
*
* After this rule, it is guaranteed that there exists no unknown `Hint` in the plan.
* All new `Hint`s should be transformed into concrete Hint classes `BroadcastHint` here.
* Note that this is separatedly into two rules because in the future we might introduce new hint
* rules that have different ordering requirements from broadcast.
*/
class SubstituteHints(conf: CatalystConf) extends Rule[LogicalPlan] {
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")
object SubstituteHints {

/**
* Substitute Hints.
*
* The only hint currently available is broadcast join hint.
*
* For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
* relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
* on top of any relation (that is not aliased differently), subquery, or common table expression
* that match the specified name.
*
* The hint resolution works by recursively traversing down the query plan to find a relation or
* subquery that matches one of the specified broadcast aliases. The traversal does not go past
* beyond any existing broadcast hints, subquery aliases.
*
* This rule must happen before common table expressions.
*/
class SubstituteBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")

def resolver: Resolver = conf.resolver
def resolver: Resolver = conf.resolver

private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
// Whether to continue recursing down the tree
var recurse = true
private def applyBroadcastHint(plan: LogicalPlan, toBroadcast: Set[String]): LogicalPlan = {
// Whether to continue recursing down the tree
var recurse = true

val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
case r: UnresolvedRelation =>
val alias = r.alias.getOrElse(r.tableIdentifier.table)
if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
case r: SubqueryAlias =>
if (toBroadcast.exists(resolver(_, r.alias))) {
BroadcastHint(plan)
} else {
// Don't recurse down subquery aliases if there are no match.
val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
case r: UnresolvedRelation =>
val alias = r.alias.getOrElse(r.tableIdentifier.table)
if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
case r: SubqueryAlias =>
if (toBroadcast.exists(resolver(_, r.alias))) {
BroadcastHint(plan)
} else {
// Don't recurse down subquery aliases if there are no match.
recurse = false
plan
}
case _: BroadcastHint =>
// Found a broadcast hint; don't change the plan but also don't recurse down.
recurse = false
plan
}
case _: BroadcastHint =>
// Found a broadcast hint; don't change the plan but also don't recurse down.
recurse = false
plan
case _ =>
plan
case _ =>
plan
}
}

if ((plan fastEquals newNode) && recurse) {
newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case of self-join, we may broadcast both side, is it expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine. Both being broadcastable doesn't mean we broadcast both.

} else {
newNode
}
}

if ((plan fastEquals newNode) && recurse) {
newNode.mapChildren(child => applyBroadcastHint(child, toBroadcast))
} else {
newNode
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
applyBroadcastHint(h.child, h.parameters.toSet)
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase) =>
applyBroadcastHint(h.child, h.parameters.toSet)

// Remove unrecognized hints
case h: Hint => h.child
/**
* Removes all the hints. This must be executed after all the other hint rules are executed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mention that, this can happen when users specify invalid hints and we will ignore it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea let me add that.

*/
object RemoveAllHints extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case h: Hint => h.child
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import org.apache.spark.sql.catalyst.plans.logical._
class SubstituteHintsSuite extends AnalysisTest {
import org.apache.spark.sql.catalyst.analysis.TestRelations._

test("invalid hints should be ignored") {
checkAnalysis(
Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
testRelation,
caseSensitive = false)
}

test("case-sensitive or insensitive parameters") {
checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
Expand Down