Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.exchange

import java.util.Random
import java.util.function.Supplier

import scala.concurrent.Future
Expand Down Expand Up @@ -298,8 +297,7 @@ object ShuffleExchangeExec {
}
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from a random partition.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing this can actually cause a regression such as skewed data since the starting partition is always same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thx for reviewing.

The original comment "starting from a random partition", I think please correct me if I am wrong, is meaning "from which reducer partition beginning to do the shuffle write with RoundRobin manner. Basically, the big data should be distributed evenly in the same partition. But the issue here is the shuffle partition does not contain much data, the data actually is smaller than the total reducer partition, which means if the starting position is the same for all the shuffle partitions, then all the data will be distributed into the same reducer partitions for all the shuffle partitions, and some reducer partitions will not have any data.

This PR just makes the partitionId the default starting position to do the RoundRobin, which means each shuffle partition has a different starting position,

I tested the below code

      val df = spark.range(0, 100, 1, 50).repartition(4)
      val v = df.rdd.mapPartitions { iter => {
        Iterator.single(iter.length)
      }
      }.collect()
      println(v.mkString(","))

w/ my PR, It outputs 24,25,26,25, w/ o my PR, it outputs 50,0,0,50

Similarly, if I change to repartition(8)

w/ my PR, It outputs 12,13,14,13,12,12,12,12, w/ o my PR, it outputs 0,0,0,0,0,0,50,50

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the Random to XORShiftRandom.

var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may miss something. The original code should already produce different starting positions for different mapper tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I tried (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " ")) and the result is very counterintuitive. A small change for the seed does not change the random result.

Can we add some comments to explain why we add hashing.byteswap32?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in SPARK-21782 for RDD - looks like the sql version did not leverage it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm wow, good findings. I didn't realize there was a similar issue.

var position = TaskContext.get().partitionId()
(row: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
Expand Down