Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix spilling tests in ExternalAppendOnlyMapSuite
  • Loading branch information
Andrew Or committed Oct 14, 2015
commit 285c81c26b210bcbe88f055475b2a53ac61bb5c1
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import java.util.Arrays
import java.util.jar.{JarEntry, JarOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -154,4 +158,51 @@ private[spark] object TestUtils {
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}

/**
* Run some code involving jobs submitted to the given context and assert that the jobs spilled.
*/
def assertSpilled[T](sc: SparkContext, identifier: String)(body: => T): Unit = {
val spillListener = new SpillListener
sc.addSparkListener(spillListener)
body
assert(spillListener.numSpilledStages > 0, s"expected $identifier to spill, but did not")
}

/**
* Run some code involving jobs submitted to the given context and assert that the jobs
* did not spill.
*/
def assertNotSpilled[T](sc: SparkContext, identifier: String)(body: => T): Unit = {
val spillListener = new SpillListener
sc.addSparkListener(spillListener)
body
assert(spillListener.numSpilledStages == 0, s"expected $identifier to not spill, but did")
}

}


/**
* A [[SparkListener]] that detects whether spills have occurred in Spark jobs.
*/
private class SpillListener extends SparkListener {
private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]]
private val spilledStageIds = new mutable.HashSet[Int]

def numSpilledStages: Int = spilledStageIds.size

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
stageIdToTaskMetrics.getOrElseUpdate(
taskEnd.stageId, new ArrayBuffer[TaskMetrics]) += taskEnd.taskMetrics
}

override def onStageCompleted(stageComplete: SparkListenerStageCompleted): Unit = {
val stageId = stageComplete.stageInfo.stageId
val metrics = stageIdToTaskMetrics.remove(stageId).toSeq.flatten
val spilled = metrics.map(_.memoryBytesSpilled).sum > 0
if (spilled) {
spilledStageIds += stageId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ class ExternalAppendOnlyMap[K, V, C](
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()

/**
* Number of files this map has spilled so far.
* Exposed for testing.
*/
private[collection] def numSpills: Int = spilledMaps.size

/**
* Insert the given key and value into the map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.io.CompressionCodec

// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)

class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
import TestUtils.{assertNotSpilled, assertSpilled}

private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private def createCombiner[T](i: T) = ArrayBuffer[T](i)
private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] = buffer += i
Expand Down Expand Up @@ -244,54 +245,53 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
* If a compression codec is provided, use it. Otherwise, do not compress spills.
*/
private def testSimpleSpilling(codec: Option[String] = None): Unit = {
val size = 1000
val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
conf.set("spark.shuffle.manager", "hash") // avoid using external sorter
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)

// reduceByKey - should spill ~8 times
val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
val resultA = rddA.reduceByKey(math.max).collect()
assert(resultA.length === 50000)
resultA.foreach { case (k, v) =>
assert(v === k * 2 + 1, s"Value for $k was wrong: expected ${k * 2 + 1}, got $v")
assertSpilled(sc, "reduceByKey") {
val result = sc.parallelize(0 until size)
.map { i => (i / 2, i) }.reduceByKey(math.max).collect()
assert(result.length === size / 2)
result.foreach { case (k, v) =>
val expected = k * 2 + 1
assert(v === expected, s"Value for $k was wrong: expected $expected, got $v")
}
}

// groupByKey - should spill ~17 times
val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
val resultB = rddB.groupByKey().collect()
assert(resultB.length === 25000)
resultB.foreach { case (i, seq) =>
val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
assert(seq.toSet === expected,
s"Value for $i was wrong: expected $expected, got ${seq.toSet}")
assertSpilled(sc, "groupByKey") {
val result = sc.parallelize(0 until size).map { i => (i / 2, i) }.groupByKey().collect()
assert(result.length == size / 2)
result.foreach { case (i, seq) =>
val actual = seq.toSet
val expected = Set(i * 2, i * 2 + 1)
assert(actual === expected, s"Value for $i was wrong: expected $expected, got $actual")
}
}

// cogroup - should spill ~7 times
val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
val resultC = rddC1.cogroup(rddC2).collect()
assert(resultC.length === 10000)
resultC.foreach { case (i, (seq1, seq2)) =>
i match {
case 0 =>
assert(seq1.toSet === Set[Int](0))
assert(seq2.toSet === Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
case 1 =>
assert(seq1.toSet === Set[Int](1))
assert(seq2.toSet === Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
case 5000 =>
assert(seq1.toSet === Set[Int](5000))
assert(seq2.toSet === Set[Int]())
case 9999 =>
assert(seq1.toSet === Set[Int](9999))
assert(seq2.toSet === Set[Int]())
case _ =>
assertSpilled(sc, "cogroup") {
val rdd1 = sc.parallelize(0 until size).map { i => (i / 2, i) }
val rdd2 = sc.parallelize(0 until size).map { i => (i / 2, i) }
val result = rdd1.cogroup(rdd2).collect()
assert(result.length === size / 2)
result.foreach { case (i, (seq1, seq2)) =>
val actual1 = seq1.toSet
val actual2 = seq2.toSet
val expected = Set(i * 2, i * 2 + 1)
assert(actual1 === expected, s"Value 1 for $i was wrong: expected $expected, got $actual1")
assert(actual2 === expected, s"Value 2 for $i was wrong: expected $expected, got $actual2")
}
}

sc.stop()
}

test("spilling with hash collisions") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[String]

Expand All @@ -315,11 +315,12 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
assert(w1.hashCode === w2.hashCode)
}

map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i)))
map.insertAll((1 to size).iterator.map(_.toString).map(i => (i, i)))
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
map.insert(w2, w1)
}
assert(map.numSpills > 0, "map did not spill")

// A map of collision pairs in both directions
val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap
Expand All @@ -334,22 +335,25 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
assert(kv._2.equals(expectedValue))
count += 1
}
assert(count === 100000 + collisionPairs.size * 2)
assert(count === size + collisionPairs.size * 2)
sc.stop()
}

test("spilling with many hash collisions") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)

// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
for (i <- 1 to 10) {
for (j <- 1 to 10000) {
for (j <- 1 to size) {
map.insert(FixedHashObject(j, j % 2), 1)
}
}
assert(map.numSpills > 0, "map did not spill")

val it = map.iterator
var count = 0
Expand All @@ -358,17 +362,20 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
assert(kv._2 === 10)
count += 1
}
assert(count === 10000)
assert(count === size)
sc.stop()
}

test("spilling with hash collisions using the Int.MaxValue key") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]

(1 to 100000).foreach { i => map.insert(i, i) }
(1 to size).foreach { i => map.insert(i, i) }
map.insert(Int.MaxValue, Int.MaxValue)
assert(map.numSpills > 0, "map did not spill")

val it = map.iterator
while (it.hasNext) {
Expand All @@ -379,14 +386,17 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
}

test("spilling with null keys and values") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]

map.insertAll((1 to 100000).iterator.map(i => (i, i)))
map.insertAll((1 to size).iterator.map(i => (i, i)))
map.insert(null.asInstanceOf[Int], 1)
map.insert(1, null.asInstanceOf[Int])
map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int])
assert(map.numSpills > 0, "map did not spill")

val it = map.iterator
while (it.hasNext) {
Expand All @@ -397,17 +407,22 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
}

test("external aggregation updates peak execution memory") {
val spillThreshold = 1000
val conf = createSparkConf(loadDefaults = false)
.set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter
.set("spark.testing.memory", (10 * 1024 * 1024).toString)
.set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString)
sc = new SparkContext("local", "test", conf)
// No spilling
AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without spilling") {
sc.parallelize(1 to 10, 2).map { i => (i, i) }.reduceByKey(_ + _).count()
assertNotSpilled(sc, "verify peak memory") {
sc.parallelize(1 to spillThreshold / 2, 2).map { i => (i, i) }.reduceByKey(_ + _).count()
}
}
// With spilling
AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map with spilling") {
sc.parallelize(1 to 1000 * 1000, 2).map { i => (i, i) }.reduceByKey(_ + _).count()
assertSpilled(sc, "verify peak memory") {
sc.parallelize(1 to spillThreshold * 3, 2).map { i => (i, i) }.reduceByKey(_ + _).count()
}
}
}

Expand Down
Loading