Skip to content

Conversation

@imback82
Copy link
Contributor

@imback82 imback82 commented Sep 7, 2020

What changes were proposed in this pull request?

This PR proposes to optimize SortMergeJoin (SMJ) if each of its children has hash output partitioning which "partially" satisfies the required distribution. In this case where the child's output partitioning expressions are a subset of required distribution expressions (join keys expressions), the shuffle can be removed because rows will be sorted by join keys before rows are joined (the required child ordering for SMJ is on join keys).

This PR introduces OptimizeSortMergeJoinWithPartialHashDistribution which removes shuffle for the sort merge join if the following conditions are met:

  • The child of ShuffleExchangeExec has HashPartitioning with the same number of partitions as the other side of join.
  • The child of ShuffleExchangeExec has output partitioning which has the subset of join keys on the respective join side.

This rule can be turned on by setting spark.sql.execution.sortMergeJoin.optimizePartialHashDistribution.enabled to true (false by default).

Why are the changes needed?

To remove unnecessary shuffles in certain scenarios.

Does this PR introduce any user-facing change?

Suppose the following case where t1 is bucketed by i1, and t2 by i2:

val df1 = (0 until 100).map(i => (i % 5, i % 13, i.toString)).toDF("i1", "j1", "k1")
val df2 = (0 until 100).map(i => (i % 3, i % 17, i.toString)).toDF("i2", "j2", "k2")
df1.write.format("parquet").bucketBy(8, "i1").saveAsTable("t1")
df2.write.format("parquet").bucketBy(8, "i2").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")

