Skip to content

Conversation

@tejasapatil
Copy link
Contributor

@tejasapatil tejasapatil commented Aug 25, 2017

Jira : https://issues.apache.org/jira/browse/SPARK-18067

What problem is being addressed in this PR ?

Currently shuffle based joins require its children to be shuffled over all the columns in the join condition. In case the child node is already distributed over a subset of columns in the join condition, this shuffle is not needed (eg. if the input is bucketed, if the input is output of a subquery). Avoiding the shuffle makes the join run faster and more stably as its single stage.

To dive deeper, lets look at this example. Both input tables table1 and table2 are bucketed on columns i and j and have 8 buckets. The query is joining the 2 tables over i,j,k. With bucketing, all the rows with the same values of i and j should reside in the same bucket of both the inputs. So, if we simply sort the corresponding buckets over the join columns and perform the join, that would suffice the requirements.

partitions table1 (i,j,k) values table2 (i,j,k) values
bucket 0 (0,0,1) (0,0,2) (1,0,4) (0,0,1) (0,0,3)
bucket 1 (1,0,2) (1,1,1) (1,0,1) (1,0,2) (1,1,2)
bucket 2 (0,1,8) (0,1,6) (0,1,1)

What changes were proposed in this pull request?

Both shuffled hash join and sort merge join would not keep track of which keys should the children be distributed on ?. To start off, this is same as the join keys. The rule ReorderJoinPredicates is modified to detect if the child's output partitioning is over a subset of join keys and based on that the distribution keys for the join operator are revised.

How was this patch tested?

  • Added unit test.
  • manual test:

Query:

-- both the input tables are bucketed over columns `i` and `j`
SELECT * FROM table1 a JOIN table2 b ON a.i = b.i AND a.j = b.j AND a.k = b.k

BEFORE

SortMergeJoin [i#5, j#6, k#7], [i#8, j#9, k#10], Inner
:- *Sort [i#5 ASC NULLS FIRST, j#6 ASC NULLS FIRST, k#7 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i#5, j#6, k#7, 200)
:     +- *FileScan orc default.table1[i#5,j#6,k#7] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [i#8 ASC NULLS FIRST, j#9 ASC NULLS FIRST, k#10 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#8, j#9, k#10, 200)
      +- *FileScan orc default.table2[i#8,j#9,k#10] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:warehouse/table2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int,j:int,k:string>

AFTER

SortMergeJoin [i#5, j#6, k#7], [i#8, j#9, k#10], [i#5, j#6], [i#8, j#9], Inner
:- *Sort [i#5 ASC NULLS FIRST, j#6 ASC NULLS FIRST, k#7 ASC NULLS FIRST], false, 0
:  +- *FileScan orc default.table1[i#5,j#6,k#7] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [i#8 ASC NULLS FIRST, j#9 ASC NULLS FIRST, k#10 ASC NULLS FIRST], false, 0
   +- *FileScan orc default.table2[i#8,j#9,k#10] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:warehouse/table2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<i:int,j:int,k:string>

@SparkQA
Copy link

SparkQA commented Aug 25, 2017

Test build #81131 has finished for PR 19054 at commit 9ba8add.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil
Copy link
Contributor Author

cc @hvanhovell @cloud-fan for review

@SparkQA
Copy link

SparkQA commented Sep 8, 2017

Test build #81562 has finished for PR 19054 at commit ec8bd80.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2017

Test build #84366 has finished for PR 19054 at commit b0db6aa.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2017

Test build #84368 has finished for PR 19054 at commit 69e288e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 11, 2018

Test build #85985 has finished for PR 19054 at commit c689ff1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil
Copy link
Contributor Author

cc @hvanhovell @cloud-fan for review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some documentation to explain what the return value is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added more doc. I wasn't sure how to make it easier to understand. Hope that the example helps with that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if leftPartitioning is HashPartitioning, we don't need to care about rightPartitioning at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that this was only done over SortMergeJoinExec and ShuffledHashJoinExec where both the partitionings are HashPartitioning, things worked fine. I have changed this to have a stricter check.

*/
private def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removal of BroadcastHashJoinExec is intentional. The children are expected to have BroadcastDistribution or UnspecifiedDistribution so this method wont help here (this optimization only helps in case of shuffle based joins)

@SparkQA
Copy link

SparkQA commented Jan 20, 2018

Test build #86406 has finished for PR 19054 at commit 00bb14b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val rightKeysBuffer = ArrayBuffer[Expression]()
expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning
currentOrderOfKeys: Seq[Expression]): // comes from join predicate
(Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a comment describing the return type? a tuple4 is not such a descriptive type 😃

rightKeysBuffer.append(rightKeys(index))
val index = currentOrderOfKeys.zipWithIndex.find { case (currKey, i) =>
!processedIndicies.contains(i) && currKey.semanticEquals(expression)
}.get._2
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the find guaranteed to always succeed?
if so, worth a comment on method's pre/post conditions.

a getOrElse(sys error "...") might also be a good way of documenting this.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants