Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Oct 5, 2022

What changes were proposed in this pull request?

This PR replaces Random(hashing.byteswap32(index)) with XORShiftRandom(index) to distribute elements evenly across output partitions.

Why are the changes needed?

It seems that the distribution using XORShiftRandom is better. For example:

  1. The number of output files has changed since SPARK-40407. Some downstream projects use repartition to determine the number of output files in the test.

    bin/spark-shell --master "local[2]"
    spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition")
    

    Before this PR and after SPARK-40407, the number of output files is 8. After this PR or before SPARK-40407, the number of output files is 10.

  2. The distribution using XORShiftRandom seem better.

    import java.util.Random
    import org.apache.spark.util.random.XORShiftRandom
    import scala.util.hashing
    
    def distribution(count: Int, partition: Int) = {
      println((1 to count).map(partitionId => new Random(partitionId).nextInt(partition))
        .groupBy(f => f)
        .map(_._2.size).mkString(". "))
    
      println((1 to count).map(partitionId => new Random(hashing.byteswap32(partitionId)).nextInt(partition))
        .groupBy(f => f)
        .map(_._2.size).mkString(". "))
    
      println((1 to count).map(partitionId => new XORShiftRandom(partitionId).nextInt(partition))
        .groupBy(f => f)
        .map(_._2.size).mkString(". "))
    }
    
    distribution(200, 4)

    The output:

    200
    50. 60. 46. 44
    55. 48. 43. 54
    

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@wangyum
Copy link
Member Author

wangyum commented Oct 5, 2022

@srowen @cloud-fan @wbo4958 @HyukjinKwon @zhengruifeng

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in e6bebb6 Oct 5, 2022
wangyum added a commit to wangyum/spark that referenced this pull request Oct 5, 2022
### What changes were proposed in this pull request?

This PR replaces `Random(hashing.byteswap32(index))` with `XORShiftRandom(index)` to distribute elements evenly across output partitions.

### Why are the changes needed?

It seems that the distribution using `XORShiftRandom` is better. For example:

1. The number of output files has changed since SPARK-40407. [Some downstream projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579) use repartition to determine the number of output files in the test.
   ```
   bin/spark-shell --master "local[2]"
   spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition")
   ```
   Before this PR and after SPARK-40407, the number of output files is 8. After this PR or before  SPARK-40407, the number of output files is 10.

2. The distribution using `XORShiftRandom` seem better.
   ```scala
   import java.util.Random
   import org.apache.spark.util.random.XORShiftRandom
   import scala.util.hashing

   def distribution(count: Int, partition: Int) = {
     println((1 to count).map(partitionId => new Random(partitionId).nextInt(partition))
       .groupBy(f => f)
       .map(_._2.size).mkString(". "))

     println((1 to count).map(partitionId => new Random(hashing.byteswap32(partitionId)).nextInt(partition))
       .groupBy(f => f)
       .map(_._2.size).mkString(". "))

     println((1 to count).map(partitionId => new XORShiftRandom(partitionId).nextInt(partition))
       .groupBy(f => f)
       .map(_._2.size).mkString(". "))
   }

   distribution(200, 4)
   ```
   The output:
   ```
   200
   50. 60. 46. 44
   55. 48. 43. 54
   ```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#38106 from wangyum/SPARK-40660.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

(cherry picked from commit e6bebb6)
wangyum added a commit that referenced this pull request Oct 6, 2022
### What changes were proposed in this pull request?

Cherry-picked from #38106 and reverted changes in RDD.scala:
https://github.com/apache/spark/blob/d2952b671a3579759ad9ce326ed8389f5270fd9f/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L507

### Why are the changes needed?

The number of output files has changed since SPARK-40407. [Some downstream projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579) use repartition to determine the number of output files in the test.
```
bin/spark-shell --master "local[2]"
spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition")
```
Before this PR and after SPARK-40407, the number of output files is 8. After this PR or before  SPARK-40407, the number of output files is 10.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #38110 from wangyum/branch-3.3-SPARK-40660.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
wangyum added a commit that referenced this pull request Oct 6, 2022
### What changes were proposed in this pull request?

Cherry-picked from #38106 and reverted changes in RDD.scala:
https://github.com/apache/spark/blob/d2952b671a3579759ad9ce326ed8389f5270fd9f/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L507

### Why are the changes needed?

The number of output files has changed since SPARK-40407. [Some downstream projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579) use repartition to determine the number of output files in the test.
```
bin/spark-shell --master "local[2]"
spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition")
```
Before this PR and after SPARK-40407, the number of output files is 8. After this PR or before  SPARK-40407, the number of output files is 10.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #38110 from wangyum/branch-3.3-SPARK-40660.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 5fe895a)
Signed-off-by: Yuming Wang <[email protected]>
@zhengruifeng
Copy link
Contributor

late LGTM

@wangyum wangyum deleted the SPARK-40660 branch October 6, 2022 08:37
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
### What changes were proposed in this pull request?

Cherry-picked from apache#38106 and reverted changes in RDD.scala:
https://github.com/apache/spark/blob/d2952b671a3579759ad9ce326ed8389f5270fd9f/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L507

### Why are the changes needed?

The number of output files has changed since SPARK-40407. [Some downstream projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579) use repartition to determine the number of output files in the test.
```
bin/spark-shell --master "local[2]"
spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition")
```
Before this PR and after SPARK-40407, the number of output files is 8. After this PR or before  SPARK-40407, the number of output files is 10.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes apache#38110 from wangyum/branch-3.3-SPARK-40660.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 5fe895a)
Signed-off-by: Yuming Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants