Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0a62098
Optimize cartesian order
Sephiroth-Lin Jul 15, 2015
61d1a7e
Fix code sytle
Sephiroth-Lin Jul 15, 2015
23deb4b
Update
Sephiroth-Lin Jul 16, 2015
eb9d155
Fix code style
Sephiroth-Lin Jul 16, 2015
8198648
Fix unit test failed
Sephiroth-Lin Jul 16, 2015
1006d46
Fix code style
Sephiroth-Lin Jul 17, 2015
f0ce447
Update
Sephiroth-Lin Jul 17, 2015
2bc0991
Update code style
Sephiroth-Lin Jul 17, 2015
547242e
code style
Sephiroth-Lin Jul 17, 2015
bca7a07
Fix unit test failed
Sephiroth-Lin Jul 17, 2015
a168900
Fix NullPointerException
Sephiroth-Lin Jul 20, 2015
99bcde7
Update thread pool name
Sephiroth-Lin Jul 22, 2015
4310536
Update
Sephiroth-Lin Aug 1, 2015
b2a0ae8
Update
Sephiroth-Lin Aug 1, 2015
5ca1d26
Use BroadcastNestedLoopJoin replace BroadcastCartesianProduct
Sephiroth-Lin Aug 1, 2015
04678d1
Fix unit test failed
Sephiroth-Lin Aug 1, 2015
f1cebae
Merge branch 'master' into SPARK-9066
Sephiroth-Lin Sep 8, 2015
8a8658c
Add Inner for do cartesian broadcast, SPARK-10484 point out this
Sephiroth-Lin Sep 8, 2015
60f2102
Update
Sephiroth-Lin Sep 9, 2015
e01c8f0
fix error
Sephiroth-Lin Sep 9, 2015
dd77444
Merge branch 'master' of https://github.com/apache/spark into SPARK-9066
Sephiroth-Lin Sep 12, 2015
d9aef91
Merge branch 'master' into SPARK-9066
Sephiroth-Lin Sep 29, 2015
a66f475
Add some unit test which PR#8652 have done, and fix unit test error
Sephiroth-Lin Sep 29, 2015
9812242
Fix unit test error
Sephiroth-Lin Sep 29, 2015
ce6ad25
Delete unused unit test
Sephiroth-Lin Sep 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update
  • Loading branch information
Sephiroth-Lin committed Jul 16, 2015
commit 23deb4b736b70013b4a3ca66ab328fca245f6b33
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object CartesianProduct extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Join(left, right, _, None) =>
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
val buildSide =
if (left.statistics.sizeInBytes <= right.statistics.sizeInBytes) {
joins.BuildRight
} else {
joins.BuildLeft
}
execution.joins.CartesianProduct(planLater(left), planLater(right), buildSide) :: Nil
case logical.Join(left, right, Inner, Some(condition)) =>
val buildSide =
if (left.statistics.sizeInBytes <= right.statistics.sizeInBytes) {
joins.BuildRight
} else {
joins.BuildLeft
}
execution.Filter(condition,
execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
execution.joins.CartesianProduct(planLater(left), planLater(right), buildSide)) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,26 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
* :: DeveloperApi ::
*/
@DeveloperApi
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
case class CartesianProduct(
left: SparkPlan,
right: SparkPlan,
buildSide: BuildSide) extends BinaryNode {

protected override def doExecute(): RDD[InternalRow] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())
private val (streamed, broadcast) = buildSide match {
case BuildRight => (left, right)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places, BuildRight means the right side is a small table, such as org.apache.spark.sql.execution.joins.HashJoin, and we usually broadcast it. Could you follow this semantics?

case BuildLeft => (right, left)
}

val cartesianRdd = if (leftResults.partitions.size > rightResults.partitions.size) {
rightResults.cartesian(leftResults).mapPartitions { iter =>
iter.map(tuple => (tuple._2, tuple._1))
}
} else {
leftResults.cartesian(rightResults)
}
override def output: Seq[Attribute] = left.output ++ right.output

cartesianRdd.mapPartitions { iter =>
protected override def doExecute(): RDD[InternalRow] = {
val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy()))
broadcastedRelation.value.cartesian(streamed.execute().map(_.copy())).mapPartitions{ iter =>
val joinedRow = new JoinedRow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question. Why not use sizeInBytes? I assume we want to move as little data as possible? Using sizeInBytes would be a bit more involved, since this would involve the planner, and (probably) adding a BuildSide parameter to CartesianProduct...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, use partition size here is not accurate, see a rdd with 100 partitions, and each partition has one record and a rdd with 10 partition and each partition has 100 million records, use the method above will cause more scan from hdfs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Yes, use sizeInBytes is better, but also have a problem, if leftResults only have 1 record and this record size are big, and rightResults have many records and these records total size are small, then at this scenario will cause worse performance. The best way is we check the total records for the partition, but now we can not get it.

iter.map(r => joinedRow(r._1, r._2))
buildSide match {
case BuildRight => iter.map(r => joinedRow(r._1, r._2))
case BuildLeft => iter.map(r => joinedRow(r._2, r._1))
}
}
}
}