Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
initial commit
  • Loading branch information
Ishiihara committed Oct 28, 2014
commit 1c41f6f248f1145c7d730129795e50bdd8a53f2b
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
Expand Down Expand Up @@ -57,10 +58,19 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
}

val sortingExpressions = expressions.map(s => new SortOrder(s, Ascending))
implicit val ordering = new RowOrdering(sortingExpressions, child.output)
val part = new HashPartitioner(numPartitions)
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part).setKeyOrdering(ordering)
//val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
shuffled.map(_._2)
val temp = shuffled.map(_._2)
for ( x <- temp.collect()) {
println(x)
}
println("------------")
temp

case RangePartitioning(sortingExpressions, numPartitions) =>
val rdd = if (sortBasedShuffleOn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val mergeJoin = joins.MergeJoin(leftKeys, rightKeys, Inner, condition, planLater(left), planLater(right))
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just passing through, but is this right? It appears to be the same case as above, so I'm not sure when it would be matched.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the hash inner join will not be matched. I am trying to resolve to merge join on inner join. I am still figuring out the best way to add merge join to query optimizer.

val buildSide =
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
Expand Down