Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
SPARK-7103: Fix crash with SparkContext.union when at least one RDD h…
…as no partitioner
  • Loading branch information
stshe committed Apr 24, 2015
commit 5a3d84649b46df9fd670e951941e809e1e6d98a7
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (partitioners.size == 1) {
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I like this. I suppose that the pre-existing condition already caught the empty RDD case, which PartitionerAwareUnionRDD will reject. Although symmetry between this check and the following one would be nice I don't think it's important. This looks correct since clearly PartitionerAwareUnionRDD intends to operate only on RDDs with partitioners.

new PartitionerAwareUnionRDD(this, rdds)
} else {
new UnionRDD(this, rdds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
var rdds: Seq[RDD[T]]
) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
require(rdds.length > 0)
require(rdds.forall(_.partitioner.isDefined))
require(rdds.flatMap(_.partitioner).toSet.size == 1,
"Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))

Expand Down
21 changes: 21 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
}

test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") {
val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner)
assert(unionRdd.isInstanceOf[UnionRDD[_]])
}

test("SparkContext.union creates PartitionAwareUnionRDD if all RDDs have partitioners") {
val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner)
assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]])
}

test("PartitionAwareUnionRDD raises exception if at least one RDD has no partitioner") {
val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new HashPartitioner(1))
val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
intercept[IllegalArgumentException] {
new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner, rddWithPartitioner))
}
}

test("partitioner aware union") {
def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = {
sc.makeRDD(seq, 1)
Expand Down