Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) {
* that schema.
*
* In contrast to a normal projection, a MutableProjection reuses the same underlying row object
* each time an input row is added. This significatly reduces the cost of calcuating the
* projection, but means that it is not safe
* each time an input row is added. This significantly reduces the cost of calculating the
* projection, but means that it is not safe to hold on to a reference to a [[Row]] after `next()`
* has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()`
* and hold on to the returned [[Row]] before calling `next()`.
*/
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
Expand All @@ -67,7 +69,7 @@ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row)
}

/**
* A mutable wrapper that makes two rows appear appear as a single concatenated row. Designed to
* A mutable wrapper that makes two rows appear as a single concatenated row. Designed to
* be instantiated once per thread and reused.
*/
class JoinedRow extends Row {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@ abstract class BaseRelation extends LeafNode {
self: Product =>

def tableName: String
def isPartitioned: Boolean = false
}
17 changes: 17 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,26 @@ import scala.collection.JavaConverters._
*/
trait SQLConf {

/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?

/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to 0
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*/
private[spark] def autoConvertJoinSize: Int =
get("spark.sql.auto.convert.join.size", "10000").toInt

/** A comma-separated list of table names marked to be broadcasted during joins. */
private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "")

/** ********************** SQLConf functionality methods ************ */

@transient
private val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
catalog.registerTable(None, tableName, rdd.logicalPlan)
val name = tableName
val newPlan = rdd.logicalPlan transform {
case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name)
}
catalog.registerTable(None, tableName, newPlan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.spark.sql.{Logging, Row}
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -66,19 +66,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
* linking.
*/
@DeveloperApi
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
extends logical.LogicalPlan with MultiInstanceRelation {
case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan")
extends BaseRelation with MultiInstanceRelation {

def output = alreadyPlanned.output
def references = Set.empty
def children = Nil
override def references = Set.empty
override def children = Nil

override final def newInstance: this.type = {
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}, tableName)
.asInstanceOf[this.type]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
import org.apache.spark.sql.parquet._

private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
Expand All @@ -45,14 +45,52 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

/**
* Uses the HashFilteredJoin pattern to find joins where at least some of the predicates can be
* evaluated by matching hash keys.
*/
object HashJoin extends Strategy with PredicateHelper {
private[this] def broadcastHashJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
left: LogicalPlan,
right: LogicalPlan,
condition: Option[Expression],
side: BuildSide) = {
val broadcastHashJoin = execution.BroadcastHashJoin(
leftKeys, rightKeys, side, planLater(left), planLater(right))(sqlContext)
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
}

def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find inner joins where at least some predicates can be evaluated by matching hash keys
// using the HashFilteredJoin pattern.
case HashFilteredJoin(
Inner,
leftKeys,
rightKeys,
condition,
left,
right @ PhysicalOperation(_, _, b: BaseRelation))
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

case HashFilteredJoin(
Inner,
leftKeys,
rightKeys,
condition,
left @ PhysicalOperation(_, _, b: BaseRelation),
right)
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
execution.ShuffledHashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil

case _ => Nil
}
}
Expand All @@ -62,10 +100,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Aggregate(groupingExpressions, aggregateExpressions, child) =>
// Collect all aggregate expressions.
val allAggregates =
aggregateExpressions.flatMap(_ collect { case a: AggregateExpression => a})
aggregateExpressions.flatMap(_ collect { case a: AggregateExpression => a })
// Collect all aggregate expressions that can be computed partially.
val partialAggregates =
aggregateExpressions.flatMap(_ collect { case p: PartialAggregate => p})
aggregateExpressions.flatMap(_ collect { case p: PartialAggregate => p })

// Only do partial aggregation if supported by all aggregate expressions.
if (allAggregates.size == partialAggregates.size) {
Expand Down Expand Up @@ -242,7 +280,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.ExistingRdd(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,3 @@ object ExistingRdd {
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute() = rdd
}

Loading