Skip to content
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aa7120e
Initial Broadcast design
hvanhovell Feb 4, 2016
c2b7533
Fix Exchange and initial code gen attempt.
hvanhovell Feb 4, 2016
6a5568a
Move broadcast retreval to SparkPlan
hvanhovell Feb 6, 2016
9adecdd
Merge remote-tracking branch 'spark/master' into SPARK-13136
hvanhovell Feb 6, 2016
d0194fb
Fix Codegen & Add other broadcast joins.
hvanhovell Feb 6, 2016
02a61b8
Minor touchup
hvanhovell Feb 6, 2016
c12c8e6
Move broadcast relation retrieval.
hvanhovell Feb 7, 2016
c7dd7ae
Remove codegen from broadcast.
hvanhovell Feb 8, 2016
e847383
Merge remote-tracking branch 'spark/master' into SPARK-13136
hvanhovell Feb 10, 2016
d73f11c
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 12, 2016
9c0f4bf
Remove closure passing.
hvanhovell Feb 14, 2016
da4a966
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 14, 2016
681f347
Move transform into BroadcastMode
hvanhovell Feb 15, 2016
7db240a
Clean-up
hvanhovell Feb 15, 2016
3ad839d
Code Review.
hvanhovell Feb 16, 2016
1116768
No newline at EOF :(
hvanhovell Feb 16, 2016
a5501cf
Rename exchanges and merge Broadcast.scala into exchange.scala.
hvanhovell Feb 17, 2016
c7429bb
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 17, 2016
b12bbc2
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 20, 2016
9d52650
Revert renaming of variabels in LeftSemiJoinBNL.
hvanhovell Feb 20, 2016
54b558d
Revert renaming of variabels in LeftSemiJoinBNL.
hvanhovell Feb 20, 2016
f33d2cb
Move all exchange related operators into the exchange package.
hvanhovell Feb 21, 2016
28363c8
CR
hvanhovell Feb 21, 2016
f812a31
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 21, 2016
4b5978b
put broadcast mode in a separate file.
hvanhovell Feb 21, 2016
c8c175e
Fix style in sqlcontext.
hvanhovell Feb 21, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans.physical

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{DataType, IntegerType}

Expand Down Expand Up @@ -75,6 +76,27 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
def clustering: Set[Expression] = ordering.map(_.child).toSet
}

/**
* Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
* identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
*/
trait BroadcastMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this and IdentityBroadcastMode into a new file.

def transform(rows: Array[InternalRow]): Any
}

/**
* IdentityBroadcastMode requires that rows are broadcasted in their original form.
*/
case object IdentityBroadcastMode extends BroadcastMode {
override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows
}

/**
* Represents data where tuples are broadcasted to every node. It is quite common that the
* entire set of tuples is transformed into different data structure.
*/
case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok for now, but technically once we introduce this, "Distribution" is not just about "distribution" anymore, but rather some data property.


/**
* Describes how an operator's output is split across partitions. The `compatibleWith`,
* `guarantees`, and `satisfies` methods describe relationships between child partitionings,
Expand Down Expand Up @@ -213,7 +235,10 @@ case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning {
case object SinglePartition extends Partitioning {
val numPartitions = 1

override def satisfies(required: Distribution): Boolean = true
override def satisfies(required: Distribution): Boolean = required match {
case _: BroadcastDistribution => false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is ok for now, but technically we don't need to introduce an exchange if both sides of the join have only one partition. i guess this framework does not currently handle that.

case _ => true
}

override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1

Expand Down Expand Up @@ -351,3 +376,21 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
partitionings.map(_.toString).mkString("(", " or ", ")")
}
}

/**
* Represents a partitioning where rows are collected, transformed and broadcasted to each
* node in the cluster.
*/
case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and partitioning here isn't really just about partitioning anymore. it also includes how data is physically organized on each partition.

override val numPartitions: Int = 1

override def satisfies(required: Distribution): Boolean = required match {
case BroadcastDistribution(m) if m == mode => true
case _ => false
}

