-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23827] [SS] StreamingJoinExec should ensure that input data is partitioned into specific number of partitions #20941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. This is great. Left one comment
| assert(d.requiredNumPartitions.isDefined) | ||
| assert(d.requiredNumPartitions.get >= 1) | ||
| if (d != AllTuples) { | ||
| assert(d.requiredNumPartitions.get == s.stateInfo.get.numPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also verify that this is equal to the number of partitions in the metadata?
| assert(s.stateInfo.isDefined) | ||
| assert(s.stateInfo.get.numPartitions >= 1) | ||
| assert( | ||
| s.stateInfo.map(_.numPartitions).contains(currentStream.lastExecution.numStateStores)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
|
Test build #88721 has finished for PR 20941 at commit
|
|
Test build #88727 has finished for PR 20941 at commit
|
| override def requiredChildDistribution: Seq[Distribution] = | ||
| ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil | ||
| ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) :: | ||
| ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Started more tests to test for flakiness. |
|
Test build #88761 has finished for PR 20941 at commit
|
|
Test build #4146 has finished for PR 20941 at commit
|
|
Test build #4145 has finished for PR 20941 at commit
|
|
Test build #4147 has finished for PR 20941 at commit
|
|
Test build #4148 has finished for PR 20941 at commit
|
…partitioned into specific number of partitions ## What changes were proposed in this pull request? Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is `SinglePartition` which satisfies the required distribution of `ClusterDistribution(no-num-partition-requirement)`, thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files. This PR adds the required constraint on the number of partitions. ## How was this patch tested? Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant. Author: Tathagata Das <[email protected]> Closes #20941 from tdas/SPARK-23827. (cherry picked from commit 15298b9) Signed-off-by: Tathagata Das <[email protected]>
…partitioned into specific number of partitions ## What changes were proposed in this pull request? Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is `SinglePartition` which satisfies the required distribution of `ClusterDistribution(no-num-partition-requirement)`, thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files. This PR adds the required constraint on the number of partitions. ## How was this patch tested? Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant. Author: Tathagata Das <[email protected]> Closes apache#20941 from tdas/SPARK-23827.
…partitioned into specific number of partitions Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is `SinglePartition` which satisfies the required distribution of `ClusterDistribution(no-num-partition-requirement)`, thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files. This PR adds the required constraint on the number of partitions. Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant. Author: Tathagata Das <[email protected]> Closes apache#20941 from tdas/SPARK-23827. (cherry picked from commit 15298b9) Signed-off-by: Tathagata Das <[email protected]> Change-Id: I9dc225a765afb198e3e8719bdb3dfffd2cff95b9
What changes were proposed in this pull request?
Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is
SinglePartitionwhich satisfies the required distribution ofClusterDistribution(no-num-partition-requirement), thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files.This PR adds the required constraint on the number of partitions.
How was this patch tested?
Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant.