Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aa7120e
Initial Broadcast design
hvanhovell Feb 4, 2016
c2b7533
Fix Exchange and initial code gen attempt.
hvanhovell Feb 4, 2016
6a5568a
Move broadcast retreval to SparkPlan
hvanhovell Feb 6, 2016
9adecdd
Merge remote-tracking branch 'spark/master' into SPARK-13136
hvanhovell Feb 6, 2016
d0194fb
Fix Codegen & Add other broadcast joins.
hvanhovell Feb 6, 2016
02a61b8
Minor touchup
hvanhovell Feb 6, 2016
c12c8e6
Move broadcast relation retrieval.
hvanhovell Feb 7, 2016
c7dd7ae
Remove codegen from broadcast.
hvanhovell Feb 8, 2016
e847383
Merge remote-tracking branch 'spark/master' into SPARK-13136
hvanhovell Feb 10, 2016
d73f11c
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 12, 2016
9c0f4bf
Remove closure passing.
hvanhovell Feb 14, 2016
da4a966
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 14, 2016
681f347
Move transform into BroadcastMode
hvanhovell Feb 15, 2016
7db240a
Clean-up
hvanhovell Feb 15, 2016
3ad839d
Code Review.
hvanhovell Feb 16, 2016
1116768
No newline at EOF :(
hvanhovell Feb 16, 2016
a5501cf
Rename exchanges and merge Broadcast.scala into exchange.scala.
hvanhovell Feb 17, 2016
c7429bb
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 17, 2016
b12bbc2
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 20, 2016
9d52650
Revert renaming of variabels in LeftSemiJoinBNL.
hvanhovell Feb 20, 2016
54b558d
Revert renaming of variabels in LeftSemiJoinBNL.
hvanhovell Feb 20, 2016
f33d2cb
Move all exchange related operators into the exchange package.
hvanhovell Feb 21, 2016
28363c8
CR
hvanhovell Feb 21, 2016
f812a31
Merge remote-tracking branch 'apache-github/master' into SPARK-13136
hvanhovell Feb 21, 2016
4b5978b
put broadcast mode in a separate file.
hvanhovell Feb 21, 2016
c8c175e
Fix style in sqlcontext.
hvanhovell Feb 21, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move transform into BroadcastMode
  • Loading branch information
hvanhovell committed Feb 15, 2016
commit 681f34718f830e69a7a370a0b0bf465283a738e2
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
* Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
* identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
*/
trait BroadcastMode
trait BroadcastMode extends (Array[InternalRow] => Any)

/**
* IdentityBroadcastMode requires that rows are broadcasted in their original form.
*/
case object IdentityBroadcastMode extends BroadcastMode
case object IdentityBroadcastMode extends BroadcastMode {
def apply(rows: Array[InternalRow]): Array[InternalRow] = rows
}

/**
* Represents data where tuples are broadcasted to every node. It is quite common that the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.util.ThreadUtils
*/
case class Broadcast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to merge this class with Exchange?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want to add this to Exchange.scala?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai It took a while for your question to sink in. We could integrate this with Exchange. It makes sense because both operators are distributing data and their structure actually is quite similar. I am not huge fan of this because we will end up conflating concerns by implementing two naturally separate code and data paths in one operator.

Lemme know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a logical exchange, but probably deserve two different physical operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way is ... rename the current exchange ShuffleExchange, and rename Broadcast BroadcastExchange.

mode: BroadcastMode,
transform: Array[InternalRow] => Any,
child: SparkPlan) extends UnaryNode {

override def output: Seq[Attribute] = child.output
Expand Down Expand Up @@ -60,7 +59,7 @@ case class Broadcast(
}.collect()

// Construct and broadcast the relation.
sparkContext.broadcast(transform(input))
sparkContext.broadcast(mode(input))
}
}(Broadcast.executionContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,19 +388,6 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
withCoordinator
}

/**
* Create a [[Broadcast]] operator for a given [[BroadcastMode]] and [[SparkPlan]].
*/
private def createBroadcast(mode: BroadcastMode, plan: SparkPlan): Broadcast = mode match {
case IdentityBroadcastMode =>
Broadcast(mode, identity, plan)
case HashSetBroadcastMode(keys) =>
Broadcast(mode, HashSemiJoin.buildKeyHashSet(keys, plan, _), plan)
case HashedRelationBroadcastMode(canJoinKeyFitWithinLong, keys) =>
Broadcast(mode, HashedRelation(canJoinKeyFitWithinLong, keys, plan, _), plan)
case _ => sys.error(s"Unknown BroadcastMode: $mode")
}

private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
Expand All @@ -415,11 +402,11 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
case (child, BroadcastDistribution(m1)) =>
child match {
// The child is broadcasting the same variable: keep the child.
case Broadcast(m2, _, _) if m1 == m2 => child
case Broadcast(m2, _) if m1 == m2 => child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a BroadcastPartitioning as the outputPartitioning of Broadcast? Then can have different implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have the same question. If we have a BroadcastPartitioning, seems we can avoid of these changes?

// The child is broadcasting a different variable: replace the child.
case Broadcast(m2, _, src) => createBroadcast(m1, src)
case Broadcast(m2, src) => Broadcast(m1, src)
// Create a broadcast on top of the child.
case _ => createBroadcast(m1, child)
case _ => Broadcast(m1, child)
}
case (child, distribution) =>
Exchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ case class BroadcastHashJoin(
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(canJoinKeyFitWithinLong, rewriteKeyExpr(buildKeys))
val mode = HashedRelationBroadcastMode(
canJoinKeyFitWithinLong,
rewriteKeyExpr(buildKeys),
buildPlan.output)
buildSide match {
case BuildLeft =>
BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ case class BroadcastHashOuterJoin(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(canJoinKeyFitWithinLong = false, buildKeys)
val mode = HashedRelationBroadcastMode(
canJoinKeyFitWithinLong = false,
buildKeys,
buildPlan.output)
joinType match {
case RightOuter =>
BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ case class BroadcastLeftSemiJoinHash(

override def requiredChildDistribution: Seq[Distribution] = {
val mode = if (condition.isEmpty) {
HashSetBroadcastMode(rightKeys)
HashSetBroadcastMode(rightKeys, right.output)
} else {
HashedRelationBroadcastMode(canJoinKeyFitWithinLong = false, rightKeys)
HashedRelationBroadcastMode(canJoinKeyFitWithinLong = false, rightKeys, right.output)
}
UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ trait HashSemiJoin {
}

private[execution] object HashSemiJoin {
def buildKeyHashSet(
keys: Seq[Expression],
plan: SparkPlan,
rows: Array[InternalRow]): java.util.HashSet[InternalRow] = {
buildKeyHashSet(keys, plan.output, rows.iterator)
}

def buildKeyHashSet(
keys: Seq[Expression],
attributes: Seq[Attribute],
Expand All @@ -110,4 +103,10 @@ private[execution] object HashSemiJoin {
}

/** HashSetBroadcastMode requires that the input rows are broadcasted as a set. */
private[execution] case class HashSetBroadcastMode(keys: Seq[Expression]) extends BroadcastMode
private[execution] case class HashSetBroadcastMode(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is ok for now since the following isn't the scope of this pull request.

in the future we should just use the hashed relations to do this, rather than using a hashset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keys: Seq[Expression],
attributes: Seq[Attribute]) extends BroadcastMode {
def apply(rows: Array[InternalRow]): java.util.HashSet[InternalRow] = {
HashSemiJoin.buildKeyHashSet(keys, attributes, rows.iterator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,6 @@ private[execution] object HashedRelation {
new GeneralHashedRelation(hashTable)
}
}

def apply(
canJoinKeyFitWithinLong: Boolean,
keys: Seq[Expression],
child: SparkPlan,
rows: Array[InternalRow]): HashedRelation = {
val generator = UnsafeProjection.create(keys, child.output)
if (canJoinKeyFitWithinLong) {
LongHashedRelation(rows.iterator, generator, rows.length)
} else {
HashedRelation(rows.iterator, generator, rows.length)
}
}
}

/**
Expand Down Expand Up @@ -691,5 +678,15 @@ private[joins] object LongHashedRelation {
/** The HashedRelationBroadcastMode requires that rows are broadcasted as a HashedRelation. */
private[execution] case class HashedRelationBroadcastMode(
canJoinKeyFitWithinLong: Boolean,
keys: Seq[Expression]) extends BroadcastMode
keys: Seq[Expression],
attributes: Seq[Attribute]) extends BroadcastMode {
def apply(rows: Array[InternalRow]): HashedRelation = {
val generator = UnsafeProjection.create(keys, attributes)
if (canJoinKeyFitWithinLong) {
LongHashedRelation(rows.iterator, generator, rows.length)
} else {
HashedRelation(rows.iterator, generator, rows.length)
}
}
}