override def compatibleWith(other: Partitioning): Boolean = other match {
case BroadcastPartitioning(m) if m == mode => true
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.InternalRow
*
* A coordinator is constructed with three parameters, `numExchanges`,
* `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
* - `numExchanges` is used to indicated that how many [[Exchange]]s that will be registered to
* this coordinator. So, when we start to do any actual work, we have a way to make sure that
* we have got expected number of [[Exchange]]s.
* - `numExchanges` is used to indicated that how many [[ShuffleExchange]]s that will be registered
* to this coordinator. So, when we start to do any actual work, we have a way to make sure that
* we have got expected number of [[ShuffleExchange]]s.
* - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
* input data size. With this parameter, we can estimate the number of post-shuffle partitions.
* This parameter is configured through
Expand All @@ -45,26 +45,27 @@ import org.apache.spark.sql.catalyst.InternalRow
* partitions.
*
* The workflow of this coordinator is described as follows:
* - Before the execution of a [[SparkPlan]], for an [[Exchange]] operator,
* - Before the execution of a [[SparkPlan]], for an [[ShuffleExchange]] operator,
* if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
* This happens in the `doPrepare` method.
* - Once we start to execute a physical plan, an [[Exchange]] registered to this coordinator will
* call `postShuffleRDD` to get its corresponding post-shuffle [[ShuffledRowRDD]].
* If this coordinator has made the decision on how to shuffle data, this [[Exchange]] will
* immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
* - Once we start to execute a physical plan, an [[ShuffleExchange]] registered to this
* coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
* [[ShuffledRowRDD]].
* If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchange]]
* will immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
* - If this coordinator has not made the decision on how to shuffle data, it will ask those
* registered [[Exchange]]s to submit their pre-shuffle stages. Then, based on the the size
* statistics of pre-shuffle partitions, this coordinator will determine the number of
* registered [[ShuffleExchange]]s to submit their pre-shuffle stages. Then, based on the the
* size statistics of pre-shuffle partitions, this coordinator will determine the number of
* post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
* to a single post-shuffle partition whenever necessary.
* - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
* [[Exchange]]s. So, when an [[Exchange]] calls `postShuffleRDD`, this coordinator can
* lookup the corresponding [[RDD]].
* [[ShuffleExchange]]s. So, when an [[ShuffleExchange]] calls `postShuffleRDD`, this coordinator
* can lookup the corresponding [[RDD]].
*
* The strategy used to determine the number of post-shuffle partitions is described as follows.
* To determine the number of post-shuffle partitions, we have a target input size for a
* post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
* corresponding to the registered [[Exchange]]s, we will do a pass of those statistics and
* corresponding to the registered [[ShuffleExchange]]s, we will do a pass of those statistics and
* pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
* the size of a post-shuffle partition is equal or greater than the target size.
* For example, we have two stages with the following pre-shuffle partition size statistics:
Expand All @@ -83,23 +84,23 @@ private[sql] class ExchangeCoordinator(
extends Logging {

// The registered Exchange operators.
private[this] val exchanges = ArrayBuffer[Exchange]()
private[this] val exchanges = ArrayBuffer[ShuffleExchange]()

// This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
private[this] val postShuffleRDDs: JMap[Exchange, ShuffledRowRDD] =
new JHashMap[Exchange, ShuffledRowRDD](numExchanges)
private[this] val postShuffleRDDs: JMap[ShuffleExchange, ShuffledRowRDD] =
new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)

// A boolean that indicates if this coordinator has made decision on how to shuffle data.
// This variable will only be updated by doEstimationIfNecessary, which is protected by
// synchronized.
@volatile private[this] var estimated: Boolean = false

/**
* Registers an [[Exchange]] operator to this coordinator. This method is only allowed to be
* called in the `doPrepare` method of an [[Exchange]] operator.
* Registers an [[ShuffleExchange]] operator to this coordinator. This method is only allowed to
* be called in the `doPrepare` method of an [[ShuffleExchange]] operator.
*/
@GuardedBy("this")
def registerExchange(exchange: Exchange): Unit = synchronized {
def registerExchange(exchange: ShuffleExchange): Unit = synchronized {
exchanges += exchange
}

Expand Down Expand Up @@ -199,7 +200,7 @@ private[sql] class ExchangeCoordinator(
// Make sure we have the expected number of registered Exchange operators.
assert(exchanges.length == numExchanges)

val newPostShuffleRDDs = new JHashMap[Exchange, ShuffledRowRDD](numExchanges)
val newPostShuffleRDDs = new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)

// Submit all map stages
val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
Expand Down Expand Up @@ -254,7 +255,7 @@ private[sql] class ExchangeCoordinator(
}
}

def postShuffleRDD(exchange: Exchange): ShuffledRowRDD = {
def postShuffleRDD(exchange: ShuffleExchange): ShuffledRowRDD = {
doEstimationIfNecessary()

if (!postShuffleRDDs.containsKey(exchange)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
import org.apache.spark.broadcast
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
Expand Down Expand Up @@ -105,14 +106,29 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

/**
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
* after adding query plan information to created RDDs for visualization.
* Concrete implementations of SparkPlan should override doExecute instead.
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute after
* preparations. Concrete implementations of SparkPlan should override doExecute.
*/
final def execute(): RDD[InternalRow] = {
final def execute(): RDD[InternalRow] = executeQuery {
doExecute()
}

/**
* Returns the result of this query as a broadcast variable by delegating to doBroadcast after
* preparations. Concrete implementations of SparkPlan should override doBroadcast.
*/
final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
doExecuteBroadcast()
}

/**
* Execute a query after preparing the query and adding query plan information to created RDDs
* for visualization.
*/
private final def executeQuery[T](query: => T): T = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
doExecute()
query
}
}

Expand Down Expand Up @@ -142,6 +158,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
protected def doExecute(): RDD[InternalRow]

/**
* Overridden by concrete implementations of SparkPlan.
* Produces the result of the query as a broadcast variable.
*/
protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
throw new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast")
}

/**
* Runs this query returning the result as an array.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
execution.Exchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
execution.ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.Coalesce(numPartitions, planLater(child)) :: Nil
}
Expand Down Expand Up @@ -367,7 +367,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r @ logical.Range(start, end, step, numSlices, output) =>
execution.Range(start, step, numSlices, r.numElements, output) :: Nil
case logical.RepartitionByExpression(expressions, child, nPartitions) =>
execution.Exchange(HashPartitioning(
execution.ShuffleExchange(HashPartitioning(
expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
case e @ python.EvaluatePython(udf, child, _) =>
python.BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -170,6 +171,10 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
child.execute()
}

override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
child.doExecuteBroadcast()
}

override def supportCodegen: Boolean = false

override def upstream(): RDD[InternalRow] = {
Expand Down
Loading