Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
}
Expand All @@ -411,7 +411,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists())
Expand All @@ -425,7 +425,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(conf)
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists())
Expand All @@ -437,7 +437,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new JobConf()
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
Expand All @@ -450,7 +450,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test ("save Hadoop Dataset through new Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
Expand Down Expand Up @@ -559,7 +559,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(conf)

def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
data: Seq[Tuple2[String, String]],
actualPartitionNum: Int,
expectedPartitionNum: Int): Unit = {
val output = new File(tempDir, "output")
Expand All @@ -581,13 +581,13 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {

// Ensure that if no split is empty, we don't lose any splits
testIgnoreEmptySplits(
data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
data = Seq(("key1", "a"), ("key2", "a"), ("key3", "b")),
actualPartitionNum = 2,
expectedPartitionNum = 2)

// Ensure that if part of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(
data = Array(("key1", "a"), ("key2", "a")),
data = Seq(("key1", "a"), ("key2", "a")),
actualPartitionNum = 5,
expectedPartitionNum = 2)
}
Expand All @@ -600,7 +600,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(conf)

def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
data: Seq[Tuple2[String, String]],
actualPartitionNum: Int,
expectedPartitionNum: Int): Unit = {
val output = new File(tempDir, "output")
Expand All @@ -624,13 +624,13 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {

// Ensure that if no split is empty, we don't lose any splits
testIgnoreEmptySplits(
data = Array(("1", "a"), ("2", "a"), ("3", "b")),
data = Seq(("1", "a"), ("2", "a"), ("3", "b")),
actualPartitionNum = 2,
expectedPartitionNum = 2)

// Ensure that if part of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(
data = Array(("1", "a"), ("2", "b")),
data = Seq(("1", "a"), ("2", "b")),
actualPartitionNum = 5,
expectedPartitionNum = 2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.metrics

import java.io.{File, FileWriter, PrintWriter}
import java.io.{File, PrintWriter}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -289,7 +289,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}
})

val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 2)

try {
rdd.saveAsTextFile(outPath.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.scalatest.Assertions

import org.apache.spark._
import org.apache.spark.Partitioner
import org.apache.spark.util.Utils

class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
test("aggregateByKey") {
Expand Down Expand Up @@ -496,8 +495,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
}

test("default partitioner uses largest partitioner") {
val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
val a = sc.makeRDD(Seq((1, "a"), (2, "b")), 2)
val b = sc.makeRDD(Seq((1, "a"), (2, "b")), 2000)
val c = a.join(b)
assert(c.partitions.size === 2000)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall
assert(c(6) === "3_")
assert(c(7) === "4_")

val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val nums1 = sc.makeRDD(Seq("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str => str.split("\t")(0)).
pipe(Seq("cat"),
Map[String, String](),
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
}

test("aggregate") {
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
val pairs = sc.makeRDD(Seq(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
val emptyMap = new StringMap {
override def default(key: String): Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ object ZippedPartitionsSuite {

class ZippedPartitionsSuite extends SparkFunSuite with SharedSparkContext {
test("print sizes") {
val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
val data1 = sc.makeRDD(Seq(1, 2, 3, 4), 2)
val data2 = sc.makeRDD(Seq("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Seq(1.0, 2.0), 2)

val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.internal.config.Kryo._
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ThreadUtils

class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
Expand Down Expand Up @@ -274,19 +274,19 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}

test("kryo with parallelize for specialized tuples") {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).count === 3)
assert(sc.parallelize(Seq((1, 11), (2, 22), (3, 33))).count === 3)
}

test("kryo with parallelize for primitive arrays") {
assert (sc.parallelize( Array(1, 2, 3) ).count === 3)
assert(sc.parallelize(Array(1, 2, 3)).count === 3)
}

test("kryo with collect for specialized tuples") {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === ((1, 11)))
assert(sc.parallelize(Seq((1, 11), (2, 22), (3, 33))).collect().head === ((1, 11)))
}

test("kryo with SerializableHyperLogLog") {
assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
assert(sc.parallelize(Array(1, 2, 3, 2, 3, 3, 2, 3, 1)).countApproxDistinct(0.01) === 3)
}

test("kryo with reduce") {
Expand Down
4 changes: 2 additions & 2 deletions docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ val users: RDD[(VertexId, (String, String))] =
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
Expand Down Expand Up @@ -425,7 +425,7 @@ val users: RDD[(VertexId, (String, String))] =
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {

test("mapVertices changing type with same erased type") {
withSpark { sc =>
val vertices = sc.parallelize(Array[(Long, Option[java.lang.Integer])](
val vertices = sc.parallelize(Seq[(Long, Option[java.lang.Integer])](
(1L, Some(1)),
(2L, Some(2)),
(3L, Some(3))
))
val edges = sc.parallelize(Array(
val edges = sc.parallelize(Seq(
Edge(1L, 2L, 0),
Edge(2L, 3L, 0),
Edge(3L, 1L, 0)
Expand Down Expand Up @@ -219,7 +219,7 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
test("reverse with join elimination") {
withSpark { sc =>
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Seq((1L, 1), (2L, 2)))
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val edges: RDD[Edge[Int]] = sc.parallelize(Seq(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = GraphXUtils.mapReduceTriplets[Int, Int, Int](
graph, et => Iterator((et.dstId, et.srcAttr)), _ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext {
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Edges are:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {

test("Count a single triangle") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array( 0L -> 1L, 1L -> 2L, 2L -> 0L ), 2)
val rawEdges = sc.parallelize(Seq(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
Expand All @@ -36,8 +36,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {

test("Count two triangles") {
withSpark { sc =>
val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val triangles = Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Seq(0L -> -1L, -1L -> -2L, -2L -> 0L)
val rawEdges = sc.parallelize(triangles, 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = graph.triangleCount()
Expand All @@ -55,8 +55,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
test("Count two triangles with bi-directed edges") {
withSpark { sc =>
val triangles =
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Seq(0L -> -1L, -1L -> -2L, -2L -> 0L)
val revTriangles = triangles.map { case (a, b) => (b, a) }
val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
Expand All @@ -74,9 +74,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {

test("Count a single triangle with duplicate edges") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(1L -> 0L, 1L -> 1L), 2)
val rawEdges = sc.parallelize(Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Seq(1L -> 0L, 1L -> 1L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
Expand Down