Now if you join two tables by t1("i1") === t2("i2") && t1("j1") === t2("j2")
Before this change:

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
scala> t1.join(t2, t1("i1") === t2("i2") && t1("j1") === t2("j2")).explain
== Physical Plan ==
*(5) SortMergeJoin [i1#161, j1#162], [i2#167, j2#168], Inner
:- *(2) Sort [i1#161 ASC NULLS FIRST, j1#162 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#161, j1#162, 200), true, [id=#196]
:     +- *(1) Filter (isnotnull(i1#161) AND isnotnull(j1#162))
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t1[i1#161,j1#162,k1#163] Batched: true, DataFilters: [isnotnull(i1#161), isnotnull(j1#162)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i1), IsNotNull(j1)], ReadSchema: struct<i1:int,j1:int,k1:string>, SelectedBucketsCount: 8 out of 8
+- *(4) Sort [i2#167 ASC NULLS FIRST, j2#168 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i2#167, j2#168, 200), true, [id=#205]
      +- *(3) Filter (isnotnull(i2#167) AND isnotnull(j2#168))
         +- *(3) ColumnarToRow
            +- FileScan parquet default.t2[i2#167,j2#168,k2#169] Batched: true, DataFilters: [isnotnull(i2#167), isnotnull(j2#168)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i2), IsNotNull(j2)], ReadSchema: struct<i2:int,j2:int,k2:string>, SelectedBucketsCount: 8 out of 8

After the PR:

scala> spark.conf.set("spark.sql.execution.sortMergeJoin.optimizePartialHashDistribution.enabled", "true")
scala> t1.join(t2, t1("i1") === t2("i2") && t1("j1") === t2("j2")).explain
== Physical Plan ==
*(3) SortMergeJoin [i1#161, j1#162], [i2#167, j2#168], Inner
:- *(1) Sort [i1#161 ASC NULLS FIRST, j1#162 ASC NULLS FIRST], false, 0
:  +- *(1) Filter (isnotnull(i1#161) AND isnotnull(j1#162))
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t1[i1#161,j1#162,k1#163] Batched: true, DataFilters: [isnotnull(i1#161), isnotnull(j1#162)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i1), IsNotNull(j1)], ReadSchema: struct<i1:int,j1:int,k1:string>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [i2#167 ASC NULLS FIRST, j2#168 ASC NULLS FIRST], false, 0
   +- *(2) Filter (isnotnull(i2#167) AND isnotnull(j2#168))
      +- *(2) ColumnarToRow
         +- FileScan parquet default.t2[i2#167,j2#168,k2#169] Batched: true, DataFilters: [isnotnull(i2#167), isnotnull(j2#168)], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i2), IsNotNull(j2)], ReadSchema: struct<i2:int,j2:int,k2:string>, SelectedBucketsCount: 8 out of 8

How was this patch tested?

Added tests.

@SparkQA
Copy link

SparkQA commented Sep 7, 2020

Test build #128330 has finished for PR 29655 at commit 1b5c4e9.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

imback82 commented Sep 7, 2020

cc: @cloud-fan @viirya @maropu Thanks in advance!

* If the above conditions are met, shuffle can be eliminated for the sort merge join
* because rows are sorted before join logic is applied.
*/
case class OptimizeSortMergeJoinWithPartialHashDistribution(conf: SQLConf) extends Rule[SparkPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot implement this optimization in EnsureRequirements instead? Any reason to apply this rule after EnsureRequirements insert shuffles?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you add fine-grained tests for this rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do this inside EnsureRequirements.ensureDistributionAndOrdering, it would require a new Partitioning and Distribution that know both sides of join, so I didn't go that route. Doing this outside would be less intrusive, I thought. But please let me know if doing this inside EnsureRequirements makes more sense. Thanks.

This is done after EnsureRequirements since reordering keys may eliminate shuffles in which case this rule is not applied.

@andyvanyperenAM
Copy link

andyvanyperenAM commented Sep 11, 2020

Great to see this PR, I bump into the same issues.
While investigating what is going wrong, I found following PR as well
#19054
which is old and closed due to inactivity (which I regret obviously).
What is the difference between the current and this older PR?

I'm not an spark-contributor, so I really can't judge the importance of the "unknown error code -9" in the test above, but maybe the above can help you.

@maropu
Copy link
Member

maropu commented Sep 11, 2020

I didn't notice the discussion and thanks for the sharing, @andyvanyperenAM. I've checked it and, yea, they look the same. Could you check the discussion in SPARK-18067/#19054, @imback82 ? I think we need to address all the issues described there, e.g., data skew. cc: @hvanhovell

@imback82
Copy link
Contributor Author

Thanks for the pointers! I wasn't aware of the existing PR. I will take a look.

@viirya
Copy link
Member

viirya commented Sep 11, 2020

My first thought is like the concerns as same as @hvanhovell in the previous discussion.

@imback82
Copy link
Contributor Author

My first thought is like the concerns as same as @hvanhovell in the previous discussion.

Is the concern with the data skew, or are there any other concerns? I couldn't find more in the discussion.

The main scenario that this PR is going after is to allow bucketed tables to be utilized by more workloads. Since bucketed tables are created by users, we rarely observed cases where users pick bucket columns with low cardinality - similar to how users pick partition columns.

I could make the rule more restrictive to check if sources are bucketed tables. (btw, if this approach is fine, I could extend the rule to support PartitioningCollection - still making sure that sources are bucketed tables -, which would help removing shuffles in nested joins.) WDYT?

cc: @c21

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @imback82 for working on this, but I think #19054 seems to be a better approach for me (i.e. add leftDistributionKeys and rightDistributionKeys in SortMergeJoinExec/ShuffledHashJoinExec, and avoid shuffle by adding logic in EnsureRequirements.reorderJoinPredicates). @tejasapatil and I are in the same team so just bringing more context on this: we added #19054 in our internal fork and don't see much OOM issues. If #19054 is better approach in other people's opinions as well, I can redo that PR to latest master for review.

Adding the rule after ensureRequirements seems to add more burden on future development. We need to think about it every time during development as there's a new rule after ensureRequirements can remove shuffle.

Comment on lines +48 to +51
ExtractShuffleExchangeExecChild(
lChild,
lChildOutputPartitioning: HashPartitioning),
_),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why we can't just pattern matching ShuffleExchangeExec(_, leftChild, _) here? It seems to be looking simpler to me.

Comment on lines +74 to +81
val mapping = leftKeyToRightKeyMapping(leftKeys, rightKeys)
(leftPartitioning.numPartitions == rightPartitioning.numPartitions) &&
leftPartitioning.expressions.zip(rightPartitioning.expressions)
.forall {
case (le, re) => mapping.get(le.canonicalized)
.map(_.exists(_.semanticEquals(re)))
.getOrElse(false)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry if I miss anything, but I feel this might not be correct. We should make sure the leftPartitioning.expressions and rightPartitioning.expressions has same size, and the order of expressions matters, right?

expressions size is different, so we should not remove shuffle:

t1 has 1024 buckets on column (a)
t2 has 1024 buckets on columns (a, b)

SELECT *
FROM t1
JOIN t2
ON t1.a = t2.a AND t1.b = t2.b

expressions size is same, but order is wrong, so we should not remove shuffle:

t1 has 1024 buckets on column (a, b)
t2 has 1024 buckets on columns (b, a)

SELECT *
FROM t1
JOIN t2
ON t1.a = t2.a AND AND t1.a = t2.b AND t1.b = t2.a AND t1.b = t2.b

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I agree with your concerns for both cases. But, for the first example, only one side will be shuffled, so the rule should not kick in. For the second example, we have t1.a = t2.b AND t1.b = t2.a which matches the bucket ordering, so this should be also fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I miss anything:

But, for the first example, only one side will be shuffled, so the rule should not kick in.

If the number of buckets for t1 is less than number of shuffle partitions, shouldn't it shuffle both sides ? (in EnsureRequirements). So the rule kicks in here and removes both shuffles, but we shouldn't remove any shuffle here.

For the second example, we have t1.a = t2.b AND t1.b = t2.a which matches the bucket ordering, so this should be also fine.

I think it's unsafe if we do not shuffle both sides. HashPartitioning(Seq(a, b)) and HashPartitioning(Seq(b, a)) are not same thing, e.g. for tuple (a: 1, b: 2) it will be assigned to different buckets given current Murmur3Hash implementation.

Copy link
Contributor Author

@imback82 imback82 Sep 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the number of buckets for t1 is less than number of shuffle partitions, shouldn't it shuffle both sides ? (in EnsureRequirements). So the rule kicks in here and removes both shuffles, but we shouldn't remove any shuffle here.

You are right. Thanks for the catch!

I think it's unsafe if we do not shuffle both sides. HashPartitioning(Seq(a, b)) and HashPartitioning(Seq(b, a)) are not same thing, e.g. for tuple (a: 1, b: 2) it will be assigned to different buckets given current Murmur3Hash implementation.

Yes, I understand they produce different hash values. However, it has the join condition t1.a = t2.b AND t1.b = t2.a. On the other hand, this rule will not be applied if the condition was t1.a = t2.a AND t1.b = t2.b. Please let me know if I missed something. Thanks!

@c21
Copy link
Contributor

c21 commented Sep 12, 2020

cc @cloud-fan I think we would like to get your opinions here, as you reviewed #19054 in the past and have context on this. Thanks.

@imback82
Copy link
Contributor Author

imback82 commented Sep 12, 2020

Thanks @c21 for your feedback! If #19054 is a better approach, we should proceed with it; this PR seems to be a little less intrusive approach.

@maropu
Copy link
Member

maropu commented Sep 15, 2020

we added #19054 in our internal fork and don't see much OOM issues.

Even so, I think removing shuffles in the middles of stages (e.g., many join cases) can make the prob. of OOM higher in theory in case of data skew. Since we can control input distributions somewhat, e.g., by the bucketing technique, it might be worth trying the restrictive approach that @imback82 suggested above, I think.

@andyvanyperenAM
Copy link

Thanks @imback82 for working on this, but I think #19054 seems to be a better approach for me (i.e. add leftDistributionKeys and rightDistributionKeys in SortMergeJoinExec/ShuffledHashJoinExec, and avoid shuffle by adding logic in EnsureRequirements.reorderJoinPredicates). @tejasapatil and I are in the same team so just bringing more context on this: we added #19054 in our internal fork and don't see much OOM issues. If #19054 is better approach in other people's opinions as well, I can redo that PR to latest master for review.

Adding the rule after ensureRequirements seems to add more burden on future development. We need to think about it every time during development as there's a new rule after ensureRequirements can remove shuffle.

Hi @c21
I was wondering what the status of this issue is? I still see it as closed on the github page and on the jira page.
What is the best way to get this issue resolved? Is there anything I can do as a non-expert to help the process?
You mentioned you added #19054 to your internal fork without OOM issues, did some other pop up or is it good to go? Is there a way to get it in the main branch to be able to use it as a simple end-user?

Thanks for the update and sorry if this is the incorrect place to give this a bump.

kind regards,
Andy

@github-actions
Copy link

github-actions bot commented Apr 1, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 1, 2021
@github-actions github-actions bot closed this Apr 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants