Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Random
import java.util.function.Supplier

import scala.concurrent.Future
import scala.util.hashing

import org.apache.spark._
import org.apache.spark.internal.config
Expand Down Expand Up @@ -299,7 +300,14 @@ object ShuffleExchangeExec {
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from a random partition.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing this can actually cause a regression such as skewed data since the starting partition is always same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thx for reviewing.

The original comment "starting from a random partition", I think please correct me if I am wrong, is meaning "from which reducer partition beginning to do the shuffle write with RoundRobin manner. Basically, the big data should be distributed evenly in the same partition. But the issue here is the shuffle partition does not contain much data, the data actually is smaller than the total reducer partition, which means if the starting position is the same for all the shuffle partitions, then all the data will be distributed into the same reducer partitions for all the shuffle partitions, and some reducer partitions will not have any data.

This PR just makes the partitionId the default starting position to do the RoundRobin, which means each shuffle partition has a different starting position,

I tested the below code

      val df = spark.range(0, 100, 1, 50).repartition(4)
      val v = df.rdd.mapPartitions { iter => {
        Iterator.single(iter.length)
      }
      }.collect()
      println(v.mkString(","))

w/ my PR, It outputs 24,25,26,25, w/ o my PR, it outputs 50,0,0,50

Similarly, if I change to repartition(8)

w/ my PR, It outputs 12,13,14,13,12,12,12,12, w/ o my PR, it outputs 0,0,0,0,0,0,50,50

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the Random to XORShiftRandom.

var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I may miss something. The original code should already produce different starting positions for different mapper tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I tried (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " ")) and the result is very counterintuitive. A small change for the seed does not change the random result.

Can we add some comments to explain why we add hashing.byteswap32?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in SPARK-21782 for RDD - looks like the sql version did not leverage it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm wow, good findings. I didn't realize there was a similar issue.

// nextInt(numPartitions) implementation has a special case when bound is a power of 2,
// which is basically taking several highest bits from the initial seed, with only a
// minimal scrambling. Due to deterministic seed, using the generator only once,
// and lack of scrambling, the position values for power-of-two numPartitions always
// end up being almost the same regardless of the index. substantially scrambling the
// seed by hashing will help. Refer to SPARK-21782 for more details.
val partitionId = TaskContext.get().partitionId()
var position = new Random(hashing.byteswap32(partitionId)).nextInt(numPartitions)
(row: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2147,6 +2147,12 @@ class DatasetSuite extends QueryTest
(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12),
(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
}

test("SPARK-40407: repartition should not result in severe data skew") {
val df = spark.range(0, 100, 1, 50).repartition(4)
val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect()
assert(result.sorted.toSeq === Seq(19, 25, 25, 31))
}
}

case class Bar(a: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2127,8 +2127,8 @@ class AdaptiveQueryExecSuite
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") {
// partition size [0,258,72,72,72]
checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4)
// partition size [72,216,216,144,72]
checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 4, 7)
// partition size [144,72,144,216,144]
checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 2, 6)
}

// no skewed partition should be optimized
Expand Down