Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.DataType

/**
* A pattern that matches any number of filter operations on top of another relational operator.
Expand Down Expand Up @@ -231,3 +232,48 @@ object Unions {
case other => other :: Nil
}
}

/**
* A pattern that finds joins with range conditions that can be evaluated using a range join.
*
* TODO support partial range joins.
*/
object ExtractRangeJoinKeys extends PredicateHelper {
type ReturnType = (LogicalPlan, LogicalPlan, Seq[Expression], Seq[Expression], Seq[Boolean])
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
case Join(left, right, Inner, Some(And(RangePredicate(d1, l1, h1, equi1),
RangePredicate(d2, l2, h2, equi2)))) if d1 == d2 => {
// L.Low < R.High && R.Low < L.High
if (evaluateOrder(l1, h1, l2, h2, left, right)) {
Some(left, right, Seq(l1, h2), Seq(l2, h1), Seq(equi1, equi2))
}
// R.Low < L.High && L.Low < R.High
else if (evaluateOrder(l1, h1, l2, h2, right, left)) {
Some(left, right, Seq(l2, h1), Seq(l1, h2), Seq(equi2, equi1))
}
else None
}
case _ => None
}

def evaluateOrder(low1: Expression, high1: Expression,
low2: Expression, high2: Expression,
left: LogicalPlan, right: LogicalPlan): Boolean = {
canEvaluate(low1, left) && canEvaluate(high1, right) &&
canEvaluate(low2, right) && canEvaluate(high2, left)
}
}

/**
* A pattern that normalizes all range expressions.
*/
object RangePredicate {
def unapply(expression: Expression): Option[(DataType, Expression, Expression, Boolean)] =
expression match {
case LessThan(low, high) => Some(expression.dataType, low, high, false)
case LessThanOrEqual(low, high) => Some(expression.dataType, low, high, true)
case GreaterThan(high, low) => Some(expression.dataType, low, high, false)
case GreaterThanOrEqual(high, low) => Some(expression.dataType, low, high, true)
case _ => None
}
}
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ private[spark] object SQLConf {
defaultValue = Some(false),
doc = "<TODO>")

val RANGE_JOIN = booleanConf("spark.sql.planner.rangeJoin",
defaultValue = Some(false),
doc = "<TODO>")

// This is only used for the thriftserver
val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
doc = "Set a Fair Scheduler pool for a JDBC client session")
Expand Down Expand Up @@ -457,6 +461,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
*/
private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)

/** When true the planner will use range join operator (instead of BNL) for range queries. */
private[spark] def rangeJoinEnabled: Boolean = getConf(RANGE_JOIN)

/**
* When set to true, Spark SQL will use the Janino at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
InMemoryScans ::
ParquetOperations ::
BasicOperators ::
BroadcastRangeJoin ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

object BroadcastRangeJoin extends Strategy {
private[this] def makeRangeJoin(leftRangeKeys: Seq[Expression],
rightRangeKeys: Seq[Expression],
equality: Seq[Boolean],
buildSide: joins.BuildSide,
left: LogicalPlan,
right: LogicalPlan) = {
new joins.BroadcastRangeJoin(
leftRangeKeys,
rightRangeKeys,
equality,
buildSide,
planLater(left),
planLater(right))
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractRangeJoinKeys(CanBroadcast(left), right,
leftKeys, rightKeys, equality) if sqlContext.conf.rangeJoinEnabled =>
makeRangeJoin(leftKeys, rightKeys, equality, joins.BuildLeft, left, right) :: Nil
case ExtractRangeJoinKeys(left, CanBroadcast(right),
leftKeys, rightKeys, equality) if sqlContext.conf.rangeJoinEnabled =>
makeRangeJoin(leftKeys, rightKeys, equality, joins.BuildRight, left, right) :: Nil
case _ => Nil
}
}

object CartesianProduct extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Join(left, right, _, None) =>
Expand Down
Loading