diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 7859781e9822..dc4532599cc8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -447,20 +447,21 @@ abstract class RDD[T: ClassTag]( : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { - /** Distributes elements evenly across output partitions, starting from a random partition. */ - val distributePartition = (index: Int, items: Iterator[T]) => { - var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions) + // Distributes elements by their hashes across output partitions, starting from a random + // partition. + // TODO distribute elements evenly after resolved the falling case described in SPARK-23243 + val distributePartition = (items: Iterator[T]) => { items.map { t => + val hash = if (t == null) 0 else t.hashCode() // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. - position = position + 1 - (position, t) - } - } : Iterator[(Int, T)] + (hash, t) + } : Iterator[(Int, T)] + } // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( - new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), + new ShuffledRDD[Int, T, T](mapPartitions(distributePartition), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index e994d724c462..9c1c35a37aba 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -22,6 +22,7 @@ import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.ClassTag +import scala.util.Random import com.esotericsoftware.kryo.KryoException import org.apache.hadoop.io.{LongWritable, Text} @@ -326,7 +327,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(repartitioned2.collect().toSet === (1 to 1000).toSet) } - test("repartitioned RDDs perform load balancing") { + // Ignore the test case since round-robin partitioning can cause incorrect result under some + // cases discussed in SPARK-23207 and SPARK-23243. Will re-enable this after we resolved the + // issue. + ignore("repartitioned RDDs perform load balancing") { // Coalesce partitions val input = Array.fill(1000)(1) val initialPartitions = 10 @@ -361,6 +365,30 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { testSplitPartitions(Array.fill(1000)(1), 250, 128) } + test("SPARK-23243: Make repartition() generate consistent output") { + def assertConsistency(rdd: RDD[Any]): Unit = { + rdd.persist() + + val partitions1 = rdd.mapPartitions { iter => + Random.shuffle(iter) + }.repartition(111).collectPartitions() + val partitions2 = rdd.repartition(111).collectPartitions() + assert(partitions1.size === partitions2.size) + assert(partitions1.zip(partitions2).forall { pair => + pair._1.toSet === pair._2.toSet + }) + } + + // repartition() should generate consistent output. + assertConsistency(sc.parallelize(1 to 10000, 10)) + + // case when input contains duplicated values. + assertConsistency(sc.parallelize(1 to 10000, 10).map(i => Random.nextInt(1000))) + + // case when input contains null values. + assertConsistency(sc.parallelize(1 to 100, 10).map(i => if (i % 2 == 0) null else i)) + } + test("coalesced RDDs") { val data = sc.parallelize(1 to 10, 10)