Skip to content

Commit 6910ed6

Browse files
committed
fix RDD.repartition()
1 parent a8a3e9b commit 6910ed6

File tree

2 files changed

+38
-9
lines changed

2 files changed

+38
-9
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -447,20 +447,21 @@ abstract class RDD[T: ClassTag](
447447
: RDD[T] = withScope {
448448
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
449449
if (shuffle) {
450-
/** Distributes elements evenly across output partitions, starting from a random partition. */
451-
val distributePartition = (index: Int, items: Iterator[T]) => {
452-
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
450+
// Distributes elements by their hashes across output partitions, starting from a random
451+
// partition.
452+
// TODO distribute elements evenly after resolved the falling case described in SPARK-23243
453+
val distributePartition = (items: Iterator[T]) => {
453454
items.map { t =>
455+
val hash = if (t == null) 0 else t.hashCode()
454456
// Note that the hash code of the key will just be the key itself. The HashPartitioner
455457
// will mod it with the number of total partitions.
456-
position = position + 1
457-
(position, t)
458-
}
459-
} : Iterator[(Int, T)]
458+
(hash, t)
459+
} : Iterator[(Int, T)]
460+
}
460461

461462
// include a shuffle step so that our upstream tasks are still distributed
462463
new CoalescedRDD(
463-
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
464+
new ShuffledRDD[Int, T, T](mapPartitions(distributePartition),
464465
new HashPartitioner(numPartitions)),
465466
numPartitions,
466467
partitionCoalescer).values

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream}
2222
import scala.collection.JavaConverters._
2323
import scala.collection.mutable.{ArrayBuffer, HashMap}
2424
import scala.reflect.ClassTag
25+
import scala.util.Random
2526

2627
import com.esotericsoftware.kryo.KryoException
2728
import org.apache.hadoop.io.{LongWritable, Text}
@@ -326,7 +327,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
326327
assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
327328
}
328329

329-
test("repartitioned RDDs perform load balancing") {
330+
// Ignore the test case since round-robin partitioning can cause incorrect result under some
331+
// cases discussed in SPARK-23207 and SPARK-23243. Will re-enable this after we resolved the
332+
// issue.
333+
ignore("repartitioned RDDs perform load balancing") {
330334
// Coalesce partitions
331335
val input = Array.fill(1000)(1)
332336
val initialPartitions = 10
@@ -361,6 +365,30 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
361365
testSplitPartitions(Array.fill(1000)(1), 250, 128)
362366
}
363367

368+
test("SPARK-23243: Make repartition() generate consistent output") {
369+
def assertConsistency(rdd: RDD[Any]): Unit = {
370+
rdd.persist()
371+
372+
val partitions1 = rdd.mapPartitions { iter =>
373+
Random.shuffle(iter)
374+
}.repartition(111).collectPartitions()
375+
val partitions2 = rdd.repartition(111).collectPartitions()
376+
assert(partitions1.size === partitions2.size)
377+
assert(partitions1.zip(partitions2).forall { pair =>
378+
pair._1.toSet === pair._2.toSet
379+
})
380+
}
381+
382+
// repartition() should generate consistent output.
383+
assertConsistency(sc.parallelize(1 to 10000, 10))
384+
385+
// case when input contains duplicated values.
386+
assertConsistency(sc.parallelize(1 to 10000, 10).map(i => Random.nextInt(1000)))
387+
388+
// case when input contains null values.
389+
assertConsistency(sc.parallelize(1 to 100, 10).map(i => if (i % 2 == 0) null else i))
390+
}
391+
364392
test("coalesced RDDs") {
365393
val data = sc.parallelize(1 to 10, 10)
366394

0 commit comments

Comments
 (0)