diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ff5d796ee276..8bfa4c013672 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -520,10 +520,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Distribute a local Scala collection to form an RDD. * - * @note Parallelize acts lazily. If `seq` is a mutable collection and is - * altered after the call to parallelize and before the first action on the - * RDD, the resultant RDD will reflect the modified collection. Pass a copy of - * the argument to avoid this. + * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call + * to parallelize and before the first action on the RDD, the resultant RDD will reflect the + * modified collection. Pass a copy of the argument to avoid this. + * + * @note When splitting Range, the sub Range is exclusive. However the last slice for inclusive + * Range is inclusive. */ def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 87b22de6ae69..f12d0cffaba3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -111,7 +111,8 @@ private object ParallelCollectionRDD { /** * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes - * it efficient to run Spark over RDDs representing large sets of numbers. + * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection + * is an inclusive Range, we use inclusive range for the last slice. */ def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { if (numSlices < 1) { @@ -127,19 +128,15 @@ private object ParallelCollectionRDD { }) } seq match { - case r: Range.Inclusive => { - val sign = if (r.step < 0) { - -1 - } else { - 1 - } - slice(new Range( - r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices) - } case r: Range => { - positions(r.length, numSlices).map({ - case (start, end) => + positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) => + // If the range is inclusive, use inclusive range for the last slice + if (r.isInclusive && index == numSlices - 1) { + new Range.Inclusive(r.start + start * r.step, r.end, r.step) + } + else { new Range(r.start + start * r.step, r.start + end * r.step, r.step) + } }).toSeq.asInstanceOf[Seq[Seq[T]]] } case nr: NumericRange[_] => { diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala index 1b112f1a41ca..cd193ae4f523 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala @@ -76,6 +76,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices(0).mkString(",") === (0 to 32).mkString(",")) assert(slices(1).mkString(",") === (33 to 66).mkString(",")) assert(slices(2).mkString(",") === (67 to 100).mkString(",")) + assert(slices(2).isInstanceOf[Range.Inclusive]) } test("empty data") { @@ -227,4 +228,28 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) } + + test("inclusive ranges with Int.MaxValue and Int.MinValue") { + val data1 = 1 to Int.MaxValue + val slices1 = ParallelCollectionRDD.slice(data1, 3) + assert(slices1.size === 3) + assert(slices1.map(_.size).sum === Int.MaxValue) + assert(slices1(2).isInstanceOf[Range.Inclusive]) + val data2 = -2 to Int.MinValue by -1 + val slices2 = ParallelCollectionRDD.slice(data2, 3) + assert(slices2.size == 3) + assert(slices2.map(_.size).sum === Int.MaxValue) + assert(slices2(2).isInstanceOf[Range.Inclusive]) + } + + test("empty ranges with Int.MaxValue and Int.MinValue") { + val data1 = Int.MaxValue until Int.MaxValue + val slices1 = ParallelCollectionRDD.slice(data1, 5) + assert(slices1.size === 5) + for (i <- 0 until 5) assert(slices1(i).size === 0) + val data2 = Int.MaxValue until Int.MaxValue + val slices2 = ParallelCollectionRDD.slice(data2, 5) + assert(slices2.size === 5) + for (i <- 0 until 5) assert(slices2(i).size === 0) + } }