Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Address Reynold's review comments
  • Loading branch information
JoshRosen committed Oct 21, 2014
commit 1e8268d6111e4ad45e2acfe47d837718f2170461
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
logInfo(s"Unpersisting TorrentBroadcast $id")
logDebug(s"Unpersisting TorrentBroadcast $id")
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package org.apache.spark.broadcast

import scala.util.Random

import org.scalacheck.Gen
import org.scalatest.FunSuite
import org.scalatest.prop.GeneratorDrivenPropertyChecks

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage._

class BroadcastSuite extends FunSuite with LocalSparkContext with GeneratorDrivenPropertyChecks {
class BroadcastSuite extends FunSuite with LocalSparkContext {

private val httpConf = broadcastConf("HttpBroadcastFactory")
private val torrentConf = broadcastConf("TorrentBroadcastFactory")
Expand Down Expand Up @@ -95,15 +93,15 @@ class BroadcastSuite extends FunSuite with LocalSparkContext with GeneratorDrive
val conf = new SparkConf()
val compressionCodec = Some(new SnappyCompressionCodec(conf))
val serializer = new JavaSerializer(conf)
val objects = for (size <- Gen.choose(1, 1024 * 10)) yield {
val seed = 42
val rand = new Random(seed)
for (trial <- 1 to 100) {
val size = 1 + rand.nextInt(1024 * 10)
val data: Array[Byte] = new Array[Byte](size)
Random.nextBytes(data)
data
}
forAll (objects) { (obj: Array[Byte]) =>
val blocks = blockifyObject(obj, blockSize, serializer, compressionCodec)
rand.nextBytes(data)
val blocks = blockifyObject(data, blockSize, serializer, compressionCodec)
val unblockified = unBlockifyObject[Array[Byte]](blocks, serializer, compressionCodec)
assert(unblockified === obj)
assert(unblockified === data)
}
}

Expand Down