Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -447,20 +447,21 @@ abstract class RDD[T: ClassTag](
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
// Distributes elements by their hashes across output partitions, starting from a random
// partition.
// TODO distribute elements evenly after resolved the falling case described in SPARK-23243
val distributePartition = (items: Iterator[T]) => {
items.map { t =>
val hash = if (t == null) 0 else t.hashCode()
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
(hash, t)
} : Iterator[(Int, T)]
}

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new ShuffledRDD[Int, T, T](mapPartitions(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
Expand Down
30 changes: 29 additions & 1 deletion core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.ClassTag
import scala.util.Random

import com.esotericsoftware.kryo.KryoException
import org.apache.hadoop.io.{LongWritable, Text}
Expand Down Expand Up @@ -326,7 +327,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
}

test("repartitioned RDDs perform load balancing") {
// Ignore the test case since round-robin partitioning can cause incorrect result under some
// cases discussed in SPARK-23207 and SPARK-23243. Will re-enable this after we resolved the
// issue.
ignore("repartitioned RDDs perform load balancing") {
// Coalesce partitions
val input = Array.fill(1000)(1)
val initialPartitions = 10
Expand Down Expand Up @@ -361,6 +365,30 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
testSplitPartitions(Array.fill(1000)(1), 250, 128)
}

test("SPARK-23243: Make repartition() generate consistent output") {
def assertConsistency(rdd: RDD[Any]): Unit = {
rdd.persist()

val partitions1 = rdd.mapPartitions { iter =>
Random.shuffle(iter)
}.repartition(111).collectPartitions()
val partitions2 = rdd.repartition(111).collectPartitions()
assert(partitions1.size === partitions2.size)
assert(partitions1.zip(partitions2).forall { pair =>
pair._1.toSet === pair._2.toSet
})
}

// repartition() should generate consistent output.
assertConsistency(sc.parallelize(1 to 10000, 10))

// case when input contains duplicated values.
assertConsistency(sc.parallelize(1 to 10000, 10).map(i => Random.nextInt(1000)))

// case when input contains null values.
assertConsistency(sc.parallelize(1 to 100, 10).map(i => if (i % 2 == 0) null else i))
}

test("coalesced RDDs") {
val data = sc.parallelize(1 to 10, 10)

Expand Down