From a29b3f7f42ba30d1b94db059599c903bc3797123 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 9 Oct 2019 11:27:24 -0500 Subject: [PATCH 1/2] Additional cases where a .parallelize call with Array is ambiguous in 2.13 --- .../scala/org/apache/spark/FileSuite.scala | 22 +++++++++---------- .../metrics/InputOutputMetricsSuite.scala | 4 ++-- .../spark/rdd/PairRDDFunctionsSuite.scala | 5 ++--- .../org/apache/spark/rdd/PipedRDDSuite.scala | 2 +- .../scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- .../spark/rdd/ZippedPartitionsSuite.scala | 6 ++--- .../serializer/KryoSerializerSuite.scala | 10 ++++----- docs/graphx-programming-guide.md | 4 ++-- .../org/apache/spark/graphx/GraphSuite.scala | 6 ++--- .../graphx/lib/ConnectedComponentsSuite.scala | 2 +- .../spark/graphx/lib/TriangleCountSuite.scala | 16 +++++++------- .../mesos/MesosSchedulerUtilsSuite.scala | 2 +- 12 files changed, 40 insertions(+), 41 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 3446c03973166..0368d77e3d5f1 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -402,7 +402,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("prevent user from overwriting the empty directory (new Hadoop API)") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) intercept[FileAlreadyExistsException] { randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath) } @@ -411,7 +411,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("prevent user from overwriting the non-empty directory (new Hadoop API)") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( tempDir.getPath + "/output") assert(new File(tempDir.getPath + "/output/part-r-00000").exists()) @@ -425,7 +425,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") sc = new SparkContext(conf) val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( tempDir.getPath + "/output") assert(new File(tempDir.getPath + "/output/part-r-00000").exists()) @@ -437,7 +437,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("save Hadoop Dataset through old Hadoop API") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) val job = new JobConf() job.setOutputKeyClass(classOf[String]) job.setOutputValueClass(classOf[String]) @@ -450,7 +450,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("save Hadoop Dataset through new Hadoop API") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) val job = Job.getInstance(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[String]) job.setOutputValueClass(classOf[String]) @@ -559,7 +559,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) def testIgnoreEmptySplits( - data: Array[Tuple2[String, String]], + data: Seq[Tuple2[String, String]], actualPartitionNum: Int, expectedPartitionNum: Int): Unit = { val output = new File(tempDir, "output") @@ -581,13 +581,13 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( - data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), + data = Seq(("key1", "a"), ("key2", "a"), ("key3", "b")), actualPartitionNum = 2, expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( - data = Array(("key1", "a"), ("key2", "a")), + data = Seq(("key1", "a"), ("key2", "a")), actualPartitionNum = 5, expectedPartitionNum = 2) } @@ -600,7 +600,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) def testIgnoreEmptySplits( - data: Array[Tuple2[String, String]], + data: Seq[Tuple2[String, String]], actualPartitionNum: Int, expectedPartitionNum: Int): Unit = { val output = new File(tempDir, "output") @@ -624,13 +624,13 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( - data = Array(("1", "a"), ("2", "a"), ("3", "b")), + data = Seq(("1", "a"), ("2", "a"), ("3", "b")), actualPartitionNum = 2, expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( - data = Array(("1", "a"), ("2", "b")), + data = Seq(("1", "a"), ("2", "b")), actualPartitionNum = 5, expectedPartitionNum = 2) } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index dbcec647a3dbc..330347299ab56 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.metrics -import java.io.{File, FileWriter, PrintWriter} +import java.io.{File, PrintWriter} import scala.collection.mutable.ArrayBuffer @@ -289,7 +289,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext } }) - val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) + val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 2) try { rdd.saveAsTextFile(outPath.toString) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 135cfffa1ac43..2de4b109e40e9 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -34,7 +34,6 @@ import org.scalatest.Assertions import org.apache.spark._ import org.apache.spark.Partitioner -import org.apache.spark.util.Utils class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("aggregateByKey") { @@ -496,8 +495,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("default partitioner uses largest partitioner") { - val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) - val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) + val a = sc.makeRDD(Seq((1, "a"), (2, "b")), 2) + val b = sc.makeRDD(Seq((1, "a"), (2, "b")), 2000) val c = a.join(b) assert(c.partitions.size === 2000) } diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 860cf4d7ed9b2..2da2854dfbcb9 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -138,7 +138,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall assert(c(6) === "3_") assert(c(7) === "4_") - val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) + val nums1 = sc.makeRDD(Seq("a\t1", "b\t2", "a\t3", "b\t4"), 2) val d = nums1.groupBy(str => str.split("\t")(0)). pipe(Seq("cat"), Map[String, String](), diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 859c25ff03819..18154d861a731 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -236,7 +236,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { } test("aggregate") { - val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) + val pairs = sc.makeRDD(Seq(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) type StringMap = HashMap[String, Int] val emptyMap = new StringMap { override def default(key: String): Int = 0 diff --git a/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala index 5d7b973fbd9ac..7079b9ea8eadc 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala @@ -27,9 +27,9 @@ object ZippedPartitionsSuite { class ZippedPartitionsSuite extends SparkFunSuite with SharedSparkContext { test("print sizes") { - val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) - val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) - val data3 = sc.makeRDD(Array(1.0, 2.0), 2) + val data1 = sc.makeRDD(Seq(1, 2, 3, 4), 2) + val data2 = sc.makeRDD(Seq("1", "2", "3", "4", "5", "6"), 2) + val data3 = sc.makeRDD(Seq(1.0, 2.0), 2) val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index b5313fc24cd84..d7c151209fcac 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.internal.config.Kryo._ import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.ThreadUtils class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") @@ -274,19 +274,19 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("kryo with parallelize for specialized tuples") { - assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).count === 3) + assert(sc.parallelize(Seq((1, 11), (2, 22), (3, 33))).count === 3) } test("kryo with parallelize for primitive arrays") { - assert (sc.parallelize( Array(1, 2, 3) ).count === 3) + assert(sc.parallelize(Array(1, 2, 3)).count === 3) } test("kryo with collect for specialized tuples") { - assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === ((1, 11))) + assert(sc.parallelize(Seq((1, 11), (2, 22), (3, 33))).collect().head === ((1, 11))) } test("kryo with SerializableHyperLogLog") { - assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3) + assert(sc.parallelize(Array(1, 2, 3, 2, 3, 3, 2, 3, 1)).countApproxDistinct(0.01) === 3) } test("kryo with reduce") { diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 903f8027cc44d..167c44aa1b2e9 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -187,7 +187,7 @@ val users: RDD[(VertexId, (String, String))] = (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = - sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") @@ -425,7 +425,7 @@ val users: RDD[(VertexId, (String, String))] = (4L, ("peter", "student")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = - sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Define a default user in case there are relationship with missing user diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 6f9670f2f250a..459cddb9a302b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -164,12 +164,12 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { test("mapVertices changing type with same erased type") { withSpark { sc => - val vertices = sc.parallelize(Array[(Long, Option[java.lang.Integer])]( + val vertices = sc.parallelize(Seq[(Long, Option[java.lang.Integer])]( (1L, Some(1)), (2L, Some(2)), (3L, Some(3)) )) - val edges = sc.parallelize(Array( + val edges = sc.parallelize(Seq( Edge(1L, 2L, 0), Edge(2L, 3L, 0), Edge(3L, 1L, 0) @@ -219,7 +219,7 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { test("reverse with join elimination") { withSpark { sc => val vertices: RDD[(VertexId, Int)] = sc.parallelize(Seq((1L, 1), (2L, 2))) - val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0))) + val edges: RDD[Edge[Int]] = sc.parallelize(Seq(Edge(1L, 2L, 0))) val graph = Graph(vertices, edges).reverse val result = GraphXUtils.mapReduceTriplets[Int, Int, Int]( graph, et => Iterator((et.dstId, et.srcAttr)), _ + _) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index d0231c885b989..baa1c42235c72 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -106,7 +106,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { (4L, ("peter", "student")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = - sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Edges are: diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala index f19c3acdc85cf..abbd89b8eefaf 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala @@ -26,7 +26,7 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count a single triangle") { withSpark { sc => - val rawEdges = sc.parallelize(Array( 0L -> 1L, 1L -> 2L, 2L -> 0L ), 2) + val rawEdges = sc.parallelize(Seq(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = graph.triangleCount() val verts = triangleCount.vertices @@ -36,8 +36,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count two triangles") { withSpark { sc => - val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + val triangles = Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Seq(0L -> -1L, -1L -> -2L, -2L -> 0L) val rawEdges = sc.parallelize(triangles, 2) val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = graph.triangleCount() @@ -55,8 +55,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count two triangles with bi-directed edges") { withSpark { sc => val triangles = - Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Seq(0L -> -1L, -1L -> -2L, -2L -> 0L) val revTriangles = triangles.map { case (a, b) => (b, a) } val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) val graph = Graph.fromEdgeTuples(rawEdges, true).cache() @@ -74,9 +74,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count a single triangle with duplicate edges") { withSpark { sc => - val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(1L -> 0L, 1L -> 1L), 2) + val rawEdges = sc.parallelize(Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Seq(1L -> 0L, 1L -> 1L), 2) val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() val triangleCount = graph.triangleCount() val verts = triangleCount.vertices diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 65f5c625b873a..4f9c7e3de9afe 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -68,7 +68,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS def arePortsEqual(array1: Array[Long], array2: Array[Long]) : Boolean = { - array1.sortBy(identity).deep == array2.sortBy(identity).deep + array1.sortBy(identity).sameElements(array2.sortBy(identity)) } def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = { From 81f5ede5b0517fe3aafb560566f6403692ce88b4 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 9 Oct 2019 11:30:10 -0500 Subject: [PATCH 2/2] Oops, revert an accidental inclusion --- .../scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 4f9c7e3de9afe..65f5c625b873a 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -68,7 +68,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS def arePortsEqual(array1: Array[Long], array2: Array[Long]) : Boolean = { - array1.sortBy(identity).sameElements(array2.sortBy(identity)) + array1.sortBy(identity).deep == array2.sortBy(identity).deep } def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {